Single-Point I/O Example¶
This example uses nixnet.session.SignalInSinglePointSession
and
nixnet.session.SignalOutSinglePointSession
to demonstrate how single-point sessions
work.
To adapt this to Frames, just change the sessions to
nixnet.session.FrameInSinglePointSession
and
nixnet.session.FrameOutSinglePointSession
with frames instead of
signals. Then adjust read
/write
to take a frame object per frame
configured in the session rather than signals.
This works for both CAN and LIN
frames. LIN frames also require change_lin_sched
to write a request for the
LIN interface to change the running schedule. See Queued I/O Example to see how
to read and write frames.
CAN Single-Point I/O¶
def main():
database_name = 'NIXNET_example'
cluster_name = 'CAN_Cluster'
input_signals = ['CANEventSignal1', 'CANEventSignal2']
output_signals = ['CANEventSignal1', 'CANEventSignal2']
interface1 = 'CAN1'
interface2 = 'CAN2'
with nixnet.SignalInSinglePointSession(
interface1,
database_name,
cluster_name,
input_signals) as input_session:
with nixnet.SignalOutSinglePointSession(
interface2,
database_name,
cluster_name,
output_signals) as output_session:
terminated_cable = six.moves.input('Are you using a terminated cable (Y or N)? ')
if terminated_cable.lower() == "y":
input_session.intf.can_term = constants.CanTerm.ON
output_session.intf.can_term = constants.CanTerm.OFF
elif terminated_cable.lower() == "n":
input_session.intf.can_term = constants.CanTerm.ON
output_session.intf.can_term = constants.CanTerm.ON
else:
print("Unrecognised input ({}), assuming 'n'".format(terminated_cable))
input_session.intf.can_term = constants.CanTerm.ON
output_session.intf.can_term = constants.CanTerm.ON
# Start the input session manually to make sure that the first
# signal value sent before the initial read will be received.
input_session.start()
user_value = six.moves.input('Enter {} signal values [float, float]: '.format(len(input_signals)))
try:
value_buffer = [float(x.strip()) for x in user_value.split(",")]
except ValueError:
value_buffer = [24.5343, 77.0129]
print('Unrecognized input ({}). Setting data buffer to {}'.format(user_value, value_buffer))
if len(value_buffer) != len(input_signals):
value_buffer = [24.5343, 77.0129]
print('Invalid number of signal values entered. Setting data buffer to {}'.format(value_buffer))
print('The same values should be received. Press q to quit')
i = 0
while True:
for index, value in enumerate(value_buffer):
value_buffer[index] = value + i
output_session.signals.write(value_buffer)
print('Sent signal values: {}'.format(value_buffer))
# Wait 1 s and then read the received values.
# They should be the same as the ones sent.
time.sleep(1)
signals = input_session.signals.read()
for timestamp, value in signals:
date = convert_timestamp(timestamp)
print('Received signal with timestamp {} and value {}'.format(date, value))
i += 1
if max(value_buffer) + i > sys.float_info.max:
i = 0
inp = six.moves.input('Hit enter to continue (q to quit): ')
if inp.lower() == 'q':
break
print('Data acquisition stopped.')