Applications

Use this to build and manage a state Machine. You should only ever instantiate the ApplicationBuilder class, and not the Application class directly.

class burr.core.application.ApplicationBuilder
build() Application[StateType]

Builds the application.

This function is a bit messy as we iron out the exact logic and rigor we want around things.

Returns:

The application object

initialize_from(
initializer: BaseStateLoader,
resume_at_next_action: bool,
default_state: dict,
default_entrypoint: str,
fork_from_app_id: str = None,
fork_from_partition_key: str = None,
fork_from_sequence_id: int = None,
) ApplicationBuilder[StateType]

Initializes the application we will build from some prior state object.

Note (1) that you can either call this or use with_state and with_entrypoint.

Note (2) if you want to continue a prior application and don’t want to fork it into a new application ID, the values in .with_identifiers() will be used to query for prior state.

Parameters:
  • initializer – The persister object to use for initialization. Likely the same one called with with_state_persister.

  • resume_at_next_action – Whether to resume at the next action, or default to the default_entrypoint

  • default_state – The default state to use if it does not exist. This is a dictionary.

  • default_entrypoint – The default entry point to use if it does not exist or you elect not to resume_at_next_action.

  • fork_from_app_id – The app ID to fork from, not to be confused with the current app_id that is set with .with_identifiers(). This is used to fork from a prior application run.

  • fork_from_partition_key – The partition key to fork from a prior application. Optional. fork_from_app_id required.

  • fork_from_sequence_id – The sequence ID to fork from a prior application run. Optional, defaults to latest. fork_from_app_id required.

Returns:

The application builder for future chaining.

with_actions(
*action_list: Action | Callable,
**action_dict: Action | Callable,
) ApplicationBuilder[StateType]

Adds an action to the application. The actions are granted names (using the with_name) method post-adding, using the kw argument. If it already has a name (or you wish to use the function name, raw, and it is a function-based-action), then you can use the args parameter. This is the only supported way to add actions.

Parameters:
  • action_list – Actions to add – these must have a name or be function-based (in which case we will use the function-name)

  • action_dict – Actions to add, keyed by name

Returns:

The application builder for future chaining.

with_entrypoint(
action: str,
) ApplicationBuilder[StateType]

Adds an entrypoint to the application. This is the action that will be run first. This can only be called once.

Parameters:

action – The name of the action to set as the entrypoint

Returns:

The application builder for future chaining.

with_graph(
graph: Graph,
) ApplicationBuilder[StateType]

Adds a prebuilt graph – this is an alternative to using the with_actions and with_transitions methods. While you will likely use with_actions and with_transitions, you may want this in a few cases:

  1. You want to reuse the same graph object for different applications

  2. You want the logic that constructs the graph to be separate from that which constructs the application

  3. You want to serialize/deserialize a graph object and run it in an application

Parameters:

graph – Graph object built with the GraphBuilder

Returns:

The application builder for future chaining.

with_hooks(
*adapters: DoLogAttributeHook | PreRunStepHook | PreRunStepHookAsync | PostRunStepHook | PostRunStepHookAsync | PreApplicationExecuteCallHook | PreApplicationExecuteCallHookAsync | PostApplicationExecuteCallHook | PostApplicationExecuteCallHookAsync | PostApplicationCreateHook | PreStartSpanHook | PreStartSpanHookAsync | PostEndSpanHook | PostEndSpanHookAsync | PreStartStreamHook | PostStreamItemHook | PostEndStreamHook | PreStartStreamHookAsync | PostStreamItemHookAsync | PostEndStreamHookAsync,
) ApplicationBuilder[StateType]

Adds a lifecycle adapter to the application. This is a way to add hooks to the application so that they are run at the appropriate times. You can use this to synchronize state out, log results, etc…

Parameters:

adapters – Adapter to add

Returns:

The application builder for future chaining.

with_identifiers(
app_id: str = None,
partition_key: str = None,
sequence_id: int = None,
) ApplicationBuilder[StateType]

Assigns various identifiers to the application. This is used for tracking, persistence, etc…

Parameters:
  • app_id – Application ID – this will be assigned to a uuid if not set.

  • partition_key – Partition key – this is used for disambiguating groups of applications. For instance, a unique user ID, etc… This is coupled to persistence, and is used to query for/select application runs.

  • sequence_id – Sequence ID that we want this to start at. If you’re using .initialize, this will be set. Otherwise this is solely for resetting/starting at a specified position.

