core#

arrays#

class TrackedArray(input_array, rtol=1e-05, atol=1e-08, equal_nan=False)#

Bases: ndarray

astype(dtype, order='K', casting='unsafe', subok=True, copy=True)#
atol: float#
property changed#
diff() Tuple[ndarray, ndarray]#
equal_nan: bool#
reset()#
rtol: float#
class TrackedCSRArray(data, row_ptr, rtol=1e-05, atol=1e-08, equal_nan=False)#

Bases: object

as_matrix()#
astype(dtype, order='K', casting='unsafe', subok=True, copy=True)#
changed: ndarray#
copy()#
data: ndarray#
get_row(index)#
reset()#
row_ptr: ndarray#
rows_contain(val)#

return a boolean array where the rows of csr contain the val argument

rows_equal(row)#

return a boolean array where the rows of csr equal the row argument

rows_intersect(vals)#

return a boolean array where the rows of csr contain any of the vals arguments

size: int#
slice(indices)#
update(updates: TrackedCSRArray, indices: ndarray)#

Update the CSRArray in place

update_from_matrix(matrix: ndarray)#

Update the csr-array from a 2D matrix. The matrix number of rows must match the csr-array’s number of rows

matrix_to_csr(matrix: ndarray)#

convert a 2d array to a TrackedCSRArray

attribute#

class Attribute(data, data_type: DataType, flags: int = 0, rtol=1e-05, atol=1e-08, options: AttributeOptions | None = None, index: Index | None = None)#

Bases: ABC

property changed#
get_enumeration()#
has_changes() bool#
has_data()#
has_data_or_raise()#
initialize(length)#
is_initialized()#
abstract is_special()#
abstract is_undefined()#
abstract reset()#
resize(new_size: int)#
abstract slice(item)#
abstract to_dict()#
class AttributeField(spec: AttributeSpec, flags: int = 0, rtol=1e-05, atol=1e-08)#

Bases: object

property key#
property name#
class AttributeOptions(special: 't.Optional[T]' = None, enum_name: 't.Optional[str]' = None, enum_values: 't.Optional[t.List[str]]' = None)#

Bases: Generic[T]

enum_name: str | None = None#
enum_values: List[str] | None = None#
get_enumeration()#
special: T | None = None#
class CSRAttribute(data, data_type: DataType, flags: int = 0, rtol=1e-05, atol=1e-08, options: AttributeOptions | None = None, index: Index | None = None)#

Bases: Attribute

property csr: TrackedCSRArray#
generate_update(mask=None)#
Parameters:

mask – a boolean array signifying which indices should be returned. If there are no changes for a specific index, its value will be self.data_type.undefined

Returns:

is_special()#
is_undefined()#
reset()#
slice(item)#
strip_undefined(value: TrackedCSRArray, indices: ndarray) Tuple[TrackedCSRArray, ndarray]#
to_dict()#
update(value: CSRAttributeData | TrackedCSRArray | Tuple[ndarray, ndarray], indices: ndarray, process_undefined=False)#
class FlagInfo(initialize: 'bool', subscribe: 'bool', required: 'bool', publish: 'bool')#

Bases: object

initialize: bool#
publish: bool#
required: bool#
subscribe: bool#
class UniformAttribute(data, data_type: DataType, flags: int = 0, rtol=1e-05, atol=1e-08, options: AttributeOptions | None = None, index: Index | None = None)#

Bases: Attribute

The underlying data can be accessed through the UniformAttribute().array attribute. When updating data using indexing (“[]”) notation, it is recommended to use UniformAttribute()[index]=value. When dealing with string (ie. unicode) arrays, this feature will make sure that the array itemsize will grow if trying to add strings that are larger than the current itemsize.

property array: TrackedArray#
generate_update(mask=None)#
Parameters:

mask – a boolean array signifying which indices should be returned. If there are no changes for a specific index, its value should be self.data_type.undefined

Returns:

is_special()#
is_undefined()#
reset()#
slice(item)#
strip_undefined(key, value)#
to_dict()#
update(value: ndarray | UniformAttributeData, indices: ndarray, process_undefined=False)#
attribute_max(attr: AttributeObject, *, func: callable = <function nanmax>) t.Union[None, bool, int, float]#
attribute_min(attr: AttributeObject, *, func: callable = <function nanmin>) t.Union[None, bool, int, float]#
convert_nested_list_to_csr(nested_list: List[List[object]], data_type: DataType | None = None)#
create_empty_attribute(data_type, length=None, rtol=1e-05, atol=1e-08, options=None)#
create_empty_attribute_for_data(data: UniformAttributeData | CSRAttributeData, length: int)#
ensure_array(data: list | ndarray, data_type: DataType | None = None)#
ensure_csr_data(value: dict | TrackedCSRArray | Tuple[ndarray, ndarray] | List[list], data_type: DataType | None = None) TrackedCSRArray#
ensure_uniform_data(value: dict | ndarray | list, data_type: DataType | None = None) TrackedArray#
field#

alias of AttributeField

flag_info(flag: int)#
get_array_aggregate(array, func, exclude=None)#
get_attribute_aggregate(attr: UniformAttribute | CSRAttribute, func: callable) None | bool | int | float#
get_undefined_array(data_type: DataType, length: int, rtol=1e-05, atol=1e-08, override_dtype=None) TrackedArray | TrackedCSRArray#

attribute_spec#

class AttributeSpec(name: str, data_type: movici_simulation_core.core.data_type.DataType, enum_name: str | None = None)#

Bases: object

data_type: DataType#
enum_name: str | None = None#
name: str#

data_format#

class EntityInitDataFormat(schema: AttributeSchema | None = None, non_data_dict_keys: Container[str] = ('general',), cache_inferred_attributes: bool = False)#

Bases: ExternalSerializationStrategy

dump_dict(dataset: dict)#
dumps(dataset: dict, filetype: FileType | None = FileType.JSON, **kwargs) str#
load_attribute(attr_data: list, name: str) dict#
load_bytes(raw: str | bytes, **kwargs)#
load_data_section(data: dict) dict#
load_entity_group(entity_group: dict)#
load_json(obj: dict)#
loads(raw_data, type: FileType)#
schema: AttributeSchema#
supported_file_types() Sequence[FileType]#
attribute_is_undefined(val)#
create_array(uniform_data: list, data_type: DataType)#
data_key_candidates(update_or_init_data, ignore_keys=('general',))#
data_keys(update_or_init_data, ignore_keys=('general',))#
dump_attribute(attribute_dict: dict, data_type)#
dump_csr_attribute(attribute_dict, data_type)#
dump_dataset_data(dataset_data: dict) dict#
dump_tracked_csr_array(csr: TrackedCSRArray, data_type=None)#
dump_uniform_attribute(attribute_dict, data_type: DataType)#
extract_dataset_data(update_or_init_data, ignore_keys=('general',))#
is_undefined_csr(csr_array, data_type)#
is_undefined_uniform(data, data_type)#
load_from_json(data, schema: AttributeSchema | None = None, non_data_dict_keys=('general',), cache_inferred_attributes=False)#
parse_csr_list(data: List[list], data_type: DataType) UniformAttributeData | CSRAttributeData#
parse_list(data: list, data_type: DataType) UniformAttributeData | CSRAttributeData#
parse_uniform_list(data: list, data_type: DataType) UniformAttributeData | CSRAttributeData#

data_type#

class DataType(py_type: Type[T], unit_shape: Tuple[int, ...] = (), csr: bool = False)#

Bases: Generic[T]

csr: bool = False#
is_undefined(val)#
property np_type#
py_type: Type[T]#
property undefined#
unit_shape: Tuple[int, ...] = ()#
get_undefined(dtype)#

entity_group#

class EntityGroup(name: str | None = None)#

Bases: object

classmethod all_attributes() Dict[str, AttributeField]#
attributes: Dict[str, AttributeField] = {}#
property dataset_name#
get_attribute(identifier: str)#
get_indices(ids: Sequence[int]) ndarray#
property index: Index#
is_similiar(other: EntityGroup)#
register(state: StateProxy)#
register_attribute(spec: AttributeSpec, flags: int = 0, rtol=1e-05, atol=1e-08)#
state: StateProxy = None#

