State Persistence

Burr provides a set of tools to make loading and saving state easy. These are functions that will be used by lifecycle hooks to save and load state.

We currently support the following database integrations:

Burr Implemented Persisters






















We follow the naming convention b_dependency-library, where the b_ is used to avoid name clashing with the underlying library. We chose the library name in case we implement the same database persister with different dependency libraries to keep the class naming convention.

If you want to implement your own state persister (to bridge it with a database), you should implement the BaseStatePersister or the AsyncBaseStatePersister interface.

class burr.core.persistence.BaseStatePersister

Bases: BaseStateLoader, BaseStateSaver

Utility interface for a state reader/writer. This both persists and initializes state. Extend this class if you want an easy way to implement custom state storage.


Initializes the app for saving, set up any databases, etc.. you want to here.

is_initialized() bool

Check if the persister has been initialized appropriately.

abstract list_app_ids(partition_key: str, **kwargs) list[str]

Returns list of app IDs for a given primary key

abstract load(
partition_key: str,
app_id: str | None,
sequence_id: int | None = None,
) PersistedStateData | None

Loads the state for a given app_id

  • partition_key – the partition key. Note this could be None, but it’s up to the persistor to whether that is a valid value it can handle.

  • app_id – the identifier for the app instance being recorded.

  • sequence_id – optional, the state corresponding to a specific point in time. Specifically state at the end of the action with this sequence_id. If sequence_id is not provided, persistor should return the state from the latest fully completed action.


PersistedStateData or None

