Parallelism

Tools to make sub-actions/sub-graphs easier to work with. Read the docs on Parallelism for more information.

class burr.core.parallelism.RunnableGraph(graph: Graph, entrypoint: str, halt_after: List[str])

Contains a graph with information it needs to run. This is a bit more than a graph – we have entrypoints + halt_after points. This is the core element of a recursive action – your recursive generators can yield these (as well as actions/functions, which both get turned into single-node graphs…)

static create(
from_: Action | Callable | RunnableGraph,
) RunnableGraph

Creates a RunnableGraph from a callable/action. This will create a single-node runnable graph, so we can wrap it up in a task.

Parameters:

from – Callable or Action to wrap

Returns:

RunnableGraph

class burr.core.parallelism.SubGraphTask(
graph: RunnableGraph,
inputs: Dict[str, Any],
state: State,
application_id: str,
tracker: SyncTrackingClient | None = None,
state_persister: BaseStateSaver | None = None,
state_initializer: BaseStateLoader | None = None,
)

Task to run a subgraph. Has runtime-specific information, like inputs, state, and the application ID. This is the lower-level component – the user will only directly interact with this if they use the TaskBasedParallelAction interface, which produces a generator of these.

run(
parent_context: ApplicationContext,
) State

Runs the task – this simply executes it by instantiating a sub-application

class burr.core.parallelism.TaskBasedParallelAction

The base class for actions that run a set of tasks in parallel and reduce the results. This is more power-user mode – if you need fine-grained control over the set of tasks your parallel action utilizes, then this is for you. If not, you’ll want to see:

If you’re unfamiliar about where to start, you’ll want to see the docs on parallelism.

This is responsible for two things:

  1. Creating a set of tasks to run in parallel

  2. Reducing the results of those tasks into a single state for the action to return.

The following example shows how to call a set of prompts over a set of different models in parallel and return the result.

from burr.core import action, state, ApplicationContext
from burr.core.parallelism import MapStates, RunnableGraph
from typing import Callable, Generator, List

@action(reads=["prompt", "model"], writes=["llm_output"])
def query_llm(state: State, model: str) -> State:
    # TODO -- implement _query_my_llm to call litellm or something
    return state.update(llm_output=_query_my_llm(prompt=state["prompt"], model=model))

class MultipleTaskExample(TaskBasedParallelAction):
    def tasks(state: State, context: ApplicationContext) -> Generator[SubGraphTask, None, None]:
        for prompt in state["prompts"]:
            for action in [
                query_llm.bind(model="gpt-4").with_name("gpt_4_answer"),
                query_llm.bind(model="o1").with_name("o1_answer"),
                query_llm.bind(model="claude").with_name("claude_answer"),
            ]
                yield SubGraphTask(
                    action=action, # can be a RunnableGraph as well
                    state=state.update(prompt=prompt),
                    inputs={},
                    # stable hash -- up to you to ensure uniqueness
                    application_id=hashlib.sha256(context.application_id + action.name + prompt).hexdigest(),
                    # a few other parameters we might add -- see advanced usage -- failure conditions, etc...
                )

    def reduce(self, state: State, states: Generator[State, None, None]) -> State:
        all_llm_outputs = []
        for sub_state in states:
            all_llm_outputs.append(
                {
                    "output" : sub_state["llm_output"],
                    "model" : sub_state["model"],
                    "prompt" : sub_state["prompt"],
                }
            )
        return state.update(all_llm_outputs=all_llm_outputs)

Note that it can be synchronous or asynchronous. Synchronous implementations will use the standard/ supplied executor. Asynchronous implementations will use asyncio.gather. Note that, while asynchronous implementations may implement the tasks as either synchronous or asynchronous generators, synchronous implementations can only use synchronous generators. Furthermore, with asynchronous implementations, the generator for reduce will be asynchronous as well (regardless of whether your task functions are asynchronous).

property inputs: list[str] | tuple[list[str], list[str]]

Inputs from this – if you want to override you’ll want to call super() first so you get these inputs.

Returns:

the list of inputs that will populate kwargs.

is_async() bool

This says whether or not the action is async. Note you have to override this if you have async tasks and want to use asyncio.gather on them. Otherwise leave this blank.

Returns:

Whether or not the action is async

abstract property reads: list[str]

Returns the keys from the state that this function reads

Returns:

A list of keys

abstract reduce(
state: State,
states: Generator[State, None, None] | AsyncGenerator[State, None],
) State

Reduces the states from the tasks into a single state.

Parameters:

states – State outputs from the subtasks

Returns:

Reduced state

run_and_update(
state: State,
**run_kwargs,
) Tuple[dict, State]

