movici_simulation_core#
Subpackages#
attributes#
This module contains AttributeSpec
objects
for common, generic, attribute types that can be used in datasets
csr#
- assert_numeric_array(arr)#
- csr_binop(data, row_ptr, operand, operator)#
Perform binary operation
operator
rowwise on a csr array, the operand must be a 1d array of length equal to the number of rows in the csr array
- generate_update(data, row_ptr, mask, changed, undefined)#
- get_new_csr_array(row_lengths, dtype, secondary_shape)#
- get_row(data, row_ptr, index)#
- isclose(a, b, rtol=1e-05, atol=1e-08, equal_nan=False)#
Versatile function to determine whether two arrays, or an array and a value are close. Uses np.isclose for numeric values and arrays, and custom implementations for string and unicode arrays. This converts unicode arrays so that they are of uniform size and can be properly used in numba jit-compiled functions
- isclose_numba(a, b, rtol=1e-05, atol=1e-08, equal_nan=False)#
- reduce_rows(data, row_ptr, func, *args)#
- remove_undefined_csr(data: ndarray, row_ptr: ndarray, indices: ndarray, undefined, num_undefined, new_data_shape) Tuple[ndarray, ndarray, ndarray] #
- row_wise_max(data, row_ptr, empty_row=None)#
- row_wise_min(data, row_ptr, empty_row=None)#
- row_wise_sum(data, row_ptr)#
- rows_contain(data, row_ptr, val, rtol=1e-05, atol=1e-08, equal_nan=False)#
- rows_equal(data, row_ptr, row, rtol=1e-05, atol=1e-08, equal_nan=False)#
- rows_intersect(data, row_ptr, vals, rtol=1e-05, atol=1e-08, equal_nan=False)#
- set_row(data, row_ptr, index, new_row)#
Set a new row on at the specific index of the csr_array. WARNING: the length of the new row must be allocated in the data array, otherwise this function may override other rows
- slice_csr_array(data, row_ptr, indices)#
- update_csr_array(data, row_ptr, upd_data, upd_row_ptr, upd_indices, changes=None, rtol=1e-05, atol=1e-08, equal_nan=False)#
Update a csr array (data and row_ptr) in place with an update csr array (upd_data and upd_row_ptr at the locations upd_indices. data and upd_data must be of the same dtype and may only differ in shape in the first dimension. Can optionally track changes by changes output argument as an boolean array of zeros that has the length equal to the number of rows in of the data csr array ( len( row_ptr)-1). When tracking changes rtol, atol and equal_nan mean the same as in np.isclose
exceptions#
- exception InvalidMessage#
Bases:
SimulationException
- exception NotReady#
Bases:
SimulationException
- exception OrchestratorException#
Bases:
SimulationException
- exception SimulationException#
Bases:
Exception
- exception SimulationExit#
Bases:
OrchestratorException
- exception StartupFailure#
Bases:
SimulationException
- exception StreamDone#
Bases:
SimulationException
messages#
- class BaseUpdateMessage#
Bases:
object
- address: str | None#
- property has_data#
- key: str | None#
- origin: str | None#
- class DataMessage(data: 'bytes')#
Bases:
Message
- data: bytes#
- size: int#
- to_bytes() Sequence[bytes] #
- class GetDataMessage(key: 'str', mask: 't.Optional[dict]' = None)#
Bases:
Message
- key: str#
- mask: dict | None = None#
- class PathMessage(path: 't.Optional[Path]')#
Bases:
Message
- path: Path | None#
- to_bytes() Sequence[bytes] #
- class PutDataMessage(key: 'str', data: 'bytes')#
Bases:
Message
- data: bytes#
- key: str#
- size: int#
- to_bytes() Sequence[bytes] #
- class RegistrationMessage(pub: 't.Optional[dict]', sub: 't.Optional[dict]')#
Bases:
Message
- pub: dict | None#
- sub: dict | None#
- class ResultMessage(key: str | None = None, address: str | None = None, next_time: int | None = None, origin: str | None = None)#
Bases:
Message
,BaseUpdateMessage
Response to an UpdateMessage
- address: str | None = None#
- key: str | None = None#
- next_time: int | None = None#
- origin: str | None = None#
- class UpdateMessage(timestamp: 'int', key: 't.Optional[str]' = None, address: 't.Optional[str]' = None, origin: 't.Optional[str]' = None)#
Bases:
Message
,BaseUpdateMessage
- address: str | None = None#
- key: str | None = None#
- origin: str | None = None#
- timestamp: int#
settings#
- class Settings(_case_sensitive: bool | None = None, _env_prefix: str | None = None, _env_file: DotenvType | None = PosixPath('.'), _env_file_encoding: str | None = None, _env_nested_delimiter: str | None = None, _secrets_dir: str | Path | None = None, *, data_dir: Path = '.', log_level: str = 'INFO', log_format: str = '[{asctime}] [{levelname:8s}] {name:17s}: {message}', name: str = '', storage: Literal['api'] | Literal['disk'] = 'disk', storage_dir: Path | None = None, temp_dir: Path = '/tmp', reference: float = 0, time_scale: float = 1, start_time: int = 0, duration: int = 0, datasets: List[dict] = None, model_names: List[str] = None, models: List[dict] = None, service_types: List[str] = None, scenario_config: dict | None = None, service_discovery: Dict[str, str] = None)#
Bases:
BaseSettings
- class Config#
Bases:
object
- env_prefix = 'movici_'#
- fields = {'log_format': {'env': ['movici_log_format', 'movici_logformat']}, 'log_level': {'env': ['movici_log_level', 'movici_loglevel']}}#
- apply_scenario_config(config: dict)#
- data_dir: DirectoryPath#
- datasets: t.List[dict]#
- duration: int#
- log_format: str#
- log_level: str#
- model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'env_file': None, 'env_file_encoding': None, 'env_nested_delimiter': None, 'env_prefix': 'movici_', 'extra': 'forbid', 'fields': {'log_format': {'env': ['movici_log_format', 'movici_logformat']}, 'log_level': {'env': ['movici_log_level', 'movici_loglevel']}}, 'protected_namespaces': ('model_', 'settings_'), 'secrets_dir': None, 'validate_default': True}#
- model_fields: ClassVar[dict[str, FieldInfo]] = {'data_dir': FieldInfo(annotation=Path, required=False, default='.', metadata=[PathType(path_type='dir')]), 'datasets': FieldInfo(annotation=List[dict], required=False, default_factory=list, json_schema_extra={'env': ''}), 'duration': FieldInfo(annotation=int, required=False, default=0), 'log_format': FieldInfo(annotation=str, required=False, default='[{asctime}] [{levelname:8s}] {name:17s}: {message}'), 'log_level': FieldInfo(annotation=str, required=False, default='INFO'), 'model_names': FieldInfo(annotation=List[str], required=False, default_factory=list, json_schema_extra={'env': ''}), 'models': FieldInfo(annotation=List[dict], required=False, default_factory=list, json_schema_extra={'env': ''}), 'name': FieldInfo(annotation=str, required=False, default=''), 'reference': FieldInfo(annotation=float, required=False, default=0), 'scenario_config': FieldInfo(annotation=Union[dict, NoneType], required=False, json_schema_extra={'env': ''}), 'service_discovery': FieldInfo(annotation=Dict[str, str], required=False, default_factory=dict, json_schema_extra={'env': ''}), 'service_types': FieldInfo(annotation=List[str], required=False, default_factory=list, json_schema_extra={'env': ''}), 'start_time': FieldInfo(annotation=int, required=False, default=0), 'storage': FieldInfo(annotation=Union[Literal[str], Literal[str]], required=False, default='disk'), 'storage_dir': FieldInfo(annotation=Union[Path, NoneType], required=False), 'temp_dir': FieldInfo(annotation=Path, required=False, default='/tmp', metadata=[PathType(path_type='dir')]), 'time_scale': FieldInfo(annotation=float, required=False, default=1)}#
- model_names: t.List[str]#
- models: t.List[dict]#
- name: str#
- reference: float#
- scenario_config: t.Optional[dict]#
- service_discovery: t.Dict[str, str]#
- service_types: t.List[str]#
- start_time: int#
- storage: t.Union[t.Literal['api'], t.Literal['disk']]#
- storage_dir: t.Optional[Path]#
- temp_dir: DirectoryPath#
- time_scale: float#
- property timeline_info#
simulation#
- class ModelFromInstanceInfo(name: str, instance: movici_simulation_core.core.types.Model)#
Bases:
ModelInfo
- class ModelFromTypeInfo(name: str, cls: Type[movici_simulation_core.core.types.Model], config: dict | None = None)#
Bases:
ModelInfo
- config: dict | None = None#
- class ModelInfo(name: str)#
Bases:
ActiveModuleInfo
- daemon: bool = None#
- class ModelRunner(model_info: ModelInfo, settings: Settings, strategies: List[type] | None = None, schema: AttributeSchema | None = None)#
Bases:
Runner
Provides logic for:
- Creating a Process (daemon=False) that runs a Model. Using a wrapping function, this
subprocess will:
create the model with its model adapter
create a (dealer) socket
run the model
catch exceptions from model, send ERROR message
raise exceptions when not directly from model
Fills the ModelInfo object
By creating the process as deamon=False, models can spawn their own subprocesses
- close()#
- entry_point()#
- init_data_handler: ServicedInitDataHandler | None = None#
- socket: MessageDealerSocket | None = None#
- start()#
- update_handler: UpdateDataClient | None = None#
- class ModelTypeInfo(identifier: str, cls: Type[movici_simulation_core.core.types.Model])#
Bases:
ModuleTypeInfo
- class Runner(strategies: List[type], schema: AttributeSchema | None = None)#
Bases:
object
- ctx = <multiprocessing.context.ForkContext object>#
- prepare_subprocess()#
- class ServiceInfo(name: str, cls: Type[movici_simulation_core.core.types.Service], address: str | None = None, daemon: bool = True)#
Bases:
ActiveModuleInfo
- address: str | None = None#
- daemon: bool = True#
- fill_service_discovery(svc_discovery: Dict[str, str])#
- set_port(port: int)#
- class ServiceRunner(service: ServiceInfo, settings: Settings, strategies: List[type] | None = None, schema: AttributeSchema | None = None)#
Bases:
Runner
Provides logic for:
Creating a Pipe that the Service can use to announce its port
- Creating a Process (daemon=True) that runs Service. Using a wrapping function this
subprocess will
create the service
create a (router) socket
announce the port
run the Service
raise exception on failure
Fills the ServiceInfo object
Raising an exception if it fails to announce the port in time
By creating the process as deamon=True, services cannot spawn their own subprocesses but they can be easily terminated
- TIMEOUT = 5#
- entry_point(conn: Connection)#
- start()#
- class ServiceTypeInfo(identifier: str, cls: Type[movici_simulation_core.core.types.Service], auto_use: bool, daemon: bool = True)#
Bases:
ModuleTypeInfo
- auto_use: bool#
- daemon: bool = True#
- class Simulation(use_global_plugins=True, debug=False, **settings)#
Bases:
Extensible
Main class for starting a simulation. A simulation can be configured from a scenario config using Simulation.configure or manually using the Simulation.add_model and Simulation.set_timeline_info methods. A simulation can then be started using Simulation.run. Every model and service runs in its own subprocess (multiprocessing.Process) for parallelism.
- property active_models#
- active_modules: Dict[str, ProcessInfo]#
- property active_services#
- add_model(name: str, model: Model | Type[Model], config=None)#
Manually add a model to a Simulation. A model can be added as an instance, or as class. When added as a class, instantiation is of the model is done inside its subprocess, which, depending on the model, could help with certain forking issues
- Parameters:
name – the model name, a model name must be unique within a simulation
model – the model class (or instance)
config – the model config dictionary to instantiate the model, when the model is given as a class
- configure(config: dict)#
Configure a simulation by scenario config. All model types and additional services that are present in the simulation must first be registered as a plugin (see Simulation.use).
- exit_code: int = None#
- model_types: Dict[str, ModelTypeInfo]#
- register_attributes(attributes: Iterable[AttributeSpec])#
Register attributes for this Simulation.
- Parameters:
attributes – an iterable of AttributeSpec objects
- register_model_type(identifier: str, model_type: Type[Model])#
Register a Model type to use in a simulation. Upon registration, this method also registers any attributes (ie AttributeSpec`s) from the models `Model.get_schema_attributes method.
- Parameters:
identifier – A unique identifier for a model type. When configuring the Simulation using Simulation.configure, this identifier must match the type key of the model config
model_type – The Model subclass to register
- register_service(identifier: str, service: Type[Service], auto_use=False, daemon=True)#
Register a Service for this Simulation. After registration, a service can either be used automatically or activated (ie. used in this Simulation) through the Simulation.configure method.
- Parameters:
identifier – A unique name to identify the Simulation by
service – The service class that will be used when this service is activated
auto_use – When a service is registered as auto_use, an instance of this Service is always available in the Simulation
daemon – Services can be either daemonic or not. Daemonic services are run as fire-and-forget and will be terminated/killed once the simulation has ended. Non-daemonic services are joined before exiting the simulation (and must have some way to exit). Non-daemonic services have the benefit that they can spawn their own subprocesses
- run() int #
starts up services from config and auto_use using ServiceRunner. Collects service addresses starts up models from config with service addresses for discovery using ModelRunner tracks models and services, terminates when necessary (question: when do we terminate everything and when does the orchestrator take over exception handling?)
- schema: AttributeSchema#
- service_types: Dict[str, ServiceTypeInfo]#
- set_default_strategies()#
- set_strategy(strat)#
- set_timeline_info(timeline_info: TimelineInfo)#
When configuring the Simulation manually, use this method to add timeline information the simulation.
- Parameters:
timeline_info – the TimelineInfo object for this simulation
- timeline_info: TimelineInfo | None = None#
- use(plugin: Type[Plugin])#
Using a plugin allows a model_type or service to register itself for availability. This method calls Plugin.install with the Simulation as its argument. The plugins can then use the methods Simulation.register_service, Simulation.register_model_type and Simulation.register_attributes.
types#
- class CSRAttributeData#
Bases:
TypedDict
- data: ndarray#
- ind_ptr: ndarray | None#
- indptr: ndarray | None#
- row_ptr: ndarray | None#
- class ExternalSerializationStrategy(schema, non_data_dict_keys: Container[str] = ('general',), cache_inferred_attributes: bool = False)#
Bases:
object
validate#
- class AttributeSchemaLookup(dataset_names: Sequence[str] | None = None, schema: AttributeSchema | None = None)#
Bases:
MoviciTypeLookup
- attribute(attribute_type)#
- dataset(dataset_name)#
- class ConfigVersion#
Bases:
TypedDict
- convert_from: Dict[str, Callable[[dict], dict]]#
- schema: dict#
- class FromDictLookup(datasets: List[dict] | None = None, entity_types: list | None = None, attribute_types: list | None = None, validate_dataset_types: bool = True)#
Bases:
MoviciTypeLookup
- attribute(attribute_type)#
- dataset(dataset_name)#
- dataset_type(dataset_name, required_type)#
- entity_group(entity_type)#
- class MoviciDataRefInfo(path: 't.Tuple[t.Union[str, int], ...]', movici_type: "t.Literal['dataset', 'entityGroup', 'attribute']", value: 'str')#
Bases:
object
- property json_path#
- movici_type: Literal['dataset', 'entityGroup', 'attribute']#
- path: Tuple[int | str, ...]#
- set_value(obj)#
- unset_value(obj)#
- value: str#
- class MoviciTypeLookup#
Bases:
object
class for looking up wether a specific dataset, entity_group, attribute exists or whether a dataset is of a specific type. Used alongside
validate_and_process
. This class can be subclassed to provide custom logic for determining whether these objects exist- attribute(attribute_type) bool #
- dataset(dataset_name) bool #
- dataset_type(dataset_name, required_type) bool #
- entity_group(entity_type) bool #
- exception MoviciTypeReport(movici_type: str, instance: str)#
Bases:
ValidationError
Indicates the existence of a
movici.type
field in the instance. By deriving fromexceptions.ValidationError
, we hook into the existingjsonschema
code that sets the location of the fields. In our own code we process and drop these “errors” so that they are not raised as actual errors- asinfo()#
- anyOf(validator, anyOf, instance, schema)#
- ensure_schema(schema_identifier: dict | str | Path, add_name_and_type=True)#
- ensure_valid_config(config: dict, target_version: str, versions: Dict[str, ConfigVersion], add_name_and_type=True)#
- extract_reports(errors)#
- get_validation_errors(config, schema)#
- has_dataset_type(instance: str, dataset_type: str, lookup)#
- movici_dataset_type(lookup)#
- movici_type(lookup)#
- movici_validator(schema, lookup: ~movici_simulation_core.validate.MoviciTypeLookup = <movici_simulation_core.validate.FromDictLookup object>)#
- oneOf(validator, oneOf, instance, schema)#
- validate_and_process(instance: ~typing.Any, schema: dict, lookup: ~movici_simulation_core.validate.MoviciTypeLookup = <movici_simulation_core.validate.FromDictLookup object>, return_errors=False) List[MoviciDataRefInfo] | Tuple[List[MoviciDataRefInfo], List[ValidationError]] #
Extension of
jsonschema.validators.validate
that strips out and processesMoviciTypeReport
s
- validate_movici_type(instance, movici_type, lookup)#
Module contents#
- class AttributeOptions(special: 't.Optional[T]' = None, enum_name: 't.Optional[str]' = None, enum_values: 't.Optional[t.List[str]]' = None)#
Bases:
Generic
[T
]- enum_name: str | None = None#
- enum_values: List[str] | None = None#
- get_enumeration()#
- special: T | None = None#
- class AttributeSchema(attributes: Iterable[AttributeSpec] | None = None)#
Bases:
Extensible
- add_attribute(attr: AttributeSpec)#
- add_attributes(attributes: Iterable[AttributeSpec])#
- add_from_namespace(ns)#
- get(key, default=None)#
- get_spec(name: str | Tuple[str | None, str], default_data_type: DataType | Callable[[], DataType] | None = None, cache=False)#
- register_attributes(attributes: Iterable[AttributeSpec])#
- use(plugin)#
- class AttributeSpec(name: str, data_type: movici_simulation_core.core.data_type.DataType, enum_name: str | None = None)#
Bases:
object
- enum_name: str | None = None#
- name: str#
- class CSRAttribute(data, data_type: DataType, flags: int = 0, rtol=1e-05, atol=1e-08, options: AttributeOptions | None = None, index: Index | None = None)#
Bases:
Attribute
- property csr: TrackedCSRArray#
- generate_update(mask=None)#
- Parameters:
mask – a boolean array signifying which indices should be returned. If there are no changes for a specific index, its value will be self.data_type.undefined
- Returns:
- is_special()#
- is_undefined()#
- reset()#
- slice(item)#
- strip_undefined(value: TrackedCSRArray, indices: ndarray) Tuple[TrackedCSRArray, ndarray] #
- to_dict()#
- update(value: CSRAttributeData | TrackedCSRArray | Tuple[ndarray, ndarray], indices: ndarray, process_undefined=False)#
- class DataType(py_type: Type[T], unit_shape: Tuple[int, ...] = (), csr: bool = False)#
Bases:
Generic
[T
]- csr: bool = False#
- is_undefined(val)#
- property np_type#
- py_type: Type[T]#
- property undefined#
- unit_shape: Tuple[int, ...] = ()#
- class DirectoryInitDataHandler(root: 'pathlib.Path')#
Bases:
InitDataHandler
- root: Path#
- class EntityGroup(name: str | None = None)#
Bases:
object
- classmethod all_attributes() Dict[str, AttributeField] #
- attributes: Dict[str, AttributeField] = {}#
- property dataset_name#
- get_attribute(identifier: str)#
- get_indices(ids: Sequence[int]) ndarray #
- is_similiar(other: EntityGroup)#
- register(state: StateProxy)#
- register_attribute(spec: AttributeSpec, flags: int = 0, rtol=1e-05, atol=1e-08)#
- state: StateProxy = None#
- class EntityInitDataFormat(schema: AttributeSchema | None = None, non_data_dict_keys: Container[str] = ('general',), cache_inferred_attributes: bool = False)#
Bases:
ExternalSerializationStrategy
- dump_dict(dataset: dict)#
- load_attribute(attr_data: list, name: str) dict #
- load_bytes(raw: str | bytes, **kwargs)#
- load_data_section(data: dict) dict #
- load_entity_group(entity_group: dict)#
- load_json(obj: dict)#
- schema: AttributeSchema#
- class Extensible#
Bases:
object
- register_attributes(attributes: Iterable[AttributeSpec])#
- set_strategy(tp)#
- class Index(ids: _SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes] | None = None, raise_on_invalid=False)#
Bases:
object
- add_ids(ids: _SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes]) None #
- block_count()#
- ensure_unique(ids: _SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes])#
- ids: ndarray | None = None#
- params: IndexParams#
- query_idx(item: int)#
- query_indices(item: _SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes])#
- set_ids(ids: _SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes])#
- class InitDataClient(name: str, server: str, sockets: Sockets | None = None)#
Bases:
RequestClient
- get(key: str, mask: dict | None = None) Path | None #
- class InitDataHandler#
Bases:
InitDataHandlerBase
- class Model(model_config: dict)#
Bases:
Plugin
- get_adapter() Type[ModelAdapterBase] #
- classmethod get_schema_attributes() Iterable[AttributeSpec] #
- classmethod install(obj: Extensible)#
- class Moment(timestamp: int, timeline_info: movici_simulation_core.core.moment.TimelineInfo | None = None)#
Bases:
object
- classmethod assert_timeline_info(timeline_info: TimelineInfo | None = None)#
- classmethod from_datetime(dt: datetime, timeline_info: TimelineInfo | None = None)#
- classmethod from_seconds(seconds: float, timeline_info: TimelineInfo | None = None)#
- classmethod from_string(datetime_str: str, timeline_info: TimelineInfo | None = None, **kwargs)#
- is_at_beginning()#
- property seconds#
- timeline_info: TimelineInfo | None = None#
- timestamp: int#
- property world_time#
- class Plugin#
Bases:
object
- classmethod install(obj: Extensible)#
- class Service#
Bases:
Plugin
- classmethod install(obj: Extensible)#
- logger: Logger#
- run()#
- setup(*, settings: Settings, stream: Stream, logger: Logger, socket: MessageRouterSocket)#
- class ServicedInitDataHandler(name: 'str', server: 'str')#
Bases:
InitDataHandler
- client: InitDataClient#
- close()#
- name: str#
- server: str#
- class Settings(_case_sensitive: bool | None = None, _env_prefix: str | None = None, _env_file: DotenvType | None = PosixPath('.'), _env_file_encoding: str | None = None, _env_nested_delimiter: str | None = None, _secrets_dir: str | Path | None = None, *, data_dir: Path = '.', log_level: str = 'INFO', log_format: str = '[{asctime}] [{levelname:8s}] {name:17s}: {message}', name: str = '', storage: Literal['api'] | Literal['disk'] = 'disk', storage_dir: Path | None = None, temp_dir: Path = '/tmp', reference: float = 0, time_scale: float = 1, start_time: int = 0, duration: int = 0, datasets: List[dict] = None, model_names: List[str] = None, models: List[dict] = None, service_types: List[str] = None, scenario_config: dict | None = None, service_discovery: Dict[str, str] = None)#
Bases:
BaseSettings
- class Config#
Bases:
object
- env_prefix = 'movici_'#
- fields = {'log_format': {'env': ['movici_log_format', 'movici_logformat']}, 'log_level': {'env': ['movici_log_level', 'movici_loglevel']}}#
- apply_scenario_config(config: dict)#
- data_dir: DirectoryPath#
- datasets: t.List[dict]#
- duration: int#
- log_format: str#
- log_level: str#
- model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'env_file': None, 'env_file_encoding': None, 'env_nested_delimiter': None, 'env_prefix': 'movici_', 'extra': 'forbid', 'fields': {'log_format': {'env': ['movici_log_format', 'movici_logformat']}, 'log_level': {'env': ['movici_log_level', 'movici_loglevel']}}, 'protected_namespaces': ('model_', 'settings_'), 'secrets_dir': None, 'validate_default': True}#
- model_fields: ClassVar[dict[str, FieldInfo]] = {'data_dir': FieldInfo(annotation=Path, required=False, default='.', metadata=[PathType(path_type='dir')]), 'datasets': FieldInfo(annotation=List[dict], required=False, default_factory=list, json_schema_extra={'env': ''}), 'duration': FieldInfo(annotation=int, required=False, default=0), 'log_format': FieldInfo(annotation=str, required=False, default='[{asctime}] [{levelname:8s}] {name:17s}: {message}'), 'log_level': FieldInfo(annotation=str, required=False, default='INFO'), 'model_names': FieldInfo(annotation=List[str], required=False, default_factory=list, json_schema_extra={'env': ''}), 'models': FieldInfo(annotation=List[dict], required=False, default_factory=list, json_schema_extra={'env': ''}), 'name': FieldInfo(annotation=str, required=False, default=''), 'reference': FieldInfo(annotation=float, required=False, default=0), 'scenario_config': FieldInfo(annotation=Union[dict, NoneType], required=False, json_schema_extra={'env': ''}), 'service_discovery': FieldInfo(annotation=Dict[str, str], required=False, default_factory=dict, json_schema_extra={'env': ''}), 'service_types': FieldInfo(annotation=List[str], required=False, default_factory=list, json_schema_extra={'env': ''}), 'start_time': FieldInfo(annotation=int, required=False, default=0), 'storage': FieldInfo(annotation=Union[Literal[str], Literal[str]], required=False, default='disk'), 'storage_dir': FieldInfo(annotation=Union[Path, NoneType], required=False), 'temp_dir': FieldInfo(annotation=Path, required=False, default='/tmp', metadata=[PathType(path_type='dir')]), 'time_scale': FieldInfo(annotation=float, required=False, default=1)}#
- model_names: t.List[str]#
- models: t.List[dict]#
- name: str#
- reference: float#
- scenario_config: t.Optional[dict]#
- service_discovery: t.Dict[str, str]#
- service_types: t.List[str]#
- start_time: int#
- storage: t.Union[t.Literal['api'], t.Literal['disk']]#
- storage_dir: t.Optional[Path]#
- temp_dir: DirectoryPath#
- time_scale: float#
- property timeline_info#
- class SimpleModel(model_config: dict)#
Bases:
Model
- close(message: QuitMessage)#
- get_adapter() Type[ModelAdapterBase] #
- initialize(settings: Settings, schema: AttributeSchema, init_data_handler: InitDataHandler, logger: Logger) DataMask #
- new_time(new_time: Moment, message: NewTimeMessage)#
- update(moment: Moment, data: dict | None, message: UpdateMessage) Tuple[dict | None, Moment | None] #
- update_series(moment: Moment, data: Iterable[dict | None], message: UpdateSeriesMessage) Tuple[dict | None, Moment | None] #
- class Simulation(use_global_plugins=True, debug=False, **settings)#
Bases:
Extensible
Main class for starting a simulation. A simulation can be configured from a scenario config using Simulation.configure or manually using the Simulation.add_model and Simulation.set_timeline_info methods. A simulation can then be started using Simulation.run. Every model and service runs in its own subprocess (multiprocessing.Process) for parallelism.
- property active_models#
- active_modules: Dict[str, ProcessInfo]#
- property active_services#
- add_model(name: str, model: Model | Type[Model], config=None)#
Manually add a model to a Simulation. A model can be added as an instance, or as class. When added as a class, instantiation is of the model is done inside its subprocess, which, depending on the model, could help with certain forking issues
- Parameters:
name – the model name, a model name must be unique within a simulation
model – the model class (or instance)
config – the model config dictionary to instantiate the model, when the model is given as a class
- configure(config: dict)#
Configure a simulation by scenario config. All model types and additional services that are present in the simulation must first be registered as a plugin (see Simulation.use).
- exit_code: int = None#
- model_types: Dict[str, ModelTypeInfo]#
- register_attributes(attributes: Iterable[AttributeSpec])#
Register attributes for this Simulation.
- Parameters:
attributes – an iterable of AttributeSpec objects
- register_model_type(identifier: str, model_type: Type[Model])#
Register a Model type to use in a simulation. Upon registration, this method also registers any attributes (ie AttributeSpec`s) from the models `Model.get_schema_attributes method.
- Parameters:
identifier – A unique identifier for a model type. When configuring the Simulation using Simulation.configure, this identifier must match the type key of the model config
model_type – The Model subclass to register
- register_service(identifier: str, service: Type[Service], auto_use=False, daemon=True)#
Register a Service for this Simulation. After registration, a service can either be used automatically or activated (ie. used in this Simulation) through the Simulation.configure method.
- Parameters:
identifier – A unique name to identify the Simulation by
service – The service class that will be used when this service is activated
auto_use – When a service is registered as auto_use, an instance of this Service is always available in the Simulation
daemon – Services can be either daemonic or not. Daemonic services are run as fire-and-forget and will be terminated/killed once the simulation has ended. Non-daemonic services are joined before exiting the simulation (and must have some way to exit). Non-daemonic services have the benefit that they can spawn their own subprocesses
- run() int #
starts up services from config and auto_use using ServiceRunner. Collects service addresses starts up models from config with service addresses for discovery using ModelRunner tracks models and services, terminates when necessary (question: when do we terminate everything and when does the orchestrator take over exception handling?)
- schema: AttributeSchema#
- service_types: Dict[str, ServiceTypeInfo]#
- set_default_strategies()#
- set_strategy(strat)#
- set_timeline_info(timeline_info: TimelineInfo)#
When configuring the Simulation manually, use this method to add timeline information the simulation.
- Parameters:
timeline_info – the TimelineInfo object for this simulation
- strategies: List[type]#
- timeline_info: TimelineInfo | None = None#
- use(plugin: Type[Plugin])#
Using a plugin allows a model_type or service to register itself for availability. This method calls Plugin.install with the Simulation as its argument. The plugins can then use the methods Simulation.register_service, Simulation.register_model_type and Simulation.register_attributes.
- class TimelineInfo(reference: float, time_scale: float = 1, start_time: int = 0, duration: int = 0)#
Bases:
object
- datetime_to_timestamp(dt: datetime) int #
- duration: int = 0#
- property end_time: int#
- is_at_beginning(timestamp: int)#
- reference: float#
- seconds_to_timestamp(seconds: float) int #
- start_time: int = 0#
- string_to_timestamp(dt_string: str, **kwargs)#
- time_scale: float = 1#
- timestamp_to_datetime(timestamp: int)#
- timestamp_to_seconds(timestamp: int) float #
- timestamp_to_unix_time(timestamp: int) float #
- unix_time_to_timestamp(unix_time: float) int #
- class TrackedArray(input_array, rtol=1e-05, atol=1e-08, equal_nan=False)#
Bases:
ndarray
- astype(dtype, order='K', casting='unsafe', subok=True, copy=True)#
- atol: float#
- property changed#
- diff() Tuple[ndarray, ndarray] #
- equal_nan: bool#
- reset()#
- rtol: float#
- class TrackedCSRArray(data, row_ptr, rtol=1e-05, atol=1e-08, equal_nan=False)#
Bases:
object
- as_matrix()#
- astype(dtype, order='K', casting='unsafe', subok=True, copy=True)#
- changed: ndarray#
- copy()#
- data: ndarray#
- get_row(index)#
- reset()#
- row_ptr: ndarray#
- rows_contain(val)#
return a boolean array where the rows of csr contain the val argument
- rows_equal(row)#
return a boolean array where the rows of csr equal the row argument
- rows_intersect(vals)#
return a boolean array where the rows of csr contain any of the vals arguments
- size: int#
- slice(indices)#
- update(updates: TrackedCSRArray, indices: ndarray)#
Update the CSRArray in place
- update_from_matrix(matrix: ndarray)#
Update the csr-array from a 2D matrix. The matrix number of rows must match the csr-array’s number of rows
- class TrackedModel(model_config: dict)#
Bases:
Model
To work with a TrackedState, a model developer could create their own TrackedState() object and work with it directly to track changes and produce updates of changed data. However, It is also possible to extend this TrackedModel class and let the TrackedModelAdapter manage the TrackedState
- Attributes:
- auto_reset By default, the TrackedModelAdapter resets tracking information of the
state for PUB and/or SUB attributes at the appropriate time, so that the model receives a SUB update only once, and PUB attributes are published only once. By setting auto_reset to 0, PUB, SUB or PUB|SUB. A model can limit this automatic behaviour and gain full control over which attributes are reset and when. However, when overriding the default behaviour, a model must be very careful in implementing this appropriately.
- auto_reset = 10#
- get_adapter() Type[ModelAdapterBase] #
- initialize(state: TrackedState)#
The initialize method is called when all of the state’s INIT attribute arrays are filled with data. This may be during the model engines initialization phase or during t=0. Data that is required for the model to initialize attribute may be published in another model’s t0-update, and the TrackedModelAdapter can wait for this to happen before calling initialize. When the simulation progresses to t>0 before the model’s INIT attributes have been filled, an Exception is raised, indicating that the model was not ready yet.
Model.initialize may raise NotReady to indicate that it does not have its required input data yet. This is for example useful if a model has a number OPT`ional required attributes of which at least one must be set. The model would check whether this is the case, and raise `NotReady if it is not. Once a model has succesfully run its initialize method, this method will not be called again for the duration of the simulation.
- Parameters:
state – The model’s TrackedState object, managed by the TrackedModelAdapter
- new_time(state: TrackedState, time_stamp: Moment)#
Called for every change of timestamp during a simulation run. This method is called before checking whether the state is ready for INIT or PUB and may be called before the initialize and update methods have been called the first time.
- abstract setup(state: TrackedState, settings: Settings, schema: AttributeSchema, init_data_handler: InitDataHandler, logger: Logger)#
In setup, a model receives a state object, it’s config and other parameters. The goal of setup is to prepare the state by giving it information of the attributes it needs to track (by subscribing (INIT/SUB/OPT) or publishing (PUB) attributes) from which datasets. These attributes may be grouped together in EntityGroup classes or created directly. The main entry points for registering are:
state.add_dataset() for registering a bunch of EntityGroup classes for a certain dataset name at once
state.add_entity_group() for registering a single EntityGroup class (or instance) for a dataset name
state.register_attribute() for registering a single attribute in a dataset/entity_group combination
During setup there is no data available in the state. These will be downloaded automatically by the TrackedModelAdapter. However, additional datasets may be requested directly through the init_data_handler parameter.
- Parameters:
state – The model’s TrackedState object, managed by the TrackedModelAdapter
settings – global settings
schema – The AttributeSchema with all registered attributes
init_data_handler – an InitDataHandler that may be used to retrieve additional datasets
logger – a logging.Logger instance
- shutdown(state: TrackedState)#
Called when a simulation ends (either due to it being finished or one of the models raises an exception). The model may implement this method to clean up local resources. This method may be called before the initialize and update methods have been called the first time
- abstract update(state: TrackedState, moment: Moment) Moment | None #
The update method is called for every update coming from the model engine. However it is only called the first time once all PUB attributes have their arrays filled with data. When the simulation progresses to t>0 before the model’s SUB attributes have been filled, an Exception is raised, indicating that the model was not ready yet.
- Parameters:
state – The model’s TrackedState object, managed by the TrackedModelAdapter
moment – The current simulation Moment
- Returns:
an optional Moment indicating the next time a model want to be woken up, as per the model engine’s protocol
- class TrackedState(schema: AttributeSchema | None = None, logger: Logger | None = None, track_unknown=0)#
Bases:
object
- all_attributes()#
- attributes: Dict[str, Dict[str, Dict[str, UniformAttribute | CSRAttribute]]]#
- generate_update(flags=8)#
- get_attribute(dataset_name: str, entity_type: str, name: str)#
- get_data_mask()#
- get_index(dataset_name: str, entity_type: str)#
- has_changes() bool #
- is_ready_for(flag: int)#
flag: one of SUB, INIT
- iter_attributes() Iterable[Tuple[str, str, str, UniformAttribute | CSRAttribute]] #
- iter_datasets() Iterable[Tuple[str, Dict[str, Dict[str, UniformAttribute | CSRAttribute]]]] #
- iter_entities() Iterable[Tuple[str, str, Dict[str, UniformAttribute | CSRAttribute]]] #
- log(level, message)#
- process_general_section(dataset_name: str, general_section: dict)#
- receive_update(update: Dict, is_initial=False, process_undefined=False)#
- register_attribute(dataset_name: str, entity_name: str, spec: AttributeSpec, flags: int = 0, rtol=1e-05, atol=1e-08) UniformAttribute | CSRAttribute #
- register_dataset(dataset_name: str, entities: Sequence[Type[EntityGroup] | EntityGroup]) List[EntityGroup] #
- register_entity_group(dataset_name, entity: Type[EntityGroupT] | EntityGroupT) EntityGroupT #
- Return type:
object
- reset_tracked_changes(flags)#
- to_dict()#
- track_unknown: int#
- class UniformAttribute(data, data_type: DataType, flags: int = 0, rtol=1e-05, atol=1e-08, options: AttributeOptions | None = None, index: Index | None = None)#
Bases:
Attribute
The underlying data can be accessed through the UniformAttribute().array attribute. When updating data using indexing (“[]”) notation, it is recommended to use UniformAttribute()[index]=value. When dealing with string (ie. unicode) arrays, this feature will make sure that the array itemsize will grow if trying to add strings that are larger than the current itemsize.
- property array: TrackedArray#
- generate_update(mask=None)#
- Parameters:
mask – a boolean array signifying which indices should be returned. If there are no changes for a specific index, its value should be self.data_type.undefined
- Returns:
- is_special()#
- is_undefined()#
- reset()#
- slice(item)#
- strip_undefined(key, value)#
- to_dict()#
- update(value: ndarray | UniformAttributeData, indices: ndarray, process_undefined=False)#
- class UpdateDataClient(name: str, home_address: str, sockets: Sockets | None = None)#
Bases:
RequestClient
- clear()#
- counter: Iterator[str]#
- get(address: str, key: str, mask: dict | None) bytes #
- home_address: str#
- put(data: bytes) Tuple[str, str] #
- reset_counter()#
- class UpdateDataFormat#
Bases:
InternalSerializationStrategy
- CURRENT_VERSION = 1#
- classmethod decode_numpy_array(obj)#
- dumps(data: dict)#
- classmethod encode_numpy_array(obj)#
- loads(raw_bytes: bytes)#
- field#
alias of
AttributeField
- get_global_schema()#
- get_timeline_info() TimelineInfo | None #
- set_timeline_info(info_or_reference: float | TimelineInfo | None, time_scale: float | None = None, start_time: int | None = None)#
- validate_and_process(instance: ~typing.Any, schema: dict, lookup: ~movici_simulation_core.validate.MoviciTypeLookup = <movici_simulation_core.validate.FromDictLookup object>, return_errors=False) List[MoviciDataRefInfo] | Tuple[List[MoviciDataRefInfo], List[ValidationError]] #
Extension of
jsonschema.validators.validate
that strips out and processesMoviciTypeReport
s