Applications

Use this to build and manage a state Machine. You should only ever instantiate the ApplicationBuilder class, and not the Application class directly.

class burr.core.application.ApplicationBuilder
build() Application

Builds the application.

This function is a bit messy as we iron out the exact logic and rigor we want around things.

Returns:

The application object

initialize_from(
initializer: BaseStateLoader,
resume_at_next_action: bool,
default_state: dict,
default_entrypoint: str,
) ApplicationBuilder

Initializes the application we will build from some prior state object. Note that you can either call this or use with_state and with_entrypoint – this also assigns application ID, partition key, and sequence ID.

Parameters:
  • initializer – The persister object to use for initialization. Likely the same one called with with_state_persister.

  • resume_at_next_action – Whether to resume at the next action, or default to the default_entrypoint

  • default_state – The default state to use if it does not exist. This is a dictionary.

  • default_entrypoint – The default entry point to use if it does not exist or you elect not to resume_at_next_action.

Returns:

The application builder for future chaining.

with_actions(
*action_list: Action | Callable,
**action_dict: Action | Callable,
) ApplicationBuilder

Adds an action to the application. The actions are granted names (using the with_name) method post-adding, using the kw argument. If it already has a name (or you wish to use the function name, raw, and it is a function-based-action), then you can use the args parameter. This is the only supported way to add actions.

Parameters:
  • action_list – Actions to add – these must have a name or be function-based (in which case we will use the function-name)

  • action_dict – Actions to add, keyed by name

Returns:

The application builder for future chaining.

with_entrypoint(action: str) ApplicationBuilder

Adds an entrypoint to the application. This is the action that will be run first. This can only be called once.

Parameters:

action – The name of the action to set as the entrypoint

Returns:

The application builder for future chaining.

with_hooks(
*adapters: PreRunStepHook | PreRunStepHookAsync | PostRunStepHook | PostRunStepHookAsync | PreRunApplicationHook | PreRunApplicationHookAsync | PostRunApplicationHook | PostRunApplicationHookAsync | PostApplicationCreateHook | PreStartSpanHook | PreStartSpanHookAsync | PostEndSpanHook | PostEndSpanHookAsync,
) ApplicationBuilder

Adds a lifecycle adapter to the application. This is a way to add hooks to the application so that they are run at the appropriate times. You can use this to synchronize state out, log results, etc…

Parameters:

adapters – Adapter to add

Returns:

The application builder for future chaining.

with_identifiers(
app_id: str = None,
partition_key: str = None,
sequence_id: int = None,
) ApplicationBuilder

Assigns various identifiers to the application. This is used for tracking, persistence, etc…

Parameters:
  • app_id – Application ID – this will be assigned to a uuid if not set.

  • partition_key – Partition key – this is used for disambiguating groups of applications. For instance, a unique user ID, etc… This is coupled to persistence, and is used to query for/select application runs.

  • sequence_id – Sequence ID that we want this to start at. If you’re using .initialize, this will be set. Otherwise this is solely for resetting/starting at a specified position.

Returns:

The application builder for future chaining.

with_state(**kwargs) ApplicationBuilder

Sets initial values in the state. If you want to load from a prior state, you can do so here and pass the values in.

TODO – enable passing in a state object instead of **kwargs

Parameters:

kwargs – Key-value pairs to set in the state

Returns:

The application builder for future chaining.

with_state_persister(
persister: BaseStateSaver | PreRunStepHook | PreRunStepHookAsync | PostRunStepHook | PostRunStepHookAsync | PreRunApplicationHook | PreRunApplicationHookAsync | PostRunApplicationHook | PostRunApplicationHookAsync | PostApplicationCreateHook | PreStartSpanHook | PreStartSpanHookAsync | PostEndSpanHook | PostEndSpanHookAsync,
on_every: str = 'step',
) ApplicationBuilder

Adds a state persister to the application. This is a way to persist state out to a database, file, etc… at the specified interval. This is one of two options:

  1. [normal mode] A BaseStateSaver object – this is a utility class that makes it easy to save/load

  2. [power-user-mode] A lifecycle adapter – this is a custom class that you use to save state.

The framework will wrap the BaseStateSaver object in a PersisterHook, which is a post-run.

Parameters:
  • persister – The persister to add

  • on_every – The interval to persist state. Currently only “step” is supported.

Returns:

The application builder for future chaining.