Runs and updates. This is not user-facing, so do not override it. This runs all actions in parallel (using the supplied executor, from the context), and then reduces the results.

Parameters:
  • state – Input state

  • run_kwargs – Additional inputs (runtime inputs)

Returns:

The results, updated state tuple. The results are empty, but we may add more in the future.

abstract tasks(
state: State,
context: ApplicationContext,
inputs: Dict[str, Any],
) Generator[SubGraphTask, None, None] | AsyncGenerator[SubGraphTask, None]

Creates all tasks that this action will run, given the state/inputs. This produces a generator of SubGraphTasks that will be run in parallel.

Parameters:
  • state – State prior to action’s execution

  • context – Context for the action

Yield:

SubGraphTasks to run

abstract property writes: list[str]

Returns the keys from the state that this reducer writes.

Returns:

A list of keys

class burr.core.parallelism.MapActionsAndStates

Base class to run a cartesian product of actions x states.

For example, if you want to run the following:

  • n prompts

  • m models

This will make it easy to do. If you need fine-grained control, you can use the TaskBasedParallelAction, which allows you to specify the tasks individually. If you just want to vary actions/states (and not both), use MapActions or MapStates implementations.

The following shows how to run a set of prompts over a set of models in parallel and return the results.

from burr.core import action, state
from burr.core.parallelism import MapActionsAndStates, RunnableGraph
from typing import Callable, Generator, List

@action(reads=["prompt", "model"], writes=["llm_output"])
def query_llm(state: State, model: str) -> State:
    # TODO -- implement _query_my_llm to call litellm or something
    return state.update(llm_output=_query_my_llm(prompt=state["prompt"], model=model))

class TestModelsOverPrompts(MapActionsAndStates):

    def actions(self, state: State, context: ApplicationContext, inputs: Dict[str, Any]) -> Generator[Action | Callable | RunnableGraph, None, None]:
        # make sure to add a name to the action
        # This is not necessary for subgraphs, as actions will already have names
        for action in [
            query_llm.bind(model="gpt-4").with_name("gpt_4_answer"),
            query_llm.bind(model="o1").with_name("o1_answer"),
            query_llm.bind(model="claude").with_name("claude_answer"),
        ]:
            yield action

    def states(self, state: State, context: ApplicationContext, inputs: Dict[str, Any]) -> Generator[State, None, None]:
        for prompt in [
            "What is the meaning of life?",
            "What is the airspeed velocity of an unladen swallow?",
            "What is the best way to cook a steak?",
        ]:
            yield state.update(prompt=prompt)

    def reduce(self, state: State, states: Generator[State, None, None]) -> State:
        all_llm_outputs = []
        for sub_state in states:
            all_llm_outputs.append(
                {
                    "output" : sub_state["llm_output"],
                    "model" : sub_state["model"],
                    "prompt" : sub_state["prompt"],
                }
            )
        return state.update(all_llm_outputs=all_llm_outputs)

    @property
    def reads(self) -> list[str]:
        return ["prompts"]

    @property
    def writes(self) -> list[str]:
        return ["all_llm_outputs"]
abstract actions(
state: State,
context: ApplicationContext,
inputs: Dict[str, Any],
) Generator[Action | Callable | RunnableGraph, None, None] | AsyncGenerator[Action | Callable | RunnableGraph, None]

Yields actions to run in parallel. These will be merged with the states as a cartesian product.

Parameters:
  • state – Input state at the time of running the “parent” action.

  • inputs – Runtime Inputs to the action

Returns:

Generator of actions to run

abstract reduce(
state: State,
states: Generator[State, None, None],
) State

Reduces the states from the tasks into a single state.

Parameters:

states – State outputs from the subtasks

Returns:

Reduced state

state_initializer(
**kwargs,
) Literal['cascade'] | BaseStateLoader | DoLogAttributeHook | PreRunStepHook | PreRunStepHookAsync | PostRunStepHook | PostRunStepHookAsync | PreApplicationExecuteCallHook | PreApplicationExecuteCallHookAsync | PostApplicationExecuteCallHook | PostApplicationExecuteCallHookAsync | PostApplicationCreateHook | PreStartSpanHook | PreStartSpanHookAsync | PostEndSpanHook | PostEndSpanHookAsync | PreStartStreamHook | PostStreamItemHook | PostEndStreamHook | PreStartStreamHookAsync | PostStreamItemHookAsync | PostEndStreamHookAsync | None

State initializer for the action – what initializer does the sub-application use?

This can either be: - “cascade”: inherit from parent - None: don’t use an initializer - Object: use the specified initializer

Note this is global – if you want to override it on a per-subgraph basis, you’ll need to use the task-level API.

