Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import serial
- import time
- import binascii
- import threading
- import struct
- import msvcrt
- PORT = 'COM26'
- BAUDRATE = 9600
- REQUEST_HEX = "010313BA0006E0A9"
- # --- Funkcja CRC16 Modbus ---
- def modbus_crc(data: bytes) -> bytes:
- crc = 0xFFFF
- for pos in data:
- crc ^= pos
- for _ in range(8):
- if crc & 1:
- crc >>= 1
- crc ^= 0xA001
- else:
- crc >>= 1
- return crc.to_bytes(2, byteorder='little')
- # --- Funkcja do konwersji float -> bytes ---
- def convert_float_to_hex(value: float, endian: str = "BE_SWAP") -> list[int]:
- """Zwraca listę 4 bajtów float w podanym kodowaniu endian.
- endian:
- "BE" – standardowy Big Endian (0x42 F6 E6 66)
- "LE" – standardowy Little Endian (0x66 E6 F6 42)
- "BE_SWAP" – Big Endian + byte swap (0xE6 0x66 0x42 0xF6) ← Twój przypadek
- "LE_SWAP" – Little Endian + byte swap
- """
- raw = struct.pack('>f', value) # zawsze zaczynamy od Big Endian (IEEE754)
- b = list(raw)
- if endian == "BE":
- return b
- elif endian == "LE":
- return b[::-1]
- elif endian == "BE_SWAP":
- return [b[2], b[3], b[0], b[1]] # słowo niskie najpierw
- elif endian == "LE_SWAP":
- return [b[1], b[0], b[3], b[2]]
- else:
- raise ValueError("Invalid endian type")
- # --- Funkcja pomocnicza do tworzenia ramki Modbus ---
- def build_response_frame(values, endian="BE_SWAP"):
- """Tworzy ramkę odpowiedzi Modbus z listy wartości float."""
- data = [0x01, 0x03, len(values) * 4 * 1.5] # 6 rejestrów → 12 bajtów
- data[2] = int(len(values) * 4) # liczba bajtów danych
- for val in values:
- data.extend(convert_float_to_hex(val, endian))
- return bytes(data)
- paused = False
- # --- Wątek klawiatury (działa poprawnie) ---
- def keyboard_listener():
- global paused
- while True:
- if msvcrt.kbhit():
- key = msvcrt.getch().decode(errors="ignore").lower()
- if key == 'p':
- paused = not paused
- state = "PAUSED" if paused else "RUNNING"
- print(f"\n>>> Program {state}. Press 'P' again to toggle.")
- time.sleep(0.1)
- # --- Główna pętla ---
- def main():
- global paused
- ser = serial.Serial(PORT, BAUDRATE, bytesize=8, parity='N', stopbits=1, timeout=0.05)
- print(f"Listening on {PORT} at {BAUDRATE} baud...")
- print("Press 'P' to pause/resume reception.")
- expected_request = bytes.fromhex(REQUEST_HEX)
- threading.Thread(target=keyboard_listener, daemon=True).start()
- # --- Ustal dane dynamicznie ---
- RESPONSE_DATA = build_response_frame([543.21, 123.45, 9.87], endian="BE_SWAP")
- try:
- while True:
- if paused:
- time.sleep(0.2)
- continue
- if ser.in_waiting:
- data = ser.read(ser.in_waiting)
- hex_data = binascii.hexlify(data).upper().decode()
- print(f"Received: {hex_data}")
- if data == expected_request:
- crc = modbus_crc(RESPONSE_DATA)
- frame = RESPONSE_DATA + crc
- ser.setRTS(True)
- time.sleep(0.05)
- ser.write(frame)
- ser.flush()
- time.sleep(0.05)
- ser.setRTS(False)
- print(f"Sent response: {binascii.hexlify(frame).upper().decode()}")
- else:
- print("Unexpected frame received.")
- time.sleep(0.05)
- except KeyboardInterrupt:
- print("\nExiting...")
- finally:
- ser.close()
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment