Source code for nixnet.convert

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import itertools
import typing  # NOQA: F401
import warnings

from nixnet import _frames
from nixnet import _funcs
from nixnet import _props
from nixnet import _utils
from nixnet import constants
from nixnet import errors
from nixnet import types

from nixnet._session import j1939 as session_j1939
from nixnet._session import signals as session_signals


__all__ = [
    "SignalConversionSinglePointSession"]


[docs]class SignalConversionSinglePointSession(object): """Convert NI-XNET signal data to frame data or vice versa. Conversion works similar to Single-Point mode. You specify a set of signals that can span multiple frames. Signal to frame conversion reads a set of values for the signals specified and writes them to the respective frame(s). Frame to signal conversion parses a set of frames and returns the latest signal value read from a corresponding frame. """ def __init__( self, database_name, # type: typing.Text cluster_name, # type: typing.Text signals, # type: typing.Union[typing.Text, typing.List[typing.Text]] ): # type: (...) -> None """Create an XNET session at run time using named references to database objects. Args: database_name(str): XNET database name to use for interface configuration. The database name must use the <alias> or <filepath> syntax (refer to Databases). cluster_name(str): XNET cluster name to use for interface configuration. The name must specify a cluster from the database given in the database_name parameter. If it is left blank, the cluster is extracted from the ``signals`` parameter. signals(list of str): Strings describing signals for the session. The list syntax is as follows: ``signals`` contains one or more XNET Signal names. Each name must be one of the following options, whichever uniquely identifies a signal within the database given: - ``<Signal>`` - ``<Frame>.<Signal>`` - ``<Cluster>.<Frame>.<Signal>`` - ``<PDU>.<Signal>`` - ``<Cluster>.<PDU>.<Signal>`` ``signals`` may also contain one or more trigger signals. For information about trigger signals, refer to Signal Output Single-Point Mode or Signal Input Single-Point Mode. """ flattened_list = _utils.flatten_items(signals) self._handle = None # To satisfy `__del__` in case nx_create_session throws self._handle = _funcs.nx_create_session( database_name, cluster_name, flattened_list, "", constants.CreateSessionMode.SIGNAL_CONVERSION_SINGLE_POINT) self._j1939 = session_j1939.J1939(self._handle) self._signals = session_signals.Signals(self._handle) def __del__(self): if self._handle is not None: warnings.warn( 'Session was not explicitly closed before it was destructed. ' 'Resources on the device may still be reserved.', errors.XnetResourceWarning) def __enter__(self): return self def __exit__(self, exception_type, exception_value, traceback): self.close() def __eq__(self, other): if isinstance(other, self.__class__): return self._handle == typing.cast(SignalConversionSinglePointSession, other)._handle else: return NotImplemented def __ne__(self, other): result = self.__eq__(other) if result is NotImplemented: return result else: return not result def __hash__(self): return hash(self._handle) def __repr__(self): # type: () -> typing.Text return '{}(handle={})'.format(type(self).__name__, self._handle)
[docs] def close(self): # type: () -> None """Close (clear) the XNET session.""" if self._handle is None: warnings.warn( 'Attempting to close NI-XNET session but session was already ' 'closed', errors.XnetResourceWarning) return _funcs.nx_clear(self._handle) self._handle = None
@property def signals(self): # type: () -> session_signals.Signals """:any:`nixnet._session.signals.Signals`: Operate on session's signals""" return self._signals @property def j1939(self): # type: () -> session_j1939.J1939 """:any:`nixnet._session.j1939.J1939`: Returns the J1939 configuration object for the session.""" return self._j1939 @property def application_protocol(self): # type: () -> constants.AppProtocol """:any:`nixnet._enums.AppProtocol`: This property returns the application protocol that the session uses. The database used with the session determines the application protocol. """ return constants.AppProtocol(_props.get_session_application_protocol(self._handle)) @property def cluster_name(self): # type: () -> typing.Text """str: This property returns the cluster (network) name used with the session.""" return _props.get_session_cluster_name(self._handle) @property def database_name(self): # type: () -> typing.Text """str: This property returns the database name used with the session.""" return _props.get_session_database_name(self._handle) @property def mode(self): # type: () -> constants.CreateSessionMode """:any:`nixnet._enums.CreateSessionMode`: This property returns the mode associated with the session. For more information, refer to :any:`nixnet._enums.CreateSessionMode`. """ return constants.CreateSessionMode(_props.get_session_mode(self._handle)) @property def protocol(self): # type: () -> constants.Protocol """:any:`nixnet._enums.Protocol`: This property returns the protocol that the interface in the session uses.""" return constants.Protocol(_props.get_session_protocol(self._handle)) def _convert_bytes_to_signals(self, bytes): # type: (bytes) -> typing.Iterable[typing.Tuple[int, float]] num_signals = len(self.signals) timestamps, values = _funcs.nx_convert_frames_to_signals_single_point(self._handle, bytes, num_signals) for timestamp, value in zip(timestamps, values): yield timestamp.value, value.value
[docs] def convert_frames_to_signals(self, frames): # type: (typing.Iterable[types.Frame]) -> typing.Iterable[typing.Tuple[int, float]] """Convert Frames to signals. The frames passed into the ``frames`` array are read one by one, and the signal values found are written to internal buffers for each signal. Frames are identified by their identifier (FlexRay: slot) field. After all frames in ``frames`` array are processed, the internal signal buffers' status is returned with the corresponding timestamps from the frames where a signal value was found. The signal internal buffers' status is being preserved over multiple calls to this function. This way, for example, data returned from multiple calls of nxFrameRead for a Frame Input Stream Mode session (or any other Frame Input session) can be passed to this function directly. .. note:: Frames unknown to the session are silently ignored. """ units = itertools.chain.from_iterable( _frames.serialize_frame(frame.to_raw()) for frame in frames) bytes = b"".join(units) return self._convert_bytes_to_signals(bytes)
def _convert_signals_to_bytes(self, signals, num_bytes): # type: (typing.Iterable[float], int) -> bytes buffer, number_of_bytes_returned = _funcs.nx_convert_signals_to_frames_single_point( self._handle, list(signals), num_bytes) return buffer[0:number_of_bytes_returned]
[docs] def convert_signals_to_frames(self, signals, frame_type=types.XnetFrame): # type: (typing.Iterable[float], typing.Type[types.FrameFactory]) -> typing.Iterable[types.Frame] """Convert signals to frames. The signal values written to the ``signals`` array are written to a raw frame buffer array. For each frame included in the session, one frame is generated in the array that contains the signal values. Signals not present in the session are written as their respective default values; empty space in the frames that signals do not occupy is written with the frame's default payload. The frame header values are filled with appropriate values so that this function's output can be directly written to a Frame Output session. Args: signals(list of float): Values corresponding to signals configured in this session. frame_type(:any:`nixnet.types.FrameFactory`): A factory for the desired frame formats. Yields: :any:`nixnet.types.Frame` """ from_raw = typing.cast(typing.Callable[[types.RawFrame], types.Frame], frame_type.from_raw) # Unlike some session reads, this should be safe from asking to read too much. num_frames_to_read = 5 while True: try: num_bytes_to_read = num_frames_to_read * _frames.nxFrameFixed_t.size buffer = self._convert_signals_to_bytes(signals, num_bytes_to_read) break except errors.XnetError as e: if e.error_type == constants.Err.BUFFER_TOO_SMALL: num_frames_to_read *= 2 else: raise for frame in _frames.iterate_frames(buffer): yield from_raw(frame)