orchestrator#

context#

class BaseModelState(context: T)#

Bases: State[ConnectedModel], ABC

next_state: Type[BaseModelState] = None#
transitions() List[Tuple[Type[Condition], Type[State]]]#
valid_commands: Tuple[Type[NewTimeMessage | UpdateMessage | NoUpdateMessage | UpdateSeriesMessage | QuitMessage], ...] = ()#
valid_responses: Tuple[Type[RegistrationMessage | AcknowledgeMessage | ResultMessage | ErrorMessage], ...] = ()#
class Busy(context: T)#

Bases: WaitingForMessage

Base class for if a model is doing something and a response is required. While busy, one or more Command-s may come in, which need to be processed later. These are stored until the model returns and they can be processed

process_quit(msg: QuitMessage)#
process_update(msg: UpdateMessage)#
valid_commands: Tuple[Type[NewTimeMessage | UpdateMessage | NoUpdateMessage | UpdateSeriesMessage | QuitMessage], ...] = (<class 'movici_simulation_core.messages.UpdateMessage'>, <class 'movici_simulation_core.services.orchestrator.context.NoUpdateMessage'>, <class 'movici_simulation_core.messages.QuitMessage'>)#
valid_responses: Tuple[Type[RegistrationMessage | AcknowledgeMessage | ResultMessage | ErrorMessage], ...] = (<class 'movici_simulation_core.messages.ErrorMessage'>,)#
class ConnectedModel(name: str, timeline: ~movici_simulation_core.services.orchestrator.context.TimelineController, send: ~typing.Callable[[~movici_simulation_core.messages.Message], None], logger: ~logging.Logger = <factory>, publishes_to: ~typing.List[~movici_simulation_core.services.orchestrator.context.ConnectedModel] | None = <factory>, subscribed_to: ~typing.List[~movici_simulation_core.services.orchestrator.context.ConnectedModel] | None = <factory>, timer: ~movici_simulation_core.services.orchestrator.stopwatch.Stopwatch | None = None, pub: dict | None = <factory>, sub: dict | None = <factory>)#

Bases: object

Holds connection state and other data concerning a (connected) model

ack: bool = False#
busy: bool = False#
failed: bool = False#
fsm: FSM[ConnectedModel]#
log_invalid(message, valid_messages: Iterable[Type[Message]])#
logger: logging.Logger#
name: str#
next_time: t.Optional[int] = None#
pending_updates: t.List[UpdateMessage]#
pub: t.Optional[dict]#
publishes_to: t.Optional[t.List[ConnectedModel]]#
quit: t.Optional[QuitMessage] = None#
recv_event(event: Message)#
send: t.Callable[[Message], None]#
send_command(message: NewTimeMessage | UpdateMessage | NoUpdateMessage | UpdateSeriesMessage | QuitMessage) None#

If there are any messages in the queue, send the first one and start the timer, also, start waiting

sub: t.Optional[dict]#
subscribed_to: t.Optional[t.List[ConnectedModel]]#
timeline: TimelineController#
timer: Stopwatch = None#
class Context(models: 'ModelCollection', timeline: 'TimelineController', global_timer: 'Stopwatch' = None, phase_timer: 'Stopwatch' = None, logger: 'logging.Logger' = <factory>)#

Bases: object

property failed#
finalize()#
global_timer: Stopwatch = None#
log_elapsed_global_time(seconds: float)#
log_elapsed_phase_time(seconds: float)#
log_finalize_message()#
log_interconnectivity_matrix()#
log_new_phase(phase: str)#
log_new_time()#
logger: Logger#
models: ModelCollection#
phase_timer: Stopwatch = None#
timeline: TimelineController#
class Done(context: T)#

Bases: BaseModelState

The model is either done or failed, ignore any further messages

run()#
class Finalizing(context: T)#

Bases: Busy

A QuitMessage has been sent to the model, which needs to acknowledge it and shut down, ignore all commands

