State Persistence

Burr provides a set of tools to make loading and saving state easy. These are functions that will be used by lifecycle hooks to save and load state.

If you want to implement your own state persister (to bridge it with a database), you should implement the BaseStatePersister interface.

class burr.core.persistence.BaseStatePersister

Bases: BaseStateLoader, BaseStateSaver

Utility interface for a state reader/writer. This both persists and initializes state. Extend this class if you want an easy way to implement custom state storage.

__init__()
initialize()

Initializes the app for saving, set up any databases, etc.. you want to here.

is_initialized() bool

Check if the persister has been initialized appropriately.

abstract list_app_ids(partition_key: str, **kwargs) list[str]

Returns list of app IDs for a given primary key

abstract load(
partition_key: str,
app_id: str | None,
sequence_id: int | None = None,
**kwargs,
) PersistedStateData | None

Loads the state for a given app_id

Parameters:
  • partition_key – the partition key. Note this could be None, but it’s up to the persistor to whether that is a valid value it can handle.

  • app_id – the identifier for the app instance being recorded.

  • sequence_id – optional, the state corresponding to a specific point in time. Specifically state at the end of the action with this sequence_id. If sequence_id is not provided, persistor should return the state from the latest fully completed action.

Returns:

PersistedStateData or None

abstract save(
partition_key: str | None,
app_id: str,
sequence_id: int,
position: str,
state: State,
status: Literal['completed', 'failed'],
**kwargs,
)

Saves the state for a given app_id, sequence_id, position

(PK, App_id, sequence_id, position) are a unique identifier for the state. Why not just (PK, App_id, sequence_id)? Because we’re over-engineering this here. We’re always going to have a position so might as well make it a quadruple.

Parameters:
  • partition_key – the partition key. Note this could be None, but it’s up to the persistor to whether that is a valid value it can handle.

  • app_id – Appliaction UID to write with

  • sequence_id – Sequence ID of the last executed step

  • position – The action name that was implemented

  • state – The current state of the application

  • status – The status of this state, either “completed” or “failed”. If “failed” the state is what it was before the action was applied.

Internally, this interface combines the following two interfaces:

class burr.core.persistence.BaseStateLoader

Base class for state initialization. This goes together with a BaseStateSaver to form the database for your application.

__init__()
abstract list_app_ids(partition_key: str, **kwargs) list[str]

Returns list of app IDs for a given primary key

abstract load(
partition_key: str,
app_id: str | None,
sequence_id: int | None = None,
**kwargs,
) PersistedStateData | None

Loads the state for a given app_id

Parameters:
  • partition_key – the partition key. Note this could be None, but it’s up to the persistor to whether that is a valid value it can handle.

  • app_id – the identifier for the app instance being recorded.

  • sequence_id – optional, the state corresponding to a specific point in time. Specifically state at the end of the action with this sequence_id. If sequence_id is not provided, persistor should return the state from the latest fully completed action.

Returns:

PersistedStateData or None

class burr.core.persistence.BaseStateSaver

Bases: ABC

Base class for state writing. This goes together with a BaseStateLoader to form the database for your application.

__init__()
initialize()

Initializes the app for saving, set up any databases, etc.. you want to here.

is_initialized() bool

Check if the persister has been initialized appropriately.

abstract save(
partition_key: str | None,
app_id: str,
sequence_id: int,
position: str,
state: State,
status: Literal['completed', 'failed'],
**kwargs,
)

Saves the state for a given app_id, sequence_id, position

(PK, App_id, sequence_id, position) are a unique identifier for the state. Why not just (PK, App_id, sequence_id)? Because we’re over-engineering this here. We’re always going to have a position so might as well make it a quadruple.

Parameters:
  • partition_key – the partition key. Note this could be None, but it’s up to the persistor to whether that is a valid value it can handle.

  • app_id – Appliaction UID to write with

  • sequence_id – Sequence ID of the last executed step

  • position – The action name that was implemented

  • state – The current state of the application

  • status – The status of this state, either “completed” or “failed”. If “failed” the state is what it was before the action was applied.

Note we also have the corresponding async implementations:

class burr.core.persistence.AsyncBaseStatePersister

