diff --git a/example.py b/example.py index 6a20967..e89ef7b 100644 --- a/example.py +++ b/example.py @@ -10,17 +10,20 @@ from ppk2_api.ppk2_api import PPK2_API ppk2s_connected = PPK2_API.list_devices() -if(len(ppk2s_connected) == 1): - ppk2_port = ppk2s_connected[0] - print(f'Found PPK2 at {ppk2_port}') +if len(ppk2s_connected) == 1: + ppk2_port = ppk2s_connected[0][0] + ppk2_serial = ppk2s_connected[0][1] + print(f"Found PPK2 at {ppk2_port} with serial number {ppk2_serial}") else: - print(f'Too many connected PPK2\'s: {ppk2s_connected}') + print(f"Too many connected PPK2's: {ppk2s_connected}") exit() -ppk2_test = PPK2_API(ppk2_port) +ppk2_test = PPK2_API(ppk2_port, timeout=1, write_timeout=1, exclusive=True) ppk2_test.get_modifiers() -ppk2_test.use_ampere_meter() # set ampere meter mode -ppk2_test.toggle_DUT_power("OFF") # disable DUT power +ppk2_test.set_source_voltage(3300) + +ppk2_test.use_source_meter() # set source meter mode +ppk2_test.toggle_DUT_power("ON") # enable DUT power ppk2_test.start_measuring() # start measuring # measurements are a constant stream of bytes @@ -30,18 +33,38 @@ for i in range(0, 1000): read_data = ppk2_test.get_data() if read_data != b'': - samples = ppk2_test.get_samples(read_data) + samples, raw_digital = ppk2_test.get_samples(read_data) print(f"Average of {len(samples)} samples is: {sum(samples)/len(samples)}uA") + + # Raw digital contains the raw digital data from the PPK2 + # The number of raw samples is equal to the number of samples in the samples list + # We have to process the raw digital data to get the actual digital data + digital_channels = ppk2_test.digital_channels(raw_digital) + for ch in digital_channels: + # Print last 10 values of each channel + print(ch[-10:]) + print() time.sleep(0.01) -ppk2_test.toggle_DUT_power("ON") +ppk2_test.toggle_DUT_power("OFF") # disable DUT power + +ppk2_test.use_ampere_meter() # set ampere meter mode ppk2_test.start_measuring() for i in range(0, 1000): read_data = ppk2_test.get_data() if read_data != b'': - samples = ppk2_test.get_samples(read_data) + samples, raw_digital = ppk2_test.get_samples(read_data) print(f"Average of {len(samples)} samples is: {sum(samples)/len(samples)}uA") - time.sleep(0.001) # lower time between sampling -> less samples read in one sampling period -ppk2_test.stop_measuring() \ No newline at end of file + # Raw digital contains the raw digital data from the PPK2 + # The number of raw samples is equal to the number of samples in the samples list + # We have to process the raw digital data to get the actual digital data + digital_channels = ppk2_test.digital_channels(raw_digital) + for ch in digital_channels: + # Print last 10 values of each channel + print(ch[-10:]) + print() + time.sleep(0.01) # lower time between sampling -> less samples read in one sampling period + +ppk2_test.stop_measuring() diff --git a/example_mp.py b/example_mp.py index bb084b4..523dcd9 100644 --- a/example_mp.py +++ b/example_mp.py @@ -1,49 +1,78 @@ """ -Basic usage of PPK2 Python API - multiprocessing version +Basic usage of PPK2 Python API - multiprocessing version. The basic ampere mode sequence is: 1. read modifiers 2. set ampere mode 3. read stream of data """ import time -from ppk2_api.ppk2_api import PPK2_MP +from ppk2_api.ppk2_api import PPK2_MP as PPK2_API -ppk2s_connected = PPK2_MP.list_devices() -if(len(ppk2s_connected) == 1): - ppk2_port = ppk2s_connected[0] - print(f'Found PPK2 at {ppk2_port}') +ppk2s_connected = PPK2_API.list_devices() +if len(ppk2s_connected) == 1: + ppk2_port = ppk2s_connected[0][0] + ppk2_serial = ppk2s_connected[0][1] + print(f"Found PPK2 at {ppk2_port} with serial number {ppk2_serial}") else: - print(f'Too many connected PPK2\'s: {ppk2s_connected}') + print(f"Too many connected PPK2's: {ppk2s_connected}") exit() -ppk2_test = PPK2_MP(ppk2_port) +ppk2_test = PPK2_API(ppk2_port, buffer_max_size_seconds=1, buffer_chunk_seconds=0.01, timeout=1, write_timeout=1, exclusive=True) ppk2_test.get_modifiers() -ppk2_test.use_ampere_meter() # set ampere meter mode -ppk2_test.toggle_DUT_power("OFF") # disable DUT power +ppk2_test.set_source_voltage(3300) -ppk2_test.start_measuring() # start measuring +""" +Source mode example +""" +ppk2_test.use_source_meter() # set source meter mode +ppk2_test.toggle_DUT_power("ON") # enable DUT power +ppk2_test.start_measuring() # start measuring # measurements are a constant stream of bytes -# multiprocessing variant starts a process in the background which constantly -# polls the device in order to prevent losing samples. It will buffer the -# last 10s (by default) of data so get_data() can be called less frequently. -for i in range(0, 10): +# the number of measurements in one sampling period depends on the wait between serial reads +# it appears the maximum number of bytes received is 1024 +# the sampling rate of the PPK2 is 100 samples per millisecond +while True: read_data = ppk2_test.get_data() if read_data != b'': - samples = ppk2_test.get_samples(read_data) + samples, raw_digital = ppk2_test.get_samples(read_data) print(f"Average of {len(samples)} samples is: {sum(samples)/len(samples)}uA") - time.sleep(0.5) -ppk2_test.toggle_DUT_power("ON") + # Raw digital contains the raw digital data from the PPK2 + # The number of raw samples is equal to the number of samples in the samples list + # We have to process the raw digital data to get the actual digital data + digital_channels = ppk2_test.digital_channels(raw_digital) + for ch in digital_channels: + # Print last 10 values of each channel + print(ch[-10:]) + print() + + time.sleep(0.001) + +ppk2_test.toggle_DUT_power("OFF") # disable DUT power +ppk2_test.stop_measuring() + +""" +Ampere mode example +""" +ppk2_test.use_ampere_meter() # set ampere meter mode ppk2_test.start_measuring() -for i in range(0, 10): +while True: read_data = ppk2_test.get_data() if read_data != b'': - samples = ppk2_test.get_samples(read_data) + samples, raw_digital = ppk2_test.get_samples(read_data) print(f"Average of {len(samples)} samples is: {sum(samples)/len(samples)}uA") - time.sleep(0.5) # lower time between sampling -> less samples read in one sampling period -ppk2_test.stop_measuring() + # Raw digital contains the raw digital data from the PPK2 + # The number of raw samples is equal to the number of samples in the samples list + # We have to process the raw digital data to get the actual digital data + digital_channels = ppk2_test.digital_channels(raw_digital) + for ch in digital_channels: + # Print last 10 values of each channel + print(ch[-10:]) + print() + time.sleep(0.001) # lower time between sampling -> less samples read in one sampling period +ppk2_test.stop_measuring() diff --git a/setup.py b/setup.py index 850f849..bab7e24 100644 --- a/setup.py +++ b/setup.py @@ -23,7 +23,7 @@ def read(*names, **kwargs): setup( name="ppk2-api", - version="0.0.2", + version="0.9.2", description="API for Nordic Semiconductor's Power Profiler Kit II (PPK 2).", url="https://github.com/IRNAS/ppk2-api-python", packages=find_packages("src"), diff --git a/src/ppk2_api/ppk2_api.py b/src/ppk2_api/ppk2_api.py index 18dae01..4b39a69 100644 --- a/src/ppk2_api/ppk2_api.py +++ b/src/ppk2_api/ppk2_api.py @@ -10,7 +10,7 @@ import logging import os import queue -import multiprocessing +import threading class PPK2_Command(): """Serial command opcodes""" @@ -46,10 +46,14 @@ class PPK2_Modes(): class PPK2_API(): - def __init__(self, port): + def __init__(self, port: str, **kwargs): + ''' + port - port where PPK2 is connected + **kwargs - keyword arguments to pass to the pySerial constructor + ''' self.ser = None - self.ser = serial.Serial(port) + self.ser = serial.Serial(port, **kwargs) self.ser.baudrate = 9600 self.modifiers = { @@ -94,6 +98,9 @@ def __init__(self, port): def __del__(self): """Destructor""" try: + # reset device + self._write_serial((PPK2_Command.RESET,)) + if self.ser: self.ser.close() except Exception as e: @@ -185,11 +192,8 @@ def _generate_mask(self, bits, pos): mask = self._twos_comp(mask) return {"mask": mask, "pos": pos} - def _get_masked_value(self, value, meas): + def _get_masked_value(self, value, meas, is_bits=False): masked_value = (value & meas["mask"]) >> meas["pos"] - if meas["pos"] == 24: - if masked_value == 255: - masked_value = -1 return masked_value def _handle_raw_data(self, adc_value): @@ -201,19 +205,28 @@ def _handle_raw_data(self, adc_value): bits = self._get_masked_value(adc_value, self.MEAS_LOGIC) analog_value = self.get_adc_result( current_measurement_range, adc_result) * 10**6 - return analog_value + return analog_value, bits except Exception as e: print("Measurement outside of range!") - return None + return None, None @staticmethod def list_devices(): import serial.tools.list_ports + ports = serial.tools.list_ports.comports() - if os.name == 'nt': - devices = [port.device for port in ports if port.description.startswith("nRF Connect USB CDC ACM")] + if os.name == "nt": + devices = [ + (port.device, port.serial_number[:8]) + for port in ports + if port.description.startswith("nRF Connect USB CDC ACM") and port.location.endswith("1") + ] else: - devices = [port.device for port in ports if port.product == 'PPK2'] + devices = [ + (port.device, port.serial_number[:8]) + for port in ports + if port.product == "PPK2" and port.location.endswith("1") + ] return devices def get_data(self): @@ -323,6 +336,26 @@ def _digital_to_analog(self, adc_value): """Convert discrete value to analog value""" return int.from_bytes(adc_value, byteorder="little", signed=False) # convert reading to analog value + def digital_channels(self, bits): + """ + Convert raw digital data to digital channels. + + Returns a 2d matrix with 8 rows (one for each channel). Each row contains HIGH and LOW values for the selected channel. + """ + + # Prepare 2d matrix with 8 rows (one for each channel) + digital_channels = [[], [], [], [], [], [], [], []] + for sample in bits: + digital_channels[0].append((sample & 1) >> 0) + digital_channels[1].append((sample & 2) >> 1) + digital_channels[2].append((sample & 4) >> 2) + digital_channels[3].append((sample & 8) >> 3) + digital_channels[4].append((sample & 16) >> 4) + digital_channels[5].append((sample & 32) >> 5) + digital_channels[6].append((sample & 64) >> 6) + digital_channels[7].append((sample & 128) >> 7) + return digital_channels + def get_samples(self, buf): """ Returns list of samples read in one sampling period. @@ -334,13 +367,16 @@ def get_samples(self, buf): sample_size = 4 # one analog value is 4 bytes in size offset = self.remainder["len"] samples = [] + raw_digital_output = [] first_reading = ( self.remainder["sequence"] + buf[0:sample_size-offset])[:4] adc_val = self._digital_to_analog(first_reading) - measurement = self._handle_raw_data(adc_val) + measurement, bits = self._handle_raw_data(adc_val) if measurement is not None: samples.append(measurement) + if bits is not None: + raw_digital_output.append(bits) offset = sample_size - offset @@ -348,19 +384,23 @@ def get_samples(self, buf): next_val = buf[offset:offset + sample_size] offset += sample_size adc_val = self._digital_to_analog(next_val) - measurement = self._handle_raw_data(adc_val) + measurement, bits = self._handle_raw_data(adc_val) if measurement is not None: samples.append(measurement) + if bits is not None: + raw_digital_output.append(bits) self.remainder["sequence"] = buf[offset:len(buf)] self.remainder["len"] = len(buf)-offset - return samples # return list of samples, handle those lists in PPK2 API wrapper + # return list of samples and raw digital outputs + # handle those lists in PPK2 API wrapper + return samples, raw_digital_output -class PPK_Fetch(multiprocessing.Process): +class PPK_Fetch(threading.Thread): ''' - Background process for polling the data in multiprocessing variant + Background process for polling the data in multi-threaded variant ''' def __init__(self, ppk2, quit_evt, buffer_len_s=10, buffer_chunk_s=0.5): super().__init__() @@ -380,7 +420,7 @@ def __init__(self, ppk2, quit_evt, buffer_len_s=10, buffer_chunk_s=0.5): if self._buffer_chunk % 4 != 0: self._buffer_chunk = (self._buffer_chunk // 4) * 4 - self._buffer_q = multiprocessing.Queue() + self._buffer_q = queue.Queue() def run(self): s = 0 @@ -397,7 +437,6 @@ def run(self): self._buffer_q.get() local_buffer = local_buffer[self._buffer_chunk:] self._last_timestamp = tm_now - #print(len(d), len(local_buffer), self._buffer_q.qsize()) # calculate stats s += len(d) @@ -423,7 +462,7 @@ def get_data(self): count = 0 while True: try: - ret += self._buffer_q.get(timeout=0.01) # get_nowait sometimes skips a chunk for some reason + ret += self._buffer_q.get(timeout=0.001) # get_nowait sometimes skips a chunk for some reason count += 1 except queue.Empty: break @@ -435,15 +474,19 @@ class PPK2_MP(PPK2_API): Multiprocessing variant of the object. The interface is the same as for the regular one except it spawns a background process on start_measuring() ''' - def __init__(self, port, buffer_seconds=10): + def __init__(self, port, buffer_max_size_seconds=10, buffer_chunk_seconds=0.1, **kwargs): ''' port - port where PPK2 is connected - buffer_seconds - how many seconds of data to keep in the buffer + buffer_max_size_seconds - how many seconds of data to keep in the buffer + buffer_chunk_seconds - how many seconds of data to put in the queue at once + **kwargs - keyword arguments to pass to the pySerial constructor ''' - super().__init__(port) + super().__init__(port, **kwargs) + self._fetcher = None - self._quit_evt = multiprocessing.Event() - self._buffer_seconds = buffer_seconds + self._quit_evt = threading.Event() + self._buffer_max_size_seconds = buffer_max_size_seconds + self._buffer_chunk_seconds = buffer_chunk_seconds def __del__(self): """Destructor""" @@ -467,12 +510,12 @@ def start_measuring(self): if self._fetcher is not None: return - self._fetcher = PPK_Fetch(self, self._quit_evt, self._buffer_seconds) + self._fetcher = PPK_Fetch(self, self._quit_evt, self._buffer_max_size_seconds, self._buffer_chunk_seconds) self._fetcher.start() def stop_measuring(self): PPK2_API.stop_measuring(self) - PPK2_API.get_data(self) # flush the serial buffer (to prevent unicode error on next command) + self.get_data() # flush the serial buffer (to prevent unicode error on next command) self._quit_evt.set() if self._fetcher is not None: self._fetcher.join() # join() will block if the queue isn't empty