process_command(msg: NewTimeMessage | UpdateMessage | NoUpdateMessage | UpdateSeriesMessage | QuitMessage)#
valid_commands: Tuple[Type[NewTimeMessage | UpdateMessage | NoUpdateMessage | UpdateSeriesMessage | QuitMessage], ...] = (<class 'movici_simulation_core.messages.NewTimeMessage'>, <class 'movici_simulation_core.services.orchestrator.context.NoUpdateMessage'>, <class 'movici_simulation_core.messages.UpdateMessage'>, <class 'movici_simulation_core.messages.QuitMessage'>)#
valid_responses: Tuple[Type[RegistrationMessage | AcknowledgeMessage | ResultMessage | ErrorMessage], ...] = (<class 'movici_simulation_core.messages.AcknowledgeMessage'>, <class 'movici_simulation_core.messages.ErrorMessage'>)#
class Idle(context: T)#

Bases: WaitingForMessage

Base class for if a model is awaiting further instructions

process_new_time(msg: NewTimeMessage)#
process_quit(msg: QuitMessage)#
process_update(msg: UpdateMessage)#
valid_commands: Tuple[Type[NewTimeMessage | UpdateMessage | NoUpdateMessage | UpdateSeriesMessage | QuitMessage], ...] = (<class 'movici_simulation_core.messages.NewTimeMessage'>, <class 'movici_simulation_core.services.orchestrator.context.NoUpdateMessage'>, <class 'movici_simulation_core.messages.UpdateMessage'>, <class 'movici_simulation_core.messages.QuitMessage'>)#
valid_responses: Tuple[Type[RegistrationMessage | AcknowledgeMessage | ResultMessage | ErrorMessage], ...] = ()#
class ModelCollection#

Bases: dict, Dict[bytes, ConnectedModel]

property busy#
determine_interdependency()#

calculate the subscribers for every model based on the pub/sub mask.

property failed#
property next_time#
queue_all(message: Message)#

add a message to the queue of all models

queue_models_for_next_time()#

Queue an update message to the model(s) that have the specified next_time

reset_model_timers()#
property waiting_for#
class NewTime(context: T)#

Bases: Busy

A NewTime message has been sent to the model, and it needs to Acknowledge that

valid_commands: Tuple[Type[NewTimeMessage | UpdateMessage | NoUpdateMessage | UpdateSeriesMessage | QuitMessage], ...] = (<class 'movici_simulation_core.messages.UpdateMessage'>, <class 'movici_simulation_core.services.orchestrator.context.NoUpdateMessage'>, <class 'movici_simulation_core.messages.QuitMessage'>)#
valid_responses: Tuple[Type[RegistrationMessage | AcknowledgeMessage | ResultMessage | ErrorMessage], ...] = (<class 'movici_simulation_core.messages.AcknowledgeMessage'>, <class 'movici_simulation_core.messages.ErrorMessage'>)#
class NoUpdateMessage#

Bases: Message

A NoUpdateMessage is sent to a subscribed (dependent) ConnectedModel to indicate that it’s dependency has finished calculating (but not produced any data) so that the subscribed ConnectedModel can determine whether to send out any pending updates

class PendingMoreUpdates(context: T)#

Bases: Idle

The model has one or more dependencies that are still calculating. We wait until a dependency returns before re-evaluating whether we can send the update to the model

process_update(msg: UpdateMessage)#
valid_commands: Tuple[Type[NewTimeMessage | UpdateMessage | NoUpdateMessage | UpdateSeriesMessage | QuitMessage], ...] = (<class 'movici_simulation_core.services.orchestrator.context.NoUpdateMessage'>, <class 'movici_simulation_core.messages.UpdateMessage'>, <class 'movici_simulation_core.messages.QuitMessage'>)#
valid_responses: Tuple[Type[RegistrationMessage | AcknowledgeMessage | ResultMessage | ErrorMessage], ...] = ()#
class ProcessPendingQuit(context: T)#