Returns:

The application builder for future chaining.

with_parallel_executor(executor_factory: ~burr.core.application.<lambda>)

Assigns a default executor to be used for recursive/parallel sub-actions. This effectively allows for executing multiple Burr apps in parallel. See https://burr.dagworks.io/pull/concepts/parallelism/ for more details.

This will default to a simple threadpool executor, meaning that you will be bound by the number of threads your computer can handle. If you want to use a more advanced executor, you can pass it in here – any subclass of concurrent.futures.Executor will work.

If you specify executors for specific tasks, this will default to that.

Note that, if you are using asyncio, you cannot specify an executor. It will default to using asyncio.gather with asyncio’s event loop.

Parameters:

executor

Returns:

with_spawning_parent(
app_id: str,
sequence_id: int,
partition_key: str | None = None,
) ApplicationBuilder[StateType]

Sets the ‘spawning’ parent application that created this app. This is used for tracking purposes. Doing this creates a parent/child relationship. There can be many spawned children from a single sequence ID (just as there can be many forks of an app).

Note the difference between this and forking. Forking allows you to create a new app where the old one left off. This suggests that this application is wholly contained within the parent application.

Parameters:
  • app_id – ID of application that spawned this app

  • sequence_id – Sequence ID of the parent app that spawned this app

  • partition_key – Partition key of the parent app that spawned this app

Returns:

The application builder for future chaining.

with_state(
state: State | StateTypeToSet | None = None,
**kwargs,
) ApplicationBuilder[StateType]

Sets initial values in the state. If you want to load from a prior state, you can do so here and pass the values in.

TODO – enable passing in a state object instead of **kwargs

Parameters:

kwargs – Key-value pairs to set in the state

Returns:

The application builder for future chaining.

with_state_persister(
persister: BaseStateSaver | DoLogAttributeHook | PreRunStepHook | PreRunStepHookAsync | PostRunStepHook | PostRunStepHookAsync | PreApplicationExecuteCallHook | PreApplicationExecuteCallHookAsync | PostApplicationExecuteCallHook | PostApplicationExecuteCallHookAsync | PostApplicationCreateHook | PreStartSpanHook | PreStartSpanHookAsync | PostEndSpanHook | PostEndSpanHookAsync | PreStartStreamHook | PostStreamItemHook | PostEndStreamHook | PreStartStreamHookAsync | PostStreamItemHookAsync | PostEndStreamHookAsync,
on_every: str = 'step',
) ApplicationBuilder[StateType]

Adds a state persister to the application. This is a way to persist state out to a database, file, etc… at the specified interval. This is one of two options:

  1. [normal mode] A BaseStateSaver object – this is a utility class that makes it easy to save/load

  2. [power-user-mode] A lifecycle adapter – this is a custom class that you use to save state.

The framework will wrap the BaseStateSaver object in a PersisterHook, which is a post-run.

Parameters:
  • persister – The persister to add

  • on_every – The interval to persist state. Currently only “step” is supported.

Returns:

The application builder for future chaining.

with_tracker(
tracker: Literal['local'] | 'TrackingClient' = 'local',
project: str = 'default',
params: Dict[str, Any] = None,
use_otel_tracing: bool = False,
)

Adds a “tracker” to the application. The tracker specifies a project name (used for disambiguating groups of tracers), and plugs into the Burr UI. This can either be:

  1. A string (the only supported one right now is “local”), and a set of parameters for a set of supported trackers.

  2. A lifecycle adapter object that does tracking (up to you how to implement it).

  1. internally creates a LocalTrackingClient object, and adds it to the lifecycle adapters.

  2. adds the lifecycle adapter to the lifecycle adapters.

Parameters:
  • tracker – Tracker to use. local creates one, else pass one in.

  • project – Project name – used if the tracker is string-specified (local).

  • params – Parameters to pass to the tracker if it’s string-specified (local).

  • use_otel_tracing – Whether to log opentelemetry traces to the Burr UI. This is experimental but we will be adding full support shortly. This requires burr[opentelemetry] installed. Note you can also log burr to OpenTelemetry using the OpenTelemetry adapter

Returns:

The application builder for future chaining.

