Single-Point I/O Example

This example uses nixnet.session.SignalInSinglePointSession and nixnet.session.SignalOutSinglePointSession to demonstrate how single-point sessions work.

To adapt this to Frames, just change the sessions to nixnet.session.FrameInSinglePointSession and nixnet.session.FrameOutSinglePointSession with frames instead of signals. Then adjust read/write to take a frame object per frame configured in the session rather than signals.

This works for both CAN and LIN frames. LIN frames also require change_lin_sched to write a request for the LIN interface to change the running schedule. See Queued I/O Example to see how to read and write frames.

CAN Single-Point I/O

def main():
    database_name = 'NIXNET_example'
    cluster_name = 'CAN_Cluster'
    input_signals = ['CANEventSignal1', 'CANEventSignal2']
    output_signals = ['CANEventSignal1', 'CANEventSignal2']
    interface1 = 'CAN1'
    interface2 = 'CAN2'

    with nixnet.SignalInSinglePointSession(
            interface1,
            database_name,
            cluster_name,
            input_signals) as input_session:
        with nixnet.SignalOutSinglePointSession(
                interface2,
                database_name,
                cluster_name,
                output_signals) as output_session:
            terminated_cable = six.moves.input('Are you using a terminated cable (Y or N)? ')
            if terminated_cable.lower() == "y":
                input_session.intf.can_term = constants.CanTerm.ON
                output_session.intf.can_term = constants.CanTerm.OFF
            elif terminated_cable.lower() == "n":
                input_session.intf.can_term = constants.CanTerm.ON
                output_session.intf.can_term = constants.CanTerm.ON
            else:
                print("Unrecognised input ({}), assuming 'n'".format(terminated_cable))
                input_session.intf.can_term = constants.CanTerm.ON
                output_session.intf.can_term = constants.CanTerm.ON

            # Start the input session manually to make sure that the first
            # signal value sent before the initial read will be received.
            input_session.start()

            user_value = six.moves.input('Enter {} signal values [float, float]: '.format(len(input_signals)))
            try:
                value_buffer = [float(x.strip()) for x in user_value.split(",")]
            except ValueError:
                value_buffer = [24.5343, 77.0129]
                print('Unrecognized input ({}). Setting data buffer to {}'.format(user_value, value_buffer))

            if len(value_buffer) != len(input_signals):
                value_buffer = [24.5343, 77.0129]
                print('Invalid number of signal values entered. Setting data buffer to {}'.format(value_buffer))

            print('The same values should be received. Press q to quit')
            i = 0
            while True:
                for index, value in enumerate(value_buffer):
                    value_buffer[index] = value + i
                output_session.signals.write(value_buffer)
                print('Sent signal values: {}'.format(value_buffer))

                # Wait 1 s and then read the received values.
                # They should be the same as the ones sent.
                time.sleep(1)

                signals = input_session.signals.read()
                for timestamp, value in signals:
                    date = convert_timestamp(timestamp)
                    print('Received signal with timestamp {} and value {}'.format(date, value))

                i += 1
                if max(value_buffer) + i > sys.float_info.max:
                    i = 0

                inp = six.moves.input('Hit enter to continue (q to quit): ')
                if inp.lower() == 'q':
                    break

            print('Data acquisition stopped.')