with_tracker(
tracker: Literal['local'] | PreRunStepHook | PreRunStepHookAsync | PostRunStepHook | PostRunStepHookAsync | PreRunApplicationHook | PreRunApplicationHookAsync | PostRunApplicationHook | PostRunApplicationHookAsync | PostApplicationCreateHook | PreStartSpanHook | PreStartSpanHookAsync | PostEndSpanHook | PostEndSpanHookAsync = 'local',
project: str = 'default',
params: Dict[str, Any] = None,
)

Adds a “tracker” to the application. The tracker specifies a project name (used for disambiguating groups of tracers), and plugs into the Burr UI. This can either be:

  1. A string (the only supported one right now is “local”), and a set of parameters for a set of supported trackers.

  2. A lifecycle adapter object that does tracking (up to you how to implement it).

  1. internally creates a LocalTrackingClient object, and adds it to the lifecycle adapters.

  2. adds the lifecycle adapter to the lifecycle adapters.

Parameters:
  • tracker – Tracker to use. local creates one, else pass one in.

  • project – Project name – used if the tracker is string-specified (local).

  • params – Parameters to pass to the tracker if it’s string-specified (local).

Returns:

The application builder for future chaining.

with_transitions(
*transitions: Tuple[str | list[str], str] | Tuple[str | list[str], str, Condition],
) ApplicationBuilder
Adds transitions to the application. Transitions are specified as tuples of either:
  1. (from, to, condition)

  2. (from, to) – condition is set to DEFAULT (which is a fallback)

Transitions will be evaluated in order of specification – if one is met, the others will not be evaluated. Note that one transition can be terminal – the system doesn’t have

Parameters:

transitions – Transitions to add

Returns:

The application builder for future chaining.

class burr.core.application.Application(
actions: List[Action],
transitions: List[Transition],
state: State,
initial_step: str,
partition_key: str | None,
uid: str,
sequence_id: int | None,
adapter_set: LifecycleAdapterSet | None = None,
builder: ApplicationBuilder | None = None,
)
__init__(
actions: List[Action],
transitions: List[Transition],
state: State,
initial_step: str,
partition_key: str | None,
uid: str,
sequence_id: int | None,
adapter_set: LifecycleAdapterSet | None = None,
builder: ApplicationBuilder | None = None,
)
aiterate(
*,
halt_before: list[str] = None,
halt_after: list[str] = None,
inputs: Dict[str, Any] | None = None,
) AsyncGenerator[Tuple[Action, dict, State], None]

Returns a generator that calls step() in a row, enabling you to see the state of the system as it updates. This is the asynchronous version so it has no capability of t

Parameters:
  • halt_before – The list of actions to halt before execution of. It will halt on the first one.

  • halt_after – The list of actions to halt after execution of. It will halt on the first one.

  • inputs – Inputs to the action – this is if this action requires an input that is passed in from the outside world. Note that this is only used for the first iteration – subsequent iterations will not use this.

Returns:

Each iteration returns the result of running step. This returns nothing – it’s an async generator which is not allowed to have a return value.

arun(
*,
halt_before: list[str] = None,
halt_after: list[str] = None,
inputs: Dict[str, Any] | None = None,
) Tuple[Action, dict | None, State]

Runs your application through until completion, using async. Does not give access to the state along the way – if you want that, use iterate().

Parameters:
  • halt_before – The list of actions to halt before execution of. It will halt on the first one.

  • halt_after – The list of actions to halt after execution of. It will halt on the first one.

  • inputs – Inputs to the action – this is if this action requires an input that is passed in from the outside world

Returns:

The final state, and the results of running the actions in the order that they were specified.

async astep(
inputs: Dict[str, Any] = None,
) Tuple[Action, dict, State] | None

Asynchronous version of step.

Parameters:

inputs – Inputs to the action – this is if this action requires an input that is passed in from the outside world

Returns:

Tuple[Function, dict, State] – the action that was just ran, the result of running it, and the new state

astream_result(
halt_after: list[str],
halt_before: list[str] = None,
inputs: Dict[str, Any] | None = None,
) Tuple[Action, ...]

Placeholder for the async version of stream_result. This is not yet implemented.

property builder: ApplicationBuilder | None

Returns the application builder that was used to build this application. Note that this asusmes the application was built using the builder. Otherwise,

Returns:

The application builder

