Applications¶
Use this to build and manage a state Machine. You should only ever instantiate the ApplicationBuilder
class,
and not the Application
class directly.
- class burr.core.application.ApplicationBuilder¶
- abuild() Application[StateType] ¶
Builds the application for asynchronous runs.
We support both synchronous and asynchronous applications. To save/load in an asynchronous manner add the async persister versions and use this method to initialize them. This will also enforce the application to be run with async methods as an additional safety check that you are not introducing unnecessary bottlenecks.
Note: When you run an async application you can still use the normal sync functionalities, i.e. sync hooks of other adapters, but they will block the async event loop until finished.
In case you are using state initializers and persisters, the asynchronous application should be used in the following cases:
¶ Cases (persister and app methods)
Status
Sync and Sync
❌
Sync and Async
❌
Async and Sync
❌
Async and Async
✅
- Returns:
The application object.
- build() Application[StateType] ¶
Builds the application for synchronous runs.
We support both synchronous and asynchronous applications. In case you are using state initializers and persisters, the synchronous application should be used in the following cases:
¶ Cases (persister and app methods)
Status
Remarks
Sync and Sync
✅
Sync and Async
✅ ⚠️
Will be deprecated
Async and Sync
❌
Async and Async
❌
We originally only had sync persistence and as such this still can be used when running the app async. However, we strongly encourage to switch to async persisters if you are running an async application.
- Returns:
The application object.
- initialize_from(
- initializer: BaseStateLoader | AsyncBaseStateLoader,
- resume_at_next_action: bool,
- default_state: dict,
- default_entrypoint: str,
- fork_from_app_id: str = None,
- fork_from_partition_key: str = None,
- fork_from_sequence_id: int = None,
Initializes the application we will build from some prior state object.
Note (1) that you can either call this or use with_state and with_entrypoint.
Note (2) if you want to continue a prior application and don’t want to fork it into a new application ID, the values in .with_identifiers() will be used to query for prior state.
- Parameters:
initializer – The persister object to use for initialization. Likely the same one called with
with_state_persister
.resume_at_next_action – Whether to resume at the next action, or default to the
default_entrypoint
default_state – The default state to use if it does not exist. This is a dictionary.
default_entrypoint – The default entry point to use if it does not exist or you elect not to resume_at_next_action.
fork_from_app_id – The app ID to fork from, not to be confused with the current app_id that is set with .with_identifiers(). This is used to fork from a prior application run.
fork_from_partition_key – The partition key to fork from a prior application. Optional. fork_from_app_id required.
fork_from_sequence_id – The sequence ID to fork from a prior application run. Optional, defaults to latest. fork_from_app_id required.
- Returns:
The application builder for future chaining.
- with_actions( ) ApplicationBuilder[StateType] ¶
Adds an action to the application. The actions are granted names (using the with_name) method post-adding, using the kw argument. If it already has a name (or you wish to use the function name, raw, and it is a function-based-action), then you can use the args parameter. This is the only supported way to add actions.
- Parameters:
action_list – Actions to add – these must have a name or be function-based (in which case we will use the function-name)
action_dict – Actions to add, keyed by name
- Returns:
The application builder for future chaining.
- with_entrypoint(
- action: str,
Adds an entrypoint to the application. This is the action that will be run first. This can only be called once.
- Parameters:
action – The name of the action to set as the entrypoint
- Returns:
The application builder for future chaining.
- with_graph(
- graph: Graph,
Adds a prebuilt graph – this is an alternative to using the with_actions and with_transitions methods. While you will likely use with_actions and with_transitions, you may want this in a few cases:
You want to reuse the same graph object for different applications
You want the logic that constructs the graph to be separate from that which constructs the application
You want to serialize/deserialize a graph object and run it in an application
- Parameters:
graph – Graph object built with the
GraphBuilder
- Returns:
The application builder for future chaining.
- with_hooks(
- *adapters: DoLogAttributeHook | PreRunStepHook | PreRunStepHookAsync | PostRunStepHook | PostRunStepHookAsync | PreApplicationExecuteCallHook | PreApplicationExecuteCallHookAsync | PostApplicationExecuteCallHook | PostApplicationExecuteCallHookAsync | PostApplicationCreateHook | PreStartSpanHook | PreStartSpanHookAsync | PostEndSpanHook | PostEndSpanHookAsync | PreStartStreamHook | PostStreamItemHook | PostEndStreamHook | PreStartStreamHookAsync | PostStreamItemHookAsync | PostEndStreamHookAsync,
Adds a lifecycle adapter to the application. This is a way to add hooks to the application so that they are run at the appropriate times. You can use this to synchronize state out, log results, etc…
- Parameters:
adapters – Adapter to add
- Returns:
The application builder for future chaining.
- with_identifiers(
- app_id: str = None,
- partition_key: str = None,
- sequence_id: int = None,
Assigns various identifiers to the application. This is used for tracking, persistence, etc…
- Parameters:
app_id – Application ID – this will be assigned to a uuid if not set.
partition_key – Partition key – this is used for disambiguating groups of applications. For instance, a unique user ID, etc… This is coupled to persistence, and is used to query for/select application runs.
sequence_id – Sequence ID that we want this to start at. If you’re using
.initialize
, this will be set. Otherwise this is solely for resetting/starting at a specified position.
- Returns:
The application builder for future chaining.
- with_parallel_executor(executor_factory: ~burr.core.application.<lambda>)¶
Assigns a default executor to be used for recursive/parallel sub-actions. This effectively allows for executing multiple Burr apps in parallel. See https://burr.dagworks.io/concepts/parallelism/ for more details.
This will default to a simple threadpool executor, meaning that you will be bound by the number of threads your computer can handle. If you want to use a more advanced executor, you can pass it in here – any subclass of concurrent.futures.Executor will work.
If you specify executors for specific tasks, this will default to that.
Note that, if you are using asyncio, you cannot specify an executor. It will default to using asyncio.gather with asyncio’s event loop.
- Parameters:
executor
- Returns:
- with_spawning_parent(
- app_id: str,
- sequence_id: int,
- partition_key: str | None = None,
Sets the ‘spawning’ parent application that created this app. This is used for tracking purposes. Doing this creates a parent/child relationship. There can be many spawned children from a single sequence ID (just as there can be many forks of an app).
Note the difference between this and forking. Forking allows you to create a new app where the old one left off. This suggests that this application is wholly contained within the parent application.
- Parameters:
app_id – ID of application that spawned this app
sequence_id – Sequence ID of the parent app that spawned this app
partition_key – Partition key of the parent app that spawned this app
- Returns:
The application builder for future chaining.
- with_state(
- state: State | StateTypeToSet | None = None,
- **kwargs,
Sets initial values in the state. If you want to load from a prior state, you can do so here and pass the values in.
TODO – enable passing in a state object instead of **kwargs
- Parameters:
kwargs – Key-value pairs to set in the state
- Returns:
The application builder for future chaining.
- with_state_persister(
- persister: BaseStateSaver | AsyncBaseStateSaver | DoLogAttributeHook | PreRunStepHook | PreRunStepHookAsync | PostRunStepHook | PostRunStepHookAsync | PreApplicationExecuteCallHook | PreApplicationExecuteCallHookAsync | PostApplicationExecuteCallHook | PostApplicationExecuteCallHookAsync | PostApplicationCreateHook | PreStartSpanHook | PreStartSpanHookAsync | PostEndSpanHook | PostEndSpanHookAsync | PreStartStreamHook | PostStreamItemHook | PostEndStreamHook | PreStartStreamHookAsync | PostStreamItemHookAsync | PostEndStreamHookAsync,
- on_every: str = 'step',
Adds a state persister to the application. This is a way to persist state out to a database, file, etc… at the specified interval. This is one of two options:
[normal mode] A BaseStateSaver object – this is a utility class that makes it easy to save/load
[power-user-mode] A lifecycle adapter – this is a custom class that you use to save state.
The framework will wrap the BaseStateSaver object in a PersisterHook, which is a post-run.
- Parameters:
persister – The persister to add
on_every – The interval to persist state. Currently only “step” is supported.
- Returns:
The application builder for future chaining.
- with_tracker(
- tracker: Literal['local'] | 'TrackingClient' = 'local',
- project: str = 'default',
- params: Dict[str, Any] = None,
- use_otel_tracing: bool = False,
Adds a “tracker” to the application. The tracker specifies a project name (used for disambiguating groups of tracers), and plugs into the Burr UI. This can either be:
A string (the only supported one right now is “local”), and a set of parameters for a set of supported trackers.
A lifecycle adapter object that does tracking (up to you how to implement it).
internally creates a
LocalTrackingClient
object, and adds it to the lifecycle adapters.adds the lifecycle adapter to the lifecycle adapters.
- Parameters:
tracker – Tracker to use.
local
creates one, else pass one in.project – Project name – used if the tracker is string-specified (local).
params – Parameters to pass to the tracker if it’s string-specified (local).
use_otel_tracing – Whether to log opentelemetry traces to the Burr UI. This is experimental but we will be adding full support shortly. This requires burr[opentelemetry] installed. Note you can also log burr to OpenTelemetry using the OpenTelemetry adapter
- Returns:
The application builder for future chaining.
- with_transitions(
- *transitions: Tuple[str | list[str], str] | Tuple[str | list[str], str, Condition],
- Adds transitions to the application. Transitions are specified as tuples of either:
(from, to, condition)
(from, to) – condition is set to DEFAULT (which is a fallback)
Transitions will be evaluated in order of specification – if one is met, the others will not be evaluated. Note that one transition can be terminal – the system doesn’t have
- Parameters:
transitions – Transitions to add
- Returns:
The application builder for future chaining.
- with_typing(
- typing_system: TypingSystem[StateTypeToSet],
Sets the typing system for the application. This is used to enforce typing on the state.
- Parameters:
typing_system – Typing system to use
- Returns:
The application builder for future chaining.
- class burr.core.application.Application(
- graph: Graph,
- state: State[ApplicationStateType],
- partition_key: str | None,
- uid: str,
- entrypoint: str,
- sequence_id: int | None = None,
- adapter_set: LifecycleAdapterSet | None = None,
- builder: 'ApplicationBuilder' | None = None,
- fork_parent_pointer: burr_types.ParentPointer | None = None,
- spawning_parent_pointer: burr_types.ParentPointer | None = None,
- tracker: 'TrackingClient' | None = None,
- parallel_executor_factory: Executor | None = None,
- state_persister: BaseStateSaver | LifecycleAdapter | None = None,
- state_initializer: BaseStateLoader | LifecycleAdapter | None = None,
- __init__(
- graph: Graph,
- state: State[ApplicationStateType],
- partition_key: str | None,
- uid: str,
- entrypoint: str,
- sequence_id: int | None = None,
- adapter_set: LifecycleAdapterSet | None = None,
- builder: 'ApplicationBuilder' | None = None,
- fork_parent_pointer: burr_types.ParentPointer | None = None,
- spawning_parent_pointer: burr_types.ParentPointer | None = None,
- tracker: 'TrackingClient' | None = None,
- parallel_executor_factory: Executor | None = None,
- state_persister: BaseStateSaver | LifecycleAdapter | None = None,
- state_initializer: BaseStateLoader | LifecycleAdapter | None = None,
Instantiates an Application. This is an internal API – use the builder!
- Parameters:
actions – Actions to run
transitions – Transitions between actions
state – State to run with
initial_step – Step name to start at
partition_key – Partition key for the application (optional)
uid – Unique identifier for the application
sequence_id – Sequence ID for the application. Note this will be incremented every run. So if this starts at 0, the first one you will see will be 1.
adapter_set – Set of lifecycle adapters
builder – Builder that created this application
- aiterate(
- *,
- halt_before: list[str] = None,
- halt_after: list[str] = None,
- inputs: Dict[str, Any] | None = None,
Returns a generator that calls step() in a row, enabling you to see the state of the system as it updates. This is the asynchronous version so it has no capability of t
- Parameters:
halt_before – The list of actions to halt before execution of. It will halt on the first one.
halt_after – The list of actions to halt after execution of. It will halt on the first one.
inputs – Inputs to the action – this is if this action requires an input that is passed in from the outside world. Note that this is only used for the first iteration – subsequent iterations will not use this.
- Returns:
Each iteration returns the result of running step. This returns nothing – it’s an async generator which is not allowed to have a return value.
- arun(
- *,
- halt_before: list[str] = None,
- halt_after: list[str] = None,
- inputs: Dict[str, Any] | None = None,
Runs your application through until completion, using async. Does not give access to the state along the way – if you want that, use iterate().
- Parameters:
halt_before – The list of actions to halt before execution of. It will halt on the first one.
halt_after – The list of actions to halt after execution of. It will halt on the first one.
inputs – Inputs to the action – this is if this action requires an input that is passed in from the outside world
- Returns:
The final state, and the results of running the actions in the order that they were specified.
- async astep(
- inputs: Dict[str, Any] | None = None,
Asynchronous version of step.
- Parameters:
inputs – Inputs to the action – this is if this action requires an input that is passed in from the outside world
- Returns:
Tuple[Function, dict, State] – the action that was just ran, the result of running it, and the new state
- astream_result(
- halt_after: list[str],
- halt_before: list[str] | None = None,
- inputs: Dict[str, Any] | None = None,
Streams a result out in an asynchronous manner.
- Parameters:
halt_after – The list of actions to halt after execution of. It will halt on the first one.
halt_before – The list of actions to halt before execution of. It will halt on the first one. Note that if this is met, the streaming result container will be empty (and return None) for the result, having an empty generator.
inputs – Inputs to the action – this is if this action requires an input that is passed in from the outside world
- Returns:
An asynchronous
AsyncStreamingResultContainer
, which is a generator that will yield results as they come in, as well as cache/give you the final result, and update state accordingly.
This is meant to be used with streaming actions –
streaming_action
orStreamingAction
It returns aStreamingResultContainer
, which has two capabilities:It is a generator that streams out the intermediate results of the action
It has an async
.get()
method that returns the final result of the action, and the final state.
If
.get()
is called before the generator is exhausted, it will block until the generator is exhausted.While this container is meant to work with streaming actions, it can also be used with non-streaming actions. In this case, the generator will be empty, and the
.get()
method will return the final result and state.The rules for halt_before and halt_after are the same as for
iterate
, andrun
. In this case, halt_before will indicate a non streaming action, which will be empty. Thushalt_after
takes precedence – if it is met, the streaming result container will contain the result of the halt_after condition.The
AsyncStreamingResultContainer
is meant as a convenience – specifically this allows for hooks, callbacks, etc… so you can take the control flow and still have state updated afterwards. Hooks/state update will be called after an exception is thrown during streaming, or the stream is completed. Note that it is undefined behavior to attempt to execute another action while a stream is in progress.To see how this works, let’s take the following action (simplified as a single-node workflow) as an example:
client = openai.AsyncClient() @streaming_action(reads=[], writes=['response']) async def streaming_response(state: State, prompt: str) -> Generator[dict, None, Tuple[dict, State]]: response = client.chat.completions.create( model='gpt-3.5-turbo', messages=[{ 'role': 'user', 'content': prompt }], temperature=0, ) buffer = [] async for chunk in response: # use an async for loop delta = chunk.choices[0].delta.content buffer.append(delta) # yield partial results yield {'response': delta}, None # indicate that we are not done by returning a `None` state! # make sure to join with the buffer! full_response = ''.join(buffer) # yield the final result at the end + the state update yield {'response': full_response}, state.update(response=full_response)
To use streaming_result, you pass in names of streaming actions (such as the one above) to the halt_after parameter:
application = ApplicationBuilder().with_actions(streaming_response=streaming_response)...build() prompt = "Count to 100, with a comma between each number and no newlines. E.g., 1, 2, 3, ..." action, streaming_result = application.astream_result(halt_after='streaming_response', inputs={"prompt": prompt}) async for result in streaming_result: print(result['response']) # one by one result, state = await streaming_result.get() print(result['response']) # all at once
Note that if you have multiple halt_after conditions, you can use the
.action
attribute to get the action that was run.application = ApplicationBuilder().with_actions( streaming_response=streaming_response, error=error # another function that outputs an error, streaming )...build() prompt = "Count to 100, with a comma between each number and no newlines. E.g., 1, 2, 3, ..." action, streaming_result = await application.astream_result(halt_after='streaming_response', inputs={"prompt": prompt}) color = "red" if action.name == "error" else "green" for result in streaming_result: print(format(result['response'], color)) # assumes that error and streaming_response both have the same output shape
application = ApplicationBuilder().with_actions( streaming_response=streaming_response, error=non_streaming_error # a non-streaming function that outputs an error )...build() prompt = "Count to 100, with a comma between each number and no newlines. E.g., 1, 2, 3, ..." action, streaming_result = await application.astream_result(halt_after='streaming_response', inputs={"prompt": prompt}) color = "red" if action.name == "error" else "green" if action.name == "streaming_response": # can also use the ``.streaming`` attribute of action async for result in streaming_result: print(format(result['response'], color)) # assumes that error and streaming_response both have the same output shape else: result, state = await output.get() print(format(result['response'], color))
- property builder: ApplicationBuilder[ApplicationStateType] | None¶
Returns the application builder that was used to build this application. Note that this asusmes the application was built using the builder. Otherwise,
- Returns:
The application builder
- property context: ApplicationContext¶
Gives the application context. This has information you need for the tracker, sequence ID, application, etc…
- Returns:
Application context
- property graph: ApplicationGraph¶
Application graph object – if you want to inspect, visualize, etc.. this is what you want.
- Returns:
The application graph object
- has_next_action() bool ¶
Returns whether or not there is a next action to run.
- Returns:
True if there is a next action, False otherwise
- iterate(
- *,
- halt_before: list[str] = None,
- halt_after: list[str] = None,
- inputs: Dict[str, Any] | None = None,
Returns a generator that calls step() in a row, enabling you to see the state of the system as it updates. Note this returns a generator, and also the final result (for convenience).
Note the nuance with halt_before and halt_after. halt_before conditions will take precedence to halt_after. Furthermore, a single iteration will always be executed prior to testing for any halting conditions.
- Parameters:
halt_before – The list of actions to halt before execution of. It will halt prior to the execution of the first one it sees.
halt_after – The list of actions to halt after execution of. It will halt after the execution of the first one it sees.
inputs – Inputs to the action – this is if this action requires an input that is passed in from the outside world. Note that this is only used for the first iteration – subsequent iterations will not use this.
- Returns:
Each iteration returns the result of running step. This generator also returns a tuple of [action, result, current state]
- property parent_pointer: ParentPointer | None¶
Gives the parent pointer of an application (from where it was forked). This is None if it was not forked.
Forking is the process of starting an application off of another.
- Returns:
The parent pointer object.
- property partition_key: str | None¶
Partition key for the application. This is designed to add semantic meaning to the application, and be leveraged by persistence systems to select/find applications.
Note this is optional – if it is not included, you will need to use a persister that supports a null partition key.
- Returns:
The partition key, None if not set
- reset_to_entrypoint() None ¶
Resets the state machine to the entrypoint action – you probably want to consider having a loop in your graph, but this will do the trick if you need it!
- run(
- *,
- halt_before: list[str] = None,
- halt_after: list[str] = None,
- inputs: Dict[str, Any] | None = None,
Runs your application through until completion. Does not give access to the state along the way – if you want that, use iterate().
- Parameters:
halt_before – The list of actions to halt before execution of. It will halt on the first one.
halt_after – The list of actions to halt after execution of. It will halt on the first one.
inputs – Inputs to the action – this is if this action requires an input that is passed in from the outside world. Note that this is only used for the first iteration – subsequent iterations will not use this.
- Returns:
The final state, and the results of running the actions in the order that they were specified.
- property sequence_id: int | None¶
gives the sequence ID of the current (next) action. This is incremented prior to every step. Any logging, etc… will use the current step’s sequence ID
- Returns:
The sequence ID of the current (next) action
- property spawning_parent_pointer: ParentPointer | None¶
Gives the parent pointer of an application (from where it was spawned). This is None if it was not spawned.
Spawning is the process of launching an application from within a step of another. This is used for recursive tracking.
- Returns:
The parent pointer object.
- property state: State[ApplicationStateType]¶
Gives the state. Recall that state is purely immutable – anything you do with this state will not be persisted unless you subsequently call update_state.
- Returns:
The current state object.
- step(
- inputs: Dict[str, Any] | None = None,
Performs a single step, advancing the state machine along. This returns a tuple of the action that was run, the result of running the action, and the new state.
Use this if you just want to do something with the state and not rely on generators. E.G. press forward/backwards, human in the loop, etc… Odds are this is not the method you want – you’ll want iterate() (if you want to see the state/ results along the way), or run() (if you just want the final state/results).
- Parameters:
inputs – Inputs to the action – this is if this action requires an input that is passed in from the outside world
- Returns:
Tuple[Function, dict, State] – the function that was just ran, the result of running it, and the new state
- stream_result(
- halt_after: list[str],
- halt_before: list[str] | None = None,
- inputs: Dict[str, Any] | None = None,
Streams a result out.
- Parameters:
halt_after – The list of actions to halt after execution of. It will halt on the first one.
halt_before – The list of actions to halt before execution of. It will halt on the first one. Note that if this is met, the streaming result container will be empty (and return None) for the result, having an empty generator.
inputs – Inputs to the action – this is if this action requires an input that is passed in from the outside world
- Returns:
A streaming result container, which is a generator that will yield results as they come in, as well as cache/give you the final result, and update state accordingly.
This is meant to be used with streaming actions –
streaming_action
orStreamingAction
It returns aStreamingResultContainer
, which has two capabilities:It is a generator that streams out the intermediate results of the action
It has a
.get()
method that returns the final result of the action, and the final state.
If
.get()
is called before the generator is exhausted, it will block until the generator is exhausted.While this container is meant to work with streaming actions, it can also be used with non-streaming actions. In this case, the generator will be empty, and the
.get()
method will return the final result and state.The rules for halt_before and halt_after are the same as for
iterate
, andrun
. In this case, halt_before will indicate a non streaming action, which will be empty. Thushalt_after
takes precedence – if it is met, the streaming result container will contain the result of the halt_after condition.The
StreamingResultContainer
is meant as a convenience – specifically this allows for hooks, callbacks, etc… so you can take the control flow and still have state updated afterwards. Hooks/state update will be called after an exception is thrown during streaming, or the stream is completed. Note that it is undefined behavior to attempt to execute another action while a stream is in progress.To see how this works, let’s take the following action (simplified as a single-node workflow) as an example:
@streaming_action(reads=[], writes=['response']) def streaming_response(state: State, prompt: str) -> Generator[dict, None, Tuple[dict, State]]: response = client.chat.completions.create( model='gpt-3.5-turbo', messages=[{ 'role': 'user', 'content': prompt }], temperature=0, ) buffer = [] for chunk in response: delta = chunk.choices[0].delta.content buffer.append(delta) # yield partial results yield {'response': delta}, None # indicate that we are not done by returning a `None` state! full_response = ''.join(buffer) # return the final result yield {'response': full_response}, state.update(response=full_response)
To use streaming_result, you pass in names of streaming actions (such as the one above) to the halt_after parameter:
application = ApplicationBuilder().with_actions(streaming_response=streaming_response)...build() prompt = "Count to 100, with a comma between each number and no newlines. E.g., 1, 2, 3, ..." action, streaming_result = application.stream_result(halt_after='streaming_response', inputs={"prompt": prompt}) for result in streaming_result: print(result['response']) # one by one result, state = streaming_result.get() print(result) # all at once
Note that if you have multiple halt_after conditions, you can use the
.action
attribute to get the action that was run.application = ApplicationBuilder().with_actions( streaming_response=streaming_response, error=error # another function that outputs an error, streaming )...build() prompt = "Count to 100, with a comma between each number and no newlines. E.g., 1, 2, 3, ..." action, streaming_result = application.stream_result(halt_after='streaming_response', inputs={"prompt": prompt}) color = "red" if action.name == "error" else "green" for result in streaming_result: print(format(result['response'], color)) # assumes that error and streaming_response both have the same output shape
application = ApplicationBuilder().with_actions( streaming_response=streaming_response, error=non_streaming_error # a non-streaming function that outputs an error )...build() prompt = "Count to 100, with a comma between each number and no newlines. E.g., 1, 2, 3, ..." action, streaming_result = application.stream_result(halt_after='streaming_response', inputs={"prompt": prompt}) color = "red" if action.name == "error" else "green" if action.name == "streaming_response": # can also use the ``.streaming`` attribute of action for result in output: print(format(result['response'], color)) # assumes that error and streaming_response both have the same output shape else: result, state = output.get() print(format(result['response'], color))
- property uid: str¶
Unique ID for the application. This must be unique across all applications in a search space. This is used by persistence/tracking to ensure that applications have meanings.
Every application has this – if not assigned, it will be randomly generated.
- Returns:
The unique ID for the application
- update_state(
- new_state: State[ApplicationStateType],
Updates state – this is meant to be called if you need to do anything with the state. For example: 1. Reset it (after going through a loop) 2. Store to some external source/log out
- Parameters:
new_state
- Returns:
- validate_correct_async_use()¶
Validates that the application is meant to run async. This validation is performed in synchronous application methods.
- visualize(
- output_file_path: str | None = None,
- include_conditions: bool = False,
- include_state: bool = False,
- view: bool = False,
- engine: Literal['graphviz'] = 'graphviz',
- write_dot: bool = False,
- **engine_kwargs: Any,
Visualizes the application graph using graphviz. This will render the graph.
- Parameters:
output_file_path – The path to save this to, None if you don’t want to save. Do not pass an extension for graphviz, instead pass format in engine_kwargs (e.g. format=”png”)
include_conditions – Whether to include condition strings on the edges (this can get noisy)
include_state – Whether to indicate the action “signature” (reads/writes) on the nodes
view – Whether to bring up a view
engine – The engine to use – only graphviz is supported for now
write_dot – If True, produce a graphviz dot file
engine_kwargs – Additional kwargs to pass to the engine
- Returns:
The graphviz object
- class burr.core.application.ApplicationGraph( )¶
User-facing representation of the state machine. This has
All the action objects
All the transition objects
The entrypoint action
- class burr.core.application.ApplicationContext(
- app_id: str,
- partition_key: str | None,
- sequence_id: int | None,
- tracker: 'TrackingClient' | None,
- parallel_executor_factory: Callable[[], Executor],
- state_initializer: BaseStateLoader | None,
- state_persister: BaseStateSaver | None,
Application context. This is anything your node might need to know about the application. Often used for recursive tracking.
Note this is also a context manager (allowing you to pass context to sub-applications).
To access this object in a running application, you can use the __context variable in the action signature:
from burr.core import action, State, ApplicationContext @action(reads=[...], writes=[...]) def my_action(state: State, __context: ApplicationContext) -> State: app_id = __context.app_id partition_key = __context.partition_key ...
- static get() ApplicationContext | None ¶
Provides the context-local application context. You can use this instead of declaring __context in an application. You really should only be using this if you’re wiring through multiple layers of abstraction and want to connect two applications.
- Returns:
The ApplicationContext you’ll want to use
Graph APIs¶
You can, optionally, use the graph API along with the burr.core.application.ApplicationBuilder.with_graph()
method. While this is a little more verbose, it helps decouple application logic from graph logic, and is useful in a host
of situations.
The GraphBuilder
class is used to build a graph, and the Graph
class can be passed to the application builder.
- class burr.core.graph.GraphBuilder¶
GraphBuilder class. This allows you to construct a graph without considering application concerns. While you can (and at first, should) use the ApplicationBuidler (which has the same API), this is lower level and allows you to decouple concerns, reuse the same graph object, etc…
- with_actions( ) GraphBuilder ¶
Adds an action to the application. The actions are granted names (using the with_name) method post-adding, using the kw argument. If it already has a name (or you wish to use the function name, raw, and it is a function-based-action), then you can use the args parameter. This is the only supported way to add actions.
- Parameters:
action_list – Actions to add – these must have a name or be function-based (in which case we will use the function-name)
action_dict – Actions to add, keyed by name
- Returns:
The application builder for future chaining.
- with_transitions(
- *transitions: Tuple[str | list[str], str] | Tuple[str | list[str], str, Condition],
- Adds transitions to the graph. Transitions are specified as tuples of either:
(from, to, condition)
(from, to) – condition is set to DEFAULT (which is a fallback)
Transitions will be evaluated in order of specification – if one is met, the others will not be evaluated. Note that one transition can be terminal – the system doesn’t have
- Parameters:
transitions – Transitions to add
- Returns:
The application builder for future chaining.
- class burr.core.graph.Graph(
- actions: List[Action],
- transitions: List[Transition],
Graph class allows you to specify actions and transitions between them. You will never instantiate this directly, just through the GraphBuilder, or indirectly through the ApplicationBuilder.
- get_next_node(
- prior_step: str | None,
- state: State,
- entrypoint: str,
Gives the next node to execute given state + prior step.
- visualize(
- output_file_path: str | Path | None = None,
- include_conditions: bool = False,
- include_state: bool = False,
- view: bool = False,
- engine: Literal['graphviz'] = 'graphviz',
- write_dot: bool = False,
- **engine_kwargs: Any,
Visualizes the graph using graphviz. This will render the graph.
- Parameters:
output_file_path – The path to save this to, None if you don’t want to save. Do not pass an extension for graphviz, instead pass format in engine_kwargs (e.g. format=”png”)
include_conditions – Whether to include condition strings on the edges (this can get noisy)
include_state – Whether to indicate the action “signature” (reads/writes) on the nodes
view – Whether to bring up a view
engine – The engine to use – only graphviz is supported for now
write_dot – If True, produce a graphviz dot file
engine_kwargs – Additional kwargs to pass to the engine
- Returns:
The graphviz object