Parallelism¶
Tools to make sub-actions/sub-graphs easier to work with. Read the docs on Parallelism for more information.
- class burr.core.parallelism.RunnableGraph(graph: Graph, entrypoint: str, halt_after: List[str])¶
Contains a graph with information it needs to run. This is a bit more than a graph – we have entrypoints + halt_after points. This is the core element of a recursive action – your recursive generators can yield these (as well as actions/functions, which both get turned into single-node graphs…)
- static create(
- from_: Action | Callable | RunnableGraph,
Creates a RunnableGraph from a callable/action. This will create a single-node runnable graph, so we can wrap it up in a task.
- Parameters:
from – Callable or Action to wrap
- Returns:
RunnableGraph
- class burr.core.parallelism.SubGraphTask(
- graph: RunnableGraph,
- inputs: Dict[str, Any],
- state: State,
- application_id: str,
- tracker: SyncTrackingClient | None = None,
- state_persister: BaseStateSaver | None = None,
- state_initializer: BaseStateLoader | None = None,
Task to run a subgraph. Has runtime-specific information, like inputs, state, and the application ID. This is the lower-level component – the user will only directly interact with this if they use the TaskBasedParallelAction interface, which produces a generator of these.
- run(
- parent_context: ApplicationContext,
Runs the task – this simply executes it by instantiating a sub-application
- class burr.core.parallelism.TaskBasedParallelAction¶
The base class for actions that run a set of tasks in parallel and reduce the results. This is more power-user mode – if you need fine-grained control over the set of tasks your parallel action utilizes, then this is for you. If not, you’ll want to see:
MapActionsAndStates
– a cartesian product of actions/statesMapActions
– a map of actions over a single stateMapStates
– a map of a single action over multiple states
If you’re unfamiliar about where to start, you’ll want to see the docs on parallelism.
This is responsible for two things:
Creating a set of tasks to run in parallel
Reducing the results of those tasks into a single state for the action to return.
The following example shows how to call a set of prompts over a set of different models in parallel and return the result.
from burr.core import action, state, ApplicationContext from burr.core.parallelism import MapStates, RunnableGraph from typing import Callable, Generator, List @action(reads=["prompt", "model"], writes=["llm_output"]) def query_llm(state: State, model: str) -> State: # TODO -- implement _query_my_llm to call litellm or something return state.update(llm_output=_query_my_llm(prompt=state["prompt"], model=model)) class MultipleTaskExample(TaskBasedParallelAction): def tasks(state: State, context: ApplicationContext) -> Generator[SubGraphTask, None, None]: for prompt in state["prompts"]: for action in [ query_llm.bind(model="gpt-4").with_name("gpt_4_answer"), query_llm.bind(model="o1").with_name("o1_answer"), query_llm.bind(model="claude").with_name("claude_answer"), ] yield SubGraphTask( action=action, # can be a RunnableGraph as well state=state.update(prompt=prompt), inputs={}, # stable hash -- up to you to ensure uniqueness application_id=hashlib.sha256(context.application_id + action.name + prompt).hexdigest(), # a few other parameters we might add -- see advanced usage -- failure conditions, etc... ) def reduce(self, state: State, states: Generator[State, None, None]) -> State: all_llm_outputs = [] for sub_state in states: all_llm_outputs.append( { "output" : sub_state["llm_output"], "model" : sub_state["model"], "prompt" : sub_state["prompt"], } ) return state.update(all_llm_outputs=all_llm_outputs)
Note that it can be synchronous or asynchronous. Synchronous implementations will use the standard/ supplied executor. Asynchronous implementations will use asyncio.gather. Note that, while asynchronous implementations may implement the tasks as either synchronous or asynchronous generators, synchronous implementations can only use synchronous generators. Furthermore, with asynchronous implementations, the generator for reduce will be asynchronous as well (regardless of whether your task functions are asynchronous).
- property inputs: list[str] | tuple[list[str], list[str]]¶
Inputs from this – if you want to override you’ll want to call super() first so you get these inputs.
- Returns:
the list of inputs that will populate kwargs.
- is_async() bool ¶
This says whether or not the action is async. Note you have to override this if you have async tasks and want to use asyncio.gather on them. Otherwise leave this blank.
- Returns:
Whether or not the action is async
- abstract property reads: list[str]¶
Returns the keys from the state that this function reads
- Returns:
A list of keys
- abstract reduce( ) State ¶
Reduces the states from the tasks into a single state.
- Parameters:
states – State outputs from the subtasks
- Returns:
Reduced state
- run_and_update(
- state: State,
- **run_kwargs,
Runs and updates. This is not user-facing, so do not override it. This runs all actions in parallel (using the supplied executor, from the context), and then reduces the results.
- Parameters:
state – Input state
run_kwargs – Additional inputs (runtime inputs)
- Returns:
The results, updated state tuple. The results are empty, but we may add more in the future.
- abstract tasks(
- state: State,
- context: ApplicationContext,
- inputs: Dict[str, Any],
Creates all tasks that this action will run, given the state/inputs. This produces a generator of SubGraphTasks that will be run in parallel.
- Parameters:
state – State prior to action’s execution
context – Context for the action
- Yield:
SubGraphTasks to run
- abstract property writes: list[str]¶
Returns the keys from the state that this reducer writes.
- Returns:
A list of keys
- class burr.core.parallelism.MapActionsAndStates¶
Base class to run a cartesian product of actions x states.
For example, if you want to run the following:
n prompts
m models
This will make it easy to do. If you need fine-grained control, you can use the
TaskBasedParallelAction
, which allows you to specify the tasks individually. If you just want to vary actions/states (and not both), useMapActions
orMapStates
implementations.The following shows how to run a set of prompts over a set of models in parallel and return the results.
from burr.core import action, state from burr.core.parallelism import MapActionsAndStates, RunnableGraph from typing import Callable, Generator, List @action(reads=["prompt", "model"], writes=["llm_output"]) def query_llm(state: State, model: str) -> State: # TODO -- implement _query_my_llm to call litellm or something return state.update(llm_output=_query_my_llm(prompt=state["prompt"], model=model)) class TestModelsOverPrompts(MapActionsAndStates): def actions(self, state: State, context: ApplicationContext, inputs: Dict[str, Any]) -> Generator[Action | Callable | RunnableGraph, None, None]: # make sure to add a name to the action # This is not necessary for subgraphs, as actions will already have names for action in [ query_llm.bind(model="gpt-4").with_name("gpt_4_answer"), query_llm.bind(model="o1").with_name("o1_answer"), query_llm.bind(model="claude").with_name("claude_answer"), ]: yield action def states(self, state: State, context: ApplicationContext, inputs: Dict[str, Any]) -> Generator[State, None, None]: for prompt in [ "What is the meaning of life?", "What is the airspeed velocity of an unladen swallow?", "What is the best way to cook a steak?", ]: yield state.update(prompt=prompt) def reduce(self, state: State, states: Generator[State, None, None]) -> State: all_llm_outputs = [] for sub_state in states: all_llm_outputs.append( { "output" : sub_state["llm_output"], "model" : sub_state["model"], "prompt" : sub_state["prompt"], } ) return state.update(all_llm_outputs=all_llm_outputs) @property def reads(self) -> list[str]: return ["prompts"] @property def writes(self) -> list[str]: return ["all_llm_outputs"]
- abstract actions(
- state: State,
- context: ApplicationContext,
- inputs: Dict[str, Any],
Yields actions to run in parallel. These will be merged with the states as a cartesian product.
- Parameters:
state – Input state at the time of running the “parent” action.
inputs – Runtime Inputs to the action
- Returns:
Generator of actions to run
- abstract reduce( ) State ¶
Reduces the states from the tasks into a single state.
- Parameters:
states – State outputs from the subtasks
- Returns:
Reduced state
- state_initializer(
- **kwargs,
State initializer for the action – what initializer does the sub-application use?
This can either be: - “cascade”: inherit from parent - None: don’t use an initializer - Object: use the specified initializer
Note this is global – if you want to override it on a per-subgraph basis, you’ll need to use the task-level API.
- Parameters:
kwargs – Additional arguments, reserverd for later
- Returns:
the specified behavior
- state_persister(
- **kwargs,
Persister for the action – what persister does the sub-application use?
This can either be: - “cascade”: inherit from parent - None: don’t use a persister - Object: use the specified persister
Note this is global – if you want to override it on a per-subgraph basis, you’ll need to use the task-level API.
- Parameters:
kwargs – Additional arguments, reserverd for later
- Returns:
The specified behavior.
- abstract states(
- state: State,
- context: ApplicationContext,
- inputs: Dict[str, Any],
Yields states to run in parallel. These will be merged with the actions as a cartesian product.
- Parameters:
state – Input state at the time of running the “parent” action.
context – Context for the action
inputs – Runtime Inputs to the action
- Returns:
Generator of states to run
- tasks(
- state: State,
- context: ApplicationContext,
- inputs: Dict[str, Any],
Takes the cartesian product of actions and states, creating tasks for each.
- Parameters:
state – Input state at the time of running the “parent” action.
context – Context for the action
inputs – Runtime Inputs to the action
- Returns:
Generator of tasks to run
- tracker(
- **kwargs,
Tracker for the action – what tracker does the sub-application use?
This can either be: - “cascade”: inherit from parent - None: don’t use a tracker - Object: use the specified tracker
Note this is global – if you want to override it on a per-subgraph basis, you’ll need to use the task-level API.
- Parameters:
kwargs – Additional arguments, reserverd for later
- Returns:
the specified behavior
- class burr.core.parallelism.MapActions¶
Base class to run a set of actions over the same state. Actions can be functions (decorated with @action), action objects, or subdags implemented as
RunnableGraph
objects. With this, you can do the following:Specify the actions to run
Specify the state to run the actions over
Reduce the results into a single state
This is useful, for example, to run different LLMs over the same set of prompts,
Here is an example (with some pseudocode) of doing just that:
from burr.core import action, state from burr.core.parallelism import MapActions, RunnableGraph from typing import Callable, Generator, List @action(reads=["prompt", "model"], writes=["llm_output"]) def query_llm(state: State, model: str) -> State: # TODO -- implement _query_my_llm to call litellm or something return state.update(llm_output=_query_my_llm(prompt=state["prompt"], model=model)) class TestMultipleModels(MapActions): def actions(self, state: State, inputs: Dict[str, Any], context: ApplicationContext) -> Generator[Action | Callable | RunnableGraph, None, None]: # Make sure to add a name to the action if you use bind() with a function, # note that these can be different actions, functions, etc... # in this case we're using `.bind()` to create multiple actions, but we can use some mix of # subgraphs, functions, action objects, etc... for action in [ query_llm.bind(model="gpt-4").with_name("gpt_4_answer"), query_llm.bind(model="o1").with_name("o1_answer"), query_llm.bind(model="claude").with_name("claude_answer"), ]: yield action def state(self, state: State, inputs: Dict[str, Any]) -> State: return state.update(prompt="What is the meaning of life?") def reduce(self, states: Generator[State, None, None]) -> State: all_llm_outputs = [] for state in states: all_llm_outputs.append(state["llm_output"]) return state.update(all_llm_outputs=all_llm_outputs) @property def reads(self) -> List[str]: return ["prompt"] # we're just running this on a single prompt, for multiple actions @property def writes(self) -> List[str]: return ["all_llm_outputs"]
- abstract actions(
- state: State,
- inputs: Dict[str, Any],
- context: ApplicationContext,
Gives all actions to map over, given the state/inputs.
- Parameters:
state – State at the time of running the action
inputs – Runtime Inputs to the action
context – Context for the action
- Returns:
Generator of actions to run
- abstract reduce( ) State ¶
Reduces the task’s results into a single state. Runs through all outputs and combines them together, to form the final state for the action.
- Parameters:
states – State outputs from the subtasks
- Returns:
Reduced state
- state(state: State, inputs: Dict[str, Any])¶
Gives the state for each of the actions. By default, this will give out the current state. That said, you may want to adjust this – E.G. to translate state into a format the sub-actions would expect.
- Parameters:
state – State at the time of running the action
inputs – Runtime inputs to the action
- Returns:
State for the action
- states(
- state: State,
- context: ApplicationContext,
- inputs: Dict[str, Any],
Just converts the state into a generator of 1, so we can use the superclass. This is internal.
- class burr.core.parallelism.MapStates¶
Base class to run a single action over a set of states. States are given as updates (manipulations) of the action’s input state, specified by the states generator.
With this, you can do the following:
Specify the states to run
Specify the action to run over all the states
Reduce the results into a single state
This is useful, for example, to run different prompts over the same LLM,
Here is an example (with some pseudocode) of doing just that:
from burr.core import action, state from burr.core.parallelism import MapStates, RunnableGraph from typing import Callable, Generator, List @action(reads=["prompt"], writes=["llm_output"]) def query_llm(state: State) -> State: return state.update(llm_output=_query_my_llm(prompt=state["prompt"])) class TestMultiplePrompts(MapStates): def action(self, state: State, inputs: Dict[str, Any]) -> Action | Callable | RunnableGraph: # make sure to add a name to the action # This is not necessary for subgraphs, as actions will already have names return query_llm.with_name("query_llm") def states(self, state: State, inputs: Dict[str, Any], context: ApplicationContext) -> Generator[State, None, None]: # You could easily have a list_prompts upstream action that writes to "prompts" in state # And loop through those # This hardcodes for simplicity for prompt in [ "What is the meaning of life?", "What is the airspeed velocity of an unladen swallow?", "What is the best way to cook a steak?", ]: yield state.update(prompt=prompt) def reduce(self, state: State, states: Generator[State, None, None]) -> State: all_llm_outputs = [] for sub_state in states: all_llm_outputs.append(sub_state["llm_output"]) return state.update(all_llm_outputs=all_llm_outputs) @property def reads(self) -> List[str]: return ["prompts"] @property def writes(self) -> List[str]: return ["all_llm_outputs"]
- abstract action(
- state: State,
- inputs: Dict[str, Any],
The single action to apply to each state. This can be a function (decorated with @action, action object, or subdag).
- Parameters:
state – State to run the action over
inputs – Runtime inputs to the action
- Returns:
Action to run
- actions(
- state: State,
- context: ApplicationContext,
- inputs: Dict[str, Any],
Maps the action over each state generated by the states method. Internally used, do not implement.
- abstract states(
- state: State,
- context: ApplicationContext,
- inputs: Dict[str, Any],
Generates all states to map over, given the state and inputs. Each state will be an update to the input state.
For instance, you may want to take an input state that has a list field, and expand it into a set of states, each with a different value from the list.
For example:
def states(self, state: State, context: ApplicationContext, inputs: Dict[str, Any]) -> Generator[State, None, None]: for item in state["multiple_fields"]: yield state.update(individual_field=item)
- Parameters:
state – Initial state
context – Context for the action
inputs – Runtime inputs to the action
- Returns:
Generator of states to run
- parallelism.map_reduce_action(
- state: Callable[[State, ApplicationContext, Dict[str, Any]], Generator[State, None, None] | AsyncGenerator[State, None] | List[State] | State],
- reducer: Callable[[State, Generator[State, None, None] | AsyncGenerator[State, None]], State],
- reads: List[str],
- writes: List[str],
- inputs: List[str],
Experimental API for creating a map-reduce action easily. We’ll be improving this.