State Persistence¶
Burr provides a set of tools to make loading and saving state easy. These are functions that will be used by lifecycle hooks to save and load state.
If you want to implement your own state persister (to bridge it with a database), you should implement
the BaseStatePersister
interface.
- class burr.core.persistence.BaseStatePersister¶
Bases:
BaseStateLoader
,BaseStateSaver
Utility interface for a state reader/writer. This both persists and initializes state. Extend this class if you want an easy way to implement custom state storage.
- __init__()¶
- initialize()¶
Initializes the app for saving, set up any databases, etc.. you want to here.
- is_initialized() bool ¶
Check if the persister has been initialized appropriately.
- abstract list_app_ids(partition_key: str, **kwargs) list[str] ¶
Returns list of app IDs for a given primary key
- abstract load(
- partition_key: str,
- app_id: str | None,
- sequence_id: int | None = None,
- **kwargs,
Loads the state for a given app_id
- Parameters:
partition_key – the partition key. Note this could be None, but it’s up to the persistor to whether that is a valid value it can handle.
app_id – the identifier for the app instance being recorded.
sequence_id – optional, the state corresponding to a specific point in time. Specifically state at the end of the action with this sequence_id. If sequence_id is not provided, persistor should return the state from the latest fully completed action.
- Returns:
PersistedStateData or None
- abstract save(
- partition_key: str | None,
- app_id: str,
- sequence_id: int,
- position: str,
- state: State,
- status: Literal['completed', 'failed'],
- **kwargs,
Saves the state for a given app_id, sequence_id, position
(PK, App_id, sequence_id, position) are a unique identifier for the state. Why not just (PK, App_id, sequence_id)? Because we’re over-engineering this here. We’re always going to have a position so might as well make it a quadruple.
- Parameters:
partition_key – the partition key. Note this could be None, but it’s up to the persistor to whether that is a valid value it can handle.
app_id – Appliaction UID to write with
sequence_id – Sequence ID of the last executed step
position – The action name that was implemented
state – The current state of the application
status – The status of this state, either “completed” or “failed”. If “failed” the state is what it was before the action was applied.
Internally, this interface combines the following two interfaces:
- class burr.core.persistence.BaseStateLoader¶
Base class for state initialization. This goes together with a BaseStateSaver to form the database for your application.
- __init__()¶
- abstract list_app_ids(partition_key: str, **kwargs) list[str] ¶
Returns list of app IDs for a given primary key
- abstract load(
- partition_key: str,
- app_id: str | None,
- sequence_id: int | None = None,
- **kwargs,
Loads the state for a given app_id
- Parameters:
partition_key – the partition key. Note this could be None, but it’s up to the persistor to whether that is a valid value it can handle.
app_id – the identifier for the app instance being recorded.
sequence_id – optional, the state corresponding to a specific point in time. Specifically state at the end of the action with this sequence_id. If sequence_id is not provided, persistor should return the state from the latest fully completed action.
- Returns:
PersistedStateData or None
- class burr.core.persistence.BaseStateSaver¶
Bases:
ABC
Base class for state writing. This goes together with a BaseStateLoader to form the database for your application.
- __init__()¶
- initialize()¶
Initializes the app for saving, set up any databases, etc.. you want to here.
- is_initialized() bool ¶
Check if the persister has been initialized appropriately.
- abstract save(
- partition_key: str | None,
- app_id: str,
- sequence_id: int,
- position: str,
- state: State,
- status: Literal['completed', 'failed'],
- **kwargs,
Saves the state for a given app_id, sequence_id, position
(PK, App_id, sequence_id, position) are a unique identifier for the state. Why not just (PK, App_id, sequence_id)? Because we’re over-engineering this here. We’re always going to have a position so might as well make it a quadruple.
- Parameters:
partition_key – the partition key. Note this could be None, but it’s up to the persistor to whether that is a valid value it can handle.
app_id – Appliaction UID to write with
sequence_id – Sequence ID of the last executed step
position – The action name that was implemented
state – The current state of the application
status – The status of this state, either “completed” or “failed”. If “failed” the state is what it was before the action was applied.
Note we also have the corresponding async implementations:
- class burr.core.persistence.AsyncBaseStatePersister¶
Bases:
AsyncBaseStateLoader
,AsyncBaseStateSaver
Utility interface for an asynchronous state reader/writer. This both persists and initializes state. Extend this class if you want an easy way to implement custom state storage.
- __init__()¶
- async initialize()¶
Initializes the app for saving, set up any databases, etc.. you want to here.
- async is_initialized() bool ¶
Check if the persister has been initialized appropriately.
- abstract async list_app_ids(partition_key: str, **kwargs) list[str] ¶
Returns list of app IDs for a given primary key
- abstract async load(
- partition_key: str,
- app_id: str | None,
- sequence_id: int | None = None,
- **kwargs,
Loads the state for a given app_id
- Parameters:
partition_key – the partition key. Note this could be None, but it’s up to the persistor to whether that is a valid value it can handle.
app_id – the identifier for the app instance being recorded.
sequence_id – optional, the state corresponding to a specific point in time. Specifically state at the end of the action with this sequence_id. If sequence_id is not provided, persistor should return the state from the latest fully completed action.
- Returns:
PersistedStateData or None
- abstract async save(
- partition_key: str | None,
- app_id: str,
- sequence_id: int,
- position: str,
- state: State,
- status: Literal['completed', 'failed'],
- **kwargs,
Saves the state for a given app_id, sequence_id, position
(PK, App_id, sequence_id, position) are a unique identifier for the state. Why not just (PK, App_id, sequence_id)? Because we’re over-engineering this here. We’re always going to have a position so might as well make it a quadruple.
- Parameters:
partition_key – the partition key. Note this could be None, but it’s up to the persistor to whether that is a valid value it can handle.
app_id – Appliaction UID to write with
sequence_id – Sequence ID of the last executed step
position – The action name that was implemented
state – The current state of the application
status – The status of this state, either “completed” or “failed”. If “failed” the state is what it was before the action was applied.
Internally, this interface combines the following two interfaces:
- class burr.core.persistence.AsyncBaseStateLoader¶
Asynchronous base class for state initialization. This goes together with a AsyncBaseStateSaver to form the database for your application.
- __init__()¶
- abstract async list_app_ids(partition_key: str, **kwargs) list[str] ¶
Returns list of app IDs for a given primary key
- abstract async load(
- partition_key: str,
- app_id: str | None,
- sequence_id: int | None = None,
- **kwargs,
Loads the state for a given app_id
- Parameters:
partition_key – the partition key. Note this could be None, but it’s up to the persistor to whether that is a valid value it can handle.
app_id – the identifier for the app instance being recorded.
sequence_id – optional, the state corresponding to a specific point in time. Specifically state at the end of the action with this sequence_id. If sequence_id is not provided, persistor should return the state from the latest fully completed action.
- Returns:
PersistedStateData or None
- class burr.core.persistence.AsyncBaseStateSaver¶
Bases:
ABC
Asynchronous base class for state writing. This goes together with a AsyncBaseStateLoader to form the database for your application.
- __init__()¶
- async initialize()¶
Initializes the app for saving, set up any databases, etc.. you want to here.
- async is_initialized() bool ¶
Check if the persister has been initialized appropriately.
- abstract async save(
- partition_key: str | None,
- app_id: str,
- sequence_id: int,
- position: str,
- state: State,
- status: Literal['completed', 'failed'],
- **kwargs,
Saves the state for a given app_id, sequence_id, position
(PK, App_id, sequence_id, position) are a unique identifier for the state. Why not just (PK, App_id, sequence_id)? Because we’re over-engineering this here. We’re always going to have a position so might as well make it a quadruple.
- Parameters:
partition_key – the partition key. Note this could be None, but it’s up to the persistor to whether that is a valid value it can handle.
app_id – Appliaction UID to write with
sequence_id – Sequence ID of the last executed step
position – The action name that was implemented
state – The current state of the application
status – The status of this state, either “completed” or “failed”. If “failed” the state is what it was before the action was applied.
Supported Sync Implementations¶
Currently we support the following, although we highly recommend you contribute your own! We will be adding more shortly.
- class burr.core.persistence.SQLitePersister(
- db_path: str,
- table_name: str = 'burr_state',
- serde_kwargs: dict = None,
- connect_kwargs: dict = None,
Class for SQLite persistence of state. This is a simple implementation.
- __init__(
- db_path: str,
- table_name: str = 'burr_state',
- serde_kwargs: dict = None,
- connect_kwargs: dict = None,
Constructor
- Parameters:
db_path – the path the DB will be stored.
table_name – the table name to store things under.
serde_kwargs – kwargs for state serialization/deserialization.
connect_kwargs – kwargs to pass to the sqlite3.connect method. Use check_same_thread=False to enable use ina multithreaded context
- create_table_if_not_exists(table_name: str)¶
Helper function to create the table where things are stored if it doesn’t exist.
- initialize()¶
Creates the table if it doesn’t exist
- is_initialized() bool ¶
This checks to see if the table has been created in the database or not. It defaults to using the initialized field, else queries the database to see if the table exists. It then sets the initialized field to True if the table exists.
- list_app_ids(partition_key: str | None, **kwargs) list[str] ¶
Returns list of app IDs for a given primary key
- load(
- partition_key: str | None,
- app_id: str | None,
- sequence_id: int | None = None,
- **kwargs,
Loads state for a given partition id.
Depending on the parameters, this will return the last thing written, the last thing written for a given app_id, or a specific sequence_id for a given app_id.
- Parameters:
partition_key
app_id
sequence_id
- Returns:
- save(
- partition_key: str | None,
- app_id: str,
- sequence_id: int,
- position: str,
- state: State,
- status: Literal['completed', 'failed'],
- **kwargs,
Saves the state for a given app_id, sequence_id, and position.
This method connects to the SQLite database, converts the state to a JSON string, and inserts a new record into the table with the provided partition_key, app_id, sequence_id, position, and state. After the operation, it commits the changes and closes the connection to the database.
- Parameters:
partition_key – The partition key. This could be None, but it’s up to the persistor to whether that is a valid value it can handle.
app_id – The identifier for the app instance being recorded.
sequence_id – The state corresponding to a specific point in time.
position – The position in the sequence of states.
state – The state to be saved, an instance of the State class.
status – The status of this state, either “completed” or “failed”. If “failed” the state is what it was before the action was applied.
- Returns:
None
- class burr.integrations.persisters.postgresql.PostgreSQLPersister(connection, table_name: str = 'burr_state', serde_kwargs: dict = None)¶
Class for PostgreSQL persistence of state. This is a simple implementation.
To try it out locally with docker – here’s a command – change the values as appropriate.
docker run --name local-psql \ # container name -v local_psql_data:/SOME/FILE_PATH/ \ # mounting a volume for data persistence -p 54320:5432 \ # port mapping -e POSTGRES_PASSWORD=my_password \ # superuser password -d postgres # database name
Then you should be able to create the class like this:
p = PostgreSQLPersister.from_values("postgres", "postgres", "my_password", "localhost", 54320, table_name="burr_state")
- __init__(connection, table_name: str = 'burr_state', serde_kwargs: dict = None)¶
Constructor
- Parameters:
connection – the connection to the PostgreSQL database.
table_name – the table name to store things under.
- create_table(table_name: str)¶
Helper function to create the table where things are stored.
- classmethod from_config(
- config: dict,
Creates a new instance of the PostgreSQLPersister from a configuration dictionary.
- classmethod from_values(
- db_name: str,
- user: str,
- password: str,
- host: str,
- port: int,
- table_name: str = 'burr_state',
Builds a new instance of the PostgreSQLPersister from the provided values.
- Parameters:
db_name – the name of the PostgreSQL database.
user – the username to connect to the PostgreSQL database.
password – the password to connect to the PostgreSQL database.
host – the host of the PostgreSQL database.
port – the port of the PostgreSQL database.
table_name – the table name to store things under.
- initialize()¶
Creates the table
- is_initialized() bool ¶
This checks to see if the table has been created in the database or not. It defaults to using the initialized field, else queries the database to see if the table exists. It then sets the initialized field to True if the table exists.
- list_app_ids(partition_key: str, **kwargs) list[str] ¶
Lists the app_ids for a given partition_key.
- load(
- partition_key: str | None,
- app_id: str,
- sequence_id: int = None,
- **kwargs,
Loads state for a given partition id.
Depending on the parameters, this will return the last thing written, the last thing written for a given app_id, or a specific sequence_id for a given app_id.
- Parameters:
partition_key
app_id
sequence_id
- Returns:
- save(
- partition_key: str,
- app_id: str,
- sequence_id: int,
- position: str,
- state: State,
- status: Literal['completed', 'failed'],
- **kwargs,
Saves the state for a given app_id, sequence_id, and position.
This method connects to the SQLite database, converts the state to a JSON string, and inserts a new record into the table with the provided partition_key, app_id, sequence_id, position, and state. After the operation, it commits the changes and closes the connection to the database.
- Parameters:
partition_key – The partition key. This could be None, but it’s up to the persistor to whether that is a valid value it can handle.
app_id – The identifier for the app instance being recorded.
sequence_id – The state corresponding to a specific point in time.
position – The position in the sequence of states.
state – The state to be saved, an instance of the State class.
status – The status of this state, either “completed” or “failed”. If “failed” the state is what it was before the action was applied.
- Returns:
None
- set_serde_kwargs(serde_kwargs: dict)¶
Sets the serde_kwargs for the persister.
- class burr.integrations.persisters.b_redis.RedisBasePersister(connection, serde_kwargs: dict = None, namespace: str = None)¶
Main class for Redis persister.
Use this class if you want to directly control injecting the Redis client.
This class is responsible for persisting state data to a Redis database. It inherits from the BaseStatePersister class.
Note: We didn’t create the right constructor for the initial implementation of the RedisPersister class, so this is an attempt to fix that in a backwards compatible way.
- __init__(connection, serde_kwargs: dict = None, namespace: str = None)¶
Initializes the RedisPersister class.
- Parameters:
connection – the redis connection object.
serde_kwargs – serialization and deserialization keyword arguments to pass to state SERDE.
namespace – The name of the project to optionally use in the key prefix.
- create_key(app_id, partition_key, sequence_id)¶
Create a key for the Redis database.
- classmethod from_values(
- host: str,
- port: int,
- db: int,
- password: str = None,
- serde_kwargs: dict = None,
- redis_client_kwargs: dict = None,
- namespace: str = None,
Creates a new instance of the RedisBasePersister from passed in values.
- list_app_ids(partition_key: str, **kwargs) list[str] ¶
List the app ids for a given partition key.
- load(
- partition_key: str,
- app_id: str,
- sequence_id: int = None,
- **kwargs,
Load the state data for a given partition key, app id, and sequence id.
If the sequence id is not given, it will be looked up in the Redis database. If it is not found, None will be returned.
- Parameters:
partition_key
app_id
sequence_id
kwargs
- Returns:
Value or None.
- class burr.integrations.persisters.b_mongodb.MongoDBBasePersister(
- client,
- db_name='mydatabase',
- collection_name='mystates',
- serde_kwargs: dict = None,
A class used to represent a MongoDB Persister.
Example usage:
persister = MongoDBBasePersister.from_values(uri='mongodb://user:pass@localhost:27017', db_name='mydatabase', collection_name='mystates') persister.save( partition_key='example_partition', app_id='example_app', sequence_id=1, position='example_position', state=state.State({'key': 'value'}), status='completed' ) loaded_state = persister.load(partition_key='example_partition', app_id='example_app', sequence_id=1) print(loaded_state)
- Note: this is called MongoDBBasePersister because we had to change the constructor and wanted to make
this change backwards compatible.
- __init__(
- client,
- db_name='mydatabase',
- collection_name='mystates',
- serde_kwargs: dict = None,
Initializes the MongoDBBasePersister class.
- Parameters:
client – the mongodb client to use
db_name – the name of the database to use
collection_name – the name of the collection to use
serde_kwargs – serializer/deserializer keyword arguments to pass to the state object
- classmethod from_values(
- uri='mongodb://localhost:27017',
- db_name='mydatabase',
- collection_name='mystates',
- serde_kwargs: dict = None,
- mongo_client_kwargs: dict = None,
Initializes the MongoDBBasePersister class.
- list_app_ids(partition_key: str, **kwargs) list[str] ¶
List the app ids for a given partition key.
- load(
- partition_key: str,
- app_id: str,
- sequence_id: int = None,
- **kwargs,
Load the state data for a given partition key, app id, and sequence id.
Note that the LocalTrackingClient
leverages the BaseStateLoader
to allow loading state,
although it uses different mechanisms to save state (as it tracks more than just state).
Supported Async Implementations¶
Currently we support the following, although we highly recommend you contribute your own! We will be adding more shortly.
- class burr.integrations.persisters.b_aiosqlite.AsyncSQLitePersister(connection, table_name: str = 'burr_state', serde_kwargs: dict = None)¶
Class for asynchronous SQLite persistence of state. This is a simple implementation.
SQLite is specifically single-threaded and aiosqlite creates async support through multi-threading. This persister is mainly here for quick prototyping and testing; we suggest to consider a different database with native async support for production.
Note the third-party library aiosqlite, is maintained and considered stable considered stable: https://github.com/omnilib/aiosqlite/issues/309.
- __init__(connection, table_name: str = 'burr_state', serde_kwargs: dict = None)¶
Constructor.
NOTE: you are responsible to handle closing of the connection / teardown manually. To help, we provide a close() method.
- Parameters:
connection – the path the DB will be stored.
table_name – the table name to store things under.
serde_kwargs – kwargs for state serialization/deserialization.
- async create_table_if_not_exists(table_name: str)¶
Helper function to create the table where things are stored if it doesn’t exist.
- async classmethod from_values(
- db_path: str,
- table_name: str = 'burr_state',
- serde_kwargs: dict = None,
- connect_kwargs: dict = None,
Creates a new instance of the AsyncSQLitePersister from passed in values.
- Parameters:
db_path – the path the DB will be stored.
table_name – the table name to store things under.
serde_kwargs – kwargs for state serialization/deserialization.
connect_kwargs – kwargs to pass to the aiosqlite.connect method.
- Returns:
async sqlite persister instance with an open connection. You are responsible for closing the connection yourself.
- async initialize()¶
Asynchronously creates the table if it doesn’t exist
- async is_initialized() bool ¶
This checks to see if the table has been created in the database or not. It defaults to using the initialized field, else queries the database to see if the table exists. It then sets the initialized field to True if the table exists.
- async list_app_ids(partition_key: str | None = None, **kwargs) list[str] ¶
Returns list of app IDs for a given primary key
- async load(
- partition_key: str | None,
- app_id: str | None,
- sequence_id: int | None = None,
- **kwargs,
Asynchronously loads state for a given partition id.
Depending on the parameters, this will return the last thing written, the last thing written for a given app_id, or a specific sequence_id for a given app_id.
- Parameters:
partition_key
app_id
sequence_id
- Returns:
- async save(
- partition_key: str | None,
- app_id: str,
- sequence_id: int,
- position: str,
- state: State,
- status: Literal['completed', 'failed'],
- **kwargs,
Asynchronously saves the state for a given app_id, sequence_id, and position.
This method connects to the SQLite database, converts the state to a JSON string, and inserts a new record into the table with the provided partition_key, app_id, sequence_id, position, and state. After the operation, it commits the changes and closes the connection to the database.
- Parameters:
partition_key – The partition key. This could be None, but it’s up to the persister to whether that is a valid value it can handle.
app_id – The identifier for the app instance being recorded.
sequence_id – The state corresponding to a specific point in time.
position – The position in the sequence of states.
state – The state to be saved, an instance of the State class.
status – The status of this state, either “completed” or “failed”. If “failed” the state is what it was before the action was applied.
- Returns:
None