model_connector#
connector#
- class ConnectorStreamHandler(connector: 'ModelConnector', stream: 'Stream[Message]')#
Bases:
object
- connector: ModelConnector#
- handle_message(msg: Message)#
- handle_message(msg: NewTimeMessage)
- handle_message(msg: UpdateMessage)
- handle_message(msg: UpdateSeriesMessage)
- handle_message(msg: QuitMessage)
- initialize()#
- class ModelConnector(model: 'ModelAdapterBase', updates: 'UpdateDataClient', init_data: 'InitDataHandler', name: 't.Optional[str]' = None)#
Bases:
object
- close(message: QuitMessage)#
- init_data: InitDataHandler#
- initialize() RegistrationMessage #
- model: ModelAdapterBase#
- name: str | None = None#
- new_time(message: NewTimeMessage)#
- update(update: UpdateMessage) ResultMessage #
- update_series(update: UpdateSeriesMessage) ResultMessage #
- updates: UpdateDataClient#
init_data#
- class DirectoryInitDataHandler(root: 'pathlib.Path')#
Bases:
InitDataHandler
- root: Path#
- class InitDataClient(name: str, server: str, sockets: Sockets | None = None)#
Bases:
RequestClient
- get(key: str, mask: dict | None = None) Path | None #
- class InitDataHandler#
Bases:
InitDataHandlerBase
- class ServicedInitDataHandler(name: 'str', server: 'str')#
Bases:
InitDataHandler
- client: InitDataClient#
- close()#
- name: str#
- server: str#
Module contents#
- class ConnectorStreamHandler(connector: 'ModelConnector', stream: 'Stream[Message]')#
Bases:
object
- connector: ModelConnector#
- handle_message(msg: Message)#
- handle_message(msg: NewTimeMessage)
- handle_message(msg: UpdateMessage)
- handle_message(msg: UpdateSeriesMessage)
- handle_message(msg: QuitMessage)
- initialize()#
- class DirectoryInitDataHandler(root: 'pathlib.Path')#
Bases:
InitDataHandler
- root: Path#
- class InitDataClient(name: str, server: str, sockets: Sockets | None = None)#
Bases:
RequestClient
- get(key: str, mask: dict | None = None) Path | None #
- class InitDataHandler#
Bases:
InitDataHandlerBase
- class ModelConnector(model: 'ModelAdapterBase', updates: 'UpdateDataClient', init_data: 'InitDataHandler', name: 't.Optional[str]' = None)#
Bases:
object
- close(message: QuitMessage)#
- init_data: InitDataHandler#
- initialize() RegistrationMessage #
- model: ModelAdapterBase#
- name: str | None = None#
- new_time(message: NewTimeMessage)#
- update(update: UpdateMessage) ResultMessage #
- update_series(update: UpdateSeriesMessage) ResultMessage #
- updates: UpdateDataClient#
- class ServicedInitDataHandler(name: 'str', server: 'str')#
Bases:
InitDataHandler
- client: InitDataClient#
- close()#
- name: str#
- server: str#