model_connector#

connector#

class ConnectorStreamHandler(connector: 'ModelConnector', stream: 'Stream[Message]')#

Bases: object

connector: ModelConnector#
handle_message(msg: Message)#
handle_message(msg: NewTimeMessage)
handle_message(msg: UpdateMessage)
handle_message(msg: UpdateSeriesMessage)
handle_message(msg: QuitMessage)
initialize()#
stream: Stream[Message]#
class ModelConnector(model: 'ModelAdapterBase', updates: 'UpdateDataClient', init_data: 'InitDataHandler', name: 't.Optional[str]' = None)#

Bases: object

close(message: QuitMessage)#
data_mask: DataMask | None = None#
init_data: InitDataHandler#
initialize() RegistrationMessage#
model: ModelAdapterBase#
name: str | None = None#
new_time(message: NewTimeMessage)#
update(update: UpdateMessage) ResultMessage#
update_series(update: UpdateSeriesMessage) ResultMessage#
updates: UpdateDataClient#
class UpdateDataClient(name: str, home_address: str, sockets: Sockets | None = None)#

Bases: RequestClient

clear()#
counter: Iterator[str]#
get(address: str, key: str, mask: dict | None) bytes#
home_address: str#
put(data: bytes) Tuple[str, str]#
reset_counter()#

init_data#

class DirectoryInitDataHandler(root: 'pathlib.Path')#

Bases: InitDataHandler

get(name: str) Tuple[FileType | None, DatasetPath | None]#
get_type_and_path(path) Tuple[FileType, DatasetPath]#
root: Path#
class InitDataClient(name: str, server: str, sockets: Sockets | None = None)#

Bases: RequestClient

get(key: str, mask: dict | None = None) Path | None#
class InitDataHandler#

Bases: InitDataHandlerBase

ensure_ftype(name: str, ftype: FileType)#
get(name: str) Tuple[FileType | None, DatasetPath | None]#
get_type_and_path(path) Tuple[FileType, DatasetPath]#
class ServicedInitDataHandler(name: 'str', server: 'str')#

Bases: InitDataHandler

client: InitDataClient#
close()#
get(name: str) Tuple[FileType | None, DatasetPath | None]#
name: str#
server: str#

Module contents#

class ConnectorStreamHandler(connector: 'ModelConnector', stream: 'Stream[Message]')#

Bases: object

connector: ModelConnector#
handle_message(msg: Message)#
handle_message(msg: NewTimeMessage)
handle_message(msg: UpdateMessage)
handle_message(msg: UpdateSeriesMessage)
handle_message(msg: QuitMessage)
initialize()#
stream: Stream[Message]#
class DirectoryInitDataHandler(root: 'pathlib.Path')#

Bases: InitDataHandler

get(name: str) Tuple[FileType | None, DatasetPath | None]#
get_type_and_path(path) Tuple[FileType, DatasetPath]#
root: Path#
class InitDataClient(name: str, server: str, sockets: Sockets | None = None)#

Bases: RequestClient

get(key: str, mask: dict | None = None) Path | None#
class InitDataHandler#

Bases: InitDataHandlerBase

ensure_ftype(name: str, ftype: FileType)#
get(name: str) Tuple[FileType | None, DatasetPath | None]#
get_type_and_path(path) Tuple[FileType, DatasetPath]#
class ModelConnector(model: 'ModelAdapterBase', updates: 'UpdateDataClient', init_data: 'InitDataHandler', name: 't.Optional[str]' = None)#

Bases: object

close(message: QuitMessage)#
data_mask: DataMask | None = None#
init_data: InitDataHandler#
initialize() RegistrationMessage#
model: ModelAdapterBase#
name: str | None = None#
new_time(message: NewTimeMessage)#
update(update: UpdateMessage) ResultMessage#
update_series(update: UpdateSeriesMessage) ResultMessage#
updates: UpdateDataClient#
class ServicedInitDataHandler(name: 'str', server: 'str')#

Bases: InitDataHandler

client: InitDataClient#
close()#
get(name: str) Tuple[FileType | None, DatasetPath | None]#
name: str#
server: str#
class UpdateDataClient(name: str, home_address: str, sockets: Sockets | None = None)#

Bases: RequestClient

clear()#
counter: Iterator[str]#
get(address: str, key: str, mask: dict | None) bytes#
home_address: str#
put(data: bytes) Tuple[str, str]#
reset_counter()#