marvin.database

Database management for persistence.

This module provides utilities for managing database sessions and migrations.

Constants

ALEMBIC_DIR

ALEMBIC_DIR = MARVIN_DIR / 'migrations'

ALEMBIC_INI

ALEMBIC_INI = MARVIN_DIR / 'alembic.ini'

MARVIN_DIR

MARVIN_DIR = Path(marvin.__file__).parent.parent.parent

Classes

Base

class Base()

Base class for all database models.

DBLLMCall

class DBLLMCall()

Methods:

  • create

    def create(cls, thread_id: str, usage: Usage, prompt_messages: list[DBMessage | Message] | None = None, completion_messages: list[DBMessage | Message] | None = None, session: AsyncSession | None = None) -> DBLLMCall
    

    Create a new LLM call record.

    Args: thread_id: ID of the thread this call belongs to usage: Usage information from the model session: Optional database session. If not provided, a new one will be created.

    Returns: The created DBLLMCall instance

DBLLMCallMessage

class DBLLMCallMessage()

Mapping table between LLM calls and messages.

DBMessage

class DBMessage()

Methods:

  • from_message
    def from_message(cls, thread_id: str, message: PydanticAIMessage, created_at: datetime | None = None) -> DBMessage
    
  • to_message
    def to_message(self) -> Message
    

DBThread

class DBThread()

Methods:

  • create

    def create(cls, session: AsyncSession | None = None, id: str | None = None, parent_thread_id: str | None = None) -> DBThread
    

    Create a new thread record.

    Args: session: Database session to use id: Optional ID to use for the thread. If not provided, a UUID will be generated. parent_thread_id: Optional ID of the parent thread

    Returns: The created DBThread instance

UsageType

class UsageType()

Custom type for Usage objects that stores them as JSON in the database.

Methods:

  • process_bind_param
    def process_bind_param(self, value: Usage | None, dialect: Any) -> dict[str, Any] | None
    
    Convert Usage to JSON before storing in DB.
  • process_result_value
    def process_result_value(self, value: dict[str, Any] | None, dialect: Any) -> Usage | None
    
    Convert JSON back to Usage when loading from DB.

Functions

create_db_and_tables

def create_db_and_tables(force: bool = False)

Create all database tables synchronously.

This is a synchronous alternative to create_db_and_tables() that can be used in contexts where asyncio.run() cannot be called.

Args: force: If True, drops all existing tables before creating new ones.

ensure_db_tables_exist

def ensure_db_tables_exist()

Ensure database tables exist, creating them if necessary.

This function creates all database tables directly without using migrations, which is more reliable than using Alembic migrations.

get_async_engine

def get_async_engine() -> AsyncEngine

Get the SQLAlchemy engine for async operations.

For SQLite databases, this uses aiosqlite. For other databases (e.g. PostgreSQL), this uses the provided URL directly.

The engine is cached by asyncio event loop to maintain compatibility with run_sync.

get_async_session

def get_async_session(session: AsyncSession | None = None) -> AsyncGenerator[AsyncSession, None]

Get an async database session.

This uses the async_sessionmaker pattern for more consistent session management. If a session is provided, it is returned as-is.

Args: session: An optional existing session to use. If provided, this function will yield it directly instead of creating a new one.

Yields: An async SQLAlchemy session

init_database_if_necessary

def init_database_if_necessary()

Initialize the database file if necessary.

This function only handles creating the database file and parent directories, it does not create tables. Tables are created by ensure_db_tables_exist().

is_sqlite

def is_sqlite() -> bool

Check if the configured database is SQLite.

serialize_message

def serialize_message(message: PydanticAIMessage) -> str

The ctx field in the RetryPromptPart is optionally dict[str, Any], which is not always serializable.

set_async_engine

def set_async_engine(engine: AsyncEngine)

Set the SQLAlchemy engine for async operations.

utc_now

def utc_now() -> datetime

Get the current UTC timestamp.


Parent Module: marvin