Parameters:

kwargs – Additional arguments, reserverd for later

Returns:

the specified behavior

state_persister(
**kwargs,
) Literal['cascade'] | BaseStateSaver | DoLogAttributeHook | PreRunStepHook | PreRunStepHookAsync | PostRunStepHook | PostRunStepHookAsync | PreApplicationExecuteCallHook | PreApplicationExecuteCallHookAsync | PostApplicationExecuteCallHook | PostApplicationExecuteCallHookAsync | PostApplicationCreateHook | PreStartSpanHook | PreStartSpanHookAsync | PostEndSpanHook | PostEndSpanHookAsync | PreStartStreamHook | PostStreamItemHook | PostEndStreamHook | PreStartStreamHookAsync | PostStreamItemHookAsync | PostEndStreamHookAsync | None

Persister for the action – what persister does the sub-application use?

This can either be: - “cascade”: inherit from parent - None: don’t use a persister - Object: use the specified persister

Note this is global – if you want to override it on a per-subgraph basis, you’ll need to use the task-level API.

Parameters:

kwargs – Additional arguments, reserverd for later

Returns:

The specified behavior.

abstract states(
state: State,
context: ApplicationContext,
inputs: Dict[str, Any],
) Generator[State, None, None] | AsyncGenerator[State, None]

Yields states to run in parallel. These will be merged with the actions as a cartesian product.

Parameters:
  • state – Input state at the time of running the “parent” action.

  • context – Context for the action

  • inputs – Runtime Inputs to the action

Returns:

Generator of states to run

tasks(
state: State,
context: ApplicationContext,
inputs: Dict[str, Any],
) Generator[SubGraphTask, None, None] | AsyncGenerator[SubGraphTask, None]

Takes the cartesian product of actions and states, creating tasks for each.

Parameters:
  • state – Input state at the time of running the “parent” action.

  • context – Context for the action

  • inputs – Runtime Inputs to the action

Returns:

Generator of tasks to run

tracker(
**kwargs,
) Literal['cascade'] | None | SyncTrackingClient

Tracker for the action – what tracker does the sub-application use?

This can either be: - “cascade”: inherit from parent - None: don’t use a tracker - Object: use the specified tracker

Note this is global – if you want to override it on a per-subgraph basis, you’ll need to use the task-level API.

Parameters:

kwargs – Additional arguments, reserverd for later

Returns:

the specified behavior

class burr.core.parallelism.MapActions

Base class to run a set of actions over the same state. Actions can be functions (decorated with @action), action objects, or subdags implemented as RunnableGraph objects. With this, you can do the following:

  1. Specify the actions to run

  2. Specify the state to run the actions over

  3. Reduce the results into a single state

This is useful, for example, to run different LLMs over the same set of prompts,

Here is an example (with some pseudocode) of doing just that:

from burr.core import action, state
from burr.core.parallelism import MapActions, RunnableGraph
from typing import Callable, Generator, List

@action(reads=["prompt", "model"], writes=["llm_output"])
def query_llm(state: State, model: str) -> State:
    # TODO -- implement _query_my_llm to call litellm or something
    return state.update(llm_output=_query_my_llm(prompt=state["prompt"], model=model))

class TestMultipleModels(MapActions):

    def actions(self, state: State, inputs: Dict[str, Any], context: ApplicationContext) -> Generator[Action | Callable | RunnableGraph, None, None]:
        # Make sure to add a name to the action if you use bind() with a function,
        # note that these can be different actions, functions, etc...
        # in this case we're using `.bind()` to create multiple actions, but we can use some mix of
        # subgraphs, functions, action objects, etc...
        for action in [
            query_llm.bind(model="gpt-4").with_name("gpt_4_answer"),
            query_llm.bind(model="o1").with_name("o1_answer"),
            query_llm.bind(model="claude").with_name("claude_answer"),
        ]:
            yield action

    def state(self, state: State, inputs: Dict[str, Any]) -> State:
        return state.update(prompt="What is the meaning of life?")

    def reduce(self, states: Generator[State, None, None]) -> State:
        all_llm_outputs = []
        for state in states:
            all_llm_outputs.append(state["llm_output"])
        return state.update(all_llm_outputs=all_llm_outputs)

    @property
    def reads(self) -> List[str]:
        return ["prompt"] # we're just running this on a single prompt, for multiple actions

    @property
    def writes(self) -> List[str]:
        return ["all_llm_outputs"]
abstract actions(
state: State,
inputs: Dict[str, Any],
context: ApplicationContext,
) Generator[Action | Callable | RunnableGraph, None, None] | AsyncGenerator[Action | Callable | RunnableGraph, None]