Bases: BaseModelState

While the model was Busy, a QuitMessage came in which needs to be processed, this state doesn’t wait for messages

process_pending_quit()#
run()#
class ProcessPendingUpdates(context: T)#

Bases: BaseModelState

While the model was Busy, one or more updates came in which needs to be processed, this state doesn’t wait for messages. If the model has one or more dependencies that are still calculating, we can’t send the update yet but have to wait until all dependencies are finished

process_pending_updates()#
run()#
class Registration(context: T)#

Bases: Busy

A RegistrationMessage is expected from the model

on_enter()#
valid_commands: Tuple[Type[NewTimeMessage | UpdateMessage | NoUpdateMessage | UpdateSeriesMessage | QuitMessage], ...] = (<class 'movici_simulation_core.messages.QuitMessage'>,)#
valid_responses: Tuple[Type[RegistrationMessage | AcknowledgeMessage | ResultMessage | ErrorMessage], ...] = (<class 'movici_simulation_core.messages.RegistrationMessage'>, <class 'movici_simulation_core.messages.ErrorMessage'>)#
class TimelineController(start: 'int', end: 'int', current_time: 'int' = None)#

Bases: object

current_time: int = None#
end: int#
queue_for_next_time(models: ModelCollection)#
set_model_to_start(model: ConnectedModel)#
set_next_time(model: ConnectedModel, next_time: int | None = None)#
start: int#
class Updating(context: T)#

Bases: Busy

The model is processing an UpdateMessage and calculating a ResultMessage is expected

valid_commands: Tuple[Type[NewTimeMessage | UpdateMessage | NoUpdateMessage | UpdateSeriesMessage | QuitMessage], ...] = (<class 'movici_simulation_core.messages.UpdateMessage'>, <class 'movici_simulation_core.services.orchestrator.context.NoUpdateMessage'>, <class 'movici_simulation_core.messages.QuitMessage'>)#
valid_responses: Tuple[Type[RegistrationMessage | AcknowledgeMessage | ResultMessage | ErrorMessage], ...] = (<class 'movici_simulation_core.messages.ResultMessage'>, <class 'movici_simulation_core.messages.ErrorMessage'>)#
class WaitingForMessage(context: T)#

Bases: BaseModelState

handle_invalid(msg: Message, valid_messages: Iterable[Type[Message]])#
handle_response(msg: RegistrationMessage | AcknowledgeMessage | ResultMessage | ErrorMessage)#
notify_subscribers(command: NewTimeMessage | UpdateMessage | NoUpdateMessage | UpdateSeriesMessage | QuitMessage | None = None)#
process_command(msg: NewTimeMessage | UpdateMessage | NoUpdateMessage | UpdateSeriesMessage | QuitMessage)#
process_new_time(msg: NewTimeMessage)#
process_no_update(msg: NoUpdateMessage)#
process_quit(msg: QuitMessage)#
process_update(msg: UpdateMessage)#
recv_message(msg: Message)#
run()#

fsm#

class Always(context: T)#

Bases: Condition

met()#
class Condition(context: T)#

Bases: ABC, Generic[T]

abstract met() bool#
class Event#

Bases: object

class FSM(initial_state: Type[State], context: T | None = None, raise_on_done=True)#

Bases: Generic[T, E]

send(event: E)#
start()#
transition()#
exception FSMDone#

Bases: FSMError

exception FSMError#

Bases: Exception

exception FSMStarted#

Bases: FSMError

class State(context: T)#

Bases: ABC, Generic[T]

on_enter()#
abstract run()#
transitions() List[Tuple[Type[Condition], Type[State]]]#
fsm_conditional_raise(func=None, attribute: str | None = None, exc: Type[Exception] | None = None)#
next_state(state: State) Type[State] | None#
not_done(func=None, *, attribute: str = 'done', exc: t.Type[Exception] = <class 'movici_simulation_core.services.orchestrator.fsm.FSMDone'>)#
not_started(func=None, *, attribute: str = 'started', exc: t.Type[Exception] = <class 'movici_simulation_core.services.orchestrator.fsm.FSMStarted'>)#
send_silent(coro: Generator, value: Any)#