with_transitions(
*transitions: Tuple[str | list[str], str] | Tuple[str | list[str], str, Condition],
) ApplicationBuilder[StateType]
Adds transitions to the application. Transitions are specified as tuples of either:
  1. (from, to, condition)

  2. (from, to) – condition is set to DEFAULT (which is a fallback)

Transitions will be evaluated in order of specification – if one is met, the others will not be evaluated. Note that one transition can be terminal – the system doesn’t have

Parameters:

transitions – Transitions to add

Returns:

The application builder for future chaining.

with_typing(
typing_system: TypingSystem[StateTypeToSet],
) ApplicationBuilder[StateTypeToSet]

Sets the typing system for the application. This is used to enforce typing on the state.

Parameters:

typing_system – Typing system to use

Returns:

The application builder for future chaining.

class burr.core.application.Application(
graph: Graph,
state: State[ApplicationStateType],
partition_key: str | None,
uid: str,
entrypoint: str,
sequence_id: int | None = None,
adapter_set: LifecycleAdapterSet | None = None,
builder: 'ApplicationBuilder' | None = None,
fork_parent_pointer: burr_types.ParentPointer | None = None,
spawning_parent_pointer: burr_types.ParentPointer | None = None,
tracker: 'TrackingClient' | None = None,
parallel_executor_factory: Executor | None = None,
)
__init__(
graph: Graph,
state: State[ApplicationStateType],
partition_key: str | None,
uid: str,
entrypoint: str,
sequence_id: int | None = None,
adapter_set: LifecycleAdapterSet | None = None,
builder: 'ApplicationBuilder' | None = None,
fork_parent_pointer: burr_types.ParentPointer | None = None,
spawning_parent_pointer: burr_types.ParentPointer | None = None,
tracker: 'TrackingClient' | None = None,
parallel_executor_factory: Executor | None = None,
)

Instantiates an Application. This is an internal API – use the builder!

Parameters:
  • actions – Actions to run

  • transitions – Transitions between actions

  • state – State to run with

  • initial_step – Step name to start at

  • partition_key – Partition key for the application (optional)

  • uid – Unique identifier for the application

  • sequence_id – Sequence ID for the application. Note this will be incremented every run. So if this starts at 0, the first one you will see will be 1.

  • adapter_set – Set of lifecycle adapters

  • builder – Builder that created this application

aiterate(
*,
halt_before: list[str] = None,
halt_after: list[str] = None,
inputs: Dict[str, Any] | None = None,
) AsyncGenerator[Tuple[Action, dict, State[ApplicationStateType]], None]

Returns a generator that calls step() in a row, enabling you to see the state of the system as it updates. This is the asynchronous version so it has no capability of t

Parameters:
  • halt_before – The list of actions to halt before execution of. It will halt on the first one.

  • halt_after – The list of actions to halt after execution of. It will halt on the first one.

  • inputs – Inputs to the action – this is if this action requires an input that is passed in from the outside world. Note that this is only used for the first iteration – subsequent iterations will not use this.

Returns:

Each iteration returns the result of running step. This returns nothing – it’s an async generator which is not allowed to have a return value.

arun(
*,
halt_before: list[str] = None,
halt_after: list[str] = None,
inputs: Dict[str, Any] | None = None,
) Tuple[Action, dict | None, State]

Runs your application through until completion, using async. Does not give access to the state along the way – if you want that, use iterate().

Parameters:
  • halt_before – The list of actions to halt before execution of. It will halt on the first one.

  • halt_after – The list of actions to halt after execution of. It will halt on the first one.

  • inputs – Inputs to the action – this is if this action requires an input that is passed in from the outside world

Returns:

The final state, and the results of running the actions in the order that they were specified.

async astep(
inputs: Dict[str, Any] | None = None,
) Tuple[Action, dict, State[ApplicationStateType]] | None

Asynchronous version of step.

Parameters:

inputs – Inputs to the action – this is if this action requires an input that is passed in from the outside world

Returns:

Tuple[Function, dict, State] – the action that was just ran, the result of running it, and the new state

astream_result(
halt_after: list[str],
halt_before: list[str] | None = None,
inputs: Dict[str, Any] | None = None,
) Tuple[Action, AsyncStreamingResultContainer[ApplicationStateType, dict | Any]]

Streams a result out in an asynchronous manner.

