data_collector#

concurrent#

class LimitedThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())#

Bases: ThreadPoolExecutor

Similar to ThreadPoolExecutor, but blocks on submit when all workers are busy

submit(function, *args, **kwargs)#
exception MultipleException(exceptions: Sequence[Exception])#

Bases: Exception

static format_exception(e: Exception)#
class MultipleFutures(iterable: Iterable[Future] = ())#

Bases: object

Keep track of multiple concurrent.Futures

add(fut: Future)#
done()#
exception()#
wait()#

data_collector#

class DataCollector(model_config: dict)#

Bases: SimpleModel

classmethod add_storage_strategy(name, strategy: Type[StorageStrategy])#
aggregate: bool = False#
close(**_)#
flush(moment: Moment, origin: str | None)#
get_storage_strategy(settings: Settings, logger: Logger)#
initialize(settings: Settings, logger: Logger, **_) DataMask#
maybe_flush(moment: Moment, origin, trigger)#
new_time(new_time: Moment, **_)#
state: TrackedState | None = None#
strategies: Dict[str, Type[StorageStrategy]] = {'disk': <class 'movici_simulation_core.models.data_collector.data_collector.LocalStorageStrategy'>}#
strategy: StorageStrategy#
submit(fn, *args, **kwargs)#
update(moment: Moment, data: dict | None, message: UpdateMessage) Tuple[dict | None, Moment | None]#
class LocalStorageStrategy(directory: Path, filename_template='t{timestamp}_{iteration}_{name}')#

Bases: StorageStrategy

classmethod choose(model_config: dict, settings: Settings, **_) StorageStrategy#
initialize()#
reset_iterations(model: DataCollector)#
store(info: UpdateInfo)#
class StorageStrategy#

Bases: object

classmethod choose(model_config: dict, settings: Settings, logger: Logger) StorageStrategy#
initialize()#
reset_iterations(model: DataCollector)#
store(info: UpdateInfo)#
class UpdateInfo(name: 'str', timestamp: 'int', iteration: 'int', data: 'dict', origin: 't.Optional[str]' = None)#

Bases: object

data: dict#
full_data()#
iteration: int#
name: str#
origin: str | None = None#
timestamp: int#

Module contents#

class DataCollector(model_config: dict)#

Bases: SimpleModel

classmethod add_storage_strategy(name, strategy: Type[StorageStrategy])#
aggregate: bool = False#
close(**_)#
flush(moment: Moment, origin: str | None)#
get_storage_strategy(settings: Settings, logger: Logger)#
initialize(settings: Settings, logger: Logger, **_) DataMask#
maybe_flush(moment: Moment, origin, trigger)#
new_time(new_time: Moment, **_)#
state: TrackedState | None = None#
strategies: Dict[str, Type[StorageStrategy]] = {'disk': <class 'movici_simulation_core.models.data_collector.data_collector.LocalStorageStrategy'>}#
strategy: StorageStrategy#
submit(fn, *args, **kwargs)#
update(moment: Moment, data: dict | None, message: UpdateMessage) Tuple[dict | None, Moment | None]#
class StorageStrategy#

Bases: object

classmethod choose(model_config: dict, settings: Settings, logger: Logger) StorageStrategy#
initialize()#
reset_iterations(model: DataCollector)#
store(info: UpdateInfo)#