marvin.engine.orchestrator

Constants

T

T = TypeVar('T')

Classes

Orchestrator

class Orchestrator(tasks: list[Task[Any]], thread: Thread | str | None = None, handlers: list[Handler | AsyncHandler] | None = None)

Methods:

  • end_turn

    def end_turn(self, result: AgentRunResult, actor: Actor)
    
  • get_all_tasks

    def get_all_tasks(self, filter: Literal['incomplete', 'ready'] | None = None) -> list[Task[Any]]
    

    Get all tasks, optionally filtered by status.

    Filters:

    • incomplete: tasks that are not yet complete
    • ready: tasks that are ready to be run
  • get_current

    def get_current(cls) -> Optional[Orchestrator]
    

    Get the current orchestrator from context.

  • handle_event

    def handle_event(self, event: Event)
    
  • run

    def run(self, raise_on_failure: bool = True, max_turns: int | float | None = None) -> list[AgentRunResult]
    
  • run_once

    def run_once(self, actor: Actor | None = None) -> AgentRunResult
    
  • start_turn

    def start_turn(self, actor: Actor)
    

SystemPrompt

class SystemPrompt(source: str | Path = Path('system.jinja'), actor: Actor, instructions: list[str], tasks: list[Task])

Functions

get_current_orchestrator

def get_current_orchestrator() -> Orchestrator | None

Get the currently active orchestrator from context.

Returns: The current Orchestrator instance or None if no orchestrator is active.


Parent Module: engine