Parameters:
  • halt_after – The list of actions to halt after execution of. It will halt on the first one.

  • halt_before – The list of actions to halt before execution of. It will halt on the first one. Note that if this is met, the streaming result container will be empty (and return None) for the result, having an empty generator.

  • inputs – Inputs to the action – this is if this action requires an input that is passed in from the outside world

Returns:

An asynchronous AsyncStreamingResultContainer, which is a generator that will yield results as they come in, as well as cache/give you the final result, and update state accordingly.

This is meant to be used with streaming actions – streaming_action or StreamingAction It returns a StreamingResultContainer, which has two capabilities:

  1. It is a generator that streams out the intermediate results of the action

  2. It has an async .get() method that returns the final result of the action, and the final state.

If .get() is called before the generator is exhausted, it will block until the generator is exhausted.

While this container is meant to work with streaming actions, it can also be used with non-streaming actions. In this case, the generator will be empty, and the .get() method will return the final result and state.

The rules for halt_before and halt_after are the same as for iterate, and run. In this case, halt_before will indicate a non streaming action, which will be empty. Thus halt_after takes precedence – if it is met, the streaming result container will contain the result of the halt_after condition.

The AsyncStreamingResultContainer is meant as a convenience – specifically this allows for hooks, callbacks, etc… so you can take the control flow and still have state updated afterwards. Hooks/state update will be called after an exception is thrown during streaming, or the stream is completed. Note that it is undefined behavior to attempt to execute another action while a stream is in progress.

To see how this works, let’s take the following action (simplified as a single-node workflow) as an example:

client = openai.AsyncClient()

@streaming_action(reads=[], writes=['response'])
async def streaming_response(state: State, prompt: str) -> Generator[dict, None, Tuple[dict, State]]:
    response = client.chat.completions.create(
        model='gpt-3.5-turbo',
        messages=[{
            'role': 'user',
            'content': prompt
            }],
        temperature=0,
    )
    buffer = []
    async for chunk in response: # use an async for loop
        delta = chunk.choices[0].delta.content
        buffer.append(delta)
        # yield partial results
        yield {'response': delta}, None # indicate that we are not done by returning a `None` state!
    # make sure to join with the buffer!
    full_response = ''.join(buffer)
    # yield the final result at the end + the state update
    yield {'response': full_response}, state.update(response=full_response)

To use streaming_result, you pass in names of streaming actions (such as the one above) to the halt_after parameter:

application = ApplicationBuilder().with_actions(streaming_response=streaming_response)...build()
prompt = "Count to 100, with a comma between each number and no newlines. E.g., 1, 2, 3, ..."
action, streaming_result = application.astream_result(halt_after='streaming_response', inputs={"prompt": prompt})
async for result in streaming_result:
    print(result['response']) # one by one

result, state = await streaming_result.get()
print(result['response']) #  all at once

Note that if you have multiple halt_after conditions, you can use the .action attribute to get the action that was run.

application = ApplicationBuilder().with_actions(
    streaming_response=streaming_response,
    error=error # another function that outputs an error, streaming
)...build()
prompt = "Count to 100, with a comma between each number and no newlines. E.g., 1, 2, 3, ..."
action, streaming_result = await application.astream_result(halt_after='streaming_response', inputs={"prompt": prompt})
color = "red" if action.name == "error" else "green"
for result in streaming_result:
    print(format(result['response'], color)) # assumes that error and streaming_response both have the same output shape
application = ApplicationBuilder().with_actions(
    streaming_response=streaming_response,
    error=non_streaming_error # a non-streaming function that outputs an error
)...build()
prompt = "Count to 100, with a comma between each number and no newlines. E.g., 1, 2, 3, ..."
action, streaming_result = await application.astream_result(halt_after='streaming_response', inputs={"prompt": prompt})
color = "red" if action.name == "error" else "green"
if action.name == "streaming_response": # can also use the ``.streaming`` attribute of action
    async for result in streaming_result:
         print(format(result['response'], color)) # assumes that error and streaming_response both have the same output shape
else:
    result, state = await output.get()
    print(format(result['response'], color))
property builder: ApplicationBuilder[ApplicationStateType] | None

Returns the application builder that was used to build this application. Note that this asusmes the application was built using the builder. Otherwise,

Returns:

The application builder

property context: ApplicationContext

Gives the application context. This has information you need for the tracker, sequence ID, application, etc…

Returns:

Application context

property graph: ApplicationGraph

Application graph object – if you want to inspect, visualize, etc.. this is what you want.