property graph: ApplicationGraph

Application graph object – if you want to inspect, visualize, etc.. this is what you want.

Returns:

The application graph object

has_next_action() bool

Returns whether or not there is a next action to run.

Returns:

True if there is a next action, False otherwise

iterate(
*,
halt_before: list[str] = None,
halt_after: list[str] = None,
inputs: Dict[str, Any] | None = None,
) Generator[Tuple[Action, dict, State], None, Tuple[Action, dict | None, State]]

Returns a generator that calls step() in a row, enabling you to see the state of the system as it updates. Note this returns a generator, and also the final result (for convenience).

Note the nuance with halt_before and halt_after. halt_before conditions will take precedence to halt_after. Furthermore, a single iteration will always be executed prior to testing for any halting conditions.

Parameters:
  • halt_before – The list of actions to halt before execution of. It will halt prior to the execution of the first one it sees.

  • halt_after – The list of actions to halt after execution of. It will halt after the execution of the first one it sees.

  • inputs – Inputs to the action – this is if this action requires an input that is passed in from the outside world. Note that this is only used for the first iteration – subsequent iterations will not use this.

Returns:

Each iteration returns the result of running step. This generator also returns a tuple of [action, result, current state]

property partition_key: str | None

Partition key for the application. This is designed to add semantic meaning to the application, and be leveraged by persistence systems to select/find applications.

Note this is optional – if it is not included, you will need to use a persister that supports a null partition key.

Returns:

The partition key, None if not set

reset_to_entrypoint() None

Resets the state machine to the entrypoint action.

run(
*,
halt_before: list[str] = None,
halt_after: list[str] = None,
inputs: Dict[str, Any] | None = None,
) Tuple[Action, dict | None, State]

Runs your application through until completion. Does not give access to the state along the way – if you want that, use iterate().

Parameters:
  • halt_before – The list of actions to halt before execution of. It will halt on the first one.

  • halt_after – The list of actions to halt after execution of. It will halt on the first one.

  • inputs – Inputs to the action – this is if this action requires an input that is passed in from the outside world. Note that this is only used for the first iteration – subsequent iterations will not use this.

Returns:

The final state, and the results of running the actions in the order that they were specified.

property sequence_id: int | None

gives the sequence ID of the current (next) action. This is incremented prior to every step. Any logging, etc… will use the current step’s sequence ID

Returns:

The sequence ID of the current (next) action

property state: State

Gives the state. Recall that state is purely immutable – anything you do with this state will not be persisted unless you subsequently call update_state.

Returns:

The current state object.

step(
inputs: Dict[str, Any] | None = None,
) Tuple[Action, dict, State] | None

Performs a single step, advancing the state machine along. This returns a tuple of the action that was run, the result of running the action, and the new state.

Use this if you just want to do something with the state and not rely on generators. E.G. press forward/backwards, human in the loop, etc… Odds are this is not the method you want – you’ll want iterate() (if you want to see the state/ results along the way), or run() (if you just want the final state/results).

Parameters:

inputs – Inputs to the action – this is if this action requires an input that is passed in from the outside world

Returns:

Tuple[Function, dict, State] – the function that was just ran, the result of running it, and the new state

stream_result(
halt_after: list[str],
halt_before: list[str] = None,
inputs: Dict[str, Any] | None = None,
) Tuple[Action, StreamingResultContainer]

Streams a result out.

Parameters:
  • halt_after – The list of actions to halt after execution of. It will halt on the first one.

  • halt_before – The list of actions to halt before execution of. It will halt on the first one. Note that if this is met, the streaming result container will be empty (and return None) for the result, having an empty generator.

  • inputs – Inputs to the action – this is if this action requires an input that is passed in from the outside world

Returns:

A streaming result container, which is a generator that will yield results as they come in, as wel as cache/give you the final result, and update state accordingly.

This is meant to be used with streaming actions – streaming_action or StreamingAction It returns a StreamingResultContainer, which has two capabilities:

  1. It is a generator that streams out the intermediate results of the action

  2. It has a .get() method that returns the final result of the action, and the final state.

If .get() is called before the generator is exhausted, it will block until the generator is exhausted.

While this container is meant to work with streaming actions, it can also be used with non-streaming actions. In this case, the generator will be empty, and the .get() method will return the final result and state.