Bases: AsyncBaseStateLoader, AsyncBaseStateSaver

Utility interface for an asynchronous state reader/writer. This both persists and initializes state. Extend this class if you want an easy way to implement custom state storage.

__init__()
async initialize()

Initializes the app for saving, set up any databases, etc.. you want to here.

async is_initialized() bool

Check if the persister has been initialized appropriately.

abstract async list_app_ids(partition_key: str, **kwargs) list[str]

Returns list of app IDs for a given primary key

abstract async load(
partition_key: str,
app_id: str | None,
sequence_id: int | None = None,
**kwargs,
) PersistedStateData | None

Loads the state for a given app_id

Parameters:
  • partition_key – the partition key. Note this could be None, but it’s up to the persistor to whether that is a valid value it can handle.

  • app_id – the identifier for the app instance being recorded.

  • sequence_id – optional, the state corresponding to a specific point in time. Specifically state at the end of the action with this sequence_id. If sequence_id is not provided, persistor should return the state from the latest fully completed action.

Returns:

PersistedStateData or None

abstract async save(
partition_key: str | None,
app_id: str,
sequence_id: int,
position: str,
state: State,
status: Literal['completed', 'failed'],
**kwargs,
)

Saves the state for a given app_id, sequence_id, position

(PK, App_id, sequence_id, position) are a unique identifier for the state. Why not just (PK, App_id, sequence_id)? Because we’re over-engineering this here. We’re always going to have a position so might as well make it a quadruple.

Parameters:
  • partition_key – the partition key. Note this could be None, but it’s up to the persistor to whether that is a valid value it can handle.

  • app_id – Appliaction UID to write with

  • sequence_id – Sequence ID of the last executed step

  • position – The action name that was implemented

  • state – The current state of the application

  • status – The status of this state, either “completed” or “failed”. If “failed” the state is what it was before the action was applied.

Internally, this interface combines the following two interfaces:

class burr.core.persistence.AsyncBaseStateLoader

Asynchronous base class for state initialization. This goes together with a AsyncBaseStateSaver to form the database for your application.

__init__()
abstract async list_app_ids(partition_key: str, **kwargs) list[str]

Returns list of app IDs for a given primary key

abstract async load(
partition_key: str,
app_id: str | None,
sequence_id: int | None = None,
**kwargs,
) PersistedStateData | None

Loads the state for a given app_id

Parameters:
  • partition_key – the partition key. Note this could be None, but it’s up to the persistor to whether that is a valid value it can handle.

  • app_id – the identifier for the app instance being recorded.

  • sequence_id – optional, the state corresponding to a specific point in time. Specifically state at the end of the action with this sequence_id. If sequence_id is not provided, persistor should return the state from the latest fully completed action.

Returns:

PersistedStateData or None

class burr.core.persistence.AsyncBaseStateSaver

Bases: ABC

Asynchronous base class for state writing. This goes together with a AsyncBaseStateLoader to form the database for your application.

__init__()
async initialize()

Initializes the app for saving, set up any databases, etc.. you want to here.

async is_initialized() bool

Check if the persister has been initialized appropriately.

abstract async save(
partition_key: str | None,
app_id: str,
sequence_id: int,
position: str,
state: State,
status: Literal['completed', 'failed'],
**kwargs,
)

Saves the state for a given app_id, sequence_id, position

(PK, App_id, sequence_id, position) are a unique identifier for the state. Why not just (PK, App_id, sequence_id)? Because we’re over-engineering this here. We’re always going to have a position so might as well make it a quadruple.

Parameters:
  • partition_key – the partition key. Note this could be None, but it’s up to the persistor to whether that is a valid value it can handle.

  • app_id – Appliaction UID to write with

  • sequence_id – Sequence ID of the last executed step

  • position – The action name that was implemented

  • state – The current state of the application

  • status – The status of this state, either “completed” or “failed”. If “failed” the state is what it was before the action was applied.

Supported Sync Implementations

Currently we support the following, although we highly recommend you contribute your own! We will be adding more shortly.

class burr.core.persistence.SQLitePersister(
db_path: str,
table_name: str = 'burr_state',
serde_kwargs: dict = None,
connect_kwargs: dict = None,
)

Class for SQLite persistence of state. This is a simple implementation.