index#

class Index(ids: _SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes] | None = None, raise_on_invalid=False)#

Bases: object

add_ids(ids: _SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes]) None#
block_count()#
ensure_unique(ids: _SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes])#
ids: ndarray | None = None#
params: IndexParams#
query_idx(item: int)#
query_indices(item: _SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes])#
set_ids(ids: _SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes])#
class IndexParams(block_from: numpy.ndarray, block_to: numpy.ndarray, block_offset: numpy.ndarray)#

Bases: object

block_count()#
block_from: ndarray#
block_offset: ndarray#
block_to: ndarray#
build_index(ids: _SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes])#

builds indexing parameters for an ids array. For every block of contiguous ids it notes the range of ids in that block and the starting position of that block in the id array.

query_idx(block_from, block_to, block_offset, ident)#
query_indices(block_from, block_to, block_offset, ids)#

moment#

class Moment(timestamp: int, timeline_info: movici_simulation_core.core.moment.TimelineInfo | None = None)#

Bases: object

classmethod assert_timeline_info(timeline_info: TimelineInfo | None = None)#
classmethod from_datetime(dt: datetime, timeline_info: TimelineInfo | None = None)#
classmethod from_seconds(seconds: float, timeline_info: TimelineInfo | None = None)#
classmethod from_string(datetime_str: str, timeline_info: TimelineInfo | None = None, **kwargs)#
is_at_beginning()#
property seconds#
timeline_info: TimelineInfo | None = None#
timestamp: int#
property world_time#
class TimelineInfo(reference: float, time_scale: float = 1, start_time: int = 0, duration: int = 0)#

Bases: object

datetime_to_timestamp(dt: datetime) int#
duration: int = 0#
property end_time: int#
is_at_beginning(timestamp: int)#
reference: float#
seconds_to_timestamp(seconds: float) int#
start_time: int = 0#
string_to_timestamp(dt_string: str, **kwargs)#
time_scale: float = 1#
timestamp_to_datetime(timestamp: int)#
timestamp_to_seconds(timestamp: int) float#
timestamp_to_unix_time(timestamp: int) float#
unix_time_to_timestamp(unix_time: float) int#
get_timeline_info() TimelineInfo | None#
set_timeline_info(info_or_reference: float | TimelineInfo | None, time_scale: float | None = None, start_time: int | None = None)#

numba_extensions#

disable_jit()#
generated_jit(func=None, **kwargs)#

Custom decorator that replaces numba.generated_jit and works also when the jit compiler is disabled

np_isclose(a, b, rtol=1e-05, atol=1e-08, equal_nan=False)#

Custom implementation of np.isclose until numba has native support. See also: https://github.com/numba/numba/issues/5977

schema#

class AttributeSchema(attributes: Iterable[AttributeSpec] | None = None)#

Bases: Extensible

add_attribute(attr: AttributeSpec)#
add_attributes(attributes: Iterable[AttributeSpec])#
add_from_namespace(ns)#
get(key, default=None)#
get_spec(name: str | Tuple[str | None, str], default_data_type: DataType | Callable[[], DataType] | None = None, cache=False)#
register_attributes(attributes: Iterable[AttributeSpec])#
register_model_type(identifier: str, model_type: Type[Model])#
use(plugin)#
attribute_plugin_from_dict(d: dict)#
attributes_from_dict(d: dict)#
get_global_schema()#
get_rowptr(d: dict)#
has_rowptr_key(d: dict)#
infer_data_type_from_array(attr_data: dict | ndarray | TrackedCSRArray)#

given array data, either as an np.ndarray, TrackedCSRArray or a “data”/”row_ptr” dictionary infer the DataType of that array data

infer_data_type_from_list(data: list)#

serialization#

class UpdateDataFormat#

Bases: InternalSerializationStrategy

CURRENT_VERSION = 1#
classmethod decode_numpy_array(obj)#
dumps(data: dict)#
classmethod encode_numpy_array(obj)#
loads(raw_bytes: bytes)#
dump_update(data: dict)#
load_update(raw_bytes: bytes)#

