Streaming Actions

Note

Burr actions can stream results! This enables you to display results to the user as tokens are streamed in.

Actions can be implemented as streaming results. This enables a lower time-to-first-token and a more interactive interface in the case of AI applications or streaming in of metrics in a model-training application. Broadly, this is a tool to enable quicker user interaction in longer running actions that require user focus.

Streaming actions can be implemented as a class or a function, just like actions. However, they have a few additional rules:

  1. They give intermediate results to the user as they are generated in the form of a generator

  2. They are responsible for determining the relationship between the intermediate results and the final result

  3. They have a return statement that handles result collection and state update

If you’re not familiar with return statements in generators, you can read about them here. The high-level idea is that the return statement is used to raise a StopIteration exception with a value that is returned to the caller. The framework uses this value to update the state of the action and to collect the final result.

A class-based streaming action might look like this:

from burr.core.action import StreamingAction

class StreamingChatCall(StreamingAction):
    def stream_run(self, state: State, **run_kwargs) -> Generator[dict, None, dict]:
        client = openai.Client()
        response = client.chat.completions.create(
            model='gpt-3.5-turbo',
            messages=[{
                'role': 'user',
                'content': state["prompt"]
            }],
            temperature=0,
            stream=True,
        )
        buffer = []
        for chunk in response:
            delta = chunk.choices[0].delta.content
            buffer.append(delta)
            yield {'response': delta}
        full_response = ''.join(buffer)
        return {'response': full_response}

    @property
    def reads(self) -> list[str]:
        return ["prompt"]

    @property
    def writes(self) -> list[str]:
        return ["response"]

    def update(self, result: dict, state: State) -> State:
        return state.append(response=result["response"])

The logic is split between stream_run, which is responsible for generating the intermediate results and joining them into the final result, and update, which is responsible for collecting the final result and updating the state. The return statement in stream_run is used to return the final result to the framework, which is passed to update.

The function-based equivalent would look very similar:

from burr.core.action import streaming_action

@streaming_action(reads=["prompt"], writes=["prompt"])
def streaming_chat_call(state: State, **run_kwargs) -> Generator[dict, None, Tuple[dict, State]]:
    client = openai.Client()
    response = client.chat.completions.create(
        model='gpt-3.5-turbo',
        messages=[{
            'role': 'user',
            'content': state["prompt"]
        }],
        temperature=0,
        stream=True,
    )
    buffer = []
    for chunk in response:
        delta = chunk.choices[0].delta.content
        buffer.append(delta)
        yield {'response': delta}
    full_response = ''.join(buffer)
    return {'response': full_response}, state.append(response=full_response)

As you can see above, we’re doing the same thing, with a bit of syntactic sugar to combine the update and return statements.

Currently Async streaming actions are not supported. We will be adding shortly – stay tuned! Follow the issue on github for more information.