Returns:

The application graph object

has_next_action() bool

Returns whether or not there is a next action to run.

Returns:

True if there is a next action, False otherwise

iterate(
*,
halt_before: list[str] = None,
halt_after: list[str] = None,
inputs: Dict[str, Any] | None = None,
) Generator[Tuple[Action, dict, State[ApplicationStateType]], None, Tuple[Action, dict | None, State[ApplicationStateType]]]

Returns a generator that calls step() in a row, enabling you to see the state of the system as it updates. Note this returns a generator, and also the final result (for convenience).

Note the nuance with halt_before and halt_after. halt_before conditions will take precedence to halt_after. Furthermore, a single iteration will always be executed prior to testing for any halting conditions.

Parameters:
  • halt_before – The list of actions to halt before execution of. It will halt prior to the execution of the first one it sees.

  • halt_after – The list of actions to halt after execution of. It will halt after the execution of the first one it sees.

  • inputs – Inputs to the action – this is if this action requires an input that is passed in from the outside world. Note that this is only used for the first iteration – subsequent iterations will not use this.

Returns:

Each iteration returns the result of running step. This generator also returns a tuple of [action, result, current state]

property parent_pointer: ParentPointer | None

Gives the parent pointer of an application (from where it was forked). This is None if it was not forked.

Forking is the process of starting an application off of another.

Returns:

The parent pointer object.

property partition_key: str | None

Partition key for the application. This is designed to add semantic meaning to the application, and be leveraged by persistence systems to select/find applications.

Note this is optional – if it is not included, you will need to use a persister that supports a null partition key.

Returns:

The partition key, None if not set

reset_to_entrypoint() None

Resets the state machine to the entrypoint action – you probably want to consider having a loop in your graph, but this will do the trick if you need it!

run(
*,
halt_before: list[str] = None,
halt_after: list[str] = None,
inputs: Dict[str, Any] | None = None,
) Tuple[Action, dict | None, State[ApplicationStateType]]

Runs your application through until completion. Does not give access to the state along the way – if you want that, use iterate().

Parameters:
  • halt_before – The list of actions to halt before execution of. It will halt on the first one.

  • halt_after – The list of actions to halt after execution of. It will halt on the first one.

  • inputs – Inputs to the action – this is if this action requires an input that is passed in from the outside world. Note that this is only used for the first iteration – subsequent iterations will not use this.

Returns:

The final state, and the results of running the actions in the order that they were specified.

property sequence_id: int | None

gives the sequence ID of the current (next) action. This is incremented prior to every step. Any logging, etc… will use the current step’s sequence ID

Returns:

The sequence ID of the current (next) action

property spawning_parent_pointer: ParentPointer | None

Gives the parent pointer of an application (from where it was spawned). This is None if it was not spawned.

Spawning is the process of launching an application from within a step of another. This is used for recursive tracking.

Returns:

The parent pointer object.

property state: State[ApplicationStateType]

Gives the state. Recall that state is purely immutable – anything you do with this state will not be persisted unless you subsequently call update_state.

Returns:

The current state object.

step(
inputs: Dict[str, Any] | None = None,
) Tuple[Action, dict, State] | None

Performs a single step, advancing the state machine along. This returns a tuple of the action that was run, the result of running the action, and the new state.

Use this if you just want to do something with the state and not rely on generators. E.G. press forward/backwards, human in the loop, etc… Odds are this is not the method you want – you’ll want iterate() (if you want to see the state/ results along the way), or run() (if you just want the final state/results).

Parameters:

inputs – Inputs to the action – this is if this action requires an input that is passed in from the outside world

Returns:

Tuple[Function, dict, State] – the function that was just ran, the result of running it, and the new state

stream_result(
halt_after: list[str],
halt_before: list[str] | None = None,
inputs: Dict[str, Any] | None = None,
) Tuple[Action, StreamingResultContainer[ApplicationStateType, dict | Any]]

Streams a result out.

Parameters:
  • halt_after – The list of actions to halt after execution of. It will halt on the first one.

  • halt_before – The list of actions to halt before execution of. It will halt on the first one. Note that if this is met, the streaming result container will be empty (and return None) for the result, having an empty generator.

  • inputs – Inputs to the action – this is if this action requires an input that is passed in from the outside world

Returns:

A streaming result container, which is a generator that will yield results as they come in, as well as cache/give you the final result, and update state accordingly.