state#

class EntityDataHandler(attributes: Dict[str, UniformAttribute | CSRAttribute], index: Index, track_unknown: int | bool = 0, process_undefined=False)#

Bases: object

generate_update(flags=8)#
initialize(data: Dict[str, UniformAttributeData | CSRAttributeData])#
receive_update(entity_data: Dict[str, UniformAttributeData | CSRAttributeData], is_initial=False)#

Update the entity state with new external update data. The first time this is called, it will initialize the entity group, and set all the entity ids. Any future updates may not contain any additional entities

to_dict()#
class StateProxy(state: 'TrackedState', dataset_name: 'str', entity_type: 'str')#

Bases: object

dataset_name: str#
entity_type: str#
get_attribute(name: str)#
get_index()#
register_attribute(spec: AttributeSpec, flags: int = 0, rtol=1e-05, atol=1e-08)#
state: TrackedState#
class TrackedState(schema: AttributeSchema | None = None, logger: Logger | None = None, track_unknown=0)#

Bases: object

all_attributes()#
attributes: Dict[str, Dict[str, Dict[str, UniformAttribute | CSRAttribute]]]#
generate_update(flags=8)#
get_attribute(dataset_name: str, entity_type: str, name: str)#
get_data_mask()#
get_index(dataset_name: str, entity_type: str)#
has_changes() bool#
index: Dict[str, Dict[str, Index]]#
is_ready_for(flag: int)#

flag: one of SUB, INIT

iter_attributes() Iterable[Tuple[str, str, str, UniformAttribute | CSRAttribute]]#
iter_datasets() Iterable[Tuple[str, Dict[str, Dict[str, UniformAttribute | CSRAttribute]]]]#
iter_entities() Iterable[Tuple[str, str, Dict[str, UniformAttribute | CSRAttribute]]]#
log(level, message)#
process_general_section(dataset_name: str, general_section: dict)#
receive_update(update: Dict, is_initial=False, process_undefined=False)#
register_attribute(dataset_name: str, entity_name: str, spec: AttributeSpec, flags: int = 0, rtol=1e-05, atol=1e-08) UniformAttribute | CSRAttribute#
register_dataset(dataset_name: str, entities: Sequence[Type[EntityGroup] | EntityGroup]) List[EntityGroup]#
register_entity_group(dataset_name, entity: Type[EntityGroupT] | EntityGroupT) EntityGroupT#
Return type:

object

reset_tracked_changes(flags)#
to_dict()#
track_unknown: int#
ensure_path(d: dict, path: Sequence[str])#
filter_attrs(attributes: FilterAttrT, flags: int = 0) FilterAttrT#

Return attributes where any of the flags match one of the Attribute.flags

parse_special_values(general_section: dict, special_keys: Iterable = ('special', 'no_data')) Dict[str, Dict[str, int | float | bool | str]]#
reset_tracked_changes(attributes: Iterable[UniformAttribute | CSRAttribute], flags: int | None = None)#

types#

class Extensible#

Bases: object

register_attributes(attributes: Iterable[AttributeSpec])#
register_model_type(identifier: str, model_type: Type[Model])#
register_service(identifier: str, service: Type[Service], auto_use=False, daemon=True)#
set_strategy(tp)#
class InitDataHandlerBase#

Bases: object

ensure_ftype(name: str, ftype: FileType)#
get(name: str) Tuple[FileType | None, DatasetPath | None]#
class Model(model_config: dict)#

Bases: Plugin

get_adapter() Type[ModelAdapterBase]#
classmethod get_schema_attributes() Iterable[AttributeSpec]#
classmethod install(obj: Extensible)#
class ModelAdapterBase(model: Model, settings: Settings, logger: Logger)#

Bases: ABC

abstract close(message: QuitMessage)#
abstract initialize(init_data_handler: InitDataHandlerBase) DataMask#
logger: Logger#
model: Model#
abstract new_time(message: NewTimeMessage)#
set_schema(schema)#
settings: Settings#
abstract update(message: UpdateMessage, data: bytes | None) Tuple[bytes | None, int | None]#
abstract update_series(message: UpdateSeriesMessage, data: Iterable[bytes | None]) Tuple[bytes | None, int | None]#
class Plugin#

