orchestrator#
context#
- class BaseModelState(context: T)#
Bases:
State
[ConnectedModel
],ABC
- next_state: Type[BaseModelState] = None#
- valid_commands: Tuple[Type[NewTimeMessage | UpdateMessage | NoUpdateMessage | UpdateSeriesMessage | QuitMessage], ...] = ()#
- valid_responses: Tuple[Type[RegistrationMessage | AcknowledgeMessage | ResultMessage | ErrorMessage], ...] = ()#
- class Busy(context: T)#
Bases:
WaitingForMessage
Base class for if a model is doing something and a response is required. While busy, one or more Command-s may come in, which need to be processed later. These are stored until the model returns and they can be processed
- process_quit(msg: QuitMessage)#
- process_update(msg: UpdateMessage)#
- valid_commands: Tuple[Type[NewTimeMessage | UpdateMessage | NoUpdateMessage | UpdateSeriesMessage | QuitMessage], ...] = (<class 'movici_simulation_core.messages.UpdateMessage'>, <class 'movici_simulation_core.services.orchestrator.context.NoUpdateMessage'>, <class 'movici_simulation_core.messages.QuitMessage'>)#
- valid_responses: Tuple[Type[RegistrationMessage | AcknowledgeMessage | ResultMessage | ErrorMessage], ...] = (<class 'movici_simulation_core.messages.ErrorMessage'>,)#
- class ConnectedModel(name: str, timeline: ~movici_simulation_core.services.orchestrator.context.TimelineController, send: ~typing.Callable[[~movici_simulation_core.messages.Message], None], logger: ~logging.Logger = <factory>, publishes_to: ~typing.List[~movici_simulation_core.services.orchestrator.context.ConnectedModel] | None = <factory>, subscribed_to: ~typing.List[~movici_simulation_core.services.orchestrator.context.ConnectedModel] | None = <factory>, timer: ~movici_simulation_core.services.orchestrator.stopwatch.Stopwatch | None = None, pub: dict | None = <factory>, sub: dict | None = <factory>)#
Bases:
object
Holds connection state and other data concerning a (connected) model
- ack: bool = False#
- busy: bool = False#
- failed: bool = False#
- fsm: FSM[ConnectedModel]#
- logger: logging.Logger#
- name: str#
- next_time: t.Optional[int] = None#
- pending_updates: t.List[UpdateMessage]#
- pub: t.Optional[dict]#
- publishes_to: t.Optional[t.List[ConnectedModel]]#
- quit: t.Optional[QuitMessage] = None#
- send_command(message: NewTimeMessage | UpdateMessage | NoUpdateMessage | UpdateSeriesMessage | QuitMessage) None #
If there are any messages in the queue, send the first one and start the timer, also, start waiting
- sub: t.Optional[dict]#
- subscribed_to: t.Optional[t.List[ConnectedModel]]#
- timeline: TimelineController#
- class Context(models: 'ModelCollection', timeline: 'TimelineController', global_timer: 'Stopwatch' = None, phase_timer: 'Stopwatch' = None, logger: 'logging.Logger' = <factory>)#
Bases:
object
- property failed#
- finalize()#
- log_elapsed_global_time(seconds: float)#
- log_elapsed_phase_time(seconds: float)#
- log_finalize_message()#
- log_interconnectivity_matrix()#
- log_new_phase(phase: str)#
- log_new_time()#
- logger: Logger#
- models: ModelCollection#
- timeline: TimelineController#
- class Done(context: T)#
Bases:
BaseModelState
The model is either done or failed, ignore any further messages
- run()#
- class Finalizing(context: T)#
Bases:
Busy
A QuitMessage has been sent to the model, which needs to acknowledge it and shut down, ignore all commands
- process_command(msg: NewTimeMessage | UpdateMessage | NoUpdateMessage | UpdateSeriesMessage | QuitMessage)#
- valid_commands: Tuple[Type[NewTimeMessage | UpdateMessage | NoUpdateMessage | UpdateSeriesMessage | QuitMessage], ...] = (<class 'movici_simulation_core.messages.NewTimeMessage'>, <class 'movici_simulation_core.services.orchestrator.context.NoUpdateMessage'>, <class 'movici_simulation_core.messages.UpdateMessage'>, <class 'movici_simulation_core.messages.QuitMessage'>)#
- valid_responses: Tuple[Type[RegistrationMessage | AcknowledgeMessage | ResultMessage | ErrorMessage], ...] = (<class 'movici_simulation_core.messages.AcknowledgeMessage'>, <class 'movici_simulation_core.messages.ErrorMessage'>)#
- class Idle(context: T)#
Bases:
WaitingForMessage
Base class for if a model is awaiting further instructions
- process_new_time(msg: NewTimeMessage)#
- process_quit(msg: QuitMessage)#
- process_update(msg: UpdateMessage)#
- valid_commands: Tuple[Type[NewTimeMessage | UpdateMessage | NoUpdateMessage | UpdateSeriesMessage | QuitMessage], ...] = (<class 'movici_simulation_core.messages.NewTimeMessage'>, <class 'movici_simulation_core.services.orchestrator.context.NoUpdateMessage'>, <class 'movici_simulation_core.messages.UpdateMessage'>, <class 'movici_simulation_core.messages.QuitMessage'>)#
- valid_responses: Tuple[Type[RegistrationMessage | AcknowledgeMessage | ResultMessage | ErrorMessage], ...] = ()#
- class ModelCollection#
Bases:
dict
,Dict
[bytes
,ConnectedModel
]- property busy#
- determine_interdependency()#
calculate the subscribers for every model based on the pub/sub mask.
- property failed#
- property next_time#
- queue_models_for_next_time()#
Queue an update message to the model(s) that have the specified next_time
- reset_model_timers()#
- property waiting_for#
- class NewTime(context: T)#
Bases:
Busy
A NewTime message has been sent to the model, and it needs to Acknowledge that
- valid_commands: Tuple[Type[NewTimeMessage | UpdateMessage | NoUpdateMessage | UpdateSeriesMessage | QuitMessage], ...] = (<class 'movici_simulation_core.messages.UpdateMessage'>, <class 'movici_simulation_core.services.orchestrator.context.NoUpdateMessage'>, <class 'movici_simulation_core.messages.QuitMessage'>)#
- valid_responses: Tuple[Type[RegistrationMessage | AcknowledgeMessage | ResultMessage | ErrorMessage], ...] = (<class 'movici_simulation_core.messages.AcknowledgeMessage'>, <class 'movici_simulation_core.messages.ErrorMessage'>)#
- class NoUpdateMessage#
Bases:
Message
A NoUpdateMessage is sent to a subscribed (dependent) ConnectedModel to indicate that it’s dependency has finished calculating (but not produced any data) so that the subscribed ConnectedModel can determine whether to send out any pending updates
- class PendingMoreUpdates(context: T)#
Bases:
Idle
The model has one or more dependencies that are still calculating. We wait until a dependency returns before re-evaluating whether we can send the update to the model
- process_update(msg: UpdateMessage)#
- valid_commands: Tuple[Type[NewTimeMessage | UpdateMessage | NoUpdateMessage | UpdateSeriesMessage | QuitMessage], ...] = (<class 'movici_simulation_core.services.orchestrator.context.NoUpdateMessage'>, <class 'movici_simulation_core.messages.UpdateMessage'>, <class 'movici_simulation_core.messages.QuitMessage'>)#
- valid_responses: Tuple[Type[RegistrationMessage | AcknowledgeMessage | ResultMessage | ErrorMessage], ...] = ()#
- class ProcessPendingQuit(context: T)#
Bases:
BaseModelState
While the model was Busy, a QuitMessage came in which needs to be processed, this state doesn’t wait for messages
- process_pending_quit()#
- run()#
- class ProcessPendingUpdates(context: T)#
Bases:
BaseModelState
While the model was Busy, one or more updates came in which needs to be processed, this state doesn’t wait for messages. If the model has one or more dependencies that are still calculating, we can’t send the update yet but have to wait until all dependencies are finished
- process_pending_updates()#
- run()#
- class Registration(context: T)#
Bases:
Busy
A RegistrationMessage is expected from the model
- on_enter()#
- valid_commands: Tuple[Type[NewTimeMessage | UpdateMessage | NoUpdateMessage | UpdateSeriesMessage | QuitMessage], ...] = (<class 'movici_simulation_core.messages.QuitMessage'>,)#
- valid_responses: Tuple[Type[RegistrationMessage | AcknowledgeMessage | ResultMessage | ErrorMessage], ...] = (<class 'movici_simulation_core.messages.RegistrationMessage'>, <class 'movici_simulation_core.messages.ErrorMessage'>)#
- class TimelineController(start: 'int', end: 'int', current_time: 'int' = None)#
Bases:
object
- current_time: int = None#
- end: int#
- queue_for_next_time(models: ModelCollection)#
- set_model_to_start(model: ConnectedModel)#
- set_next_time(model: ConnectedModel, next_time: int | None = None)#
- start: int#
- class Updating(context: T)#
Bases:
Busy
The model is processing an UpdateMessage and calculating a ResultMessage is expected
- valid_commands: Tuple[Type[NewTimeMessage | UpdateMessage | NoUpdateMessage | UpdateSeriesMessage | QuitMessage], ...] = (<class 'movici_simulation_core.messages.UpdateMessage'>, <class 'movici_simulation_core.services.orchestrator.context.NoUpdateMessage'>, <class 'movici_simulation_core.messages.QuitMessage'>)#
- valid_responses: Tuple[Type[RegistrationMessage | AcknowledgeMessage | ResultMessage | ErrorMessage], ...] = (<class 'movici_simulation_core.messages.ResultMessage'>, <class 'movici_simulation_core.messages.ErrorMessage'>)#
- class WaitingForMessage(context: T)#
Bases:
BaseModelState
- handle_response(msg: RegistrationMessage | AcknowledgeMessage | ResultMessage | ErrorMessage)#
- notify_subscribers(command: NewTimeMessage | UpdateMessage | NoUpdateMessage | UpdateSeriesMessage | QuitMessage | None = None)#
- process_command(msg: NewTimeMessage | UpdateMessage | NoUpdateMessage | UpdateSeriesMessage | QuitMessage)#
- process_new_time(msg: NewTimeMessage)#
- process_no_update(msg: NoUpdateMessage)#
- process_quit(msg: QuitMessage)#
- process_update(msg: UpdateMessage)#
- run()#
fsm#
- class Event#
Bases:
object
- class FSM(initial_state: Type[State], context: T | None = None, raise_on_done=True)#
Bases:
Generic
[T
,E
]- send(event: E)#
- start()#
- transition()#
- exception FSMError#
Bases:
Exception
- fsm_conditional_raise(func=None, attribute: str | None = None, exc: Type[Exception] | None = None)#
- not_done(func=None, *, attribute: str = 'done', exc: t.Type[Exception] = <class 'movici_simulation_core.services.orchestrator.fsm.FSMDone'>)#
- not_started(func=None, *, attribute: str = 'started', exc: t.Type[Exception] = <class 'movici_simulation_core.services.orchestrator.fsm.FSMStarted'>)#
- send_silent(coro: Generator, value: Any)#
interconnectivity#
- class IPubSubFilter(*args, **kwargs)#
Bases:
Protocol
- name: str#
- pub: dict#
- sub: dict#
- subscribers: List[IPubSubFilter]#
- format_matrix(models: Sequence[IPubSubFilter], title='', match='X')#
title |0|1|2|3| 0|model_0 | |X|X|X| 1|model_1 | | | |X| 2|model_2 | | | |X| 3|model_10| | | | |
service#
- class Orchestrator#
Bases:
Service
The class that manages the timeline and acts as a broker between models
- classmethod install(sim: Simulation)#
- logger: Logger#
- make_send(identifier: str)#
create a send function that a can be used to send a message to a specific client connected to the ZMQ Router
- run()#
- timeline: TimelineController#
states#
- class AllModelsDone(context: T)#
Bases:
OrchestratorCondition
- met() bool #
- class AllModelsReady(context: T)#
Bases:
OrchestratorCondition
- met() bool #
- class EndFinalizingPhase(context: T)#
Bases:
OrchestratorState
- run()#
- class Failed(context: T)#
Bases:
OrchestratorCondition
- met() bool #
- class FinalizingWaitForModels(context: T)#
Bases:
WaitForModels
- class ModelsRegistration(context: T)#
Bases:
WaitForModels
- class NewTime(context: T)#
Bases:
OrchestratorState
- run()#
- class StartFinalizingPhase(context: T)#
Bases:
OrchestratorState
- run()#
- class StartInitializingPhase(context: T)#
Bases:
OrchestratorState
- run()#
- transitions()#
- class StartRunningPhase(context: T)#
Bases:
OrchestratorState
- run()#
- class WaitForModels(context: T)#
Bases:
OrchestratorState
,ABC
- run()#
- class WaitForResults(context: T)#
Bases:
WaitForModels
stopwatch#
- class ReportingStopwatch(on_stop: ~typing.Callable[[float], None] | None = None, on_reset: ~typing.Callable[[float], None] | None = None, now_func: ~typing.Callable[[], float] = <built-in function monotonic>, ignore_errors=False)#
Bases:
Stopwatch
- reset() float #
returns Stopwatch.total_elapsed
- stop() float #
returns Stopwatch.elapsed
Module contents#
- class Orchestrator#
Bases:
Service
The class that manages the timeline and acts as a broker between models
- classmethod install(sim: Simulation)#
- logger: Logger#
- make_send(identifier: str)#
create a send function that a can be used to send a message to a specific client connected to the ZMQ Router
- run()#
- timeline: TimelineController#