Acquire real time data from NI-USB 6002 via the nidaqmx package in Python

49 Views Asked by At

I have a problem and I am wondering if somebody has a solution for it. I am working with a National Instruments Card (NI-USB 6002) and with the Python Package nidaqmx. Want I want is to capture two analog inputs with a frequency of 6kHz. I want to read a specified amount of samples as soon as they are available. Therefore I generated a condition. It should always check the available samples and if there are enough, it should read it and should process it further. But I am running into a problem after a while. Since I am checking always, how many samples are available at the moment, I thought it is an CPU or memory problem but I am not sure anymore. The software is running great for a while, but at some point it is not reading the samples regularly anymore. I don't know why. Here you can see my code:

# Import different modules
import nidaqmx
from nidaqmx.constants import AcquisitionType, TerminalConfiguration, WaitMode
import numpy as np
import time
from collections import deque
from scipy import signal
import pandas as pd
from ticktock import tick
import logging
from datetime import datetime 



''' Main code '''
if __name__ == "__main__":
    
    # Configure the logging
    logging.basicConfig(
        format='[%(asctime)s.%(msecs)03d] %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S',
        level=logging.INFO)
    
    # Define the parameters for the input and for the calculation
    sample_rate = 6000      # sample rate for the analog input [Hz]
    window_time = 66e-3     # Timeslot for one window [s]
    overlap = 0.5           # overlap between old an new data for one window
    
    # Create an empty DataFrame to store the data
    columns = ["Input_ai0", "Input_ai1"]
    data_df = pd.DataFrame(columns=columns)
    
    # Define the input and output channels
    input_channels = ["Dev1/ai0", "Dev1/ai1"]
    output_channels = ["Dev1/ao0", "Dev1/ao1"]
    
    # Calculate window_size (samples for one window for ml-algorithm) and
    # samples_per_channel (samples which are read at one timepoint)
    window_size = (int)(window_time * (sample_rate/len(input_channels)))
    samples_per_channel = (int)(window_time * sample_rate * overlap)
    
    # Define the queue and the maximum number of samples in it (window size)
    data_queue_ai0 = deque(maxlen=window_size)
    data_queue_ai1 = deque(maxlen=window_size)
    
    # Create a task for analog input and output
    with nidaqmx.Task() as input_task, nidaqmx.Task() as output_task:
        
            # Configure analog input and output channels
            for channel in input_channels:
                input_task.ai_channels.add_ai_voltage_chan(channel, terminal_config=nidaqmx.constants.TerminalConfiguration.RSE)
            for channel in output_channels:
                output_task.ao_channels.add_ao_voltage_chan(channel)
            
            # Configure the sample rate and the number of samples per channel,
            # start the input task and initialize the output with 0V.
            input_task.timing.cfg_samp_clk_timing(sample_rate, sample_mode=nidaqmx.constants.AcquisitionType.CONTINUOUS)
            input_task.start()
            output_task.write([0.0, 0.0])
            
            try:
                while True:
                    
                    # Check the available samples in the DAQ
                    available_samples = input_task.in_stream.avail_samp_per_chan
                    logging.info(f"Available samples: {available_samples}")
                    
                    # If there are enough samples available, read the
                    # neccessary ones and add them to the data_queue (window).
                    if available_samples >= samples_per_channel:
                        input_data = input_task.read(number_of_samples_per_channel=samples_per_channel)
                        logging.info(f"Have read {len(input_channels)} * {len(input_data[0])} samples.")
                        
                        data_queue_ai0.extend(input_data[0])
                        data_queue_ai1.extend(input_data[1])
                        
                        # Append the data to the DataFrame
                        new_row = {
                            "Input_ai0": input_data[0],
                            "Input_ai1": input_data[1],
                            }
                        data_df = data_df.append(new_row, ignore_index=True)

            except KeyboardInterrupt:
                print("Data acquisition stopped (you pressed Ctrl+C).")
                    
            except nidaqmx.DaqError as e:
                print(f"Dataacquisition-Error: {str(e)}")
    

    # Save data in txt-file
    data_out = "data_output_two_channels.txt"
    data_df.to_csv(data_out, sep='\t', index = None)
    print("Acquired data saved to 'read_data.txt")
    
    print("Analog input completed.")

Attached you can find a screenshot from how it looks like when it is executed. After a while, it seems like python hangs itself up.

Does anybody have a solution for it??

I tried several things but I get always the same problem. It sames like it is not able to read that often.

0

There are 0 best solutions below