The rules for halt_before and halt_after are the same as for iterate, and run. In this case, halt_before will indicate a non streaming action, which will be empty. Thus halt_after takes precedence – if it is met, the streaming result container will contain the result of the halt_after condition.

The StreamingResultContainer is meant as a convenience – specifically this allows for hooks, callbacks, etc… so you can take the control flow and still have state updated afterwards. Hooks/state update will be called after an exception is thrown during streaming, or the stream is completed. Note that it is undefined behavior to attempt to execute another action while a stream is in progress.

To see how this works, let’s take the following action (simplified as a single-node workflow) as an example:

@streaming_action(reads=[], writes=['response'])
def streaming_response(state: State, prompt: str) -> Generator[dict, None, Tuple[dict, State]]:
    response = client.chat.completions.create(
        model='gpt-3.5-turbo',
        messages=[{
            'role': 'user',
            'content': prompt
            }],
        temperature=0,
    )
    buffer = []
    for chunk in response:
        delta = chunk.choices[0].delta.content
        buffer.append(delta)
        # yield partial results
        yield {'response': delta}
    full_response = ''.join(buffer)
    # return the final result
    return {'response': full_response}, state.update(response=full_response)

To use streaming_result, you pass in names of streaming actions (such as the one above) to the halt_after parameter:

application = ApplicationBuilder().with_actions(streaming_response=streaming_response)...build()
prompt = "Count to 100, with a comma between each number and no newlines. E.g., 1, 2, 3, ..."
action, streaming_result = application.stream_result(halt_after='streaming_response', inputs={"prompt": prompt})
for result in streaming_result:
    print(result['response']) # one by one

result, state = streaming_result.get()
print(result) #  all at once

Note that if you have multiple halt_after conditions, you can use the .action attribute to get the action that was run.

application = ApplicationBuilder().with_actions(
    streaming_response=streaming_response,
    error=error # another function that outputs an error, streaming
)...build()
prompt = "Count to 100, with a comma between each number and no newlines. E.g., 1, 2, 3, ..."
action, streaming_result = application.stream_result(halt_after='streaming_response', inputs={"prompt": prompt})
color = "red" if action.name == "error" else "green"
for result in streaming_result:
    print(format(result['response'], color)) # assumes that error and streaming_response both have the same output shape
application = ApplicationBuilder().with_actions(
    streaming_response=streaming_response,
    error=non_streaming_error # a non-streaming function that outputs an error
)...build()
prompt = "Count to 100, with a comma between each number and no newlines. E.g., 1, 2, 3, ..."
action, streaming_result = application.stream_result(halt_after='streaming_response', inputs={"prompt": prompt})
color = "red" if action.name == "error" else "green"
if action.name == "streaming_response": # can also use the ``.streaming`` attribute of action
    for result in output:
         print(format(result['response'], color)) # assumes that error and streaming_response both have the same output shape
else:
    result, state = output.get()
    print(format(result['response'], color))
property uid: str

Unique ID for the application. This must be unique across all applications in a search space. This is used by persistence/tracking to ensure that applications have meanings.

Every application has this – if not assigned, it will be randomly generated.

Returns:

The unique ID for the application

update_state(new_state: State)

Updates state – this is meant to be called if you need to do anything with the state. For example: 1. Reset it (after going through a loop) 2. Store to some external source/log out

Parameters:

new_state

Returns:

visualize(
output_file_path: str | None = None,
include_conditions: bool = False,
include_state: bool = False,
view: bool = False,
engine: Literal['graphviz'] = 'graphviz',
**engine_kwargs: Any,
) graphviz.Digraph | None

Visualizes the application graph using graphviz. This will render the graph.

Parameters:
  • output_file_path – The path to save this to, None if you don’t want to save. Do not pass an extension for graphviz, instead pass format in engine_kwargs (e.g. format=”png”)

  • include_conditions – Whether to include condition strings on the edges (this can get noisy)

  • include_state – Whether to indicate the action “signature” (reads/writes) on the nodes

  • view – Whether to bring up a view

  • engine – The engine to use – only graphviz is supported for now

  • engine_kwargs – Additional kwargs to pass to the engine

Returns:

The graphviz object

class burr.core.application.ApplicationGraph(
actions: List[Action],
transitions: List[Transition],
entrypoint: Action,
)

User-facing representation of the state machine. This has

  1. All the action objects

  2. All the transition objects

  3. The entrypoint action