movici_simulation_core#

Subpackages#

attributes#

This module contains AttributeSpec objects for common, generic, attribute types that can be used in datasets

csr#

assert_numeric_array(arr)#
csr_binop(data, row_ptr, operand, operator)#

Perform binary operation operator rowwise on a csr array, the operand must be a 1d array of length equal to the number of rows in the csr array

generate_update(data, row_ptr, mask, changed, undefined)#
get_new_csr_array(row_lengths, dtype, secondary_shape)#
get_row(data, row_ptr, index)#
isclose(a, b, rtol=1e-05, atol=1e-08, equal_nan=False)#

Versatile function to determine whether two arrays, or an array and a value are close. Uses np.isclose for numeric values and arrays, and custom implementations for string and unicode arrays. This converts unicode arrays so that they are of uniform size and can be properly used in numba jit-compiled functions

isclose_numba(a, b, rtol=1e-05, atol=1e-08, equal_nan=False)#
reduce_rows(data, row_ptr, func, *args)#
remove_undefined_csr(data: ndarray, row_ptr: ndarray, indices: ndarray, undefined, num_undefined, new_data_shape) Tuple[ndarray, ndarray, ndarray]#
row_wise_max(data, row_ptr, empty_row=None)#
row_wise_min(data, row_ptr, empty_row=None)#
row_wise_sum(data, row_ptr)#
rows_contain(data, row_ptr, val, rtol=1e-05, atol=1e-08, equal_nan=False)#
rows_equal(data, row_ptr, row, rtol=1e-05, atol=1e-08, equal_nan=False)#
rows_intersect(data, row_ptr, vals, rtol=1e-05, atol=1e-08, equal_nan=False)#
set_row(data, row_ptr, index, new_row)#

Set a new row on at the specific index of the csr_array. WARNING: the length of the new row must be allocated in the data array, otherwise this function may override other rows

slice_csr_array(data, row_ptr, indices)#
update_csr_array(data, row_ptr, upd_data, upd_row_ptr, upd_indices, changes=None, rtol=1e-05, atol=1e-08, equal_nan=False)#