interconnectivity#

class IPubSubFilter(*args, **kwargs)#

Bases: Protocol

name: str#
pub: dict#
sub: dict#
subscribers: List[IPubSubFilter]#
format_matrix(models: Sequence[IPubSubFilter], title='', match='X')#
title     |0|1|2|3|
0|model_0 | |X|X|X|
1|model_1 | | | |X|
2|model_2 | | | |X|
3|model_10| | | | |

service#

class Orchestrator#

Bases: Service

The class that manages the timeline and acts as a broker between models

context: Context#
fsm: FSM[Context, Tuple[str, Message]]#
classmethod install(sim: Simulation)#
logger: Logger#
make_send(identifier: str)#

create a send function that a can be used to send a message to a specific client connected to the ZMQ Router

run()#
settings: Settings#
setup(*, settings: Settings, stream: Stream, logger: Logger, **_)#
stream: Stream#
timeline: TimelineController#

states#

class AllModelsDone(context: T)#

Bases: OrchestratorCondition

met() bool#
class AllModelsReady(context: T)#

Bases: OrchestratorCondition

met() bool#
class EndFinalizingPhase(context: T)#

Bases: OrchestratorState

run()#
transitions() List[Tuple[Type[Condition], Type[State]]]#
class Failed(context: T)#

Bases: OrchestratorCondition

met() bool#
class FinalizingWaitForModels(context: T)#

Bases: WaitForModels

transitions() List[Tuple[Type[Condition], Type[State]]]#
class ModelsRegistration(context: T)#

Bases: WaitForModels

transitions() List[Tuple[Type[Condition], Type[State]]]#
class NewTime(context: T)#

Bases: OrchestratorState

run()#
transitions() List[Tuple[Type[Condition], Type[State]]]#
class OrchestratorCondition(context: T)#

Bases: Condition[Context], ABC

class OrchestratorState(context: T)#

Bases: State[Context]

class StartFinalizingPhase(context: T)#

Bases: OrchestratorState

run()#
transitions() List[Tuple[Type[Condition], Type[State]]]#
class StartInitializingPhase(context: T)#

Bases: OrchestratorState

run()#
transitions()#
class StartRunningPhase(context: T)#

Bases: OrchestratorState

run()#
transitions() List[Tuple[Type[Condition], Type[State]]]#
class WaitForModels(context: T)#

Bases: OrchestratorState, ABC

run()#
class WaitForResults(context: T)#

Bases: WaitForModels

transitions() List[Tuple[Type[Condition], Type[State]]]#

stopwatch#

class ReportingStopwatch(on_stop: ~typing.Callable[[float], None] | None = None, on_reset: ~typing.Callable[[float], None] | None = None, now_func: ~typing.Callable[[], float] = <built-in function monotonic>, ignore_errors=False)#

Bases: Stopwatch

reset() float#

returns Stopwatch.total_elapsed

stop() float#

returns Stopwatch.elapsed

class Stopwatch(now_func: Callable[[], float] | None = None, ignore_errors=False)#

Bases: object

property elapsed#
reset() float#

returns Stopwatch.total_elapsed

restart() float#
property running#
start()#
stop() float#

returns Stopwatch.elapsed

property total_elapsed#

Module contents#

class Orchestrator#

Bases: Service

The class that manages the timeline and acts as a broker between models

context: Context#
fsm: FSM[Context, Tuple[str, Message]]#
classmethod install(sim: Simulation)#
logger: Logger#
make_send(identifier: str)#

create a send function that a can be used to send a message to a specific client connected to the ZMQ Router

run()#
settings: Settings#
setup(*, settings: Settings, stream: Stream, logger: Logger, **_)#
stream: Stream#
timeline: TimelineController#