__init__(
db_path: str,
table_name: str = 'burr_state',
serde_kwargs: dict = None,
connect_kwargs: dict = None,
)

Constructor

Parameters:
  • db_path – the path the DB will be stored.

  • table_name – the table name to store things under.

  • serde_kwargs – kwargs for state serialization/deserialization.

  • connect_kwargs – kwargs to pass to the sqlite3.connect method. Use check_same_thread=False to enable use ina multithreaded context

create_table_if_not_exists(table_name: str)

Helper function to create the table where things are stored if it doesn’t exist.

initialize()

Creates the table if it doesn’t exist

is_initialized() bool

This checks to see if the table has been created in the database or not. It defaults to using the initialized field, else queries the database to see if the table exists. It then sets the initialized field to True if the table exists.

list_app_ids(partition_key: str | None, **kwargs) list[str]

Returns list of app IDs for a given primary key

load(
partition_key: str | None,
app_id: str | None,
sequence_id: int | None = None,
**kwargs,
) PersistedStateData | None

Loads state for a given partition id.

Depending on the parameters, this will return the last thing written, the last thing written for a given app_id, or a specific sequence_id for a given app_id.

Parameters:
  • partition_key

  • app_id

  • sequence_id

Returns:

save(
partition_key: str | None,
app_id: str,
sequence_id: int,
position: str,
state: State,
status: Literal['completed', 'failed'],
**kwargs,
)

Saves the state for a given app_id, sequence_id, and position.

This method connects to the SQLite database, converts the state to a JSON string, and inserts a new record into the table with the provided partition_key, app_id, sequence_id, position, and state. After the operation, it commits the changes and closes the connection to the database.

Parameters:
  • partition_key – The partition key. This could be None, but it’s up to the persistor to whether that is a valid value it can handle.

  • app_id – The identifier for the app instance being recorded.

  • sequence_id – The state corresponding to a specific point in time.

  • position – The position in the sequence of states.

  • state – The state to be saved, an instance of the State class.

  • status – The status of this state, either “completed” or “failed”. If “failed” the state is what it was before the action was applied.

Returns:

None

class burr.integrations.persisters.postgresql.PostgreSQLPersister(connection, table_name: str = 'burr_state', serde_kwargs: dict = None)

Class for PostgreSQL persistence of state. This is a simple implementation.

To try it out locally with docker – here’s a command – change the values as appropriate.

docker run --name local-psql \  # container name
           -v local_psql_data:/SOME/FILE_PATH/ \  # mounting a volume for data persistence
           -p 54320:5432 \  # port mapping
           -e POSTGRES_PASSWORD=my_password \  # superuser password
           -d postgres  # database name

Then you should be able to create the class like this:

p = PostgreSQLPersister.from_values("postgres", "postgres", "my_password",
                                   "localhost", 54320, table_name="burr_state")
__init__(connection, table_name: str = 'burr_state', serde_kwargs: dict = None)

Constructor

Parameters:
  • connection – the connection to the PostgreSQL database.

  • table_name – the table name to store things under.

create_table(table_name: str)

Helper function to create the table where things are stored.

classmethod from_config(
config: dict,
) PostgreSQLPersister

Creates a new instance of the PostgreSQLPersister from a configuration dictionary.

classmethod from_values(
db_name: str,
user: str,
password: str,
host: str,
port: int,
table_name: str = 'burr_state',
)

Builds a new instance of the PostgreSQLPersister from the provided values.

Parameters:
  • db_name – the name of the PostgreSQL database.

  • user – the username to connect to the PostgreSQL database.

  • password – the password to connect to the PostgreSQL database.

  • host – the host of the PostgreSQL database.

  • port – the port of the PostgreSQL database.

  • table_name – the table name to store things under.

initialize()

Creates the table

is_initialized() bool

This checks to see if the table has been created in the database or not. It defaults to using the initialized field, else queries the database to see if the table exists. It then sets the initialized field to True if the table exists.

list_app_ids(partition_key: str, **kwargs) list[str]

Lists the app_ids for a given partition_key.

load(
partition_key: str | None,
app_id: str,
sequence_id: int = None,
**kwargs,
) PersistedStateData | None

