model_connector#
connector#
- class ConnectorStreamHandler(connector: 'ModelConnector', stream: 'Stream[Message]')#
Bases:
object- connector: ModelConnector#
- handle_message(msg: Message)#
- handle_message(msg: NewTimeMessage)
- handle_message(msg: UpdateMessage)
- handle_message(msg: UpdateSeriesMessage)
- handle_message(msg: QuitMessage)
- initialize()#
- class ModelConnector(model: 'ModelAdapterBase', updates: 'UpdateDataClient', init_data: 'InitDataHandler', name: 't.Optional[str]' = None)#
Bases:
object- close(message: QuitMessage)#
- init_data: InitDataHandler#
- initialize() RegistrationMessage#
- model: ModelAdapterBase#
- name: Optional[str] = None#
- new_time(message: NewTimeMessage)#
- update(update: UpdateMessage) ResultMessage#
- update_series(update: UpdateSeriesMessage) ResultMessage#
- updates: UpdateDataClient#
init_data#
- class DirectoryInitDataHandler(root: 'pathlib.Path')#
Bases:
InitDataHandler- root: Path#
- class InitDataClient(name: str, server: str, sockets: Optional[Sockets] = None)#
Bases:
RequestClient- get(key: str, mask: Optional[dict] = None) Optional[Path]#
- class InitDataHandler#
Bases:
InitDataHandlerBase
- class ServicedInitDataHandler(name: 'str', server: 'str')#
Bases:
InitDataHandler- client: InitDataClient#
- close()#
- name: str#
- server: str#
Module contents#
- class ConnectorStreamHandler(connector: 'ModelConnector', stream: 'Stream[Message]')#
Bases:
object- connector: ModelConnector#
- handle_message(msg: Message)#
- handle_message(msg: NewTimeMessage)
- handle_message(msg: UpdateMessage)
- handle_message(msg: UpdateSeriesMessage)
- handle_message(msg: QuitMessage)
- initialize()#
- class DirectoryInitDataHandler(root: 'pathlib.Path')#
Bases:
InitDataHandler- root: Path#
- class InitDataClient(name: str, server: str, sockets: Optional[Sockets] = None)#
Bases:
RequestClient- get(key: str, mask: Optional[dict] = None) Optional[Path]#
- class InitDataHandler#
Bases:
InitDataHandlerBase
- class ModelConnector(model: 'ModelAdapterBase', updates: 'UpdateDataClient', init_data: 'InitDataHandler', name: 't.Optional[str]' = None)#
Bases:
object- close(message: QuitMessage)#
- init_data: InitDataHandler#
- initialize() RegistrationMessage#
- model: ModelAdapterBase#
- name: Optional[str] = None#
- new_time(message: NewTimeMessage)#
- update(update: UpdateMessage) ResultMessage#
- update_series(update: UpdateSeriesMessage) ResultMessage#
- updates: UpdateDataClient#
- class ServicedInitDataHandler(name: 'str', server: 'str')#
Bases:
InitDataHandler- client: InitDataClient#
- close()#
- name: str#
- server: str#