Update a csr array (data and row_ptr) in place with an update csr array (upd_data and upd_row_ptr at the locations upd_indices. data and upd_data must be of the same dtype and may only differ in shape in the first dimension. Can optionally track changes by changes output argument as an boolean array of zeros that has the length equal to the number of rows in of the data csr array ( len( row_ptr)-1). When tracking changes rtol, atol and equal_nan mean the same as in np.isclose

exceptions#

exception InvalidMessage#

Bases: SimulationException

exception NotReady#

Bases: SimulationException

exception OrchestratorException#

Bases: SimulationException

exception SimulationException#

Bases: Exception

exception SimulationExit#

Bases: OrchestratorException

exception StartupFailure#

Bases: SimulationException

exception StreamDone#

Bases: SimulationException

messages#

class AcknowledgeMessage#

Bases: Message

Response to an NewTimeMessage or QuitMessage

class BaseUpdateMessage#

Bases: object

address: str | None#
property has_data#
key: str | None#
origin: str | None#
class ClearDataMessage(prefix: 'str')#

Bases: Message

prefix: str#
class DataMessage(data: 'bytes')#

Bases: Message

data: bytes#
classmethod from_bytes(raw_message: Sequence[bytes]) Message#
size: int#
to_bytes() Sequence[bytes]#
class ErrorMessage(error: 't.Optional[str]' = None)#

Bases: Message

error: str | None = None#
class GetDataMessage(key: 'str', mask: 't.Optional[dict]' = None)#

Bases: Message

key: str#
mask: dict | None = None#
class Message#

Bases: object

classmethod from_bytes(raw_message: Sequence[bytes]) Message#
classmethod from_dict(dict_: dict)#
to_bytes() Sequence[bytes]#
class NewTimeMessage(timestamp: 'int')#

Bases: Message

timestamp: int#
class PathMessage(path: 't.Optional[Path]')#

Bases: Message

classmethod from_bytes(raw_message: Sequence[bytes]) Message#
path: Path | None#
to_bytes() Sequence[bytes]#
class PutDataMessage(key: 'str', data: 'bytes')#

Bases: Message

data: bytes#
classmethod from_bytes(raw_message: Sequence[bytes]) Message#
key: str#
size: int#
to_bytes() Sequence[bytes]#
class QuitMessage#

Bases: Message

class RegistrationMessage(pub: 't.Optional[dict]', sub: 't.Optional[dict]')#

Bases: Message

pub: dict | None#
sub: dict | None#
class ResultMessage(key: str | None = None, address: str | None = None, next_time: int | None = None, origin: str | None = None)#

Bases: Message, BaseUpdateMessage

Response to an UpdateMessage

address: str | None = None#
key: str | None = None#
next_time: int | None = None#
origin: str | None = None#
class UpdateMessage(timestamp: 'int', key: 't.Optional[str]' = None, address: 't.Optional[str]' = None, origin: 't.Optional[str]' = None)#

Bases: Message, BaseUpdateMessage

address: str | None = None#
key: str | None = None#
origin: str | None = None#
timestamp: int#
class UpdateSeriesMessage(updates: 't.List[UpdateMessage]')#

Bases: Message

classmethod from_bytes(raw_message: Sequence[bytes]) Message#
classmethod from_dict(dict_: dict)#
property timestamp#
to_bytes() Sequence[bytes]#
updates: List[UpdateMessage]#
dump_message(message: Message) Sequence[bytes]#
load_message(msg_type: bytes, *payload: bytes) Message#

settings#

class Settings(_case_sensitive: bool | None = None, _env_prefix: str | None = None, _env_file: DotenvType | None = PosixPath('.'), _env_file_encoding: str | None = None, _env_nested_delimiter: str | None = None, _secrets_dir: str | Path | None = None, *, data_dir: Path = '.', log_level: str = 'INFO', log_format: str = '[{asctime}] [{levelname:8s}] {name:17s}: {message}', name: str = '', storage: Literal['api'] | Literal['disk'] = 'disk', storage_dir: Path | None = None, temp_dir: Path = '/tmp', reference: float = 0, time_scale: float = 1, start_time: int = 0, duration: int = 0, datasets: List[dict] = None, model_names: List[str] = None, models: List[dict] = None, service_types: List[str] = None, scenario_config: dict | None = None, service_discovery: Dict[str, str] = None)#

Bases: BaseSettings

class Config#

Bases: object

env_prefix = 'movici_'#
fields = {'log_format': {'env': ['movici_log_format', 'movici_logformat']}, 'log_level': {'env': ['movici_log_level', 'movici_loglevel']}}#
apply_scenario_config(config: dict)#
data_dir: DirectoryPath#
datasets: t.List[dict]#
duration: int#
log_format: str#
log_level: str#
model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'env_file': None, 'env_file_encoding': None, 'env_nested_delimiter': None, 'env_prefix': 'movici_', 'extra': 'forbid', 'fields': {'log_format': {'env': ['movici_log_format', 'movici_logformat']}, 'log_level': {'env': ['movici_log_level', 'movici_loglevel']}}, 'protected_namespaces': ('model_', 'settings_'), 'secrets_dir': None, 'validate_default': True}#
model_fields: ClassVar[dict[str, FieldInfo]] = {'data_dir': FieldInfo(annotation=Path, required=False, default='.', metadata=[PathType(path_type='dir')]), 'datasets': FieldInfo(annotation=List[dict], required=False, default_factory=list, json_schema_extra={'env': ''}), 'duration': FieldInfo(annotation=int, required=False, default=0), 'log_format': FieldInfo(annotation=str, required=False, default='[{asctime}] [{levelname:8s}] {name:17s}: {message}'), 'log_level': FieldInfo(annotation=str, required=False, default='INFO'), 'model_names': FieldInfo(annotation=List[str], required=False, default_factory=list, json_schema_extra={'env': ''}), 'models': FieldInfo(annotation=List[dict], required=False, default_factory=list, json_schema_extra={'env': ''}), 'name': FieldInfo(annotation=str, required=False, default=''), 'reference': FieldInfo(annotation=float, required=False, default=0), 'scenario_config': FieldInfo(annotation=Union[dict, NoneType], required=False, json_schema_extra={'env': ''}), 'service_discovery': FieldInfo(annotation=Dict[str, str], required=False, default_factory=dict, json_schema_extra={'env': ''}), 'service_types': FieldInfo(annotation=List[str], required=False, default_factory=list, json_schema_extra={'env': ''}), 'start_time': FieldInfo(annotation=int, required=False, default=0), 'storage': FieldInfo(annotation=Union[Literal[str], Literal[str]], required=False, default='disk'), 'storage_dir': FieldInfo(annotation=Union[Path, NoneType], required=False), 'temp_dir': FieldInfo(annotation=Path, required=False, default='/tmp', metadata=[PathType(path_type='dir')]), 'time_scale': FieldInfo(annotation=float, required=False, default=1)}#
model_names: t.List[str]#
models: t.List[dict]#
name: str#
reference: float#
scenario_config: t.Optional[dict]#
service_discovery: t.Dict[str, str]#
service_types: t.List[str]#
start_time: int#
storage: t.Union[t.Literal['api'], t.Literal['disk']]#
storage_dir: t.Optional[Path]#
temp_dir: DirectoryPath#
time_scale: float#
property timeline_info#

simulation#

class ActiveModuleInfo(name: str)#

Bases: object

name: str#
process: Process | None = None#
class ModelFromInstanceInfo(name: str, instance: movici_simulation_core.core.types.Model)#

Bases: ModelInfo

instance: Model#
class ModelFromTypeInfo(name: str, cls: Type[movici_simulation_core.core.types.Model], config: dict | None = None)#

Bases: ModelInfo

cls: Type[Model]#
config: dict | None = None#
class ModelInfo(name: str)#

Bases: ActiveModuleInfo

daemon: bool = None#
class ModelRunner(model_info: ModelInfo, settings: Settings, strategies: List[type] | None = None, schema: AttributeSchema | None = None)#

Bases: Runner

Provides logic for:

  • Creating a Process (daemon=False) that runs a Model. Using a wrapping function, this

    subprocess will:

    • create the model with its model adapter

    • create a (dealer) socket

    • run the model

    • catch exceptions from model, send ERROR message

    • raise exceptions when not directly from model

  • Fills the ModelInfo object

By creating the process as deamon=False, models can spawn their own subprocesses

