import serial import struct import threading import time from typing import Callable, Optional, Union, List from construct import Container from dps150.packets.cmd import * from dps150.packets.base import ( packet, float_response, float3_response, byte_response, all_data, ) class DPS150: def __init__(self, serial_device: str, callback: Optional[Callable] = None) -> None: """ Initialize DPS150 power supply controller. Args: serial_device: Serial port path (e.g., '/dev/ttyACM0' or 'COM3') callback: Optional callback function to receive data updates """ self.serial_device = serial_device self.callback = callback if callback else lambda x: None self._device: Optional[serial.Serial] = None self._reader_thread: Optional[threading.Thread] = None self._running = False self._buffer = bytearray() async def _sleep(self, n: float): """Sleep helper (async-like using time.sleep)""" time.sleep(n / 1000.0) def start(self) -> None: """Start communication with the device.""" print(f'start {self.serial_device}') self._device = serial.Serial( port=self.serial_device, baudrate=115200, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=1, write_timeout=1 ) if not self._device.is_open: raise Exception(f'Can\'t open serial port! ({self.serial_device})') self._running = True self._start_reader() self._init_command() def stop(self) -> None: """Stop communication with the device.""" print('stop') if self._device and self._device.is_open: self.send_command(HEADER_OUT, CMD_SESSION, 0, 0) self._running = False if self._reader_thread: self._reader_thread.join(timeout=2) self._device.close() def _start_reader(self) -> None: """Start background thread for reading data.""" print('reading...') self._reader_thread = threading.Thread(target=self._read_loop, daemon=True) self._reader_thread.start() def _read_loop(self) -> None: """Main reading loop running in background thread.""" buffer = bytearray() while self._running and self._device and self._device.is_open: try: if self._device.in_waiting > 0: data = self._device.read(self._device.in_waiting) buffer.extend(data) # Parse packets using construct i = 0 while i < len(buffer) - 6: if buffer[i] == HEADER_IN and buffer[i + 1] == CMD_GET: # Try to parse packet try: # Check if we have enough data if i + 4 >= len(buffer): break length = buffer[i + 3] if i + 4 + length + 1 > len(buffer): break # Not enough data yet # Parse packet packet_data = bytes(buffer[i:i + 4 + length + 1]) parsed = packet.parse(packet_data) # Validate checksum if not parsed.checksum_valid: # Checksum error, skip i += 1 continue # Remove processed packet from buffer buffer = buffer[i + len(packet_data):] i = 0 # Process parsed packet self._parse_packet(parsed) except Exception as e: # Parse error, skip byte i += 1 else: i += 1 # Keep remaining buffer if i < len(buffer): buffer = buffer[i:] else: buffer = bytearray() time.sleep(0.01) # Small delay to prevent CPU spinning except Exception as error: print(f'Read error: {error}') if not self._running: break time.sleep(0.1) def _init_command(self) -> None: """Initialize device with session and baud rate.""" # Start session self.send_command(HEADER_OUT, CMD_SESSION, 0, 1) time.sleep(0.1) # Set baud rate (115200 = index 4 in [9600, 19200, 38400, 57600, 115200]) self.send_command(HEADER_OUT, CMD_BAUD, 0, 5) time.sleep(0.1) # Get device info self.send_command(HEADER_OUT, CMD_GET, MODEL_NAME, 0) time.sleep(0.1) self.send_command(HEADER_OUT, CMD_GET, HARDWARE_VERSION, 0) time.sleep(0.1) self.send_command(HEADER_OUT, CMD_GET, FIRMWARE_VERSION, 0) time.sleep(0.1) self.get_all() def send_command(self, c1: int, c2: int, c3: int, c5: Union[int, List[int], bytes]) -> None: """ Send command to device. Args: c1: Header (HEADER_IN=240 or HEADER_OUT=241) c2: Command (CMD_GET=161, CMD_SET=177, etc.) c3: Type/parameter c5: Data (int, list of ints, or bytes) """ if isinstance(c5, int): c5 = bytes([c5]) elif isinstance(c5, list): c5 = bytes(c5) elif not isinstance(c5, bytes): c5 = bytes(c5) c4 = len(c5) c6 = (c3 + c4) % 0x100 for val in c5: c6 = (c6 + val) % 0x100 # Build packet using construct packet_container = Container( header=c1, cmd=c2, type=c3, length=c4, data=c5, checksum=c6, ) command = packet.build(packet_container) self._send_command_raw(command) def send_command_float(self, c1: int, c2: int, c3: int, value: float) -> None: """Send command with float value.""" float_bytes = struct.pack(' None: """Send raw command bytes to device.""" if self._device and self._device.is_open: self._device.write(command) time.sleep(0.05) # 50ms delay as in JS version def _parse_packet(self, parsed: Container) -> None: """Parse incoming data packet using construct structures.""" if parsed.length == 0: return try: c3 = parsed.type c5 = parsed.data if c3 == 192: # Input voltage data = float_response.parse(c5) self.callback({'inputVoltage': data.value}) elif c3 == 195: # Output voltage, current, power data = float3_response.parse(c5) self.callback({ 'outputVoltage': data.value1, 'outputCurrent': data.value2, 'outputPower': data.value3, }) elif c3 == 196: # Temperature data = float_response.parse(c5) self.callback({'temperature': data.value}) elif c3 == 217: # Output capacity data = float_response.parse(c5) self.callback({'outputCapacity': data.value}) elif c3 == 218: # Output energy data = float_response.parse(c5) self.callback({'outputEnergy': data.value}) elif c3 == 219: # Output closed/enabled data = byte_response.parse(c5) self.callback({'outputClosed': data.value == 1}) elif c3 == 220: # Protection state data = byte_response.parse(c5) state_idx = data.value if state_idx < len(PROTECTION_STATES): self.callback({'protectionState': PROTECTION_STATES[state_idx]}) elif c3 == 221: # CC=0 or CV=1 data = byte_response.parse(c5) self.callback({'mode': 'CC' if data.value == 0 else 'CV'}) elif c3 == 222: # Model name # String response needs length from parent context try: model_name = c5.decode('ascii', errors='ignore').rstrip('\x00') self.callback({'modelName': model_name}) except: pass elif c3 == 223: # Hardware version try: hw_version = c5.decode('ascii', errors='ignore').rstrip('\x00') self.callback({'hardwareVersion': hw_version}) except: pass elif c3 == 224: # Firmware version try: fw_version = c5.decode('ascii', errors='ignore').rstrip('\x00') self.callback({'firmwareVersion': fw_version}) except: pass elif c3 == 225: # Unknown data = byte_response.parse(c5) print(f'Unknown data type 225: {data.value}') elif c3 == 226: # Upper limit voltage data = float_response.parse(c5) self.callback({'upperLimitVoltage': data.value}) elif c3 == 227: # Upper limit current data = float_response.parse(c5) self.callback({'upperLimitCurrent': data.value}) elif c3 == 255: # All data if len(c5) >= 139: # Full packet size data = all_data.parse(c5) # Debug unknown data if len(c5) > 139: print(f'Packet length: {len(c5)}, unknown: protectionStateRaw={data.protectionStateRaw}, ' f'unknown1={data.unknown1}, unknownVoltage={data.unknownVoltage}, ' f'unknownCurrent={data.unknownCurrent}') protection_state = PROTECTION_STATES[data.protectionStateRaw] if data.protectionStateRaw < len(PROTECTION_STATES) else "" self.callback({ 'inputVoltage': data.inputVoltage, 'setVoltage': data.setVoltage, 'setCurrent': data.setCurrent, 'outputVoltage': data.outputVoltage, 'outputCurrent': data.outputCurrent, 'outputPower': data.outputPower, 'temperature': data.temperature, 'group1setVoltage': data.group1setVoltage, 'group1setCurrent': data.group1setCurrent, 'group2setVoltage': data.group2setVoltage, 'group2setCurrent': data.group2setCurrent, 'group3setVoltage': data.group3setVoltage, 'group3setCurrent': data.group3setCurrent, 'group4setVoltage': data.group4setVoltage, 'group4setCurrent': data.group4setCurrent, 'group5setVoltage': data.group5setVoltage, 'group5setCurrent': data.group5setCurrent, 'group6setVoltage': data.group6setVoltage, 'group6setCurrent': data.group6setCurrent, 'overVoltageProtection': data.overVoltageProtection, 'overCurrentProtection': data.overCurrentProtection, 'overPowerProtection': data.overPowerProtection, 'overTemperatureProtection': data.overTemperatureProtection, 'lowVoltageProtection': data.lowVoltageProtection, 'brightness': data.brightness, 'volume': data.volume, 'meteringClosed': data.meteringClosed, 'outputCapacity': data.outputCapacity, 'outputEnergy': data.outputEnergy, 'outputClosed': data.outputClosed, 'protectionState': protection_state, 'mode': 'CC' if data.modeRaw == 0 else 'CV', 'upperLimitVoltage': data.upperLimitVoltage, 'upperLimitCurrent': data.upperLimitCurrent, }) except Exception as e: print(f'Parse error: {e}') import traceback traceback.print_exc() def get_all(self) -> None: """Request all device data.""" self.send_command(HEADER_OUT, CMD_GET, ALL, 0) def set_float_value(self, type_val: int, value: float) -> None: """Set float parameter value.""" self.send_command_float(HEADER_OUT, CMD_SET, type_val, value) def set_byte_value(self, type_val: int, value: int) -> None: """Set byte parameter value.""" self.send_command(HEADER_OUT, CMD_SET, type_val, value) def enable(self) -> None: """Enable output.""" self.set_byte_value(OUTPUT_ENABLE, 1) def disable(self) -> None: """Disable output.""" self.set_byte_value(OUTPUT_ENABLE, 0) def start_metering(self) -> None: """Start metering (accumulate capacity/energy).""" self.set_byte_value(METERING_ENABLE, 1) def stop_metering(self) -> None: """Stop metering.""" self.set_byte_value(METERING_ENABLE, 0) # Legacy methods for compatibility def connect(self) -> None: """Alias for start().""" self.start() def disconnect(self) -> None: """Alias for stop().""" self.stop() def read(self) -> bytes: """Legacy read method (not used, data comes via callback).""" if self._device and self._device.in_waiting > 0: return self._device.read_all() return b'' def write(self, data: bytes) -> None: """Legacy write method.""" if self._device and self._device.is_open: self._device.write(data)