abstract save(
partition_key: str | None,
app_id: str,
sequence_id: int,
position: str,
state: State,
status: Literal['completed', 'failed'],

Saves the state for a given app_id, sequence_id, position

(PK, App_id, sequence_id, position) are a unique identifier for the state. Why not just (PK, App_id, sequence_id)? Because we’re over-engineering this here. We’re always going to have a position so might as well make it a quadruple.

  • partition_key – the partition key. Note this could be None, but it’s up to the persistor to whether that is a valid value it can handle.

  • app_id – Appliaction UID to write with

  • sequence_id – Sequence ID of the last executed step

  • position – The action name that was implemented

  • state – The current state of the application

  • status – The status of this state, either “completed” or “failed”. If “failed” the state is what it was before the action was applied.

Internally, this interface combines the following two interfaces:

class burr.core.persistence.BaseStateLoader

Base class for state initialization. This goes together with a BaseStateSaver to form the database for your application.

abstract list_app_ids(partition_key: str, **kwargs) list[str]

Returns list of app IDs for a given primary key

abstract load(
partition_key: str,
app_id: str | None,
sequence_id: int | None = None,
) PersistedStateData | None

Loads the state for a given app_id

  • partition_key – the partition key. Note this could be None, but it’s up to the persistor to whether that is a valid value it can handle.

  • app_id – the identifier for the app instance being recorded.

  • sequence_id – optional, the state corresponding to a specific point in time. Specifically state at the end of the action with this sequence_id. If sequence_id is not provided, persistor should return the state from the latest fully completed action.


PersistedStateData or None

class burr.core.persistence.BaseStateSaver

Bases: ABC

Base class for state writing. This goes together with a BaseStateLoader to form the database for your application.


Initializes the app for saving, set up any databases, etc.. you want to here.

is_initialized() bool

Check if the persister has been initialized appropriately.

abstract save(
partition_key: str | None,
app_id: str,
sequence_id: int,
position: str,
state: State,
status: Literal['completed', 'failed'],

Saves the state for a given app_id, sequence_id, position

(PK, App_id, sequence_id, position) are a unique identifier for the state. Why not just (PK, App_id, sequence_id)? Because we’re over-engineering this here. We’re always going to have a position so might as well make it a quadruple.

  • partition_key – the partition key. Note this could be None, but it’s up to the persistor to whether that is a valid value it can handle.

  • app_id – Appliaction UID to write with

  • sequence_id – Sequence ID of the last executed step

  • position – The action name that was implemented

  • state – The current state of the application

  • status – The status of this state, either “completed” or “failed”. If “failed” the state is what it was before the action was applied.

Note we also have the corresponding async implementations:

class burr.core.persistence.AsyncBaseStatePersister

Bases: AsyncBaseStateLoader, AsyncBaseStateSaver

Utility interface for an asynchronous state reader/writer. This both persists and initializes state. Extend this class if you want an easy way to implement custom state storage.

async initialize()

Initializes the app for saving, set up any databases, etc.. you want to here.

async is_initialized() bool

Check if the persister has been initialized appropriately.

abstract async list_app_ids(partition_key: str, **kwargs) list[str]

Returns list of app IDs for a given primary key

abstract async load(
partition_key: str,
app_id: str | None,
sequence_id: int | None = None,
) PersistedStateData | None

Loads the state for a given app_id

  • partition_key – the partition key. Note this could be None, but it’s up to the persistor to whether that is a valid value it can handle.

  • app_id – the identifier for the app instance being recorded.

  • sequence_id – optional, the state corresponding to a specific point in time. Specifically state at the end of the action with this sequence_id. If sequence_id is not provided, persistor should return the state from the latest fully completed action.


PersistedStateData or None

abstract async save(
partition_key: str | None,
app_id: str,
sequence_id: int,
position: str,
state: State,
status: Literal['completed', 'failed'],

Saves the state for a given app_id, sequence_id, position

(PK, App_id, sequence_id, position) are a unique identifier for the state. Why not just (PK, App_id, sequence_id)? Because we’re over-engineering this here. We’re always going to have a position so might as well make it a quadruple.

  • partition_key – the partition key. Note this could be None, but it’s up to the persistor to whether that is a valid value it can handle.

  • app_id – Appliaction UID to write with

  • sequence_id – Sequence ID of the last executed step

  • position – The action name that was implemented

  • state – The current state of the application

  • status – The status of this state, either “completed” or “failed”. If “failed” the state is what it was before the action was applied.

Internally, this interface combines the following two interfaces:

class burr.core.persistence.AsyncBaseStateLoader

Asynchronous base class for state initialization. This goes together with a AsyncBaseStateSaver to form the database for your application.

abstract async list_app_ids(partition_key: str, **kwargs) list[str]

Returns list of app IDs for a given primary key

abstract async load(
partition_key: str,
app_id: str | None,
sequence_id: int | None = None,
) PersistedStateData | None

Loads the state for a given app_id

  • partition_key – the partition key. Note this could be None, but it’s up to the persistor to whether that is a valid value it can handle.

  • app_id – the identifier for the app instance being recorded.

  • sequence_id – optional, the state corresponding to a specific point in time. Specifically state at the end of the action with this sequence_id. If sequence_id is not provided, persistor should return the state from the latest fully completed action.


PersistedStateData or None

class burr.core.persistence.AsyncBaseStateSaver

Bases: ABC

Asynchronous base class for state writing. This goes together with a AsyncBaseStateLoader to form the database for your application.

async initialize()

Initializes the app for saving, set up any databases, etc.. you want to here.

async is_initialized() bool

Check if the persister has been initialized appropriately.

abstract async save(
partition_key: str | None,
app_id: str,
sequence_id: int,
position: str,
state: State,
status: Literal['completed', 'failed'],

Saves the state for a given app_id, sequence_id, position

(PK, App_id, sequence_id, position) are a unique identifier for the state. Why not just (PK, App_id, sequence_id)? Because we’re over-engineering this here. We’re always going to have a position so might as well make it a quadruple.

  • partition_key – the partition key. Note this could be None, but it’s up to the persistor to whether that is a valid value it can handle.

  • app_id – Appliaction UID to write with

  • sequence_id – Sequence ID of the last executed step

  • position – The action name that was implemented

  • state – The current state of the application

  • status – The status of this state, either “completed” or “failed”. If “failed” the state is what it was before the action was applied.

Supported Sync Implementations

Currently we support the following, although we highly recommend you contribute your own! We will be adding more shortly.

class burr.core.persistence.SQLitePersister(
db_path: str,
table_name: str = 'burr_state',
serde_kwargs: dict = None,
connect_kwargs: dict = None,
connection: Connection = None,

Class for SQLite persistence of state. This is a simple implementation.

db_path: str,
table_name: str = 'burr_state',
serde_kwargs: dict = None,
connect_kwargs: dict = None,
connection: Connection = None,


  • db_path – the path the DB will be stored.

  • table_name – the table name to store things under.

  • serde_kwargs – kwargs for state serialization/deserialization.

  • connect_kwargs – kwargs to pass to the sqlite3.connect method. Use check_same_thread=False to enable use ina multithreaded context

  • connection – ability to instantiate a db outside of the persister and pass in the connection to be used.


Closes the connection to the database.

create_table_if_not_exists(table_name: str)

Helper function to create the table where things are stored if it doesn’t exist.

classmethod from_config(config: dict) SQLitePersister

Creates a new instance of the SQLitePersister from a configuration dictionary.

The config key:value pair needed are: db_path: str, table_name: str, serde_kwargs: dict, connect_kwargs: dict,

classmethod from_values(
db_path: str,
table_name: str = 'burr_state',
serde_kwargs: dict = None,
connect_kwargs: dict = None,
) SQLitePersister

Creates a new instance of the SQLitePersister from passed in values.

  • db_path – the path the DB will be stored.

  • table_name – the table name to store things under.

  • serde_kwargs – kwargs for state serialization/deserialization.

  • connect_kwargs – kwargs to pass to the aiosqlite.connect method.


async sqlite persister instance with an open connection. You are responsible for closing the connection yourself.


Creates the table if it doesn’t exist

is_initialized() bool

This checks to see if the table has been created in the database or not. It defaults to using the initialized field, else queries the database to see if the table exists. It then sets the initialized field to True if the table exists.

list_app_ids(partition_key: str | None, **kwargs) list[str]

Returns list of app IDs for a given primary key

partition_key: str | None,
app_id: str | None,
sequence_id: int | None = None,
) PersistedStateData | None

Loads state for a given partition id.

Depending on the parameters, this will return the last thing written, the last thing written for a given app_id, or a specific sequence_id for a given app_id.

  • partition_key

  • app_id

  • sequence_id


partition_key: str | None,
app_id: str,
sequence_id: int,
position: str,
state: State,
status: Literal['completed', 'failed'],

Saves the state for a given app_id, sequence_id, and position.

This method connects to the SQLite database, converts the state to a JSON string, and inserts a new record into the table with the provided partition_key, app_id, sequence_id, position, and state. After the operation, it commits the changes and closes the connection to the database.

  • partition_key – The partition key. This could be None, but it’s up to the persistor to whether that is a valid value it can handle.

  • app_id – The identifier for the app instance being recorded.

  • sequence_id – The state corresponding to a specific point in time.

  • position – The position in the sequence of states.

  • state – The state to be saved, an instance of the State class.

  • status – The status of this state, either “completed” or “failed”. If “failed” the state is what it was before the action was applied.



set_serde_kwargs(serde_kwargs: dict)

Sets the serde_kwargs for the persister.

class burr.integrations.persisters.b_psycopg2.PostgreSQLPersister(connection, table_name: str = 'burr_state', serde_kwargs: dict = None)

Class for PostgreSQL persistence of state. This is a simple implementation.

To try it out locally with docker – here’s a command – change the values as appropriate.

docker run --name local-psql \  # container name
           -v local_psql_data:/SOME/FILE_PATH/ \  # mounting a volume for data persistence
           -p 54320:5432 \  # port mapping
           -e POSTGRES_PASSWORD=my_password \  # superuser password
           -d postgres  # database name

Then you should be able to create the class like this:

p = PostgreSQLPersister.from_values("postgres", "postgres", "my_password",
                                   "localhost", 54320, table_name="burr_state")
__init__(connection, table_name: str = 'burr_state', serde_kwargs: dict = None)


  • connection – the connection to the PostgreSQL database.

  • table_name – the table name to store things under.


Closes the connection to the database.

create_table(table_name: str)

Helper function to create the table where things are stored.

classmethod from_config(
config: dict,
) PostgreSQLPersister

Creates a new instance of the PostgreSQLPersister from a configuration dictionary.

classmethod from_values(
db_name: str,
user: str,
password: str,
host: str,
port: int,
table_name: str = 'burr_state',

Builds a new instance of the PostgreSQLPersister from the provided values.

  • db_name – the name of the PostgreSQL database.

  • user – the username to connect to the PostgreSQL database.

  • password – the password to connect to the PostgreSQL database.

  • host – the host of the PostgreSQL database.

  • port – the port of the PostgreSQL database.

  • table_name – the table name to store things under.


Creates the table

is_initialized() bool

This checks to see if the table has been created in the database or not. It defaults to using the initialized field, else queries the database to see if the table exists. It then sets the initialized field to True if the table exists.

list_app_ids(partition_key: str, **kwargs) list[str]

Lists the app_ids for a given partition_key.

partition_key: str | None,
app_id: str,
sequence_id: int = None,
) PersistedStateData | None

Loads state for a given partition id.

Depending on the parameters, this will return the last thing written, the last thing written for a given app_id, or a specific sequence_id for a given app_id.

  • partition_key

  • app_id

  • sequence_id


partition_key: str,
app_id: str,
sequence_id: int,
position: str,
state: State,
status: Literal['completed', 'failed'],

Saves the state for a given app_id, sequence_id, and position.

This method connects to the SQLite database, converts the state to a JSON string, and inserts a new record into the table with the provided partition_key, app_id, sequence_id, position, and state. After the operation, it commits the changes and closes the connection to the database.

  • partition_key – The partition key. This could be None, but it’s up to the persistor to whether that is a valid value it can handle.

  • app_id – The identifier for the app instance being recorded.

  • sequence_id – The state corresponding to a specific point in time.

  • position – The position in the sequence of states.

  • state – The state to be saved, an instance of the State class.

  • status – The status of this state, either “completed” or “failed”. If “failed” the state is what it was before the action was applied.



set_serde_kwargs(serde_kwargs: dict)

Sets the serde_kwargs for the persister.

class burr.integrations.persisters.b_redis.RedisBasePersister(connection, serde_kwargs: dict = None, namespace: str = None)

Main class for Redis persister.

Use this class if you want to directly control injecting the Redis client.

This class is responsible for persisting state data to a Redis database. It inherits from the BaseStatePersister class.

Note: We didn’t create the right constructor for the initial implementation of the RedisPersister class, so this is an attempt to fix that in a backwards compatible way.

__init__(connection, serde_kwargs: dict = None, namespace: str = None)

Initializes the RedisPersister class.

  • connection – the redis connection object.

  • serde_kwargs – serialization and deserialization keyword arguments to pass to state SERDE.

  • namespace – The name of the project to optionally use in the key prefix.


Closes the connection to the database.

create_key(app_id, partition_key, sequence_id)

Create a key for the Redis database.

classmethod from_config(
config: dict,
) RedisBasePersister

Creates a new instance of the RedisBasePersister from a configuration dictionary.

classmethod from_values(
host: str,
port: int,
db: int,
password: str = None,
serde_kwargs: dict = None,
redis_client_kwargs: dict = None,
namespace: str = None,
) RedisBasePersister

Creates a new instance of the RedisBasePersister from passed in values.

list_app_ids(partition_key: str, **kwargs) list[str]

List the app ids for a given partition key.

partition_key: str,
app_id: str,
sequence_id: int = None,
) PersistedStateData | None

Load the state data for a given partition key, app id, and sequence id.

If the sequence id is not given, it will be looked up in the Redis database. If it is not found, None will be returned.

  • partition_key

  • app_id

  • sequence_id

  • kwargs


Value or None.

partition_key: str,
app_id: str,
sequence_id: int,
position: str,
state: State,
status: Literal['completed', 'failed'],

Save the state data to the Redis database.

  • partition_key

  • app_id

  • sequence_id

  • position

  • state

  • status

  • kwargs


set_serde_kwargs(serde_kwargs: dict)

Sets the serde_kwargs for the persister.

class burr.integrations.persisters.b_pymongo.MongoDBBasePersister(
serde_kwargs: dict = None,

A class used to represent a MongoDB Persister.

Example usage:

persister = MongoDBBasePersister.from_values(uri='mongodb://user:pass@localhost:27017',
    state=state.State({'key': 'value'}),
loaded_state = persister.load(partition_key='example_partition', app_id='example_app', sequence_id=1)
Note: this is called MongoDBBasePersister because we had to change the constructor and wanted to make

this change backwards compatible.

serde_kwargs: dict = None,

Initializes the MongoDBBasePersister class.

  • client – the mongodb client to use

  • db_name – the name of the database to use

  • collection_name – the name of the collection to use

  • serde_kwargs – serializer/deserializer keyword arguments to pass to the state object


Closes the connection to the database.

classmethod from_config(
config: dict,
) MongoDBBasePersister

Creates a new instance of the MongoDBBasePersister from a configuration dictionary.

classmethod from_values(
serde_kwargs: dict = None,
mongo_client_kwargs: dict = None,
) MongoDBBasePersister

Initializes the MongoDBBasePersister class.

list_app_ids(partition_key: str, **kwargs) list[str]

List the app ids for a given partition key.

partition_key: str | None,
app_id: str,
sequence_id: int = None,
) PersistedStateData | None

Loads the state data for a given partition key, app_id, and sequence_id.

This method retrieves the most recent state data for the specified (partition_key, app_id) combination. If a sequence ID is provided, it will attempt to fetch the specific state at that sequence.


partition_key – The partition key. Defaults to None. Note: The partition key defaults to None. If a partition key was used during saving, it must be provided

consistently during retrieval, or no results will be returned. :param app_id: Application UID to read from. :param sequence_id: (Optional) The sequence ID to retrieve a specific state. If not provided,

the latest state is returned.


The state data if found, otherwise None.

partition_key: str | None,
app_id: str,
sequence_id: int,
position: str,
state: State,
status: Literal['completed', 'failed'],

Save the state data to the MongoDB database.


partition_key – the partition key. Note this could be None, but it’s up to the persistor to whether

that is a valid value it can handle. If a partition key was used during saving, it must be provided consistently during retrieval, or no results will be returned. :param app_id: Application UID to write with. :param sequence_id: Sequence ID of the last executed step. :param position: The action name that was implemented. :param state: The current state of the application. :param status: The status of this state, either “completed” or “failed”. If “failed”, the state is what it was

before the action was applied.


set_serde_kwargs(serde_kwargs: dict)

Sets the serde_kwargs for the persister.

Note that the LocalTrackingClient leverages the BaseStateLoader to allow loading state, although it uses different mechanisms to save state (as it tracks more than just state).

Supported Async Implementations

Currently we support the following, although we highly recommend you contribute your own! We will be adding more shortly.

class burr.integrations.persisters.b_aiosqlite.AsyncSQLitePersister(connection, table_name: str = 'burr_state', serde_kwargs: dict = None)

Class for asynchronous SQLite persistence of state. This is a simple implementation.


The synchronous persister closes the connection on deletion of the class using the __del__ method. In an async context that is not reliable (the event loop may already be closed by the time __del__ gets invoked). Therefore, you are responsible for closing the connection yourself (i.e. manual cleanup). We suggest to use the persister either as a context manager through the async with clause or using the method .cleanup().


SQLite is specifically single-threaded and aiosqlite creates async support through multi-threading. This persister is mainly here for quick prototyping and testing; we suggest to consider a different database with native async support for production.

Note the third-party library aiosqlite, is maintained and considered stable considered stable:

__init__(connection, table_name: str = 'burr_state', serde_kwargs: dict = None)


NOTE: you are responsible to handle closing of the connection / teardown manually. To help, we provide a close() method.

  • connection – the path the DB will be stored.

  • table_name – the table name to store things under.

  • serde_kwargs – kwargs for state serialization/deserialization.

async cleanup()

Closes the connection to the database.

async close()

This is deprecated, please use .cleanup()

async create_table_if_not_exists(table_name: str)

Helper function to create the table where things are stored if it doesn’t exist.

async classmethod from_config(
config: dict,
) AsyncSQLitePersister

Creates a new instance of the AsyncSQLitePersister from a configuration dictionary.

The config key:value pair needed are: db_path: str, table_name: str, serde_kwargs: dict, connect_kwargs: dict,

async classmethod from_values(
db_path: str,
table_name: str = 'burr_state',
serde_kwargs: dict = None,
connect_kwargs: dict = None,
) AsyncSQLitePersister

Creates a new instance of the AsyncSQLitePersister from passed in values.

  • db_path – the path the DB will be stored.

  • table_name – the table name to store things under.

  • serde_kwargs – kwargs for state serialization/deserialization.

  • connect_kwargs – kwargs to pass to the aiosqlite.connect method.


async sqlite persister instance with an open connection. You are responsible for closing the connection yourself.

async initialize()

Asynchronously creates the table if it doesn’t exist

async is_initialized() bool

This checks to see if the table has been created in the database or not. It defaults to using the initialized field, else queries the database to see if the table exists. It then sets the initialized field to True if the table exists.

async list_app_ids(partition_key: str | None = None, **kwargs) list[str]

Returns list of app IDs for a given primary key

async load(
partition_key: str | None,
app_id: str | None,
sequence_id: int | None = None,
) PersistedStateData | None

Asynchronously loads state for a given partition id.

Depending on the parameters, this will return the last thing written, the last thing written for a given app_id, or a specific sequence_id for a given app_id.

  • partition_key

  • app_id

  • sequence_id


async save(
partition_key: str | None,
app_id: str,
sequence_id: int,
position: str,
state: State,
status: Literal['completed', 'failed'],

Asynchronously saves the state for a given app_id, sequence_id, and position.

This method connects to the SQLite database, converts the state to a JSON string, and inserts a new record into the table with the provided partition_key, app_id, sequence_id, position, and state. After the operation, it commits the changes and closes the connection to the database.

  • partition_key – The partition key. This could be None, but it’s up to the persister to whether that is a valid value it can handle.

  • app_id – The identifier for the app instance being recorded.

  • sequence_id – The state corresponding to a specific point in time.

  • position – The position in the sequence of states.

  • state – The state to be saved, an instance of the State class.

  • status – The status of this state, either “completed” or “failed”. If “failed” the state is what it was before the action was applied.



set_serde_kwargs(serde_kwargs: dict)

Sets the serde_kwargs for the persister.

class burr.integrations.persisters.b_asyncpg.AsyncPostgreSQLPersister(connection, table_name: str = 'burr_state', serde_kwargs: dict = None)

Class for async PostgreSQL persistence of state.


The synchronous persister closes the connection on deletion of the class using the __del__ method. In an async context that is not reliable (the event loop may already be closed by the time __del__ gets invoked). Therefore, you are responsible for closing the connection yourself (i.e. manual cleanup). We suggest to use the persister either as a context manager through the async with clause or using the method .cleanup().


The implementation relies on the popular asyncpg library:

MagicStack wrote a blog post: explaining why a new implementation, performance review and comparison to aiopg (async interface of psycopg2).

To try it out locally with docker – here’s a command – change the values as appropriate.

docker run --name local-psql \  # container name
           -v local_psql_data:/SOME/FILE_PATH/ \  # mounting a volume for data persistence
           -p 54320:5432 \  # port mapping
           -e POSTGRES_PASSWORD=my_password \  # superuser password
           -d postgres  # database name

Then you should be able to create the class like this:

p = await AsyncPostgreSQLPersister.from_values("postgres", "postgres", "my_password",
                                   "localhost", 54320, table_name="burr_state")
table_name: str = 'burr_state',
serde_kwargs: dict = None,


  • connection – the connection to the PostgreSQL database.

  • table_name – the table name to store things under.

async cleanup()

Closes the connection to the database.

async create_table(table_name: str)

Helper function to create the table where things are stored.

async classmethod from_config(
config: dict,
) AsyncPostgreSQLPersister

Creates a new instance of the PostgreSQLPersister from a configuration dictionary.

async classmethod from_values(
db_name: str,
user: str,
password: str,
host: str,
port: int,
table_name: str = 'burr_state',
) AsyncPostgreSQLPersister

Builds a new instance of the PostgreSQLPersister from the provided values.

  • db_name – the name of the PostgreSQL database.

  • user – the username to connect to the PostgreSQL database.

  • password – the password to connect to the PostgreSQL database.

  • host – the host of the PostgreSQL database.

  • port – the port of the PostgreSQL database.

  • table_name – the table name to store things under.

async initialize()

Creates the table

async is_initialized() bool

This checks to see if the table has been created in the database or not. It defaults to using the initialized field, else queries the database to see if the table exists. It then sets the initialized field to True if the table exists.

async list_app_ids(partition_key: str, **kwargs) list[str]

Lists the app_ids for a given partition_key.

async load(
partition_key: str | None,
app_id: str,
sequence_id: int = None,
) PersistedStateData | None

Loads state for a given partition id.

Depending on the parameters, this will return the last thing written, the last thing written for a given app_id, or a specific sequence_id for a given app_id.

  • partition_key

  • app_id

  • sequence_id


async save(
partition_key: str,
app_id: str,
sequence_id: int,
position: str,
state: State,
status: Literal['completed', 'failed'],

Saves the state for a given app_id, sequence_id, and position.

This method connects to the SQLite database, converts the state to a JSON string, and inserts a new record into the table with the provided partition_key, app_id, sequence_id, position, and state. After the operation, it commits the changes and closes the connection to the database.

  • partition_key – The partition key. This could be None, but it’s up to the persistor to whether that is a valid value it can handle.

  • app_id – The identifier for the app instance being recorded.

  • sequence_id – The state corresponding to a specific point in time.

  • position – The position in the sequence of states.

  • state – The state to be saved, an instance of the State class.

  • status – The status of this state, either “completed” or “failed”. If “failed” the state is what it was before the action was applied.



set_serde_kwargs(serde_kwargs: dict)

Sets the serde_kwargs for the persister.

class burr.integrations.persisters.b_redis.AsyncRedisBasePersister(connection, serde_kwargs: dict = None, namespace: str = None)

Main class for async Redis persister.


The synchronous persister closes the connection on deletion of the class using the __del__ method. In an async context that is not reliable (the event loop may already be closed by the time __del__ gets invoked). Therefore, you are responsible for closing the connection yourself (i.e. manual cleanup). We suggest to use the persister either as a context manager through the async with clause or using the method .cleanup().

This class is responsible for async persisting state data to a Redis database. It inherits from the AsyncBaseStatePersister class.

__init__(connection, serde_kwargs: dict = None, namespace: str = None)

Initializes the AsyncRedisPersister class.

  • connection – the redis connection object.

  • serde_kwargs – serialization and deserialization keyword arguments to pass to state SERDE.

  • namespace – The name of the project to optionally use in the key prefix.

async cleanup()

Closes the connection to the database.

create_key(app_id, partition_key, sequence_id)

Create a key for the Redis database.

classmethod from_config(
config: dict,
) AsyncRedisBasePersister

Creates a new instance of the RedisBasePersister from a configuration dictionary.

classmethod from_values(
host: str,
port: int,
db: int,
password: str = None,
serde_kwargs: dict = None,
redis_client_kwargs: dict = None,
namespace: str = None,
) AsyncRedisBasePersister

Creates a new instance of the AsyncRedisBasePersister from passed in values.

async list_app_ids(partition_key: str, **kwargs) list[str]

List the app ids for a given partition key.

async load(
partition_key: str,
app_id: str,
sequence_id: int = None,
) PersistedStateData | None

Load the state data for a given partition key, app id, and sequence id.

If the sequence id is not given, it will be looked up in the Redis database. If it is not found, None will be returned.

  • partition_key

  • app_id

  • sequence_id

  • kwargs


Value or None.

async save(
partition_key: str,
app_id: str,
sequence_id: int,
position: str,
state: State,
status: Literal['completed', 'failed'],

Save the state data to the Redis database.

  • partition_key

  • app_id

  • sequence_id

  • position

  • state

  • status

  • kwargs


set_serde_kwargs(serde_kwargs: dict)

Sets the serde_kwargs for the persister.