Bases: object

classmethod install(obj: Extensible)#
class Service#

Bases: Plugin

classmethod install(obj: Extensible)#
logger: Logger#
run()#
setup(*, settings: Settings, stream: Stream, logger: Logger, socket: MessageRouterSocket)#

utils#

configure_global_plugins(app: Extensible, key='movici.plugins', ignore_missing_imports=True)#

Module contents#

class AttributeField(spec: AttributeSpec, flags: int = 0, rtol=1e-05, atol=1e-08)#

Bases: object

property key#
property name#
class AttributeOptions(special: 't.Optional[T]' = None, enum_name: 't.Optional[str]' = None, enum_values: 't.Optional[t.List[str]]' = None)#

Bases: Generic[T]

enum_name: str | None = None#
enum_values: List[str] | None = None#
get_enumeration()#
special: T | None = None#
class AttributeSchema(attributes: Iterable[AttributeSpec] | None = None)#

Bases: Extensible

add_attribute(attr: AttributeSpec)#
add_attributes(attributes: Iterable[AttributeSpec])#
add_from_namespace(ns)#
get(key, default=None)#
get_spec(name: str | Tuple[str | None, str], default_data_type: DataType | Callable[[], DataType] | None = None, cache=False)#
register_attributes(attributes: Iterable[AttributeSpec])#
register_model_type(identifier: str, model_type: Type[Model])#
use(plugin)#
class AttributeSpec(name: str, data_type: movici_simulation_core.core.data_type.DataType, enum_name: str | None = None)#

Bases: object

data_type: DataType#
enum_name: str | None = None#
name: str#
class CSRAttribute(data, data_type: DataType, flags: int = 0, rtol=1e-05, atol=1e-08, options: AttributeOptions | None = None, index: Index | None = None)#

Bases: Attribute

property csr: TrackedCSRArray#
generate_update(mask=None)#
Parameters:

mask – a boolean array signifying which indices should be returned. If there are no changes for a specific index, its value will be self.data_type.undefined

Returns:

is_special()#
is_undefined()#
reset()#
slice(item)#
strip_undefined(value: TrackedCSRArray, indices: ndarray) Tuple[TrackedCSRArray, ndarray]#
to_dict()#
update(value: CSRAttributeData | TrackedCSRArray | Tuple[ndarray, ndarray], indices: ndarray, process_undefined=False)#
class DataType(py_type: Type[T], unit_shape: Tuple[int, ...] = (), csr: bool = False)#

Bases: Generic[T]

csr: bool = False#
is_undefined(val)#
property np_type#
py_type: Type[T]#
property undefined#
unit_shape: Tuple[int, ...] = ()#
class EntityGroup(name: str | None = None)#

Bases: object

classmethod all_attributes() Dict[str, AttributeField]#
attributes: Dict[str, AttributeField] = {}#
property dataset_name#
get_attribute(identifier: str)#
get_indices(ids: Sequence[int]) ndarray#
property index: Index#
is_similiar(other: EntityGroup)#
register(state: StateProxy)#
register_attribute(spec: AttributeSpec, flags: int = 0, rtol=1e-05, atol=1e-08)#
state: StateProxy = None#
class EntityInitDataFormat(schema: AttributeSchema | None = None, non_data_dict_keys: Container[str] = ('general',), cache_inferred_attributes: bool = False)#

Bases: ExternalSerializationStrategy

dump_dict(dataset: dict)#
dumps(dataset: dict, filetype: FileType | None = FileType.JSON, **kwargs) str#
load_attribute(attr_data: list, name: str) dict#
load_bytes(raw: str | bytes, **kwargs)#
load_data_section(data: dict) dict#
load_entity_group(entity_group: dict)#
load_json(obj: dict)#
loads(raw_data, type: FileType)#
schema: AttributeSchema#
supported_file_types() Sequence[FileType]#
class Extensible#

Bases: object

register_attributes(attributes: Iterable[AttributeSpec])#
register_model_type(identifier: str, model_type: Type[Model])#
register_service(identifier: str, service: Type[Service], auto_use=False, daemon=True)#
set_strategy(tp)#
class Index(ids: _SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes] | None = None, raise_on_invalid=False)#

Bases: object

add_ids(ids: _SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes]) None#
block_count()#
ensure_unique(ids: _SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes])#
ids: ndarray | None = None#
params: IndexParams#
query_idx(item: int)#
query_indices(item: _SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes])#
set_ids(ids: _SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes])#
class InitDataHandlerBase#

Bases: object

ensure_ftype(name: str, ftype: FileType)#
get(name: str) Tuple[FileType | None, DatasetPath | None]#
class Model(model_config: dict)#

Bases: Plugin

get_adapter() Type[ModelAdapterBase]#
classmethod get_schema_attributes() Iterable[AttributeSpec]#
classmethod install(obj: Extensible)#
class ModelAdapterBase(model: Model, settings: Settings, logger: Logger)#

Bases: ABC

abstract close(message: QuitMessage)#
abstract initialize(init_data_handler: InitDataHandlerBase) DataMask#
logger: Logger#
model: Model#
abstract new_time(message: NewTimeMessage)#
set_schema(schema)#
settings: Settings#
abstract update(message: UpdateMessage, data: bytes | None) Tuple[bytes | None, int | None]#
abstract update_series(message: UpdateSeriesMessage, data: Iterable[bytes | None]) Tuple[bytes | None, int | None]#
class Moment(timestamp: int, timeline_info: movici_simulation_core.core.moment.TimelineInfo | None = None)#

Bases: object

classmethod assert_timeline_info(timeline_info: TimelineInfo | None = None)#
classmethod from_datetime(dt: datetime, timeline_info: TimelineInfo | None = None)#
classmethod from_seconds(seconds: float, timeline_info: TimelineInfo | None = None)#
classmethod from_string(datetime_str: str, timeline_info: TimelineInfo | None = None, **kwargs)#
is_at_beginning()#
property seconds#
timeline_info: TimelineInfo | None = None#
timestamp: int#
property world_time#
class Plugin#

Bases: object

classmethod install(obj: Extensible)#
class Service#

Bases: Plugin

classmethod install(obj: Extensible)#
logger: Logger#
run()#
setup(*, settings: Settings, stream: Stream, logger: Logger, socket: MessageRouterSocket)#
class TimelineInfo(reference: float, time_scale: float = 1, start_time: int = 0, duration: int = 0)#

Bases: object

datetime_to_timestamp(dt: datetime) int#
duration: int = 0#
property end_time: int#
is_at_beginning(timestamp: int)#
reference: float#
seconds_to_timestamp(seconds: float) int#
start_time: int = 0#
string_to_timestamp(dt_string: str, **kwargs)#
time_scale: float = 1#
timestamp_to_datetime(timestamp: int)#
timestamp_to_seconds(timestamp: int) float#
timestamp_to_unix_time(timestamp: int) float#
unix_time_to_timestamp(unix_time: float) int#
class TrackedArray(input_array, rtol=1e-05, atol=1e-08, equal_nan=False)#

Bases: ndarray

astype(dtype, order='K', casting='unsafe', subok=True, copy=True)#
atol: float#
property changed#
diff() Tuple[ndarray, ndarray]#
equal_nan: bool#
reset()#
rtol: float#
class TrackedCSRArray(data, row_ptr, rtol=1e-05, atol=1e-08, equal_nan=False)#

Bases: object

as_matrix()#
astype(dtype, order='K', casting='unsafe', subok=True, copy=True)#
changed: ndarray#
copy()#
data: ndarray#
get_row(index)#
reset()#
row_ptr: ndarray#
rows_contain(val)#

return a boolean array where the rows of csr contain the val argument

rows_equal(row)#

return a boolean array where the rows of csr equal the row argument

rows_intersect(vals)#

return a boolean array where the rows of csr contain any of the vals arguments

size: int#
slice(indices)#
update(updates: TrackedCSRArray, indices: ndarray)#

Update the CSRArray in place

