Source code for nixnet.types

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import abc
import collections
import typing  # NOQA: F401

import six

from nixnet import _cconsts
from nixnet import _errors
from nixnet import _py2
from nixnet import constants

__all__ = [

DriverVersion_ = collections.namedtuple(
    ['major', 'minor', 'update', 'phase', 'build'])

[docs]class DriverVersion(DriverVersion_): """Driver Version The arguments align with the following fields: ``[major].[minor].[update][phase][build]``. Attributes: major (int): minor (int): update (int): phase (:any:`nixnet._enums.Phase`): build (int): """
CanComm_ = collections.namedtuple( 'CanComm_', ['state', 'tcvr_err', 'sleep', 'last_err', 'tx_err_count', 'rx_err_count'])
[docs]class CanComm(CanComm_): """CAN Communication State. Attributes: state (:any:`nixnet._enums.CanCommState`): Communication State tcvr_err (bool): Transceiver Error. Transceiver error indicates whether an error condition exists on the physical transceiver. This is typically referred to as the transceiver chip NERR pin. False indicates normal operation (no error), and true indicates an error. sleep (bool): Sleep. Sleep indicates whether the transceiver and communication controller are in their sleep state. False indicates normal operation (awake), and true indicates sleep. last_err (:any:`nixnet._enums.CanLastErr`): Last Error. Last error specifies the status of the last attempt to receive or transmit a frame tx_err_count (int): Transmit Error Counter. The transmit error counter begins at 0 when communication starts on the CAN interface. The counter increments when an error is detected for a transmitted frame and decrements when a frame transmits successfully. The counter increases more for an error than it is decreased for success. This ensures that the counter generally increases when a certain ratio of frames (roughly 1/8) encounter errors. When communication state transitions to Bus Off, the transmit error counter no longer is valid. rx_err_count (int): Receive Error Counter. The receive error counter begins at 0 when communication starts on the CAN interface. The counter increments when an error is detected for a received frame and decrements when a frame is received successfully. The counter increases more for an error than it is decreased for success. This ensures that the counter generally increases when a certain ratio of frames (roughly 1/8) encounter errors. """ pass
LinComm_ = collections.namedtuple( 'LinComm_', ['sleep', 'state', 'last_err', 'err_received', 'err_expected', 'err_id', 'tcvr_rdy', 'sched_index'])
[docs]class LinComm(LinComm_): """CAN Communication State. Attributes: sleep (bool): Sleep. Indicates whether the transceiver and communication controller are in their sleep state. False indicates normal operation (awake), and true indicates sleep. state (:any:`nixnet._enums.LinCommState`): Communication State last_err (:any:`nixnet._enums.LinLastErr`): Last Error. Last error specifies the status of the last attempt to receive or transmit a frame err_received (int): Returns the value received from the network when last error occurred. When ``last_err`` is ``READBACK``, this is the value read back. When ``last_err`` is ``CHECKSUM``, this is the received checksum. err_expected (int): Returns the value that the LIN interface expected to see (instead of last received). When ``last_err`` is ``READBACK``, this is the value transmitted. When ``last_err`` is ``CHECKSUM``, this is the calculated checksum. err_id (int): Returns the frame identifier in which the last error occurred. This is not applicable when ``last_err`` is ``NONE`` or ``UNKNOWN_ID``. tcvr_rdy (bool): Indicates whether the LIN transceiver is powered from the bus. True indicates the bus power exists, so it is safe to start communication on the LIN interface. If this value is false, you cannot start communication successfully. Wire power to the LIN transceiver and run your application again. sched_index (int): Indicates the LIN schedule that the interface currently is running. This index refers to a LIN schedule that you requested using the :any:`nixnet._session.base.SessionBase.change_lin_schedule` function. It indexes the array of schedules represented in the :any:`nixnet._session.intf.Interface.lin_sched_names`. This index applies only when the LIN interface is running as a master. If the LIN interface is running as a slave only, this element should be ignored. """ pass
PduProperties_ = collections.namedtuple( 'PDU_PROPERTIES_', ['pdu', 'start_bit', 'update_bit'])
[docs]class PduProperties(PduProperties_): """Properties that map a PDU onto a frame. Mapping PDUs to a frame requires setting three frame properties that are combined into this tuple. Attributes: pdu (:any:`Pdu`): Defines the sequence of values for the other two properties. start_bit (int): Defines the start bit of the PDU inside the frame. update_bit (int): Defines the update bit for the PDU inside the frame. If the update bit is not used, set the value to ``-1``. """
[docs]class CanIdentifier(object): """CAN frame arbitration identifier. Attributes: identifier(int): CAN frame arbitration identifier extended(bool): If the identifier is extended """ _FRAME_ID_MASK = 0x000007FF _EXTENDED_FRAME_ID_MASK = 0x1FFFFFFF def __init__(self, identifier, extended=False): # type: (int, bool) -> None self.identifier = identifier self.extended = extended
[docs] @classmethod def from_raw(cls, raw): # type: (int) -> CanIdentifier """Parse a raw frame identifier into a CanIdentifier Args: raw(int): A raw frame identifier Returns: CanIdentifier: parsed value >>> CanIdentifier.from_raw(0x1) CanIdentifier(0x1) >>> CanIdentifier.from_raw(0x20000001) CanIdentifier(0x1, extended=True) """ extended = bool(raw & _cconsts.NX_FRAME_ID_CAN_IS_EXTENDED) if extended: identifier = raw & cls._EXTENDED_FRAME_ID_MASK else: identifier = raw & cls._FRAME_ID_MASK return cls(identifier, extended)
def __int__(self): """Convert CanIdentifier into a raw frame identifier >>> hex(int(CanIdentifier(1))) '0x1' >>> hex(int(CanIdentifier(1, True))) '0x20000001' """ identifier = self.identifier if self.extended: if identifier != (identifier & self._EXTENDED_FRAME_ID_MASK): _errors.check_for_error(_cconsts.NX_ERR_UNDEFINED_FRAME_ID) identifier |= _cconsts.NX_FRAME_ID_CAN_IS_EXTENDED else: if identifier != (identifier & self._FRAME_ID_MASK): _errors.check_for_error(_cconsts.NX_ERR_UNDEFINED_FRAME_ID) return identifier def __eq__(self, other): if isinstance(other, CanIdentifier): other_id = typing.cast(CanIdentifier, other) return all(( self.identifier == other_id.identifier, self.extended == other_id.extended)) else: return NotImplemented def __ne__(self, other): result = self.__eq__(other) if result is NotImplemented: return result else: return not result def __repr__(self): """CanIdentifier debug representation. >>> CanIdentifier(1) CanIdentifier(0x1) >>> CanIdentifier(1, True) CanIdentifier(0x1, extended=True) """ if self.extended: return "{}(0x{:x}, extended={})".format( type(self).__name__, self.identifier, self.extended) else: return "{}(0x{:x})".format( type(self).__name__, self.identifier)
[docs]@six.add_metaclass(abc.ABCMeta) class FrameFactory(object): """ABC for creating :any:`nixnet.types.Frame` objects.""" __slots__ = ()
[docs] @_py2.abstractclassmethod def from_raw(cls, frame): # NOQA: N805 can't detect abstractclassmethod # No type annotation because mypy doesn't understand # abstractclassmethod is the same as classmethod """Convert from RawFrame.""" pass
[docs]@six.add_metaclass(abc.ABCMeta) class Frame(FrameFactory): """ABC for frame objects.""" __slots__ = ()
[docs] @abc.abstractmethod def to_raw(self): # type: () -> RawFrame """Convert to RawFrame.""" pass
@abc.abstractproperty def type(self): # type: () -> constants.FrameType """:any:`nixnet._enums.FrameType`: Frame format.""" pass @abc.abstractmethod def __eq__(self, other): pass def __ne__(self, other): result = self.__eq__(other) if result is NotImplemented: return result else: return not result @abc.abstractmethod def __repr__(self): pass
[docs]class RawFrame(Frame): """Raw Frame. Attributes: timestamp(int): Absolute time the XNET interface received the end-of-frame. identifier(int): Frame identifier. type(:any:`nixnet._enums.FrameType`): Frame type. flags(int): Flags that qualify the type. info(int): Info that qualify the type. payload(bytes): Payload. """ __slots__ = [ "timestamp", "identifier", "_type", "flags", "info", "payload"] def __init__(self, timestamp, identifier, type, flags=0, info=0, payload=b""): # type: (int, int, constants.FrameType, int, int, bytes) -> None self.timestamp = timestamp self.identifier = identifier self._type = type self.flags = flags = info self.payload = payload
[docs] @classmethod def from_raw(cls, frame): """Convert from RawFrame.""" return frame
[docs] def to_raw(self): """Convert to RawFrame.""" return self
@property def type(self): return self._type def __eq__(self, other): if isinstance(other, self.__class__): other_frame = typing.cast(RawFrame, other) return all(( self.timestamp == other_frame.timestamp, self.identifier == other_frame.identifier, self.type == other_frame.type, self.flags == other_frame.flags, ==, self.payload == other_frame.payload)) else: return NotImplemented def __repr__(self): # type: () -> typing.Text """RawFrame debug representation. >>> RawFrame(1, 2, constants.FrameType.CAN_DATA, 3, 4) RawFrame(timestamp=0x1, identifier=0x2, type=FrameType.CAN_DATA, flags=0x3, info=0x4) """ optional = [] if self.flags != 0: optional.append('flags=0x{:x}'.format(self.flags)) if != 0: optional.append('info=0x{:x}'.format( if self.payload: optional.append('len(payload)={}'.format(len(self.payload))) if optional: optional_params = ', {}'.format(", ".join(optional)) else: optional_params = '' return "{}(timestamp=0x{:x}, identifier=0x{:x}, type={}{})".format( type(self).__name__, self.timestamp, self.identifier, self.type, optional_params)
[docs]class CanFrame(Frame): """CAN Frame. Attributes: identifier(:any:`nixnet.types.CanIdentifier`): CAN frame arbitration identifier. echo(bool): If the frame is an echo of a successful transmit rather than being received from the network. type(:any:`nixnet._enums.FrameType`): Frame type. timestamp(int): Absolute time the XNET interface received the end-of-frame. payload(bytes): Payload. """ __slots__ = [ "identifier", "echo", "_type", "timestamp", "payload"] def __init__(self, identifier, type=constants.FrameType.CAN_DATA, payload=b""): # type: (typing.Union[CanIdentifier, int], constants.FrameType, bytes) -> None if isinstance(identifier, int): self.identifier = CanIdentifier(identifier) else: self.identifier = identifier self.echo = False # Used only for Read self._type = type self.timestamp = 0 # Used only for Read self.payload = payload
[docs] @classmethod def from_raw(cls, frame): """Convert from RawFrame. >>> raw = RawFrame(5, 0x20000001, constants.FrameType.CAN_DATA, _cconsts.NX_FRAME_FLAGS_TRANSMIT_ECHO, 0, b'') >>> CanFrame.from_raw(raw) CanFrame(CanIdentifier(0x1, extended=True), echo=True, timestamp=0x5) """ identifier = CanIdentifier.from_raw(frame.identifier) can_frame = CanFrame(identifier, constants.FrameType(frame.type), frame.payload) can_frame.timestamp = frame.timestamp can_frame.echo = bool(frame.flags & _cconsts.NX_FRAME_FLAGS_TRANSMIT_ECHO) return can_frame
[docs] def to_raw(self): """Convert to RawFrame. >>> CanFrame(CanIdentifier(1, True), constants.FrameType.CAN_DATA).to_raw() RawFrame(timestamp=0x0, identifier=0x20000001, type=FrameType.CAN_DATA) >>> c = CanFrame(CanIdentifier(1, True), constants.FrameType.CAN_DATA) >>> c.echo = True >>> c.to_raw() RawFrame(timestamp=0x0, identifier=0x20000001, type=FrameType.CAN_DATA, flags=0x80) """ identifier = int(self.identifier) flags = 0 if self.echo: flags |= _cconsts.NX_FRAME_FLAGS_TRANSMIT_ECHO return RawFrame(self.timestamp, identifier, self.type, flags, 0, self.payload)
@property def type(self): return self._type def __eq__(self, other): if isinstance(other, self.__class__): other_frame = typing.cast(CanFrame, other) return all(( self.identifier == other_frame.identifier, self.echo == other_frame.echo, self.type == other_frame.type, self.timestamp == other_frame.timestamp, self.payload == other_frame.payload)) else: return NotImplemented def __repr__(self): # type: () -> typing.Text """CanFrame debug representation. >>> CanFrame(1) CanFrame(CanIdentifier(0x1)) >>> CanFrame(1, constants.FrameType.CANFD_DATA, b'\x01') CanFrame(CanIdentifier(0x1), type=FrameType.CANFD_DATA, len(payload)=1) """ optional = [] if self.echo: optional.append('echo={}'.format(self.echo)) if self.type != constants.FrameType.CAN_DATA: optional.append('type={}'.format(self.type)) if self.timestamp != 0: optional.append('timestamp=0x{:x}'.format(self.timestamp)) if self.payload: optional.append('len(payload)={}'.format(len(self.payload))) if optional: optional_params = ', {}'.format(", ".join(optional)) else: optional_params = '' return "{}({}{})".format( type(self).__name__, self.identifier, optional_params)
[docs]class CanBusErrorFrame(Frame): """Error detected on hardware bus of a :any:`nixnet.session.FrameInStreamSession`. .. note:: This requires enabling :any:`nixnet._session.intf.Interface.bus_err_to_in_strm`. See also :any:`nixnet.types.CanComm`. Attributes: timestamp(int): Absolute time when the bus error occurred. state (:any:`nixnet._enums.CanCommState`): Communication State tcvr_err (bool): Transceiver Error. bus_err (:any:`nixnet._enums.CanLastErr`): Last Error. tx_err_count (int): Transmit Error Counter. rx_err_count (int): Receive Error Counter. """ __slots__ = [ "timestamp", "state", "tcvr_err", "bus_err", "tx_err_count", "rx_err_count"] def __init__(self, timestamp, state, tcvr_err, bus_err, tx_err_count, rx_err_count): # type: (int, constants.CanCommState, bool, constants.CanLastErr, int, int) -> None self.timestamp = timestamp self.state = state self.tcvr_err = tcvr_err self.bus_err = bus_err self.tx_err_count = tx_err_count self.rx_err_count = rx_err_count
[docs] @classmethod def from_raw(cls, frame): """Convert from RawFrame. >>> raw = RawFrame(0x64, 0x0, constants.FrameType.CAN_BUS_ERROR, 0, 0, b'\\x00\\x01\\x02\\x03\\x04') >>> CanBusErrorFrame.from_raw(raw) CanBusErrorFrame(0x64, CanCommState.ERROR_ACTIVE, True, CanLastErr.ACK, 1, 2) """ timestamp = frame.timestamp state = constants.CanCommState(six.indexbytes(frame.payload, 0)) tx_err_count = six.indexbytes(frame.payload, 1) rx_err_count = six.indexbytes(frame.payload, 2) bus_err = constants.CanLastErr(six.indexbytes(frame.payload, 3)) tcvr_err = six.indexbytes(frame.payload, 4) != 0 return CanBusErrorFrame(timestamp, state, tcvr_err, bus_err, tx_err_count, rx_err_count)
[docs] def to_raw(self): """Convert to RawFrame. >>> CanBusErrorFrame(100, constants.CanCommState.BUS_OFF, True, constants.CanLastErr.STUFF, 1, 2).to_raw() RawFrame(timestamp=0x64, identifier=0x0, type=FrameType.CAN_BUS_ERROR, len(payload)=5) """ identifier = 0 flags = 0 info = 0 payload_data = [ self.state.value, self.tx_err_count, self.rx_err_count, self.bus_err.value, 1 if self.tcvr_err else 0, ] payload = bytes(bytearray(payload_data)) return RawFrame(self.timestamp, identifier, self.type, flags, info, payload)
@property def type(self): return constants.FrameType.CAN_BUS_ERROR def __eq__(self, other): if isinstance(other, self.__class__): other_frame = typing.cast(CanBusErrorFrame, other) return all(( self.timestamp == other_frame.timestamp, self.state == other_frame.state, self.tcvr_err == other_frame.tcvr_err, self.bus_err == other_frame.bus_err, self.tx_err_count == other_frame.tx_err_count, self.rx_err_count == other_frame.rx_err_count)) else: return NotImplemented def __repr__(self): # type: () -> typing.Text """CanBusErrorFrame debug representation. >>> CanBusErrorFrame(100, constants.CanCommState.BUS_OFF, True, constants.CanLastErr.STUFF, 1, 2) CanBusErrorFrame(0x64, CanCommState.BUS_OFF, True, CanLastErr.STUFF, 1, 2) """ return "{}(0x{:x}, {}, {}, {}, {}, {})".format( type(self).__name__, self.timestamp, self.state, self.tcvr_err, self.bus_err, self.tx_err_count, self.rx_err_count)
[docs]class LinFrame(object): """LIN Frame. Attributes: identifier(int): LIN frame arbitration identifier. echo(bool): If the frame is an echo of a successful transmit rather than being received from the network. type(:any:`nixnet._enums.FrameType`): Frame type. timestamp(int): Absolute time the XNET interface received the end-of-frame. eventslot(bool): Whether the frame was received within an event-triggered slot or an unconditional or sporadic slot. eventid(int): Identifier for an event-triggered slot. payload(bytes): A byte string representing the payload. """ __slots__ = [ "identifier", "echo", "type", "timestamp", "eventslot", "eventid", "payload"] _FRAME_ID_MASK = 0x0000003F def __init__(self, identifier, type=constants.FrameType.LIN_DATA, payload=b""): # type: (int, constants.FrameType, bytes) -> None self.identifier = identifier self.echo = False # Used only for Read self.type = type self.timestamp = 0 # Used only for Read self.eventslot = False # Used only for Read self.eventid = 0 # Used only for Read self.payload = payload
[docs] @classmethod def from_raw(cls, frame): # type: (RawFrame) -> LinFrame """Convert from RawFrame. >>> raw = RawFrame(5, 2, constants.FrameType.LIN_DATA, 0x81, 1, b'\x01') >>> LinFrame.from_raw(raw) LinFrame(identifier=0x2, echo=True, timestamp=0x5, eventslot=True, eventid=1, len(payload)=1) >>> raw = RawFrame(5, 2, constants.FrameType.LIN_DATA, _cconsts.NX_FRAME_FLAGS_TRANSMIT_ECHO, 0, b'\x01') >>> LinFrame.from_raw(raw) LinFrame(identifier=0x2, echo=True, timestamp=0x5, len(payload)=1) """ identifier = frame.identifier & cls._FRAME_ID_MASK lin_frame = LinFrame(identifier, constants.FrameType(frame.type), frame.payload) lin_frame.timestamp = frame.timestamp lin_frame.echo = bool(frame.flags & _cconsts.NX_FRAME_FLAGS_TRANSMIT_ECHO) lin_frame.eventslot = bool(frame.flags & _cconsts.NX_FRAME_FLAGS_LIN_EVENT_SLOT) if lin_frame.eventslot: lin_frame.eventid = else: lin_frame.eventid = 0 return lin_frame
[docs] def to_raw(self): # type: () -> RawFrame """Convert to RawFrame. >>> LinFrame(2, constants.FrameType.LIN_DATA).to_raw() RawFrame(timestamp=0x0, identifier=0x2, type=FrameType.LIN_DATA) >>> l = LinFrame(2, constants.FrameType.LIN_DATA) >>> l.echo = True >>> l.eventslot = True >>> l.eventid = 1 >>> l.to_raw() RawFrame(timestamp=0x0, identifier=0x2, type=FrameType.LIN_DATA, flags=0x81, info=0x1) """ if self.identifier != (self.identifier & self._FRAME_ID_MASK): _errors.check_for_error(_cconsts.NX_ERR_UNDEFINED_FRAME_ID) flags = 0 info = 0 if self.echo: flags |= _cconsts.NX_FRAME_FLAGS_TRANSMIT_ECHO if self.eventslot: flags |= _cconsts.NX_FRAME_FLAGS_LIN_EVENT_SLOT info |= self.eventid return RawFrame(self.timestamp, self.identifier, self.type, flags, info, self.payload)
def __eq__(self, other): if isinstance(other, self.__class__): other_frame = typing.cast(LinFrame, other) return all(( self.identifier == other_frame.identifier, self.echo == other_frame.echo, self.type == other_frame.type, self.timestamp == other_frame.timestamp, self.eventslot == other_frame.eventslot, self.eventid == other_frame.eventid, self.payload == other_frame.payload)) else: return NotImplemented def __repr__(self): # type: () -> typing.Text """LinFrame debug representation. >>> LinFrame(2) LinFrame(identifier=0x2) >>> LinFrame(2, constants.FrameType.LIN_NO_RESPONSE, b'\x01') LinFrame(identifier=0x2, type=FrameType.LIN_NO_RESPONSE, len(payload)=1) """ optional = [] if self.echo: optional.append('echo={}'.format(self.echo)) if self.type != constants.FrameType.LIN_DATA: optional.append('type={}'.format(self.type)) if self.timestamp != 0: optional.append('timestamp=0x{:x}'.format(self.timestamp)) if self.eventslot: optional.append('eventslot={}'.format(self.eventslot)) if self.eventid != 0: optional.append('eventid={}'.format(self.eventid)) if self.payload: optional.append('len(payload)={}'.format(len(self.payload))) if optional: optional_params = ', {}'.format(", ".join(optional)) else: optional_params = '' return "{}(identifier=0x{:x}{})".format( type(self).__name__, self.identifier, optional_params)
[docs]class LinBusErrorFrame(Frame): """Error detected on hardware bus of a :any:`nixnet.session.FrameInStreamSession`. .. note:: This requires enabling :any:`nixnet._session.intf.Interface.bus_err_to_in_strm`. See also :any:`nixnet.types.LinComm`. Attributes: timestamp(int): Absolute time when the bus error occurred. state (:any:`nixnet._enums.LinCommState`): Communication State. bus_err (:any:`nixnet._enums.LinLastErr`): Last Error. err_id (int): Identifier on bus. err_received (int): Received byte on bus err_expected (int): Expected byte on bus """ __slots__ = [ "timestamp", "state", "bus_err", "err_id", "err_received", "err_expected"] def __init__(self, timestamp, state, bus_err, err_id, err_received, err_expected): # type: (int, constants.LinCommState, constants.LinLastErr, int, int, int) -> None self.timestamp = timestamp self.state = state self.bus_err = bus_err self.err_id = err_id self.err_received = err_received self.err_expected = err_expected
[docs] @classmethod def from_raw(cls, frame): """Convert from RawFrame. >>> raw = RawFrame(0x64, 0x0, constants.FrameType.LIN_BUS_ERROR, 0, 0, b'\\x00\\x01\\x02\\x03\\x04') >>> LinBusErrorFrame.from_raw(raw) LinBusErrorFrame(0x64, LinCommState.IDLE, LinLastErr.UNKNOWN_ID, 0x2, 3, 4) """ timestamp = frame.timestamp state = constants.LinCommState(six.indexbytes(frame.payload, 0)) bus_err = constants.LinLastErr(six.indexbytes(frame.payload, 1)) err_id = six.indexbytes(frame.payload, 2) err_received = six.indexbytes(frame.payload, 3) err_expected = six.indexbytes(frame.payload, 4) return LinBusErrorFrame(timestamp, state, bus_err, err_id, err_received, err_expected)
[docs] def to_raw(self): """Convert to RawFrame. >>> LinBusErrorFrame(100, constants.LinCommState.INACTIVE, constants.LinLastErr.UNKNOWN_ID, 2, 3, 4).to_raw() RawFrame(timestamp=0x64, identifier=0x0, type=FrameType.LIN_BUS_ERROR, len(payload)=5) """ identifier = 0 flags = 0 info = 0 payload_data = [ self.state.value, self.bus_err.value, self.err_id, self.err_received, self.err_expected, ] payload = bytes(bytearray(payload_data)) return RawFrame(self.timestamp, identifier, self.type, flags, info, payload)
@property def type(self): return constants.FrameType.LIN_BUS_ERROR def __eq__(self, other): if isinstance(other, self.__class__): other_frame = typing.cast(LinBusErrorFrame, other) return all(( self.timestamp == other_frame.timestamp, self.state == other_frame.state, self.bus_err == other_frame.bus_err, self.err_id == other_frame.err_id, self.err_received == other_frame.err_received, self.err_expected == other_frame.err_expected)) else: return NotImplemented def __repr__(self): # type: () -> typing.Text """LinBusErrorFrame debug representation. >>> LinBusErrorFrame(100, constants.LinCommState.INACTIVE, constants.LinLastErr.CRC, 1, 2, 3) LinBusErrorFrame(0x64, LinCommState.INACTIVE, LinLastErr.CRC, 0x1, 2, 3) """ return "{}(0x{:x}, {}, {}, 0x{:x}, {}, {})".format( type(self).__name__, self.timestamp, self.state, self.bus_err, self.err_id, self.err_received, self.err_expected)
[docs]class DelayFrame(Frame): """Delay hardware when DelayFrame is outputted. .. note:: This requires :any:`nixnet._session.intf.Interface.out_strm_timng` to be in replay mode. Attributes: offset(int): Time to delay in milliseconds. """ __slots__ = [ "offset"] def __init__(self, offset): # type: (int) -> None self.offset = offset
[docs] @classmethod def from_raw(cls, frame): """Convert from RawFrame. >>> raw = RawFrame(5, 0, constants.FrameType.SPECIAL_DELAY, 0, 0, b'') >>> DelayFrame.from_raw(raw) DelayFrame(5) """ return DelayFrame(frame.timestamp)
[docs] def to_raw(self): """Convert to RawFrame. >>> DelayFrame(250).to_raw() RawFrame(timestamp=0xfa, identifier=0x0, type=FrameType.SPECIAL_DELAY) """ identifier = 0 flags = 0 info = 0 payload = b'' return RawFrame(self.offset, identifier, self.type, flags, info, payload)
@property def type(self): return constants.FrameType.SPECIAL_DELAY def __eq__(self, other): if isinstance(other, self.__class__): other_frame = typing.cast(DelayFrame, other) return self.offset == other_frame.offset else: return NotImplemented def __repr__(self): # type: () -> typing.Text """DelayFrame debug representation. >>> DelayFrame(250) DelayFrame(250) """ return "{}({})".format(type(self).__name__, self.offset)
[docs]class LogTriggerFrame(Frame): """Timestamp of when a trigger occurred. This frame is generated on input sessions when a rising edge is detected on an external connection. .. note:: This requires using :any:`nixnet._session.base.SessionBase.connect_terminals` to connect an external connection to the internal ``LogTrigger`` terminal. Attributes: timestamp(int): Absolute time that the trigger occurred. """ __slots__ = [ "timestamp"] def __init__(self, timestamp): # type: (int) -> None self.timestamp = timestamp
[docs] @classmethod def from_raw(cls, frame): """Convert from RawFrame. >>> raw = RawFrame(5, 0, constants.FrameType.SPECIAL_LOG_TRIGGER, 0, 0, b'') >>> LogTriggerFrame.from_raw(raw) LogTriggerFrame(0x5) """ return LogTriggerFrame(frame.timestamp)
[docs] def to_raw(self): """Convert to RawFrame. >>> LogTriggerFrame(250).to_raw() RawFrame(timestamp=0xfa, identifier=0x0, type=FrameType.SPECIAL_LOG_TRIGGER) """ identifier = 0 flags = 0 info = 0 payload = b'' return RawFrame(self.timestamp, identifier, self.type, flags, info, payload)
@property def type(self): return constants.FrameType.SPECIAL_LOG_TRIGGER def __eq__(self, other): if isinstance(other, self.__class__): other_frame = typing.cast(LogTriggerFrame, other) return self.timestamp == other_frame.timestamp else: return NotImplemented def __repr__(self): # type: () -> typing.Text """LogTriggerFrame debug representation. >>> LogTriggerFrame(250) LogTriggerFrame(0xfa) """ return "{}(0x{:x})".format(type(self).__name__, self.timestamp)
[docs]class StartTriggerFrame(Frame): """Timestamp of :any:`nixnet.session.FrameInStreamSession` start. .. note:: This requires enabling :any:`nixnet._session.intf.Interface.start_trig_to_in_strm`. Attributes: timestamp(int): Absolute time that the trigger occurred. """ __slots__ = [ "timestamp"] def __init__(self, timestamp): # type: (int) -> None self.timestamp = timestamp
[docs] @classmethod def from_raw(cls, frame): """Convert from RawFrame. >>> raw = RawFrame(5, 0, constants.FrameType.SPECIAL_START_TRIGGER, 0, 0, b'') >>> StartTriggerFrame.from_raw(raw) StartTriggerFrame(0x5) """ return StartTriggerFrame(frame.timestamp)
[docs] def to_raw(self): """Convert to RawFrame. >>> StartTriggerFrame(250).to_raw() RawFrame(timestamp=0xfa, identifier=0x0, type=FrameType.SPECIAL_START_TRIGGER) """ identifier = 0 flags = 0 info = 0 payload = b'' return RawFrame(self.timestamp, identifier, self.type, flags, info, payload)
@property def type(self): return constants.FrameType.SPECIAL_START_TRIGGER def __eq__(self, other): if isinstance(other, self.__class__): other_frame = typing.cast(StartTriggerFrame, other) return self.timestamp == other_frame.timestamp else: return NotImplemented def __repr__(self): # type: () -> typing.Text """StartTriggerFrame debug representation. >>> StartTriggerFrame(250) StartTriggerFrame(0xfa) """ return "{}(0x{:x})".format(type(self).__name__, self.timestamp)
[docs]class XnetFrame(FrameFactory): """Create `Frame` based on `RawFrame` content.""" __slots__ = ()
[docs] @classmethod def from_raw(cls, frame): """Convert from RawFrame.""" frame_type = { constants.FrameType.CAN_DATA: CanFrame, constants.FrameType.CAN20_DATA: CanFrame, constants.FrameType.CANFD_DATA: CanFrame, constants.FrameType.CANFDBRS_DATA: CanFrame, constants.FrameType.CAN_REMOTE: CanFrame, constants.FrameType.CAN_BUS_ERROR: CanBusErrorFrame, constants.FrameType.LIN_DATA: LinFrame, constants.FrameType.SPECIAL_DELAY: DelayFrame, constants.FrameType.SPECIAL_LOG_TRIGGER: LogTriggerFrame, constants.FrameType.SPECIAL_START_TRIGGER: StartTriggerFrame, }.get(frame.type) if frame_type is None: raise NotImplementedError("Unsupported frame type", frame.type) return frame_type.from_raw(frame)