close()#
entry_point()#
init_data_handler: ServicedInitDataHandler | None = None#
socket: MessageDealerSocket | None = None#
start()#
update_handler: UpdateDataClient | None = None#
class ModelTypeInfo(identifier: str, cls: Type[movici_simulation_core.core.types.Model])#

Bases: ModuleTypeInfo

cls: Type[Model]#
class ModuleTypeInfo(identifier: str)#

Bases: object

identifier: str#
class ProcessInfo(*args, **kwargs)#

Bases: Protocol

daemon: bool#
process: Process | None#
class Runner(strategies: List[type], schema: AttributeSchema | None = None)#

Bases: object

ctx = <multiprocessing.context.ForkContext object>#
prepare_subprocess()#
class ServiceInfo(name: str, cls: Type[movici_simulation_core.core.types.Service], address: str | None = None, daemon: bool = True)#

Bases: ActiveModuleInfo

address: str | None = None#
cls: Type[Service]#
daemon: bool = True#
fill_service_discovery(svc_discovery: Dict[str, str])#
set_port(port: int)#
class ServiceRunner(service: ServiceInfo, settings: Settings, strategies: List[type] | None = None, schema: AttributeSchema | None = None)#

Bases: Runner

Provides logic for:

  • Creating a Pipe that the Service can use to announce its port

  • Creating a Process (daemon=True) that runs Service. Using a wrapping function this

    subprocess will

    • create the service

    • create a (router) socket

    • announce the port

    • run the Service

    • raise exception on failure

  • Fills the ServiceInfo object

  • Raising an exception if it fails to announce the port in time

By creating the process as deamon=True, services cannot spawn their own subprocesses but they can be easily terminated

TIMEOUT = 5#
entry_point(conn: Connection)#
start()#
class ServiceTypeInfo(identifier: str, cls: Type[movici_simulation_core.core.types.Service], auto_use: bool, daemon: bool = True)#

Bases: ModuleTypeInfo

auto_use: bool#
cls: Type[Service]#
daemon: bool = True#
class Simulation(use_global_plugins=True, debug=False, **settings)#

Bases: Extensible

Main class for starting a simulation. A simulation can be configured from a scenario config using Simulation.configure or manually using the Simulation.add_model and Simulation.set_timeline_info methods. A simulation can then be started using Simulation.run. Every model and service runs in its own subprocess (multiprocessing.Process) for parallelism.

property active_models#
active_modules: Dict[str, ProcessInfo]#
property active_services#
add_model(name: str, model: Model | Type[Model], config=None)#

Manually add a model to a Simulation. A model can be added as an instance, or as class. When added as a class, instantiation is of the model is done inside its subprocess, which, depending on the model, could help with certain forking issues

Parameters:
  • name – the model name, a model name must be unique within a simulation

  • model – the model class (or instance)

  • config – the model config dictionary to instantiate the model, when the model is given as a class

configure(config: dict)#

Configure a simulation by scenario config. All model types and additional services that are present in the simulation must first be registered as a plugin (see Simulation.use).

exit_code: int = None#
model_types: Dict[str, ModelTypeInfo]#
register_attributes(attributes: Iterable[AttributeSpec])#

Register attributes for this Simulation.

Parameters:

attributes – an iterable of AttributeSpec objects

register_model_type(identifier: str, model_type: Type[Model])#