update_from_matrix(matrix: ndarray)#

Update the csr-array from a 2D matrix. The matrix number of rows must match the csr-array’s number of rows

class TrackedState(schema: AttributeSchema | None = None, logger: Logger | None = None, track_unknown=0)#

Bases: object

all_attributes()#
attributes: Dict[str, Dict[str, Dict[str, UniformAttribute | CSRAttribute]]]#
generate_update(flags=8)#
get_attribute(dataset_name: str, entity_type: str, name: str)#
get_data_mask()#
get_index(dataset_name: str, entity_type: str)#
has_changes() bool#
index: Dict[str, Dict[str, Index]]#
is_ready_for(flag: int)#

flag: one of SUB, INIT

iter_attributes() Iterable[Tuple[str, str, str, UniformAttribute | CSRAttribute]]#
iter_datasets() Iterable[Tuple[str, Dict[str, Dict[str, UniformAttribute | CSRAttribute]]]]#
iter_entities() Iterable[Tuple[str, str, Dict[str, UniformAttribute | CSRAttribute]]]#
log(level, message)#
process_general_section(dataset_name: str, general_section: dict)#
receive_update(update: Dict, is_initial=False, process_undefined=False)#
register_attribute(dataset_name: str, entity_name: str, spec: AttributeSpec, flags: int = 0, rtol=1e-05, atol=1e-08) UniformAttribute | CSRAttribute#
register_dataset(dataset_name: str, entities: Sequence[Type[EntityGroup] | EntityGroup]) List[EntityGroup]#
register_entity_group(dataset_name, entity: Type[EntityGroupT] | EntityGroupT) EntityGroupT#
Return type:

object

reset_tracked_changes(flags)#
to_dict()#
track_unknown: int#
class UniformAttribute(data, data_type: DataType, flags: int = 0, rtol=1e-05, atol=1e-08, options: AttributeOptions | None = None, index: Index | None = None)#

Bases: Attribute

The underlying data can be accessed through the UniformAttribute().array attribute. When updating data using indexing (“[]”) notation, it is recommended to use UniformAttribute()[index]=value. When dealing with string (ie. unicode) arrays, this feature will make sure that the array itemsize will grow if trying to add strings that are larger than the current itemsize.

property array: TrackedArray#
generate_update(mask=None)#
Parameters:

mask – a boolean array signifying which indices should be returned. If there are no changes for a specific index, its value should be self.data_type.undefined

Returns:

is_special()#
is_undefined()#
reset()#
slice(item)#
strip_undefined(key, value)#
to_dict()#
update(value: ndarray | UniformAttributeData, indices: ndarray, process_undefined=False)#
class UpdateDataFormat#

Bases: InternalSerializationStrategy

CURRENT_VERSION = 1#
classmethod decode_numpy_array(obj)#
dumps(data: dict)#
classmethod encode_numpy_array(obj)#
loads(raw_bytes: bytes)#
attribute_max(attr: AttributeObject, *, func: callable = <function nanmax>) t.Union[None, bool, int, float]#
attribute_min(attr: AttributeObject, *, func: callable = <function nanmin>) t.Union[None, bool, int, float]#
attribute_plugin_from_dict(d: dict)#
attributes_from_dict(d: dict)#
configure_global_plugins(app: Extensible, key='movici.plugins', ignore_missing_imports=True)#
dump_update(data: dict)#
field#

alias of AttributeField

flag_info(flag: int)#
get_global_schema()#
get_rowptr(d: dict)#
get_timeline_info() TimelineInfo | None#
has_rowptr_key(d: dict)#
infer_data_type_from_array(attr_data: dict | ndarray | TrackedCSRArray)#

given array data, either as an np.ndarray, TrackedCSRArray or a “data”/”row_ptr” dictionary infer the DataType of that array data

infer_data_type_from_list(data: list)#
load_update(raw_bytes: bytes)#
matrix_to_csr(matrix: ndarray)#

convert a 2d array to a TrackedCSRArray

set_timeline_info(info_or_reference: float | TimelineInfo | None, time_scale: float | None = None, start_time: int | None = None)#