Loads state for a given partition id.

Depending on the parameters, this will return the last thing written, the last thing written for a given app_id, or a specific sequence_id for a given app_id.

Parameters:
  • partition_key

  • app_id

  • sequence_id

Returns:

save(
partition_key: str,
app_id: str,
sequence_id: int,
position: str,
state: State,
status: Literal['completed', 'failed'],
**kwargs,
)

Saves the state for a given app_id, sequence_id, and position.

This method connects to the SQLite database, converts the state to a JSON string, and inserts a new record into the table with the provided partition_key, app_id, sequence_id, position, and state. After the operation, it commits the changes and closes the connection to the database.

Parameters:
  • partition_key – The partition key. This could be None, but it’s up to the persistor to whether that is a valid value it can handle.

  • app_id – The identifier for the app instance being recorded.

  • sequence_id – The state corresponding to a specific point in time.

  • position – The position in the sequence of states.

  • state – The state to be saved, an instance of the State class.

  • status – The status of this state, either “completed” or “failed”. If “failed” the state is what it was before the action was applied.

Returns:

None

set_serde_kwargs(serde_kwargs: dict)

Sets the serde_kwargs for the persister.

class burr.integrations.persisters.b_redis.RedisBasePersister(connection, serde_kwargs: dict = None, namespace: str = None)

Main class for Redis persister.

Use this class if you want to directly control injecting the Redis client.

This class is responsible for persisting state data to a Redis database. It inherits from the BaseStatePersister class.

Note: We didn’t create the right constructor for the initial implementation of the RedisPersister class, so this is an attempt to fix that in a backwards compatible way.

__init__(connection, serde_kwargs: dict = None, namespace: str = None)

Initializes the RedisPersister class.

Parameters:
  • connection – the redis connection object.

  • serde_kwargs – serialization and deserialization keyword arguments to pass to state SERDE.

  • namespace – The name of the project to optionally use in the key prefix.

create_key(app_id, partition_key, sequence_id)

Create a key for the Redis database.

classmethod from_values(
host: str,
port: int,
db: int,
password: str = None,
serde_kwargs: dict = None,
redis_client_kwargs: dict = None,
namespace: str = None,
) RedisBasePersister

Creates a new instance of the RedisBasePersister from passed in values.

list_app_ids(partition_key: str, **kwargs) list[str]

List the app ids for a given partition key.

load(
partition_key: str,
app_id: str,
sequence_id: int = None,
**kwargs,
) PersistedStateData | None

Load the state data for a given partition key, app id, and sequence id.

If the sequence id is not given, it will be looked up in the Redis database. If it is not found, None will be returned.

Parameters:
  • partition_key

  • app_id

  • sequence_id

  • kwargs

Returns:

Value or None.

save(
partition_key: str,
app_id: str,
sequence_id: int,
position: str,
state: State,
status: Literal['completed', 'failed'],
**kwargs,
)

Save the state data to the Redis database.

Parameters:
  • partition_key

  • app_id

  • sequence_id

  • position

  • state

  • status

  • kwargs

Returns:

class burr.integrations.persisters.b_mongodb.MongoDBBasePersister(
client,
db_name='mydatabase',
collection_name='mystates',
serde_kwargs: dict = None,
)

A class used to represent a MongoDB Persister.

Example usage:

persister = MongoDBBasePersister.from_values(uri='mongodb://user:pass@localhost:27017',
                                             db_name='mydatabase',
                                             collection_name='mystates')
persister.save(
    partition_key='example_partition',
    app_id='example_app',
    sequence_id=1,
    position='example_position',
    state=state.State({'key': 'value'}),
    status='completed'
)
loaded_state = persister.load(partition_key='example_partition', app_id='example_app', sequence_id=1)
print(loaded_state)
Note: this is called MongoDBBasePersister because we had to change the constructor and wanted to make

this change backwards compatible.

__init__(
client,
db_name='mydatabase',
collection_name='mystates',
serde_kwargs: dict = None,
)

Initializes the MongoDBBasePersister class.

Parameters:
  • client – the mongodb client to use

  • db_name – the name of the database to use

  • collection_name – the name of the collection to use

  • serde_kwargs – serializer/deserializer keyword arguments to pass to the state object

