Reference

Reference for cfei.smap

class cfei.smap.SmapAiohttpInterface(url: str, key: str = '', encoding: str = 'utf-8', buffer_size: int = 65536, max_concurrent_requests: int = 10, cache: bool = False) → None

Bases: cfei.smap.SmapInterface

An interface to sMAP protocol over HTTP.

New in version 1.1.

fetch_everything(where: str) → typing.Dict[uuid.UUID, typing.Any]

Query for everything.

Query the sMAP archiver for all streams that satisfy the where condition and return everything.

Parameters:where (str) – Query WHERE condition.
Returns:A mapping UUID -> metadata.
Return type:dict(UUID, everything)
fetch_latest_readings(where: str, limit: int = 1, streamlimit: int = 10000, instant: typing.Union[datetime.datetime, NoneType] = None) → typing.Dict[uuid.UUID, pandas.core.series.Series]

Query for readings and return by UUID.

Query the sMAP archiver for all the readings before current or specified instant for all streams that satisfy the where condition. The results are returned by UUID.

Parameters:
  • where (str) – Query WHERE condition.
  • limit (int) – Maximum number of readings (-1 for unlimited)
  • streamlimit (int) – Maximum number of streams
  • instant (datetime) – Instant (if None, current instant is used)
Returns:

A mapping UUID -> values.

Return type:

dict(UUID, pd.Series)

fetch_metadata(where: str) → typing.Dict[uuid.UUID, typing.Any]

Query for metadata.

Query the sMAP archiver for all streams that satisfy the where condition and return their metadata.

Parameters:where (str) – Query WHERE condition.
Returns:A mapping UUID -> metadata.
Return type:dict(UUID, object)
fetch_properties(where: str) → typing.Dict[uuid.UUID, typing.Any]

Query for properties.

Query the sMAP archiver for all streams that satisfy the where condition and return their properties.

Parameters:where (str) – Query WHERE condition.
Returns:A mapping UUID -> properties.
Return type:dict(UUID, object)
fetch_raw(query: str) → str

Query for raw query.

Query the sMAP archiver using a custom query and return.

Parameters:where (str) – Raw query.
Returns:Parsed JSON output.
Return type:Any
fetch_readings(start: datetime.datetime, end: datetime.datetime, where: str, limit: int = -1, streamlimit: int = 10000) → typing.Dict[uuid.UUID, pandas.core.series.Series]

Query for readings and return by UUID.

Query the sMAP archiver for all readings between start and end for all streams that satisfy the where condition. The results are returned by UUID.

Parameters:
  • start (datetime) – Query start time.
  • end (datetime) – Query end time.
  • where (str) – Query WHERE condition.
  • limit (int) – Maximum number of readings (-1 for unlimited)
  • streamlimit (int) – Maximum number of streams
Returns:

A mapping UUID -> values.

Return type:

dict(UUID, pd.Series)

Warning

When limit is equal to -1 the returned streams are actually limited to one million readings (possibly depending on the server implementation). Using a higher value for limit works correctly.

fetch_readings_intertwined(start: datetime.datetime, end: datetime.datetime, where: str, limit: int = -1, streamlimit: int = 10000) → pandas.core.series.Series

Query for readings and return a single stream.

Query the sMAP archiver for all readings between start and end for all streams that satisfy the where condition. The results are intertwined.

Parameters:
  • start (datetime) – Query start time.
  • end (datetime) – Query end time.
  • where (str) – Query WHERE condition.
  • limit (int) – Maximum number of readings (-1 for unlimited)
  • streamlimit (int) – Maximum number of streams
Returns:

Intertwined series.

Return type:

pd.Series

Warning

When limit is equal to -1 the returned streams are actually limited to one million readings (possibly depending on the server implementation). Using a higher value for limit works correctly.

fetch_uuids(where: str) → typing.Set[uuid.UUID]

Query for UUIDs.

Query the sMAP archiver for all streams that satisfy the where condition and return their UUIDs.

Parameters:where (str) – Query WHERE condition.
Returns:The UUIDs.
Return type:set(UUID)
post_data(stream: pandas.core.series.Series, uuid: uuid.UUID, source_name: str, path: str, metadata: typing.Union[typing.Dict, NoneType] = None, properties: typing.Union[typing.Dict, NoneType] = None) → None

