marvin.engine.events
Classes
ActorEndTurnEvent
class ActorEndTurnEvent(type: EventType, id: uuid.UUID = lambda: uuid.uuid4()(), timestamp: datetime.datetime = lambda: datetime.datetime.now(datetime.timezone.utc)(), actor: Actor)
Event for agent turn end.
ActorMessageDeltaEvent
class ActorMessageDeltaEvent(type: EventType, id: uuid.UUID = lambda: uuid.uuid4()(), timestamp: datetime.datetime = lambda: datetime.datetime.now(datetime.timezone.utc)(), actor: Actor, delta: TextPartDelta, snapshot: TextPart)
Event for delta updates to agent messages during streaming.
ActorMessageEvent
class ActorMessageEvent(type: EventType, id: uuid.UUID = lambda: uuid.uuid4()(), timestamp: datetime.datetime = lambda: datetime.datetime.now(datetime.timezone.utc)(), actor: Actor, message: TextPart)
Event for complete text messages from an agent.
ActorStartTurnEvent
class ActorStartTurnEvent(type: EventType, id: uuid.UUID = lambda: uuid.uuid4()(), timestamp: datetime.datetime = lambda: datetime.datetime.now(datetime.timezone.utc)(), actor: Actor)
Event for agent turn start.
class EndTurnToolCallEvent(type: EventType, id: uuid.UUID = lambda: uuid.uuid4()(), timestamp: datetime.datetime = lambda: datetime.datetime.now(datetime.timezone.utc)(), actor: Actor, event: FinalResultEvent, tool_call_id: str, tool: EndTurn)
Event that fires as soon as we know that an end turn tool call has been made.
class EndTurnToolResultEvent(type: EventType, id: uuid.UUID = lambda: uuid.uuid4()(), timestamp: datetime.datetime = lambda: datetime.datetime.now(datetime.timezone.utc)(), actor: Actor, result: FinalResult, tool_call_id: str, tool: EndTurn)
Event for the final result from an end turn tool.
Event
class Event(type: EventType, id: uuid.UUID = lambda: uuid.uuid4()(), timestamp: datetime.datetime = lambda: datetime.datetime.now(datetime.timezone.utc)())
Base class for all events in the system.
OrchestratorEndEvent
class OrchestratorEndEvent(type: EventType, id: uuid.UUID = lambda: uuid.uuid4()(), timestamp: datetime.datetime = lambda: datetime.datetime.now(datetime.timezone.utc)())
Event for orchestrator end.
OrchestratorErrorEvent
class OrchestratorErrorEvent(type: EventType, id: uuid.UUID = lambda: uuid.uuid4()(), timestamp: datetime.datetime = lambda: datetime.datetime.now(datetime.timezone.utc)(), error: str)
Event for orchestrator exceptions.
OrchestratorStartEvent
class OrchestratorStartEvent(type: EventType, id: uuid.UUID = lambda: uuid.uuid4()(), timestamp: datetime.datetime = lambda: datetime.datetime.now(datetime.timezone.utc)())
Event for orchestrator start.
class ToolCallDeltaEvent(type: EventType, id: uuid.UUID = lambda: uuid.uuid4()(), timestamp: datetime.datetime = lambda: datetime.datetime.now(datetime.timezone.utc)(), actor: Actor, delta: ToolCallPartDelta, snapshot: ToolCallPart, tool_call_id: str, tool: Callable[..., Any] | None)
Event for delta updates to tool calls during streaming.
Methods:
args_dict
def args_dict(self) -> dict[str, Any]
Return the args as a dictionary.
class ToolCallEvent(type: EventType, id: uuid.UUID = lambda: uuid.uuid4()(), timestamp: datetime.datetime = lambda: datetime.datetime.now(datetime.timezone.utc)(), actor: Actor, message: ToolCallPart, tool_call_id: str, tool: Callable[..., Any] | None)
Event for complete tool calls ready to be executed.
Methods:
args_dict
def args_dict(self) -> dict[str, Any]
Return the args as a dictionary.
class ToolResultEvent(type: EventType, id: uuid.UUID = lambda: uuid.uuid4()(), timestamp: datetime.datetime = lambda: datetime.datetime.now(datetime.timezone.utc)(), message: ToolReturnPart)
Event for tool return values.
class ToolRetryEvent(type: EventType, id: uuid.UUID = lambda: uuid.uuid4()(), timestamp: datetime.datetime = lambda: datetime.datetime.now(datetime.timezone.utc)(), message: RetryPromptPart)
Event for tool retry requests.
UserMessageEvent
class UserMessageEvent(type: EventType, id: uuid.UUID = lambda: uuid.uuid4()(), timestamp: datetime.datetime = lambda: datetime.datetime.now(datetime.timezone.utc)(), message: UserPromptPart)
Event for user messages.
Functions
get_text_from_parts
def get_text_from_parts(parts: list[ModelResponsePart]) -> str
Extract text content from a list of ModelResponseParts.
Parent Module: engine