This is meant to be used with streaming actions – streaming_action or StreamingAction It returns a StreamingResultContainer, which has two capabilities:

  1. It is a generator that streams out the intermediate results of the action

  2. It has a .get() method that returns the final result of the action, and the final state.

If .get() is called before the generator is exhausted, it will block until the generator is exhausted.

While this container is meant to work with streaming actions, it can also be used with non-streaming actions. In this case, the generator will be empty, and the .get() method will return the final result and state.

The rules for halt_before and halt_after are the same as for iterate, and run. In this case, halt_before will indicate a non streaming action, which will be empty. Thus halt_after takes precedence – if it is met, the streaming result container will contain the result of the halt_after condition.

The StreamingResultContainer is meant as a convenience – specifically this allows for hooks, callbacks, etc… so you can take the control flow and still have state updated afterwards. Hooks/state update will be called after an exception is thrown during streaming, or the stream is completed. Note that it is undefined behavior to attempt to execute another action while a stream is in progress.

To see how this works, let’s take the following action (simplified as a single-node workflow) as an example:

@streaming_action(reads=[], writes=['response'])
def streaming_response(state: State, prompt: str) -> Generator[dict, None, Tuple[dict, State]]:
    response = client.chat.completions.create(
        model='gpt-3.5-turbo',
        messages=[{
            'role': 'user',
            'content': prompt
            }],
        temperature=0,
    )
    buffer = []
    for chunk in response:
        delta = chunk.choices[0].delta.content
        buffer.append(delta)
        # yield partial results
        yield {'response': delta}, None # indicate that we are not done by returning a `None` state!
    full_response = ''.join(buffer)
    # return the final result
    yield {'response': full_response}, state.update(response=full_response)

To use streaming_result, you pass in names of streaming actions (such as the one above) to the halt_after parameter:

application = ApplicationBuilder().with_actions(streaming_response=streaming_response)...build()
prompt = "Count to 100, with a comma between each number and no newlines. E.g., 1, 2, 3, ..."
action, streaming_result = application.stream_result(halt_after='streaming_response', inputs={"prompt": prompt})
for result in streaming_result:
    print(result['response']) # one by one

result, state = streaming_result.get()
print(result) #  all at once

Note that if you have multiple halt_after conditions, you can use the .action attribute to get the action that was run.

application = ApplicationBuilder().with_actions(
    streaming_response=streaming_response,
    error=error # another function that outputs an error, streaming
)...build()
prompt = "Count to 100, with a comma between each number and no newlines. E.g., 1, 2, 3, ..."
action, streaming_result = application.stream_result(halt_after='streaming_response', inputs={"prompt": prompt})
color = "red" if action.name == "error" else "green"
for result in streaming_result:
    print(format(result['response'], color)) # assumes that error and streaming_response both have the same output shape
application = ApplicationBuilder().with_actions(
    streaming_response=streaming_response,
    error=non_streaming_error # a non-streaming function that outputs an error
)...build()
prompt = "Count to 100, with a comma between each number and no newlines. E.g., 1, 2, 3, ..."
action, streaming_result = application.stream_result(halt_after='streaming_response', inputs={"prompt": prompt})
color = "red" if action.name == "error" else "green"
if action.name == "streaming_response": # can also use the ``.streaming`` attribute of action
    for result in output:
         print(format(result['response'], color)) # assumes that error and streaming_response both have the same output shape
else:
    result, state = output.get()
    print(format(result['response'], color))
property uid: str

Unique ID for the application. This must be unique across all applications in a search space. This is used by persistence/tracking to ensure that applications have meanings.

Every application has this – if not assigned, it will be randomly generated.

Returns:

The unique ID for the application

update_state(
new_state: State[ApplicationStateType],
)

Updates state – this is meant to be called if you need to do anything with the state. For example: 1. Reset it (after going through a loop) 2. Store to some external source/log out

Parameters:

new_state

Returns:

visualize(
output_file_path: str | None = None,
include_conditions: bool = False,
include_state: bool = False,
view: bool = False,
engine: Literal['graphviz'] = 'graphviz',
write_dot: bool = False,
**engine_kwargs: Any,
) 'graphviz.Digraph' | None

Visualizes the application graph using graphviz. This will render the graph.