Register a Model type to use in a simulation. Upon registration, this method also registers any attributes (ie AttributeSpec`s) from the models `Model.get_schema_attributes method.

Parameters:
  • identifier – A unique identifier for a model type. When configuring the Simulation using Simulation.configure, this identifier must match the type key of the model config

  • model_type – The Model subclass to register

register_service(identifier: str, service: Type[Service], auto_use=False, daemon=True)#

Register a Service for this Simulation. After registration, a service can either be used automatically or activated (ie. used in this Simulation) through the Simulation.configure method.

Parameters:
  • identifier – A unique name to identify the Simulation by

  • service – The service class that will be used when this service is activated

  • auto_use – When a service is registered as auto_use, an instance of this Service is always available in the Simulation

  • daemon – Services can be either daemonic or not. Daemonic services are run as fire-and-forget and will be terminated/killed once the simulation has ended. Non-daemonic services are joined before exiting the simulation (and must have some way to exit). Non-daemonic services have the benefit that they can spawn their own subprocesses

run() int#

starts up services from config and auto_use using ServiceRunner. Collects service addresses starts up models from config with service addresses for discovery using ModelRunner tracks models and services, terminates when necessary (question: when do we terminate everything and when does the orchestrator take over exception handling?)

schema: AttributeSchema#
service_types: Dict[str, ServiceTypeInfo]#
set_default_strategies()#
set_strategy(strat)#
set_timeline_info(timeline_info: TimelineInfo)#

When configuring the Simulation manually, use this method to add timeline information the simulation.

Parameters:

timeline_info – the TimelineInfo object for this simulation

timeline_info: TimelineInfo | None = None#
use(plugin: Type[Plugin])#

Using a plugin allows a model_type or service to register itself for availability. This method calls Plugin.install with the Simulation as its argument. The plugins can then use the methods Simulation.register_service, Simulation.register_model_type and Simulation.register_attributes.

types#

class CSRAttributeData#

Bases: TypedDict

data: ndarray#
ind_ptr: ndarray | None#
indptr: ndarray | None#
row_ptr: ndarray | None#
class DataMask#

Bases: TypedDict

pub: dict | None#
sub: dict | None#
class ExternalSerializationStrategy(schema, non_data_dict_keys: Container[str] = ('general',), cache_inferred_attributes: bool = False)#

Bases: object

dumps(data, type: FileType)#
loads(raw_data, type: FileType)#
supported_file_type_or_raise(filetype: FileType)#
supported_file_types() Sequence[FileType]#
class FileType(value)#

Bases: Enum

An enumeration.

CSV = ('.csv',)#
JSON = ('.json',)#
MSGPACK = ('.msgpack',)#
NETCDF = ('.nc',)#
OTHER = ()#
classmethod from_extension(ext)#
class InternalSerializationStrategy#

Bases: object

dumps(data)#
loads(raw_data)#
class UniformAttributeData#

Bases: TypedDict

data: ndarray#

validate#

class AttributeSchemaLookup(dataset_names: Sequence[str] | None = None, schema: AttributeSchema | None = None)#

Bases: MoviciTypeLookup

attribute(attribute_type)#
dataset(dataset_name)#
class ConfigVersion#

Bases: TypedDict

convert_from: Dict[str, Callable[[dict], dict]]#
schema: dict#
class FromDictLookup(datasets: List[dict] | None = None, entity_types: list | None = None, attribute_types: list | None = None, validate_dataset_types: bool = True)#

Bases: MoviciTypeLookup

attribute(attribute_type)#
dataset(dataset_name)#
dataset_type(dataset_name, required_type)#
entity_group(entity_type)#
class MoviciDataRefInfo(path: 't.Tuple[t.Union[str, int], ...]', movici_type: "t.Literal['dataset', 'entityGroup', 'attribute']", value: 'str')#

Bases: object

property json_path#
movici_type: Literal['dataset', 'entityGroup', 'attribute']#
path: Tuple[int | str, ...]#
set_value(obj)#
unset_value(obj)#
value: str#
class MoviciTypeLookup#

Bases: object

class for looking up wether a specific dataset, entity_group, attribute exists or whether a dataset is of a specific type. Used alongside validate_and_process. This class can be subclassed to provide custom logic for determining whether these objects exist

attribute(attribute_type) bool#
dataset(dataset_name) bool#
dataset_type(dataset_name, required_type) bool#
entity_group(entity_type) bool#
exception MoviciTypeReport(movici_type: str, instance: str)#

Bases: ValidationError

Indicates the existence of a movici.type field in the instance. By deriving from exceptions.ValidationError, we hook into the existing jsonschema code that sets the location of the fields. In our own code we process and drop these “errors” so that they are not raised as actual errors

asinfo()#
anyOf(validator, anyOf, instance, schema)#
ensure_schema(schema_identifier: dict | str | Path, add_name_and_type=True)#
ensure_valid_config(config: dict, target_version: str, versions: Dict[str, ConfigVersion], add_name_and_type=True)#
extract_reports(errors)#
get_validation_errors(config, schema)#
has_dataset_type(instance: str, dataset_type: str, lookup)#
movici_dataset_type(lookup)#
movici_type(lookup)#
movici_validator(schema, lookup: ~movici_simulation_core.validate.MoviciTypeLookup = <movici_simulation_core.validate.FromDictLookup object>)#
oneOf(validator, oneOf, instance, schema)#
validate_and_process(instance: ~typing.Any, schema: dict, lookup: ~movici_simulation_core.validate.MoviciTypeLookup = <movici_simulation_core.validate.FromDictLookup object>, return_errors=False) List[MoviciDataRefInfo] | Tuple[List[MoviciDataRefInfo], List[ValidationError]]#

Extension of jsonschema.validators.validate that strips out and processes MoviciTypeReports

validate_movici_type(instance, movici_type, lookup)#

Module contents#

class AttributeOptions(special: 't.Optional[T]' = None, enum_name: 't.Optional[str]' = None, enum_values: 't.Optional[t.List[str]]' = None)#

Bases: Generic[T]

enum_name: str | None = None#
enum_values: List[str] | None = None#
get_enumeration()#
special: T | None = None#
class AttributeSchema(attributes: Iterable[AttributeSpec] | None = None)#

Bases: Extensible

add_attribute(attr: AttributeSpec)#
add_attributes(attributes: Iterable[AttributeSpec])#
add_from_namespace(ns)#
get(key, default=None)#
get_spec(name: str | Tuple[str | None, str], default_data_type: DataType | Callable[[], DataType] | None = None, cache=False)#
register_attributes(attributes: Iterable[AttributeSpec])#
register_model_type(identifier: str, model_type: Type[Model])#
use(plugin)#
class AttributeSpec(name: str, data_type: movici_simulation_core.core.data_type.DataType, enum_name: str | None = None)#

Bases: object

data_type: DataType#
enum_name: str | None = None#
name: str#
class CSRAttribute(data, data_type: DataType, flags: int = 0, rtol=1e-05, atol=1e-08, options: AttributeOptions | None = None, index: Index | None = None)#

Bases: Attribute

property csr: TrackedCSRArray#
generate_update(mask=None)#
Parameters:

mask – a boolean array signifying which indices should be returned. If there are no changes for a specific index, its value will be self.data_type.undefined

Returns:

is_special()#
is_undefined()#
reset()#
slice(item)#
strip_undefined(value: TrackedCSRArray, indices: ndarray) Tuple[TrackedCSRArray, ndarray]#
to_dict()#
update(value: CSRAttributeData | TrackedCSRArray | Tuple[ndarray, ndarray], indices: ndarray, process_undefined=False)#
class DataType(py_type: Type[T], unit_shape: Tuple[int, ...] = (), csr: bool = False)#

Bases: Generic[T]

csr: bool = False#
is_undefined(val)#
property np_type#
py_type: Type[T]#
property undefined#
unit_shape: Tuple[int, ...] = ()#
class DirectoryInitDataHandler(root: 'pathlib.Path')#

Bases: InitDataHandler

get(name: str) Tuple[FileType | None, DatasetPath | None]#
get_type_and_path(path) Tuple[FileType, DatasetPath]#
root: Path#
class EntityGroup(name: str | None = None)#

Bases: object

classmethod all_attributes() Dict[str, AttributeField]#
attributes: Dict[str, AttributeField] = {}#
property dataset_name#
get_attribute(identifier: str)#
get_indices(ids: Sequence[int]) ndarray#
property index: Index#
is_similiar(other: EntityGroup)#
register(state: StateProxy)#
register_attribute(spec: AttributeSpec, flags: int = 0, rtol=1e-05, atol=1e-08)#
state: StateProxy = None#
class EntityInitDataFormat(schema: AttributeSchema | None = None, non_data_dict_keys: Container[str] = ('general',), cache_inferred_attributes: bool = False)#

Bases: ExternalSerializationStrategy

dump_dict(dataset: dict)#
dumps(dataset: dict, filetype: FileType | None = FileType.JSON, **kwargs) str#
load_attribute(attr_data: list, name: str) dict#
load_bytes(raw: str | bytes, **kwargs)#
load_data_section(data: dict) dict#
load_entity_group(entity_group: dict)#
load_json(obj: dict)#
loads(raw_data, type: FileType)#
schema: AttributeSchema#
supported_file_types() Sequence[FileType]#
class Extensible#

Bases: object

register_attributes(attributes: Iterable[AttributeSpec])#
register_model_type(identifier: str, model_type: Type[Model])#
register_service(identifier: str, service: Type[Service], auto_use=False, daemon=True)#
set_strategy(tp)#
class Index(ids: _SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes] | None = None, raise_on_invalid=False)#