Post new readings to a stream.

Parameters:
  • streams (pd.Series) – New readings.
  • uuid (UUID) – Stream’s UUID.
  • source_name (str) – Stream’s source name.
  • path (str) – Stream’s path.
  • metadata (dict) – Stream’s metadata
  • properties (dict) – Stream’s properties
post_readings(readings: typing.Union[typing.Dict[uuid.UUID, pandas.core.series.Series], typing.Tuple[uuid.UUID, pandas.core.series.Series]]) → None

Post new readings to existing streams.

The specified streams’ paths will be fetched and cached before posting the readings. All streams must, therefore, exist beforehand.

Parameters:readings (dict|pd.Series) – Either a dictionary {UUID: series} or a pair (UUID, pd.Series).
subscribe(where: typing.Union[str, NoneType], callback: typing.Callable[[uuid.UUID, pandas.core.series.Series], typing.Awaitable[NoneType]], timeout: int = 30) → None

Subscribe to sMAP republish interface.

The callback function will be called with new readings as soon as the archiver will republish them.

Args: :param where: Query text. :type where: str :param callback: Callback to process new readings. :type callback: SeriesCallbackType :param timeout: Connection timeout in seconds. :type timeout: int

class cfei.smap.SmapHttpInterface(url: str, key: str = '', encoding: str = 'utf-8', buffer_size: int = 65536, max_concurrent_requests: int = 10, cache: bool = False) → None

Bases: cfei.smap.SmapInterface

An interface to sMAP protocol over HTTP.

Deprecated since version 1.1: Use SmapAiohttpInterface instead.

fetch_everything(where: str) → typing.Dict[uuid.UUID, typing.Any]

Query for everything.

Query the sMAP archiver for all streams that satisfy the where condition and return everything.

Parameters:where (str) – Query WHERE condition.
Returns:A mapping UUID -> metadata.
Return type:dict(UUID, everything)
fetch_latest_readings(where: str, limit: int = 1, streamlimit: int = 10000, instant: typing.Union[datetime.datetime, NoneType] = None) → typing.Dict[uuid.UUID, pandas.core.series.Series]

Query for readings and return by UUID.

Query the sMAP archiver for all the readings before current or specified instant for all streams that satisfy the where condition. The results are returned by UUID.

Parameters:
  • where (str) – Query WHERE condition.
  • limit (int) – Maximum number of readings (-1 for unlimited)
  • streamlimit (int) – Maximum number of streams
  • instant (datetime) – Instant (if None, current instant is used)
Returns:

A mapping UUID -> values.

Return type:

dict(UUID, pd.Series)

fetch_metadata(where: str) → typing.Dict[uuid.UUID, typing.Any]

Query for metadata.

Query the sMAP archiver for all streams that satisfy the where condition and return their metadata.

Parameters:where (str) – Query WHERE condition.
Returns:A mapping UUID -> metadata.
Return type:dict(UUID, object)
fetch_properties(where: str) → typing.Dict[uuid.UUID, typing.Any]

Query for properties.

Query the sMAP archiver for all streams that satisfy the where condition and return their properties.

Parameters:where (str) – Query WHERE condition.
Returns:A mapping UUID -> properties.
Return type:dict(UUID, object)
fetch_raw(query: str) → str

Query for raw query.

Query the sMAP archiver using a custom query and return.

Parameters:where (str) – Raw query.
Returns:Parsed JSON output.
Return type:Any
fetch_readings(start: datetime.datetime, end: datetime.datetime, where: str, limit: int = -1, streamlimit: int = 10000) → typing.Dict[uuid.UUID, pandas.core.series.Series]

Query for readings and return by UUID.

Query the sMAP archiver for all readings between start and end for all streams that satisfy the where condition. The results are returned by UUID.

Parameters:
  • start (datetime) – Query start time.
  • end (datetime) – Query end time.
  • where (str) – Query WHERE condition.
  • limit (int) – Maximum number of readings (-1 for unlimited)
  • streamlimit (int) – Maximum number of streams
Returns:

A mapping UUID -> values.

Return type:

dict(UUID, pd.Series)

Warning

When limit is equal to -1 the returned streams are actually limited to one million readings (possibly depending on the server implementation). Using a higher value for limit works correctly.

fetch_readings_intertwined(start: datetime.datetime, end: datetime.datetime, where: str, limit: int = -1, streamlimit: int = 10000) → pandas.core.series.Series

Query for readings and return a single stream.

Query the sMAP archiver for all readings between start and end for all streams that satisfy the where condition. The results are intertwined.

Parameters:
  • start (datetime) – Query start time.
  • end (datetime) – Query end time.
  • where (str) – Query WHERE condition.
  • limit (int) – Maximum number of readings (-1 for unlimited)
  • streamlimit (int) – Maximum number of streams
Returns:

Intertwined series.

Return type:

pd.Series

Warning

When limit is equal to -1 the returned streams are actually limited to one million readings (possibly depending on the server implementation). Using a higher value for limit works correctly.

fetch_uuids(where: str) → typing.Set[uuid.UUID]

Query for UUIDs.

Query the sMAP archiver for all streams that satisfy the where condition and return their UUIDs.

Parameters:where (str) – Query WHERE condition.
Returns:The UUIDs.
Return type:set(UUID)
post_data(stream: pandas.core.series.Series, uuid: uuid.UUID, source_name: str, path: str, metadata: typing.Union[typing.Dict, NoneType] = None, properties: typing.Union[typing.Dict, NoneType] = None) → None

Post new readings to a stream.

Parameters:
  • streams (pd.Series) – New readings.
  • uuid (UUID) – Stream’s UUID.
  • source_name (str) – Stream’s source name.
  • path (str) – Stream’s path.
  • metadata (dict) – Stream’s metadata
  • properties (dict) – Stream’s properties
post_readings(readings: typing.Union[typing.Dict[uuid.UUID, pandas.core.series.Series], typing.Tuple[uuid.UUID, pandas.core.series.Series]]) → None

Post new readings to existing streams.

The specified streams’ paths will be fetched and cached before posting the readings. All streams must, therefore, exist beforehand.

Parameters:readings (dict|pd.Series) – Either a dictionary {UUID: series} or a pair (UUID, pd.Series).
subscribe(where: typing.Union[str, NoneType], callback: typing.Callable[[uuid.UUID, pandas.core.series.Series], typing.Awaitable[NoneType]], timeout: int = 30) → None

Subscribe to sMAP republish interface.

The callback function will be called with new readings as soon as the archiver will republish them.

Args: :param where: Query text. :type where: str :param callback: Callback to process new readings. :type callback: SeriesCallbackType :param timeout: Connection timeout in seconds. :type timeout: int

class cfei.smap.SmapInterface(transport_interface: cfei.smap.transport.TransportInterface, cache: bool = False, loose_validation: bool = False) → None

Bases: object

An interface to sMAP protocol.

Each object has a local cache for metadata, so querying multiple times for a stream results in only one query for its metadata.

fetch_everything(where: str) → typing.Dict[uuid.UUID, typing.Any]

Query for everything.

Query the sMAP archiver for all streams that satisfy the where condition and return everything.

Parameters:where (str) – Query WHERE condition.
Returns:A mapping UUID -> metadata.
Return type:dict(UUID, everything)
fetch_latest_readings(where: str, limit: int = 1, streamlimit: int = 10000, instant: typing.Union[datetime.datetime, NoneType] = None) → typing.Dict[uuid.UUID, pandas.core.series.Series]

Query for readings and return by UUID.

Query the sMAP archiver for all the readings before current or specified instant for all streams that satisfy the where condition. The results are returned by UUID.

Parameters:
  • where (str) – Query WHERE condition.
  • limit (int) – Maximum number of readings (-1 for unlimited)
  • streamlimit (int) – Maximum number of streams
  • instant (datetime) – Instant (if None, current instant is used)
Returns:

A mapping UUID -> values.

Return type:

dict(UUID, pd.Series)

fetch_metadata(where: str) → typing.Dict[uuid.UUID, typing.Any]

Query for metadata.

Query the sMAP archiver for all streams that satisfy the where condition and return their metadata.

Parameters:where (str) – Query WHERE condition.
Returns:A mapping UUID -> metadata.
Return type:dict(UUID, object)
fetch_properties(where: str) → typing.Dict[uuid.UUID, typing.Any]

Query for properties.

Query the sMAP archiver for all streams that satisfy the where condition and return their properties.

Parameters:where (str) – Query WHERE condition.
Returns:A mapping UUID -> properties.
Return type:dict(UUID, object)
fetch_raw(query: str) → str

Query for raw query.

Query the sMAP archiver using a custom query and return.

Parameters:where (str) – Raw query.
Returns:Parsed JSON output.
Return type:Any
fetch_readings(start: datetime.datetime, end: datetime.datetime, where: str, limit: int = -1, streamlimit: int = 10000) → typing.Dict[uuid.UUID, pandas.core.series.Series]

Query for readings and return by UUID.

Query the sMAP archiver for all readings between start and end for all streams that satisfy the where condition. The results are returned by UUID.

Parameters:
  • start (datetime) – Query start time.
  • end (datetime) – Query end time.
  • where (str) – Query WHERE condition.
  • limit (int) – Maximum number of readings (-1 for unlimited)
  • streamlimit (int) – Maximum number of streams
Returns:

A mapping UUID -> values.

Return type:

dict(UUID, pd.Series)

Warning

When limit is equal to -1 the returned streams are actually limited to one million readings (possibly depending on the server implementation). Using a higher value for limit works correctly.

fetch_readings_intertwined(start: datetime.datetime, end: datetime.datetime, where: str, limit: int = -1, streamlimit: int = 10000) → pandas.core.series.Series

Query for readings and return a single stream.

Query the sMAP archiver for all readings between start and end for all streams that satisfy the where condition. The results are intertwined.

Parameters:
  • start (datetime) – Query start time.
  • end (datetime) – Query end time.
  • where (str) – Query WHERE condition.
  • limit (int) – Maximum number of readings (-1 for unlimited)
  • streamlimit (int) – Maximum number of streams
Returns:

Intertwined series.

Return type:

pd.Series

Warning

When limit is equal to -1 the returned streams are actually limited to one million readings (possibly depending on the server implementation). Using a higher value for limit works correctly.

fetch_uuids(where: str) → typing.Set[uuid.UUID]

Query for UUIDs.

Query the sMAP archiver for all streams that satisfy the where condition and return their UUIDs.

Parameters:where (str) – Query WHERE condition.
Returns:The UUIDs.
Return type:set(UUID)
post_data(stream: pandas.core.series.Series, uuid: uuid.UUID, source_name: str, path: str, metadata: typing.Union[typing.Dict, NoneType] = None, properties: typing.Union[typing.Dict, NoneType] = None) → None

Post new readings to a stream.

Parameters:
  • streams (pd.Series) – New readings.
  • uuid (UUID) – Stream’s UUID.
  • source_name (str) – Stream’s source name.
  • path (str) – Stream’s path.
  • metadata (dict) – Stream’s metadata
  • properties (dict) – Stream’s properties
post_readings(readings: typing.Union[typing.Dict[uuid.UUID, pandas.core.series.Series], typing.Tuple[uuid.UUID, pandas.core.series.Series]]) → None

Post new readings to existing streams.

The specified streams’ paths will be fetched and cached before posting the readings. All streams must, therefore, exist beforehand.

Parameters:readings (dict|pd.Series) – Either a dictionary {UUID: series} or a pair (UUID, pd.Series).
subscribe(where: typing.Union[str, NoneType], callback: typing.Callable[[uuid.UUID, pandas.core.series.Series], typing.Awaitable[NoneType]], timeout: int = 30) → None

Subscribe to sMAP republish interface.

The callback function will be called with new readings as soon as the archiver will republish them.

Args: :param where: Query text. :type where: str :param callback: Callback to process new readings. :type callback: SeriesCallbackType :param timeout: Connection timeout in seconds. :type timeout: int

cfei.smap.generate_uuid_as_in_java(source_name: str, path: str) → uuid.UUID

Generate a UUID from source name and path

This function has the same implementation of the Java library, therefore, with same inputs it will generate the same UUID.

Parameters:
  • source_name (typing.Text) – Source name
  • path (typing.Text) – Path.
Returns:

A UUID.

Return type:

UUID

cfei.smap.intertwine_series(serieses: typing.List[pandas.core.series.Series]) → pandas.core.series.Series

Convert a list of time-series to a single time-series, such that readings are in order

Parameters:
  • series (typing.List[pd.Series]) – List of time-series.
  • uuids (typing.List[UUID]) – List of uuids.
Returns:

A new intertwined time-series.

Return type:

pd.Series

Reference for cfei.smap.types

cfei.smap.types.CallbackType

alias of typing.Callable

cfei.smap.types.RawCallbackType

alias of typing.Callable

cfei.smap.types.SeriesCallbackType

alias of typing.Callable

Reference for cfei.smap.transport

class cfei.smap.transport.TransportInterface → None

Bases: object

Interface for transport layer.

fetch(query: str) → str

Fetch data by query.

Parameters:query (str) – Query.
Returns:Query result.
Return type:typing.Text
post_data(data: str) → None

Post new data.

Parameters:data (str) – data.
subscribe(where: typing.Union[str, NoneType], callback: typing.Callable[str, typing.Awaitable[NoneType]], timeout: int = 30) → None

Subscribe to republish interface.

The callback function will be called with new readings as soon as the archiver will republish them.

Parameters:
  • where (str) – Query text.
  • callback (CallbackType) – Callback to process new readings.
  • timeout (int) – Connection timeout in seconds.
synchronous_fetch(query: str) → str

Synchronously fetch data by query.

Parameters:query (str) – Query.
Returns:Query result.
Return type:typing.Text

Reference for cfei.smap.transport.aiohttp

class cfei.smap.transport.aiohttp.AiohttpTransportInterface(url: str, key: str = '', encoding: str = 'utf-8', buffer_size: int = 65536, max_concurrent_requests: int = 10) → None

Bases: cfei.smap.transport.TransportInterface

Interface for HTTP transport layer.

Changed in version 1.1: This class was previously called cfei.smap.transport.HttpTransportInterface.

fetch(query: str) → str

Fetch data by query.

Parameters:query (str) – Query.
Returns:Query result.
Return type:JsonType
post_data(data: str) → None

Post new data.

Parameters:data (str) – data.
subscribe(where: typing.Union[str, NoneType], callback: typing.Callable[str, typing.Awaitable[NoneType]], timeout: int = 30) → None

Subscribe to republish interface.

The callback function will be called with new readings as soon as the archiver will republish them.

Parameters:
  • where (str) – Query text.
  • callback (CallbackType) – Callback to process new readings.
  • timeout (int) – Connection timeout in seconds.
synchronous_fetch(query: str) → str

Synchronously fetch data by query.

Parameters:query (str) – Query.
Returns:Query result.
Return type:JsonType

Reference for cfei.smap.cache

cfei.smap.cache.load_raw_from_cache(query: str) → str

Load raw content from local cache

Cache file name is generated from the entire query. If such file exists, it is loaded using pickle library.

Parameters:query (str) – Query.
Returns:Raw data.
Return type:str
Raises:RuntimeError – If cache loading failed.
cfei.smap.cache.load_readings_from_cache(start: datetime.datetime, end: datetime.datetime, where: str, limit: int, streamlimit: int) → typing.Dict[uuid.UUID, pandas.core.series.Series]

Load readings from local cache

Cache file name is generated from the where clause. If such file exists, it is loaded using pickle library. Loading fails if:

  • Data contained in cache does not cover the entire period [start, end];
  • Data contained in cache had different limit or streamlimit arguments.
Parameters:
  • start (datetime) – Query start time.
  • end (datetime) – Query end time.
  • where (str) – Query WHERE condition.
  • limit (int) – Maximum number of readings
  • streamlimit (int) – Maximum number of streams
Returns:

A mapping UUID -> values.

Return type:

dict(UUID, pd.Series)

Raises:

RuntimeError – If cache loading failed.

cfei.smap.cache.save_raw_to_cache(data: str, query: str) → None

Save raw data to local cache

Cache file name is generated from the entire query.

Parameters:
  • data (str) – Raw data.
  • query (str) – Query.
cfei.smap.cache.save_readings_to_cache(data: typing.Dict[uuid.UUID, typing.Any], start: datetime.datetime, end: datetime.datetime, where: str, limit: int, streamlimit: int) → None

Save readings to local cache

Cache file name is generated from the where clause.

Parameters:
  • data (dict(UUID, typing.Any)) – Readings.
  • start (datetime) – Query start time.
  • end (datetime) – Query end time.
  • where (str) – Query WHERE condition.
  • limit (int) – Maximum number of readings
  • streamlimit (int) – Maximum number of streams