Gives all actions to map over, given the state/inputs.

Parameters:
  • state – State at the time of running the action

  • inputs – Runtime Inputs to the action

  • context – Context for the action

Returns:

Generator of actions to run

abstract reduce(
state: State,
states: Generator[State, None, None] | AsyncGenerator[State, None],
) State

Reduces the task’s results into a single state. Runs through all outputs and combines them together, to form the final state for the action.

Parameters:

states – State outputs from the subtasks

Returns:

Reduced state

state(state: State, inputs: Dict[str, Any])

Gives the state for each of the actions. By default, this will give out the current state. That said, you may want to adjust this – E.G. to translate state into a format the sub-actions would expect.

Parameters:
  • state – State at the time of running the action

  • inputs – Runtime inputs to the action

Returns:

State for the action

states(
state: State,
context: ApplicationContext,
inputs: Dict[str, Any],
) Generator[State, None, None]

Just converts the state into a generator of 1, so we can use the superclass. This is internal.

class burr.core.parallelism.MapStates

Base class to run a single action over a set of states. States are given as updates (manipulations) of the action’s input state, specified by the states generator.

With this, you can do the following:

  1. Specify the states to run

  2. Specify the action to run over all the states

  3. Reduce the results into a single state

This is useful, for example, to run different prompts over the same LLM,

Here is an example (with some pseudocode) of doing just that:

from burr.core import action, state
from burr.core.parallelism import MapStates, RunnableGraph
from typing import Callable, Generator, List

@action(reads=["prompt"], writes=["llm_output"])
def query_llm(state: State) -> State:
    return state.update(llm_output=_query_my_llm(prompt=state["prompt"]))

class TestMultiplePrompts(MapStates):

    def action(self, state: State, inputs: Dict[str, Any]) -> Action | Callable | RunnableGraph:
        # make sure to add a name to the action
        # This is not necessary for subgraphs, as actions will already have names
        return query_llm.with_name("query_llm")

    def states(self, state: State, inputs: Dict[str, Any], context: ApplicationContext) -> Generator[State, None, None]:
        # You could easily have a list_prompts upstream action that writes to "prompts" in state
        # And loop through those
        # This hardcodes for simplicity
        for prompt in [
            "What is the meaning of life?",
            "What is the airspeed velocity of an unladen swallow?",
            "What is the best way to cook a steak?",
        ]:
            yield state.update(prompt=prompt)

    def reduce(self, state: State, states: Generator[State, None, None]) -> State:
        all_llm_outputs = []
        for sub_state in states:
            all_llm_outputs.append(sub_state["llm_output"])
        return state.update(all_llm_outputs=all_llm_outputs)

    @property
    def reads(self) -> List[str]:
        return ["prompts"]

    @property
    def writes(self) -> List[str]:
        return ["all_llm_outputs"]
abstract action(
state: State,
inputs: Dict[str, Any],
) Action | Callable | RunnableGraph

The single action to apply to each state. This can be a function (decorated with @action, action object, or subdag).

Parameters:
  • state – State to run the action over

  • inputs – Runtime inputs to the action

Returns:

Action to run

actions(
state: State,
context: ApplicationContext,
inputs: Dict[str, Any],
) Generator[Action | Callable | RunnableGraph, None, None] | AsyncGenerator[Action | Callable | RunnableGraph, None]

Maps the action over each state generated by the states method. Internally used, do not implement.

abstract reduce(
state: State,
results: Generator[State, None, None] | AsyncGenerator[State, None],
) State

Reduces the task’s results

Parameters:

results

Returns:

abstract states(
state: State,
context: ApplicationContext,
inputs: Dict[str, Any],
) Generator[State, None, None] | AsyncGenerator[State, None]

Generates all states to map over, given the state and inputs. Each state will be an update to the input state.

For instance, you may want to take an input state that has a list field, and expand it into a set of states, each with a different value from the list.

For example:

def states(self, state: State, context: ApplicationContext, inputs: Dict[str, Any]) -> Generator[State, None, None]:
    for item in state["multiple_fields"]:
        yield state.update(individual_field=item)
Parameters:
  • state – Initial state

  • context – Context for the action

  • inputs – Runtime inputs to the action

Returns:

Generator of states to run

parallelism.map_reduce_action(
state: Callable[[State, ApplicationContext, Dict[str, Any]], Generator[State, None, None] | AsyncGenerator[State, None] | List[State] | State],
reducer: Callable[[State, Generator[State, None, None] | AsyncGenerator[State, None]], State],
reads: List[str],
writes: List[str],
inputs: List[str],
)

Experimental API for creating a map-reduce action easily. We’ll be improving this.