Bases: object

add_ids(ids: _SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes]) None#
block_count()#
ensure_unique(ids: _SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes])#
ids: ndarray | None = None#
params: IndexParams#
query_idx(item: int)#
query_indices(item: _SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes])#
set_ids(ids: _SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes])#
class InitDataClient(name: str, server: str, sockets: Sockets | None = None)#

Bases: RequestClient

get(key: str, mask: dict | None = None) Path | None#
class InitDataHandler#

Bases: InitDataHandlerBase

ensure_ftype(name: str, ftype: FileType)#
get(name: str) Tuple[FileType | None, DatasetPath | None]#
get_type_and_path(path) Tuple[FileType, DatasetPath]#
class Model(model_config: dict)#

Bases: Plugin

get_adapter() Type[ModelAdapterBase]#
classmethod get_schema_attributes() Iterable[AttributeSpec]#
classmethod install(obj: Extensible)#
class Moment(timestamp: int, timeline_info: movici_simulation_core.core.moment.TimelineInfo | None = None)#

Bases: object

classmethod assert_timeline_info(timeline_info: TimelineInfo | None = None)#
classmethod from_datetime(dt: datetime, timeline_info: TimelineInfo | None = None)#
classmethod from_seconds(seconds: float, timeline_info: TimelineInfo | None = None)#
classmethod from_string(datetime_str: str, timeline_info: TimelineInfo | None = None, **kwargs)#
is_at_beginning()#
property seconds#
timeline_info: TimelineInfo | None = None#
timestamp: int#
property world_time#
class Plugin#

Bases: object

classmethod install(obj: Extensible)#
class Service#

Bases: Plugin

classmethod install(obj: Extensible)#
logger: Logger#
run()#
setup(*, settings: Settings, stream: Stream, logger: Logger, socket: MessageRouterSocket)#
class ServicedInitDataHandler(name: 'str', server: 'str')#

Bases: InitDataHandler

client: InitDataClient#
close()#
get(name: str) Tuple[FileType | None, DatasetPath | None]#
name: str#
server: str#
class Settings(_case_sensitive: bool | None = None, _env_prefix: str | None = None, _env_file: DotenvType | None = PosixPath('.'), _env_file_encoding: str | None = None, _env_nested_delimiter: str | None = None, _secrets_dir: str | Path | None = None, *, data_dir: Path = '.', log_level: str = 'INFO', log_format: str = '[{asctime}] [{levelname:8s}] {name:17s}: {message}', name: str = '', storage: Literal['api'] | Literal['disk'] = 'disk', storage_dir: Path | None = None, temp_dir: Path = '/tmp', reference: float = 0, time_scale: float = 1, start_time: int = 0, duration: int = 0, datasets: List[dict] = None, model_names: List[str] = None, models: List[dict] = None, service_types: List[str] = None, scenario_config: dict | None = None, service_discovery: Dict[str, str] = None)#

Bases: BaseSettings

class Config#

Bases: object