classmethod from_values(
uri='mongodb://localhost:27017',
db_name='mydatabase',
collection_name='mystates',
serde_kwargs: dict = None,
mongo_client_kwargs: dict = None,
) MongoDBBasePersister

Initializes the MongoDBBasePersister class.

list_app_ids(partition_key: str, **kwargs) list[str]

List the app ids for a given partition key.

load(
partition_key: str,
app_id: str,
sequence_id: int = None,
**kwargs,
) PersistedStateData | None

Load the state data for a given partition key, app id, and sequence id.

save(
partition_key: str,
app_id: str,
sequence_id: int,
position: str,
state: State,
status: Literal['completed', 'failed'],
**kwargs,
)

Save the state data to the MongoDB database.

Note that the LocalTrackingClient leverages the BaseStateLoader to allow loading state, although it uses different mechanisms to save state (as it tracks more than just state).

Supported Async Implementations

Currently we support the following, although we highly recommend you contribute your own! We will be adding more shortly.

class burr.integrations.persisters.b_aiosqlite.AsyncSQLitePersister(connection, table_name: str = 'burr_state', serde_kwargs: dict = None)

Class for asynchronous SQLite persistence of state. This is a simple implementation.

SQLite is specifically single-threaded and aiosqlite creates async support through multi-threading. This persister is mainly here for quick prototyping and testing; we suggest to consider a different database with native async support for production.

Note the third-party library aiosqlite, is maintained and considered stable considered stable: https://github.com/omnilib/aiosqlite/issues/309.

__init__(connection, table_name: str = 'burr_state', serde_kwargs: dict = None)

Constructor.

NOTE: you are responsible to handle closing of the connection / teardown manually. To help, we provide a close() method.

Parameters:
  • connection – the path the DB will be stored.

  • table_name – the table name to store things under.

  • serde_kwargs – kwargs for state serialization/deserialization.

async create_table_if_not_exists(table_name: str)

Helper function to create the table where things are stored if it doesn’t exist.

async classmethod from_values(
db_path: str,
table_name: str = 'burr_state',
serde_kwargs: dict = None,
connect_kwargs: dict = None,
) AsyncSQLitePersister

Creates a new instance of the AsyncSQLitePersister from passed in values.

Parameters:
  • db_path – the path the DB will be stored.

  • table_name – the table name to store things under.

  • serde_kwargs – kwargs for state serialization/deserialization.

  • connect_kwargs – kwargs to pass to the aiosqlite.connect method.

Returns:

async sqlite persister instance with an open connection. You are responsible for closing the connection yourself.

async initialize()

Asynchronously creates the table if it doesn’t exist

async is_initialized() bool

This checks to see if the table has been created in the database or not. It defaults to using the initialized field, else queries the database to see if the table exists. It then sets the initialized field to True if the table exists.

async list_app_ids(partition_key: str | None = None, **kwargs) list[str]

Returns list of app IDs for a given primary key

async load(
partition_key: str | None,
app_id: str | None,
sequence_id: int | None = None,
**kwargs,
) PersistedStateData | None

Asynchronously loads state for a given partition id.

Depending on the parameters, this will return the last thing written, the last thing written for a given app_id, or a specific sequence_id for a given app_id.

Parameters:
  • partition_key

  • app_id

  • sequence_id

Returns:

async save(
partition_key: str | None,
app_id: str,
sequence_id: int,
position: str,
state: State,
status: Literal['completed', 'failed'],
**kwargs,
)

Asynchronously saves the state for a given app_id, sequence_id, and position.

This method connects to the SQLite database, converts the state to a JSON string, and inserts a new record into the table with the provided partition_key, app_id, sequence_id, position, and state. After the operation, it commits the changes and closes the connection to the database.

Parameters:
  • partition_key – The partition key. This could be None, but it’s up to the persister to whether that is a valid value it can handle.

  • app_id – The identifier for the app instance being recorded.

  • sequence_id – The state corresponding to a specific point in time.

  • position – The position in the sequence of states.

  • state – The state to be saved, an instance of the State class.

  • status – The status of this state, either “completed” or “failed”. If “failed” the state is what it was before the action was applied.

Returns:

None