KarBob

Untitled

Nov 7th, 2025
201
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 3.75 KB | None | 0 0
  1. import serial
  2. import time
  3. import binascii
  4. import threading
  5. import struct
  6. import msvcrt
  7.  
  8. PORT = 'COM26'
  9. BAUDRATE = 9600
  10. REQUEST_HEX = "010313BA0006E0A9"
  11.  
  12. # --- Funkcja CRC16 Modbus ---
  13. def modbus_crc(data: bytes) -> bytes:
  14.     crc = 0xFFFF
  15.     for pos in data:
  16.         crc ^= pos
  17.         for _ in range(8):
  18.             if crc & 1:
  19.                 crc >>= 1
  20.                 crc ^= 0xA001
  21.             else:
  22.                 crc >>= 1
  23.     return crc.to_bytes(2, byteorder='little')
  24.  
  25.  
  26. # --- Funkcja do konwersji float -> bytes ---
  27. def convert_float_to_hex(value: float, endian: str = "BE_SWAP") -> list[int]:
  28.     """Zwraca listę 4 bajtów float w podanym kodowaniu endian.
  29.  
  30.    endian:
  31.        "BE"      – standardowy Big Endian (0x42 F6 E6 66)
  32.        "LE"      – standardowy Little Endian (0x66 E6 F6 42)
  33.        "BE_SWAP" – Big Endian + byte swap (0xE6 0x66 0x42 0xF6)  ← Twój przypadek
  34.        "LE_SWAP" – Little Endian + byte swap
  35.    """
  36.     raw = struct.pack('>f', value)  # zawsze zaczynamy od Big Endian (IEEE754)
  37.     b = list(raw)
  38.  
  39.     if endian == "BE":
  40.         return b
  41.     elif endian == "LE":
  42.         return b[::-1]
  43.     elif endian == "BE_SWAP":
  44.         return [b[2], b[3], b[0], b[1]]  # słowo niskie najpierw
  45.     elif endian == "LE_SWAP":
  46.         return [b[1], b[0], b[3], b[2]]
  47.     else:
  48.         raise ValueError("Invalid endian type")
  49.  
  50.  
  51. # --- Funkcja pomocnicza do tworzenia ramki Modbus ---
  52. def build_response_frame(values, endian="BE_SWAP"):
  53.     """Tworzy ramkę odpowiedzi Modbus z listy wartości float."""
  54.     data = [0x01, 0x03, len(values) * 4 * 1.5]  # 6 rejestrów → 12 bajtów
  55.     data[2] = int(len(values) * 4)  # liczba bajtów danych
  56.     for val in values:
  57.         data.extend(convert_float_to_hex(val, endian))
  58.     return bytes(data)
  59.  
  60.  
  61. paused = False
  62.  
  63.  
  64. # --- Wątek klawiatury (działa poprawnie) ---
  65. def keyboard_listener():
  66.     global paused
  67.     while True:
  68.         if msvcrt.kbhit():
  69.             key = msvcrt.getch().decode(errors="ignore").lower()
  70.             if key == 'p':
  71.                 paused = not paused
  72.                 state = "PAUSED" if paused else "RUNNING"
  73.                 print(f"\n>>> Program {state}. Press 'P' again to toggle.")
  74.         time.sleep(0.1)
  75.  
  76.  
  77. # --- Główna pętla ---
  78. def main():
  79.     global paused
  80.  
  81.     ser = serial.Serial(PORT, BAUDRATE, bytesize=8, parity='N', stopbits=1, timeout=0.05)
  82.     print(f"Listening on {PORT} at {BAUDRATE} baud...")
  83.     print("Press 'P' to pause/resume reception.")
  84.  
  85.     expected_request = bytes.fromhex(REQUEST_HEX)
  86.     threading.Thread(target=keyboard_listener, daemon=True).start()
  87.  
  88.     # --- Ustal dane dynamicznie ---
  89.     RESPONSE_DATA = build_response_frame([543.21, 123.45, 9.87], endian="BE_SWAP")
  90.  
  91.     try:
  92.         while True:
  93.             if paused:
  94.                 time.sleep(0.2)
  95.                 continue
  96.  
  97.             if ser.in_waiting:
  98.                 data = ser.read(ser.in_waiting)
  99.                 hex_data = binascii.hexlify(data).upper().decode()
  100.                 print(f"Received: {hex_data}")
  101.  
  102.                 if data == expected_request:
  103.                     crc = modbus_crc(RESPONSE_DATA)
  104.                     frame = RESPONSE_DATA + crc
  105.  
  106.                     ser.setRTS(True)
  107.                     time.sleep(0.05)
  108.  
  109.                     ser.write(frame)
  110.                     ser.flush()
  111.  
  112.                     time.sleep(0.05)
  113.                     ser.setRTS(False)
  114.  
  115.                     print(f"Sent response: {binascii.hexlify(frame).upper().decode()}")
  116.                 else:
  117.                     print("Unexpected frame received.")
  118.  
  119.             time.sleep(0.05)
  120.  
  121.     except KeyboardInterrupt:
  122.         print("\nExiting...")
  123.     finally:
  124.         ser.close()
  125.  
  126.  
  127. if __name__ == "__main__":
  128.     main()
  129.  
Advertisement
Add Comment
Please, Sign In to add comment