Source code for nixnet._session.frames

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import itertools
import typing  # NOQA: F401

from nixnet import _frames
from nixnet import _funcs
from nixnet import _props
from nixnet._session import collection
from nixnet import constants
from nixnet import types


[docs]class Frames(collection.Collection): """Frames in a session.""" def _create_item(self, handle, index, name): return Frame(handle, index, name) @property def payld_len_max(self): # type: () -> int """int: Returns the maximum payload length of all frames in this session, expressed as bytes (0-254). For CAN Stream (Input and Output), this property depends on the XNET Cluster CAN I/O Mode property. If the I/O mode is `constants.CanIoMode.CAN`, this property is 8 bytes. If the I/O mode is 'constants.CanIoMode.CAN_FD' or 'constants.CanIoMode.CAN_FD_BRS', this property is 64 bytes. For LIN Stream (Input and Output), this property always is 8 bytes. For FlexRay Stream (Input and Output), this property is the same as the XNET Cluster FlexRay Payload Length Maximum property value. For Queued and Single-Point (Input and Output), this is the maximum payload of all frames specified in the List property. """ return _props.get_session_payld_len_max(self._handle)
[docs]class InFrames(Frames): """Frames in a session."""
[docs] def read_bytes( self, num_bytes, timeout=constants.TIMEOUT_NONE): # type: (int, float) -> bytes """Read data as a list of raw bytes (frame data). The raw bytes encode one or more frames using the Raw Frame Format. Args: num_bytes(int): The number of bytes to read. timeout(float): The time in seconds to wait for number to read frame bytes to become available. To avoid returning a partial frame, even when 'num_bytes' are available from the hardware, this read may return fewer bytes in buffer. For example, assume you pass 'num_bytes' 70 bytes and 'timeout' of 10 seconds. During the read, two frames are received, the first 24 bytes in size, and the second 56 bytes in size, for a total of 80 bytes. The read returns after the two frames are received, but only the first frame is copied to data. If the read copied 46 bytes of the second frame (up to the limit of 70), that frame would be incomplete and therefore difficult to interpret. To avoid this problem, the read always returns complete frames in buffer. If 'timeout' is positive, this function waits for 'num_bytes' frame bytes to be received, then returns complete frames up to that number. If the bytes do not arrive prior to the 'timeout', an error is returned. If 'timeout' is 'constants.TIMEOUT_INFINITE', this function waits indefinitely for 'num_bytes' frame bytes. If 'timeout' is 'constants.TIMEOUT_NONE', this function does not wait and immediately returns all available frame bytes up to the limit 'num_bytes' specifies. Returns: A list of raw bytes representing the data. """ buffer, number_of_bytes_returned = _funcs.nx_read_frame(self._handle, num_bytes, timeout) return buffer[0:number_of_bytes_returned]
[docs] def read( self, num_frames, timeout=constants.TIMEOUT_NONE, frame_type=types.XnetFrame): # type: (int, float, typing.Type[types.FrameFactory]) -> typing.Iterable[types.Frame] """Read frames. Args: num_frames(int): Number of frames to read. timeout(float): The time in seconds to wait for number to read frame bytes to become available. If 'timeout' is positive, this function waits for 'num_frames' frames to be received, then returns complete frames up to that number. If the frames do not arrive prior to the 'timeout', an error is returned. If 'timeout' is 'constants.TIMEOUT_INFINITE', this function waits indefinitely for 'num_frames' frames. If 'timeout' is 'constants.TIMEOUT_NONE', this function does not wait and immediately returns all available frames up to the limit 'num_frames' specifies. frame_type(:any:`nixnet.types.FrameFactory`): A factory for the desired frame formats. Yields: :any:`nixnet.types.Frame` """ from_raw = typing.cast(typing.Callable[[types.RawFrame], types.Frame], frame_type.from_raw) # NOTE: If the frame payload exceeds the base unit, this will return # less than num_frames num_bytes = num_frames * _frames.nxFrameFixed_t.size buffer = self.read_bytes(num_bytes, timeout) for frame in _frames.iterate_frames(buffer): yield from_raw(frame)
[docs]class SinglePointInFrames(Frames): """Frames in a session."""
[docs] def read_bytes( self, num_bytes): # type: (int) -> bytes """Read data as a list of raw bytes (frame data). Args: num_bytes(int): Number of bytes to read. Returns: bytes: Raw bytes representing the data. """ buffer, number_of_bytes_returned = _funcs.nx_read_frame( self._handle, num_bytes, constants.TIMEOUT_NONE) return buffer[0:number_of_bytes_returned]
[docs] def read( self, frame_type=types.XnetFrame): # type: (typing.Type[types.FrameFactory]) -> typing.Iterable[types.Frame] """Read frames. Args: frame_type(:any:`nixnet.types.FrameFactory`): A factory for the desired frame formats. Yields: :any:`nixnet.types.Frame` """ from_raw = typing.cast(typing.Callable[[types.RawFrame], types.Frame], frame_type.from_raw) # NOTE: If the frame payload exceeds the base unit, this will return # less than num_frames num_frames = len(self) num_bytes = num_frames * _frames.nxFrameFixed_t.size buffer = self.read_bytes(num_bytes) for frame in _frames.iterate_frames(buffer): yield from_raw(frame)
[docs]class OutFrames(Frames): """Frames in a session."""
[docs] def write_bytes( self, frame_bytes, timeout=10): # type: (bytes, float) -> None """Write a list of raw bytes (frame data). The raw bytes encode one or more frames using the Raw Frame Format. Args: frame_bytes(bytes): Frames to transmit. timeout(float): The time in seconds to wait for number to read frame bytes to become available. If 'timeout' is positive, this function waits up to that 'timeout' for space to become available in queues. If the space is not available prior to the 'timeout', a 'timeout' error is returned. If 'timeout' is 'constants.TIMEOUT_INFINITE', this functions waits indefinitely for space to become available in queues. If 'timeout' is 'constants.TIMEOUT_NONE', this function does not wait and immediately returns with a 'timeout' error if all data cannot be queued. Regardless of the 'timeout' used, if a 'timeout' error occurs, none of the data is queued, so you can attempt to call this function again at a later time with the same data. """ _funcs.nx_write_frame(self._handle, bytes(frame_bytes), timeout)
[docs] def write( self, frames, timeout=10): # type: (typing.Iterable[types.Frame], float) -> None """Write frame data. Args: frames(list of float): One or more :any:`nixnet.types.Frame` objects to be written to the session. timeout(float): The time in seconds to wait for number to read frame bytes to become available. If 'timeout' is positive, this function waits up to that 'timeout' for space to become available in queues. If the space is not available prior to the 'timeout', a 'timeout' error is returned. If 'timeout' is 'constants.TIMEOUT_INFINITE', this functions waits indefinitely for space to become available in queues. If 'timeout' is 'constants.TIMEOUT_NONE', this function does not wait and immediately returns with a 'timeout' error if all data cannot be queued. Regardless of the 'timeout' used, if a 'timeout' error occurs, none of the data is queued, so you can attempt to call this function again at a later time with the same data. """ units = itertools.chain.from_iterable( _frames.serialize_frame(frame.to_raw()) for frame in frames) bytes = b"".join(units) self.write_bytes(bytes, timeout)
[docs]class SinglePointOutFrames(Frames): """Frames in a session."""
[docs] def write_bytes( self, frame_bytes): # type: (bytes) -> None """Write a list of raw bytes (frame data). The raw bytes encode one or more frames using the Raw Frame Format. Args: frame_bytes(bytes): Frames to transmit. """ _funcs.nx_write_frame(self._handle, bytes(frame_bytes), constants.TIMEOUT_NONE)
[docs] def write( self, frames): # type: (typing.Iterable[types.Frame]) -> None """Write frame data. Args: frames(list of float): One or more :any:`nixnet.types.Frame` objects to be written to the session. """ units = itertools.chain.from_iterable( _frames.serialize_frame(frame.to_raw()) for frame in frames) bytes = b"".join(units) self.write_bytes(bytes)
[docs]class Frame(collection.Item): """Frame configuration for a session."""
[docs] def set_can_start_time_off(self, offset): # type: (float) -> None """Set CAN Start Time Offset. Use this function to have more control over the schedule of frames on the bus, to offer more determinism by configuring cyclic frames to be spaced evenly. If you do not call this function or you set it to a negative number, NI-XNET chooses this start time offset based on the arbitration identifier and periodic transmit time. ``offset`` takes effect whenever a session is started. If you stop a session and restart it, the start time offset is re-evaluated. Args: offset(float): The amount of time that must elapse between the session being started and the time that the first frame is transmitted across the bus. This is different than the cyclic rate, which determines the time between subsequent frame transmissions. """ _props.set_session_can_start_time_off(self._handle, self._index, offset)
[docs] def set_can_tx_time(self, time): # type: (float) -> None """Set CAN Transmit Time. If you call this function while a frame object is currently started, the frame object is stopped, the cyclic rate updated, and then the frame object is restarted. Because of the stopping and starting, the frame's start time offset is re-evaluated. The first time a queued frame object is started, the XNET frame's transmit time determines the object's default queue size. Changing this rate has no impact on the queue size. Depending on how you change the rate, the queue may not be sufficient to store data for an extended period of time. You can mitigate this by setting the session Queue Size property to provide sufficient storage for all rates you use. If you are using a single-point session, this is not relevant. Args: time(float): Frame's transmit time while the session is running. The transmit time is the amount of time that must elapse between subsequent transmissions of a cyclic frame. The default value of this property comes from the database (the XNET Frame CAN Transmit Time property). """ _props.set_session_can_tx_time(self._handle, self._index, time)
[docs] def set_skip_n_cyclic_frames(self, n): # type: (int) -> None """Set Skip N Cyclic Frames When the frame's transmission time arrives and the skip count is nonzero, a frame value is dequeued (if this is not a single-point session), and the skip count is decremented, but the frame actually is not transmitted across the bus. When the skip count decrements to zero, subsequent cyclic transmissions resume. This function is useful for testing of ECU behavior when a cyclic frame is expected, but is missing for N cycles. .. note:: Only CAN interfaces currently support this function. .. note:: This property is valid only for output sessions and frames with cyclic timing (that is, not event-based frames). Args: n(int): Skip the next N cyclic frames when nonzero. """ _props.set_session_skip_n_cyclic_frames(self._handle, self._index, n)
[docs] def set_lin_tx_n_corrupted_chksums(self, n): # type: (int) -> None """Set LIN Transmit N Corrupted Checksums. When set to a nonzero value, this function causes the next N number of checksums to be corrupted. The checksum is corrupted by negating the value calculated per the database; (EnhancedValue * -1) or (ClassicValue * -1). If the frame is transmitted in an unconditional or sporadic schedule slot, N is always decremented for each frame transmission. If the frame is transmitted in an event-triggered slot and a collision occurs, N is not decremented. In that case, N is decremented only when the collision resolving schedule is executed and the frame is successfully transmitted. If the frame is the only one to transmit in the event-triggered slot (no collision), N is decremented at event-triggered slot time. This function is useful for testing ECU behavior when a corrupted checksum is transmitted. .. note:: This function is valid only for output sessions. Args: n(int): Number of checksums to be corrupted. """ _props.set_session_lin_tx_n_corrupted_chksums(self._handle, self._index, n)
[docs] def set_j1939_addr_filter(self, address=""): # type: (typing.Union[typing.Text, int]) -> None """Set J1939 Address Filter. Define a filter for the source address of the PGN transmitting node. You can use it when multiple nodes with different addresses are transmitting the same PGN. If the filter is active, the session accepts only frames transmitted by a node with the defined address. All other frames with the same PGN but transmitted by other nodes are ignored. .. note:: You can use this function in input sessions only. Args: address(str or int): Decimal value of the address. Leave blank to reset the filter. """ _props.set_session_j1939_addr_filter(self._handle, self._index, str(address))