env_prefix = 'movici_'#
fields = {'log_format': {'env': ['movici_log_format', 'movici_logformat']}, 'log_level': {'env': ['movici_log_level', 'movici_loglevel']}}#
apply_scenario_config(config: dict)#
data_dir: DirectoryPath#
datasets: t.List[dict]#
duration: int#
log_format: str#
log_level: str#
model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'env_file': None, 'env_file_encoding': None, 'env_nested_delimiter': None, 'env_prefix': 'movici_', 'extra': 'forbid', 'fields': {'log_format': {'env': ['movici_log_format', 'movici_logformat']}, 'log_level': {'env': ['movici_log_level', 'movici_loglevel']}}, 'protected_namespaces': ('model_', 'settings_'), 'secrets_dir': None, 'validate_default': True}#
model_fields: ClassVar[dict[str, FieldInfo]] = {'data_dir': FieldInfo(annotation=Path, required=False, default='.', metadata=[PathType(path_type='dir')]), 'datasets': FieldInfo(annotation=List[dict], required=False, default_factory=list, json_schema_extra={'env': ''}), 'duration': FieldInfo(annotation=int, required=False, default=0), 'log_format': FieldInfo(annotation=str, required=False, default='[{asctime}] [{levelname:8s}] {name:17s}: {message}'), 'log_level': FieldInfo(annotation=str, required=False, default='INFO'), 'model_names': FieldInfo(annotation=List[str], required=False, default_factory=list, json_schema_extra={'env': ''}), 'models': FieldInfo(annotation=List[dict], required=False, default_factory=list, json_schema_extra={'env': ''}), 'name': FieldInfo(annotation=str, required=False, default=''), 'reference': FieldInfo(annotation=float, required=False, default=0), 'scenario_config': FieldInfo(annotation=Union[dict, NoneType], required=False, json_schema_extra={'env': ''}), 'service_discovery': FieldInfo(annotation=Dict[str, str], required=False, default_factory=dict, json_schema_extra={'env': ''}), 'service_types': FieldInfo(annotation=List[str], required=False, default_factory=list, json_schema_extra={'env': ''}), 'start_time': FieldInfo(annotation=int, required=False, default=0), 'storage': FieldInfo(annotation=Union[Literal[str], Literal[str]], required=False, default='disk'), 'storage_dir': FieldInfo(annotation=Union[Path, NoneType], required=False), 'temp_dir': FieldInfo(annotation=Path, required=False, default='/tmp', metadata=[PathType(path_type='dir')]), 'time_scale': FieldInfo(annotation=float, required=False, default=1)}#
model_names: t.List[str]#
models: t.List[dict]#
name: str#
reference: float#
scenario_config: t.Optional[dict]#
service_discovery: t.Dict[str, str]#
service_types: t.List[str]#
start_time: int#
storage: t.Union[t.Literal['api'], t.Literal['disk']]#
storage_dir: t.Optional[Path]#
temp_dir: DirectoryPath#
time_scale: float#
property timeline_info#
class SimpleModel(model_config: dict)#

Bases: Model

close(message: QuitMessage)#
get_adapter() Type[ModelAdapterBase]#
initialize(settings: Settings, schema: AttributeSchema, init_data_handler: InitDataHandler, logger: Logger) DataMask#
new_time(new_time: Moment, message: NewTimeMessage)#
update(moment: Moment, data: dict | None, message: UpdateMessage) Tuple[dict | None, Moment | None]#
update_series(moment: Moment, data: Iterable[dict | None], message: UpdateSeriesMessage) Tuple[dict | None, Moment | None]#
class Simulation(use_global_plugins=True, debug=False, **settings)#

Bases: Extensible

Main class for starting a simulation. A simulation can be configured from a scenario config using Simulation.configure or manually using the Simulation.add_model and Simulation.set_timeline_info methods. A simulation can then be started using Simulation.run. Every model and service runs in its own subprocess (multiprocessing.Process) for parallelism.

property active_models#
active_modules: Dict[str, ProcessInfo]#
property active_services#
add_model(name: str, model: Model | Type[Model], config=None)#

Manually add a model to a Simulation. A model can be added as an instance, or as class. When added as a class, instantiation is of the model is done inside its subprocess, which, depending on the model, could help with certain forking issues

Parameters:
  • name – the model name, a model name must be unique within a simulation

  • model – the model class (or instance)

  • config – the model config dictionary to instantiate the model, when the model is given as a class

configure(config: dict)#

Configure a simulation by scenario config. All model types and additional services that are present in the simulation must first be registered as a plugin (see Simulation.use).

exit_code: int = None#
model_types: Dict[str, ModelTypeInfo]#
register_attributes(attributes: Iterable[AttributeSpec])#

Register attributes for this Simulation.

Parameters:

attributes – an iterable of AttributeSpec objects

register_model_type(identifier: str, model_type: Type[Model])#