Parameters:
  • output_file_path – The path to save this to, None if you don’t want to save. Do not pass an extension for graphviz, instead pass format in engine_kwargs (e.g. format=”png”)

  • include_conditions – Whether to include condition strings on the edges (this can get noisy)

  • include_state – Whether to indicate the action “signature” (reads/writes) on the nodes

  • view – Whether to bring up a view

  • engine – The engine to use – only graphviz is supported for now

  • write_dot – If True, produce a graphviz dot file

  • engine_kwargs – Additional kwargs to pass to the engine

Returns:

The graphviz object

class burr.core.application.ApplicationGraph(
actions: List[Action],
transitions: List[Transition],
entrypoint: Action,
)

User-facing representation of the state machine. This has

  1. All the action objects

  2. All the transition objects

  3. The entrypoint action

class burr.core.application.ApplicationContext(
app_id: str,
partition_key: str | None,
sequence_id: int | None,
tracker: 'TrackingClient' | None,
parallel_executor_factory: Callable[[], Executor],
)

Application context. This is anything your node might need to know about the application. Often used for recursive tracking.

Note this is also a context manager (allowing you to pass context to sub-applications).

static get() ApplicationContext | None

Provides the context-local application context. You can use this instead of declaring __context in an application. You really should only be using this if you’re wiring through multiple layers of abstraction and want to connect two applications.

Returns:

The ApplicationContext you’ll want to use

Graph APIs

You can, optionally, use the graph API along with the burr.core.application.ApplicationBuilder.with_graph() method. While this is a little more verbose, it helps decouple application logic from graph logic, and is useful in a host of situations.

The GraphBuilder class is used to build a graph, and the Graph class can be passed to the application builder.

class burr.core.graph.GraphBuilder

GraphBuilder class. This allows you to construct a graph without considering application concerns. While you can (and at first, should) use the ApplicationBuidler (which has the same API), this is lower level and allows you to decouple concerns, reuse the same graph object, etc…

build() Graph

Builds/finalizes the graph.

Returns:

The graph object

with_actions(
*action_list: Action | Callable,
**action_dict: Action | Callable,
) GraphBuilder

Adds an action to the application. The actions are granted names (using the with_name) method post-adding, using the kw argument. If it already has a name (or you wish to use the function name, raw, and it is a function-based-action), then you can use the args parameter. This is the only supported way to add actions.

Parameters:
  • action_list – Actions to add – these must have a name or be function-based (in which case we will use the function-name)

  • action_dict – Actions to add, keyed by name

Returns:

The application builder for future chaining.

with_transitions(
*transitions: Tuple[str | list[str], str] | Tuple[str | list[str], str, Condition],
) GraphBuilder
Adds transitions to the graph. Transitions are specified as tuples of either:
  1. (from, to, condition)

  2. (from, to) – condition is set to DEFAULT (which is a fallback)

Transitions will be evaluated in order of specification – if one is met, the others will not be evaluated. Note that one transition can be terminal – the system doesn’t have

Parameters:

transitions – Transitions to add

Returns:

The application builder for future chaining.

class burr.core.graph.Graph(
actions: List[Action],
transitions: List[Transition],
)

Graph class allows you to specify actions and transitions between them. You will never instantiate this directly, just through the GraphBuilder, or indirectly through the ApplicationBuilder.

get_action(action_name: str) Action | None

Gets an action object given the action name

get_next_node(
prior_step: str | None,
state: State,
entrypoint: str,
) Action | None

Gives the next node to execute given state + prior step.

visualize(
output_file_path: str | Path | None = None,
include_conditions: bool = False,
include_state: bool = False,
view: bool = False,
engine: Literal['graphviz'] = 'graphviz',
write_dot: bool = False,
**engine_kwargs: Any,
) graphviz.Digraph | None

Visualizes the graph using graphviz. This will render the graph.

Parameters:
  • output_file_path – The path to save this to, None if you don’t want to save. Do not pass an extension for graphviz, instead pass format in engine_kwargs (e.g. format=”png”)

  • include_conditions – Whether to include condition strings on the edges (this can get noisy)

  • include_state – Whether to indicate the action “signature” (reads/writes) on the nodes

  • view – Whether to bring up a view

  • engine – The engine to use – only graphviz is supported for now

  • write_dot – If True, produce a graphviz dot file

  • engine_kwargs – Additional kwargs to pass to the engine

Returns:

The graphviz object