Source code for nixnet._session.base

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import ctypes  # type: ignore
import typing  # NOQA: F401
import warnings

from nixnet import _ctypedefs
from nixnet import _errors
from nixnet import _funcs
from nixnet import _props
from nixnet import _utils
from nixnet import constants
from nixnet import errors
from nixnet import types  # NOQA: F401

from nixnet._session import intf as session_intf
from nixnet._session import j1939 as session_j1939


[docs]class SessionBase(object): """Session base object.""" def __init__( self, database_name, # type: typing.Text cluster_name, # type: typing.Text list, # type: typing.Text interface_name, # type: typing.Text mode, # type: constants.CreateSessionMode ): # type: (...) -> None """Create an XNET session at run time using named references to database objects. This function creates a session using the named database objects specified in 'list' from the database named in 'database_name'. This function is intended to be used by session classes that derive from SessionBase; therefore, it is not public. Args: database_name: A string representing the XNET database to use for interface configuration. The database name must use the <alias> or <filepath> syntax (refer to Databases). cluster_name: A string representing the XNET cluster to use for interface configuration. The name must specify a cluster from the database given in the database_name parameter. If it is left blank, the cluster is extracted from the list parameter; this is not allowed for modes of 'constants.CreateSessionMode.FRAME_IN_STREAM' or 'constants.CreateSessionMode.FRAME_OUT_STREAM'. list: A list of strings describing signals or frames for the session. The list syntax depends on the mode. Refer to mode spefic session classes defined below for 'list' syntax. interface_name: A string representing the XNET Interface to use for this session. If Mode is 'constants.CreateSessionMode.SIGNAL_CONVERSION_SINGLE_POINT', this input is ignored. You can set it to an empty string. mode: The session mode. See :any:`nixnet._enums.CreateSessionMode`. Returns: A session base object. """ self._handle = None # To satisfy `__del__` in case nx_create_session throws self._handle = _funcs.nx_create_session(database_name, cluster_name, list, interface_name, mode) self._intf = session_intf.Interface(self._handle) self._j1939 = session_j1939.J1939(self._handle) def __del__(self): if self._handle is not None: warnings.warn( 'Session was not explicitly closed before it was destructed. ' 'Resources on the device may still be reserved.', errors.XnetResourceWarning) def __enter__(self): return self def __exit__(self, exception_type, exception_value, traceback): self.close() def __eq__(self, other): if isinstance(other, self.__class__): return self._handle == typing.cast(SessionBase, other)._handle else: return NotImplemented def __ne__(self, other): result = self.__eq__(other) if result is NotImplemented: return result else: return not result def __hash__(self): return hash(self._handle) def __repr__(self): # type: () -> typing.Text return '{}(handle={})'.format(type(self).__name__, self._handle)
[docs] def close(self): # type: () -> None """Close (clear) the XNET session. This function stops communication for the session and releases all resources the session uses. It internally calls :any:`nixnet._session.base.SessionBase.stop` with normal scope, so if this is the last session using the interface, communication stops. You typically use 'close' when you need to close the existing session to create a new session that uses the same objects. For example, if you create a session for a frame named frame_a using Frame Output Single-Point mode, then you create a second session for frame_a using Frame Output Queued mode, the second call to the session constructor returns an error, because frame_a can be accessed using only one output mode. If you call 'close' before the second constructor call, you can close the previous use of frame_a to create the new session. """ if self._handle is None: warnings.warn( 'Attempting to close NI-XNET session but session was already ' 'closed', errors.XnetResourceWarning) return _funcs.nx_clear(self._handle) self._handle = None
[docs] def start(self, scope=constants.StartStopScope.NORMAL): # type: (constants.StartStopScope) -> None """Start communication for the XNET session. Because the session is started automatically by default, this function is optional. This function is for more advanced applications to start multiple sessions in a specific order. For more information about the automatic start feature, refer to the :any:`nixnet._session.base.SessionBase.auto_start` property. For each physical interface, the NI-XNET hardware is divided into two logical units: Sessions: You can create one or more sessions, each of which contains frames or signals to be transmitted (or received) on the bus. Interface: The interface physically connects to the bus and transmits (or receives) data for the sessions. You can start each logical unit separately. When a session is started, all contained frames or signals are placed in a state where they are ready to communicate. When the interface is started, it takes data from all started sessions to communicate with other nodes on the bus. For a specification of the state models for the session and interface, refer to State Models. If an output session starts before you write data, or you read an input session before it receives a frame, default data is used. For more information, refer to the XNET Frame Default Payload and XNET Signal Default Value properties. Args: scope(:any:`nixnet._enums.StartStopScope`): Describes the impact of this operation on the underlying state models for the session and its interface. """ _funcs.nx_start(self._handle, scope)
[docs] def stop(self, scope=constants.StartStopScope.NORMAL): # type: (constants.StartStopScope) -> None """Stop communication for the XNET session. Because the session is stopped automatically when closed (cleared), this function is optional. For each physical interface, the NI-XNET hardware is divided into two logical units: Sessions: You can create one or more sessions, each of which contains frames or signals to be transmitted (or received) on the bus. Interface: The interface physically connects to the bus and transmits (or receives) data for the sessions. You can stop each logical unit separately. When a session is stopped, all contained frames or signals are placed in a state where they are no longer ready to communicate. When the interface is stopped, it no longer takes data from sessions to communicate with other nodes on the bus. For a specification of the state models for the session and interface, refer to State Models. Args: scope(:any:`nixnet._enums.StartStopScope`): Describes the impact of this operation on the underlying state models for the session and its interface. """ _funcs.nx_stop(self._handle, scope)
[docs] def flush(self): # type: () -> None """Flushes (empties) all XNET session queues. With the exception of single-point modes, all sessions use queues to store frames. For input modes, the queues store frame values (or corresponding signal values) that have been received, but not obtained by calling the read function. For output sessions, the queues store frame values provided to write function, but not transmitted successfully. :any:`nixnet._session.base.SessionBase.start` and :any:`nixnet._session.base.SessionBase.stop` have no effect on these queues. Use 'flush' to discard all values in the session's queues. For example, if you call a write function to write three frames, then immediately call :any:`nixnet._session.base.SessionBase.stop`, then call :any:`nixnet._session.base.SessionBase.start` a few seconds later, the three frames transmit. If you call 'flush' between :any:`nixnet._session.base.SessionBase.stop` and :any:`nixnet._session.base.SessionBase.start`, no frames transmit. As another example, if you receive three frames, then call :any:`nixnet._session.base.SessionBase.stop`, the three frames remains in the queue. If you call :any:`nixnet._session.base.SessionBase.start` a few seconds later, then call a read function, you obtain the three frames received earlier, potentially followed by other frames received after calling :any:`nixnet._session.base.SessionBase.start`. If you call 'flush' between :any:`nixnet._session.base.SessionBase.stop` and :any:`nixnet._session.base.SessionBase.start`, read function returns only frames received after the calling :any:`nixnet._session.base.SessionBase.start`. """ _funcs.nx_flush(self._handle)
[docs] def wait_for_transmit_complete(self, timeout=10): # type: (float) -> None """Wait for transmition to complete. All frames written for the session have been transmitted on the bus. This condition applies to CAN, LIN, and FlexRay. This condition is state based, and the state is Boolean (true/false). Args: timeout(float): The maximum amount of time to wait in seconds. """ _funcs.nx_wait(self._handle, constants.Condition.TRANSMIT_COMPLETE, 0, timeout)
[docs] def wait_for_intf_communicating(self, timeout=10): # type: (float) -> None """Wait for the interface to begin communication on the network. If a start trigger is configured for the interface, this first waits for the trigger. Once the interface is started, this waits for the protocol's communication state to transition to a value that indicates communication with remote nodes. After this wait succeeds, calls to 'read_state' will return: :any:`nixnet._enums.CanCommState`: 'constants.CAN_COMM.ERROR_ACTIVE' :any:`nixnet._enums.CanCommState`: 'constants.CAN_COMM.ERROR_PASSIVE' 'constants.ReadState.TIME_COMMUNICATING': Valid time for communication (invalid time of 0 prior) Args: timeout(float): The maximum amount of time to wait in seconds. """ _funcs.nx_wait(self._handle, constants.Condition.INTF_COMMUNICATING, 0, timeout)
[docs] def wait_for_intf_remote_wakeup(self, timeout=10): # type: (float) -> None """Wait for interface remote wakeup. Wait for the interface to wakeup due to activity by a remote node on the network. This wait is used for CAN, when you set the 'can_tcvr_state' property to 'constants.CanTcvrState.SLEEP'. Although the interface itself is ready to communicate, this places the transceiver into a sleep state. When a remote CAN node transmits a frame, the transceiver wakes up, and communication is restored. This wait detects that remote wakeup. This wait is used for LIN when you set 'lin_sleep' property to 'constants.LinSleep.REMOTE_SLEEP' or 'constants.LinSleep.LOCAL_SLEEP'. When asleep, if a remote LIN ECU transmits the wakeup pattern (break), the XNET LIN interface detects this transmission and wakes up. This wait detects that remote wakeup. Args: timeout(float): The maximum amount of time to wait in seconds. """ _funcs.nx_wait(self._handle, constants.Condition.INTF_REMOTE_WAKEUP, 0, timeout)
[docs] def connect_terminals(self, source, destination): # type: (typing.Text, typing.Text) -> None """Connect terminals on the XNET interface. This function connects a source terminal to a destination terminal on the interface hardware. The XNET terminal represents an external or internal hardware connection point on a National Instruments XNET hardware product. External terminals include PXI Trigger lines for a PXI card, RTSI terminals for a PCI card, or the single external terminal for a C Series module. Internal terminals include timebases (clocks) and logical entities such as a start trigger. The terminal inputs use the Terminal I/O names. Typically, one of the pair is an internal and the other an external. Args: source(str): Connection source name. destination(str): Connection destination name. """ _funcs.nx_connect_terminals(self._handle, source, destination)
[docs] def disconnect_terminals(self, source, destination): # type: (typing.Text, typing.Text) -> None """Disconnect terminals on the XNET interface. This function disconnects a specific pair of source/destination terminals previously connected with :any:`nixnet._session.base.SessionBase.connect_terminals`. When the final session for a given interface is cleared, NI-XNET automatically disconnects all terminal connections for that interface. Therefore, 'disconnect_terminals' is not required for most applications. This function typically is used to change terminal connections dynamically while an application is running. To disconnect a terminal, you first must stop the interface using :any:`nixnet._session.base.SessionBase.stop` with the Interface Only scope. Then you can call 'disconnect_terminals' and :any:`nixnet._session.base.SessionBase.connect_terminals` to adjust terminal connections. Finally, you can call :any:`nixnet._session.base.SessionBase.start` with the Interface Only scope to restart the interface. You can disconnect only a terminal that has been previously connected. Attempting to disconnect a nonconnected terminal results in an error. Args: source(str): Connection source name. destination(str): Connection destination name. """ _funcs.nx_disconnect_terminals(self._handle, source, destination)
[docs] def change_lin_schedule(self, sched_index): # type: (int) -> None """Writes communication states of an XNET session. This function writes a request for the LIN interface to change the running schedule. According to the LIN protocol, only the master executes schedules, not slaves. If the :any:`nixnet._session.intf.Interface.lin_master` property is false (slave), this write function implicitly sets that property to true (master). If the interface currently is running as a slave, this write returns an error, because it cannot change to master while running. Args: sched_index(int): Index to the schedule table that the LIN master executes. The schedule tables are sorted the way they are returned from the database with the `nixnet.database._cluster.Cluster.lin_schedules` property. """ _funcs.nx_write_state(self._handle, constants.WriteState.LIN_SCHEDULE_CHANGE, _ctypedefs.u32(sched_index))
[docs] def change_lin_diagnostic_schedule(self, schedule): # type: (constants.LinDiagnosticSchedule) -> None """Writes communication states of an XNET session. This function writes a request for the LIN interface to change the diagnostic schedule. Args: schedule(:any:`nixnet._enums.LinDiagnosticSchedule`): Diagnostic schedule that the LIN master executes. """ _funcs.nx_write_state(self._handle, constants.WriteState.LIN_DIAGNOSTIC_SCHEDULE_CHANGE, _ctypedefs.u32(schedule.value)) # NOQA: E501
@property def time_current(self): # type: () -> int """int: Current interface time.""" state_value_ctypes = _ctypedefs.nxTimestamp_t() state_size = ctypes.sizeof(state_value_ctypes) _funcs.nx_read_state( self._handle, constants.ReadState.TIME_CURRENT, state_size, ctypes.pointer(state_value_ctypes)) time = state_value_ctypes.value return time @property def time_start(self): # type: () -> int """int: Time the interface was started.""" state_value_ctypes = _ctypedefs.nxTimestamp_t() state_size = ctypes.sizeof(state_value_ctypes) _funcs.nx_read_state( self._handle, constants.ReadState.TIME_START, state_size, ctypes.pointer(state_value_ctypes)) time = state_value_ctypes.value if time == 0: # The interface is not communicating. _errors.check_for_error(constants.Err.SESSION_NOT_STARTED.value) return time @property def time_communicating(self): # type: () -> int """int: Time the interface started communicating. The time is usually later than ``time_start`` because the interface must undergo a communication startup procedure. """ state_value_ctypes = _ctypedefs.nxTimestamp_t() state_size = ctypes.sizeof(state_value_ctypes) _funcs.nx_read_state( self._handle, constants.ReadState.TIME_COMMUNICATING, state_size, ctypes.pointer(state_value_ctypes)) time = state_value_ctypes.value if time == 0: # The interface is not communicating. _errors.check_for_error(constants.Err.SESSION_NOT_STARTED.value) return time @property def state(self): # type: () -> constants.SessionInfoState """:any:`nixnet._enums.SessionInfoState`: Session running state.""" state_value_ctypes = _ctypedefs.u32() state_size = ctypes.sizeof(state_value_ctypes) _funcs.nx_read_state( self._handle, constants.ReadState.SESSION_INFO, state_size, ctypes.pointer(state_value_ctypes)) state = state_value_ctypes.value return constants.SessionInfoState(state) @property def can_comm(self): # type: () -> types.CanComm """:any:`nixnet.types.CanComm`: CAN Communication state""" state_value_ctypes = _ctypedefs.u32() state_size = ctypes.sizeof(state_value_ctypes) _funcs.nx_read_state( self._handle, constants.ReadState.CAN_COMM, state_size, ctypes.pointer(state_value_ctypes)) bitfield = state_value_ctypes.value return _utils.parse_can_comm_bitfield(bitfield) @property def lin_comm(self): # type: () -> types.LinComm """:any:`nixnet.types.LinComm`: LIN Communication state""" state_value_ctypes = (_ctypedefs.u32 * 2)() # type: ignore state_size = ctypes.sizeof(state_value_ctypes) _funcs.nx_read_state( self._handle, constants.ReadState.LIN_COMM, state_size, ctypes.pointer(state_value_ctypes)) first = state_value_ctypes[0].value second = state_value_ctypes[1].value return _utils.parse_lin_comm_bitfield(first, second)
[docs] def check_fault(self): # type: () -> None """Check for an asynchronous fault. A fault is an error that occurs asynchronously to the NI-XNET application calls. The fault cause may be related to network communication, but it also can be related to XNET hardware, such as a fault in the onboard processor. Although faults are extremely rare, nxReadState provides a detection method distinct from the status of NI-XNET function calls, yet easy to use alongside the common practice of checking the communication state. """ state_value_ctypes = _ctypedefs.u32() state_size = ctypes.sizeof(state_value_ctypes) fault = _funcs.nx_read_state( self._handle, constants.ReadState.SESSION_INFO, state_size, ctypes.pointer(state_value_ctypes)) _errors.check_for_error(fault)
@property def intf(self): # type: () -> session_intf.Interface """:any:`nixnet._session.intf.Interface`: Returns the Interface configuration object for the session.""" return self._intf @property def j1939(self): # type: () -> session_j1939.J1939 """:any:`nixnet._session.j1939.J1939`: Returns the J1939 configuration object for the session.""" return self._j1939 @property def application_protocol(self): # type: () -> constants.AppProtocol """:any:`nixnet._enums.AppProtocol`: This property returns the application protocol that the session uses. The database used with the session determines the application protocol. """ return constants.AppProtocol(_props.get_session_application_protocol(self._handle)) @property def auto_start(self): # type: () -> bool """bool: Automatically starts the output session on the first call to the appropriate write function. For input sessions, start always is performed within the first call to the appropriate read function (if not already started using :any:`nixnet._session.base.SessionBase.start`). This is done because there is no known use case for reading a stopped input session. For output sessions, as long as the first call to the appropriate write function contains valid data, you can leave this property at its default value of true. If you need to call the appropriate write function multiple times prior to starting the session, or if you are starting multiple sessions simultaneously, you can set this property to false. After calling the appropriate write function as desired, you can call :any:`nixnet._session.base.SessionBase.start` to start the session(s). When automatic start is performed, it is equivalent to :any:`nixnet._session.base.SessionBase.start` with scope set to Normal. This starts the session itself, and if the interface is not already started, it starts the interface also. """ return _props.get_session_auto_start(self._handle) @auto_start.setter def auto_start(self, value): # type: (bool) -> None _props.set_session_auto_start(self._handle, value) @property def cluster_name(self): # type: () -> typing.Text """str: This property returns the cluster (network) name used with the session.""" return _props.get_session_cluster_name(self._handle) @property def database_name(self): # type: () -> typing.Text """str: This property returns the database name used with the session.""" return _props.get_session_database_name(self._handle) @property def mode(self): # type: () -> constants.CreateSessionMode """:any:`nixnet._enums.CreateSessionMode`: This property returns the mode associated with the session. For more information, refer to :any:`nixnet._enums.CreateSessionMode`. """ return constants.CreateSessionMode(_props.get_session_mode(self._handle)) @property def num_pend(self): # type: () -> int """int: This property returns the number of values (frames or signals) pending for the session. For input sessions, this is the number of frame/signal values available to the appropriate read function. If you call the appropriate read function with number to read of this number and timeout of 0.0, the appropriate read function should return this number of values successfully. For output sessions, this is the number of frames/signal values provided to the appropriate write function but not yet transmitted onto the network. Stream frame sessions using FlexRay or CAN FD protocol may use a variable size of frames. In these cases, this property assumes the largest possible frame size. If you use smaller frames, the real number of pending values might be higher. The largest possible frames sizes are: CAN FD: 64 byte payload. FlexRay: The higher value of the frame size in the static segment and the maximum frame size in the dynamic segment. The XNET Cluster FlexRay Payload Length Maximum property provides this value. """ return _props.get_session_num_pend(self._handle) @property def num_unused(self): # type: () -> int """int: This property returns the number of values (frames or signals) unused for the session. If you get this property prior to starting the session, it provides the size of the underlying queue(s). Contrary to the Queue Size property, this value is in number of frames for Frame I/O, not number of bytes; for Signal I/O, it is the number of signal values in both cases. After start, this property returns the queue size minus the :any:`Number of Values Pending <nixnet._session.base.SessionBase.num_pend>` property. For input sessions, this is the number of frame/signal values unused in the underlying queue(s). For output sessions, this is the number of frame/signal values you can provide to a subsequent call to the appropriate write function. If you call the appropriate write function with this number of values and timeout of 0.0, it should return success. Stream frame sessions using FlexRay or CAN FD protocol may use a variable size of frames. In these cases, this property assumes the largest possible frame size. If you use smaller frames, the real number of pending values might be higher. The largest possible frames sizes are: CAN FD: 64 byte payload. FlexRay: The higher value of the frame size in the static segment and the maximum frame size in the dynamic segment. The XNET Cluster FlexRay Payload Length Maximum property provides this value. """ return _props.get_session_num_unused(self._handle) @property def protocol(self): # type: () -> constants.Protocol """:any:`nixnet._enums.Protocol`: This property returns the protocol that the interface in the session uses.""" return constants.Protocol(_props.get_session_protocol(self._handle)) @property def queue_size(self): # type: () -> int """int: Get or set queue size. For output sessions, queues store data passed to the appropriate write function and not yet transmitted onto the network. For input sessions, queues store data received from the network and not yet obtained using the appropriate read function. For most applications, the default queue sizes are sufficient. You can write to this property to override the default. When you write (set) this property, you must do so prior to the first session start. You cannot set this property again after calling :any:`nixnet._session.base.SessionBase.stop`. For signal I/O sessions, this property is the number of signal values stored. This is analogous to the number of values you use with the appropriate read or write function. For frame I/O sessions, this property is the number of bytes of frame data stored. For standard CAN or LIN frame I/O sessions, each frame uses exactly 24 bytes. You can use this number to convert the Queue Size (in bytes) to/from the number of frame values. For CAN FD and FlexRay frame I/O sessions, each frame value size can vary depending on the payload length. For more information, refer to Raw Frame Format. For Signal I/O XY sessions, you can use signals from more than one frame. Within the implementation, each frame uses a dedicated queue. According to the formulas below, the default queue sizes can be different for each frame. If you read the default Queue Size property for a Signal Input XY session, the largest queue size is returned, so that a call to the appropriate read function of that size can empty all queues. If you read the default Queue Size property for a Signal Output XY session, the smallest queue size is returned, so that a call to the appropriate write function of that size can succeed when all queues are empty. If you write the Queue Size property for a Signal I/O XY session, that size is used for all frames, so you must ensure that it is sufficient for the frame with the fastest transmit time. For Signal I/O Waveform sessions, you can use signals from more than one frame. Within the implementation, each frame uses a dedicated queue. The Queue Size property does not represent the memory in these queues, but rather the amount of time stored. The default queue allocations store Application Time worth of resampled signal values. If you read the default Queue Size property for a Signal I/O Waveform session, it returns Application Time multiplied by the time Resample Rate. If you write the Queue Size property for a Signal I/O Waveform session, that value is translated from a number of samples to a time, and that time is used to allocate memory for each queue. For Single-Point sessions (signal or frame), this property is ignored. Single-Point sessions always use a value of 1 as the effective queue size. """ return _props.get_session_queue_size(self._handle) @queue_size.setter def queue_size(self, value): # type: (int) -> None _props.set_session_queue_size(self._handle, value)