Register a Model type to use in a simulation. Upon registration, this method also registers any attributes (ie AttributeSpec`s) from the models `Model.get_schema_attributes method.

Parameters:
  • identifier – A unique identifier for a model type. When configuring the Simulation using Simulation.configure, this identifier must match the type key of the model config

  • model_type – The Model subclass to register

register_service(identifier: str, service: Type[Service], auto_use=False, daemon=True)#

Register a Service for this Simulation. After registration, a service can either be used automatically or activated (ie. used in this Simulation) through the Simulation.configure method.

Parameters:
  • identifier – A unique name to identify the Simulation by

  • service – The service class that will be used when this service is activated

  • auto_use – When a service is registered as auto_use, an instance of this Service is always available in the Simulation

  • daemon – Services can be either daemonic or not. Daemonic services are run as fire-and-forget and will be terminated/killed once the simulation has ended. Non-daemonic services are joined before exiting the simulation (and must have some way to exit). Non-daemonic services have the benefit that they can spawn their own subprocesses

run() int#

starts up services from config and auto_use using ServiceRunner. Collects service addresses starts up models from config with service addresses for discovery using ModelRunner tracks models and services, terminates when necessary (question: when do we terminate everything and when does the orchestrator take over exception handling?)

schema: AttributeSchema#
service_types: Dict[str, ServiceTypeInfo]#
set_default_strategies()#
set_strategy(strat)#
set_timeline_info(timeline_info: TimelineInfo)#

When configuring the Simulation manually, use this method to add timeline information the simulation.

Parameters:

timeline_info – the TimelineInfo object for this simulation

strategies: List[type]#
timeline_info: TimelineInfo | None = None#
use(plugin: Type[Plugin])#

Using a plugin allows a model_type or service to register itself for availability. This method calls Plugin.install with the Simulation as its argument. The plugins can then use the methods Simulation.register_service, Simulation.register_model_type and Simulation.register_attributes.

class TimelineInfo(reference: float, time_scale: float = 1, start_time: int = 0, duration: int = 0)#

Bases: object

datetime_to_timestamp(dt: datetime) int#
duration: int = 0#
property end_time: int#
is_at_beginning(timestamp: int)#
reference: float#
seconds_to_timestamp(seconds: float) int#
start_time: int = 0#
string_to_timestamp(dt_string: str, **kwargs)#
time_scale: float = 1#
timestamp_to_datetime(timestamp: int)#
timestamp_to_seconds(timestamp: int) float#
timestamp_to_unix_time(timestamp: int) float#
unix_time_to_timestamp(unix_time: float) int#
class TrackedArray(input_array, rtol=1e-05, atol=1e-08, equal_nan=False)#

Bases: ndarray

astype(dtype, order='K', casting='unsafe', subok=True, copy=True)#
atol: float#
property changed#
diff() Tuple[ndarray, ndarray]#
equal_nan: bool#
reset()#
rtol: float#
class TrackedCSRArray(data, row_ptr, rtol=1e-05, atol=1e-08, equal_nan=False)#

Bases: object

as_matrix()#
astype(dtype, order='K', casting='unsafe', subok=True, copy=True)#
changed: ndarray#
copy()#
data: ndarray#
get_row(index)#
reset()#
row_ptr: ndarray#
rows_contain(val)#

return a boolean array where the rows of csr contain the val argument

rows_equal(row)#

return a boolean array where the rows of csr equal the row argument

rows_intersect(vals)#

return a boolean array where the rows of csr contain any of the vals arguments

size: int#
slice(indices)#
update(updates: TrackedCSRArray, indices: ndarray)#

Update the CSRArray in place

update_from_matrix(matrix: ndarray)#

Update the csr-array from a 2D matrix. The matrix number of rows must match the csr-array’s number of rows

class TrackedModel(model_config: dict)#

Bases: Model

To work with a TrackedState, a model developer could create their own TrackedState() object and work with it directly to track changes and produce updates of changed data. However, It is also possible to extend this TrackedModel class and let the TrackedModelAdapter manage the TrackedState

Attributes:
auto_reset By default, the TrackedModelAdapter resets tracking information of the

state for PUB and/or SUB attributes at the appropriate time, so that the model receives a SUB update only once, and PUB attributes are published only once. By setting auto_reset to 0, PUB, SUB or PUB|SUB. A model can limit this automatic behaviour and gain full control over which attributes are reset and when. However, when overriding the default behaviour, a model must be very careful in implementing this appropriately.

auto_reset = 10#
get_adapter() Type[ModelAdapterBase]#
initialize(state: TrackedState)#

The initialize method is called when all of the state’s INIT attribute arrays are filled with data. This may be during the model engines initialization phase or during t=0. Data that is required for the model to initialize attribute may be published in another model’s t0-update, and the TrackedModelAdapter can wait for this to happen before calling initialize. When the simulation progresses to t>0 before the model’s INIT attributes have been filled, an Exception is raised, indicating that the model was not ready yet.

Model.initialize may raise NotReady to indicate that it does not have its required input data yet. This is for example useful if a model has a number OPT`ional required attributes of which at least one must be set. The model would check whether this is the case, and raise `NotReady if it is not. Once a model has succesfully run its initialize method, this method will not be called again for the duration of the simulation.

Parameters:

state – The model’s TrackedState object, managed by the TrackedModelAdapter

new_time(state: TrackedState, time_stamp: Moment)#

Called for every change of timestamp during a simulation run. This method is called before checking whether the state is ready for INIT or PUB and may be called before the initialize and update methods have been called the first time.

abstract setup(state: TrackedState, settings: Settings, schema: AttributeSchema, init_data_handler: InitDataHandler, logger: Logger)#

In setup, a model receives a state object, it’s config and other parameters. The goal of setup is to prepare the state by giving it information of the attributes it needs to track (by subscribing (INIT/SUB/OPT) or publishing (PUB) attributes) from which datasets. These attributes may be grouped together in EntityGroup classes or created directly. The main entry points for registering are:

  • state.add_dataset() for registering a bunch of EntityGroup classes for a certain dataset name at once

  • state.add_entity_group() for registering a single EntityGroup class (or instance) for a dataset name

  • state.register_attribute() for registering a single attribute in a dataset/entity_group combination

During setup there is no data available in the state. These will be downloaded automatically by the TrackedModelAdapter. However, additional datasets may be requested directly through the init_data_handler parameter.

Parameters:
  • state – The model’s TrackedState object, managed by the TrackedModelAdapter

  • settings – global settings

  • schema – The AttributeSchema with all registered attributes

  • init_data_handler – an InitDataHandler that may be used to retrieve additional datasets

  • logger – a logging.Logger instance

shutdown(state: TrackedState)#

Called when a simulation ends (either due to it being finished or one of the models raises an exception). The model may implement this method to clean up local resources. This method may be called before the initialize and update methods have been called the first time

abstract update(state: TrackedState, moment: Moment) Moment | None#

The update method is called for every update coming from the model engine. However it is only called the first time once all PUB attributes have their arrays filled with data. When the simulation progresses to t>0 before the model’s SUB attributes have been filled, an Exception is raised, indicating that the model was not ready yet.

Parameters:
  • state – The model’s TrackedState object, managed by the TrackedModelAdapter

  • moment – The current simulation Moment

Returns:

an optional Moment indicating the next time a model want to be woken up, as per the model engine’s protocol

class TrackedState(schema: AttributeSchema | None = None, logger: Logger | None = None, track_unknown=0)#

Bases: object

all_attributes()#
attributes: Dict[str, Dict[str, Dict[str, UniformAttribute | CSRAttribute]]]#
generate_update(flags=8)#
get_attribute(dataset_name: str, entity_type: str, name: str)#
get_data_mask()#
get_index(dataset_name: str, entity_type: str)#
has_changes() bool#
index: Dict[str, Dict[str, Index]]#
is_ready_for(flag: int)#

flag: one of SUB, INIT

iter_attributes() Iterable[Tuple[str, str, str, UniformAttribute | CSRAttribute]]#
iter_datasets() Iterable[Tuple[str, Dict[str, Dict[str, UniformAttribute | CSRAttribute]]]]#
iter_entities() Iterable[Tuple[str, str, Dict[str, UniformAttribute | CSRAttribute]]]#
log(level, message)#
process_general_section(dataset_name: str, general_section: dict)#
receive_update(update: Dict, is_initial=False, process_undefined=False)#
register_attribute(dataset_name: str, entity_name: str, spec: AttributeSpec, flags: int = 0, rtol=1e-05, atol=1e-08) UniformAttribute | CSRAttribute#
register_dataset(dataset_name: str, entities: Sequence[Type[EntityGroup] | EntityGroup]) List[EntityGroup]#
register_entity_group(dataset_name, entity: Type[EntityGroupT] | EntityGroupT) EntityGroupT#
Return type:

object

reset_tracked_changes(flags)#
to_dict()#
track_unknown: int#
class UniformAttribute(data, data_type: DataType, flags: int = 0, rtol=1e-05, atol=1e-08, options: AttributeOptions | None = None, index: Index | None = None)#

Bases: Attribute

The underlying data can be accessed through the UniformAttribute().array attribute. When updating data using indexing (“[]”) notation, it is recommended to use UniformAttribute()[index]=value. When dealing with string (ie. unicode) arrays, this feature will make sure that the array itemsize will grow if trying to add strings that are larger than the current itemsize.

property array: TrackedArray#
generate_update(mask=None)#
Parameters:

mask – a boolean array signifying which indices should be returned. If there are no changes for a specific index, its value should be self.data_type.undefined

Returns:

is_special()#
is_undefined()#
reset()#
slice(item)#
strip_undefined(key, value)#
to_dict()#
update(value: ndarray | UniformAttributeData, indices: ndarray, process_undefined=False)#
class UpdateDataClient(name: str, home_address: str, sockets: Sockets | None = None)#

Bases: RequestClient

clear()#
counter: Iterator[str]#
get(address: str, key: str, mask: dict | None) bytes#
home_address: str#
put(data: bytes) Tuple[str, str]#
reset_counter()#
class UpdateDataFormat#

Bases: InternalSerializationStrategy

CURRENT_VERSION = 1#
classmethod decode_numpy_array(obj)#
dumps(data: dict)#
classmethod encode_numpy_array(obj)#
loads(raw_bytes: bytes)#
field#

alias of AttributeField

get_global_schema()#
get_timeline_info() TimelineInfo | None#
set_timeline_info(info_or_reference: float | TimelineInfo | None, time_scale: float | None = None, start_time: int | None = None)#
validate_and_process(instance: ~typing.Any, schema: dict, lookup: ~movici_simulation_core.validate.MoviciTypeLookup = <movici_simulation_core.validate.FromDictLookup object>, return_errors=False) List[MoviciDataRefInfo] | Tuple[List[MoviciDataRefInfo], List[ValidationError]]#

Extension of jsonschema.validators.validate that strips out and processes MoviciTypeReports