Files

271 lines
8.8 KiB
Python
Raw Permalink Normal View History

2025-12-13 01:18:34 +01:00
import tkinter as tk
from tkinter import ttk, scrolledtext
import serial.tools.list_ports
import asyncio
import threading
import serial
import websockets
import time
import ssl
# SSL macOS workaround
ssl_context = ssl._create_unverified_context()
ser = None
ws = None
running = False
# ─────────────────────────────────────────────
# LOG
# ─────────────────────────────────────────────
def log(text, log_widget):
timestamp = time.strftime("%H:%M:%S")
log_widget.configure(state="normal")
log_widget.insert(tk.END, f"[{timestamp}] {text}\n")
log_widget.see(tk.END)
log_widget.configure(state="disabled")
# ─────────────────────────────────────────────
# ASYNC TASK : WebSocket ↔ Minitel
# ─────────────────────────────────────────────
async def websocket_task(url, tty, speed, parity, databits, stopbits, status_label, log_widget):
global ser, ws, running
STOPBITS_MAP = {
"1": serial.STOPBITS_ONE,
"1.5": serial.STOPBITS_ONE_POINT_FIVE,
"2": serial.STOPBITS_TWO
}
try:
# Ouverture port série
log(f"Ouverture du port série {tty} à {speed} bauds…", log_widget)
ser = serial.Serial(
tty,
int(speed),
parity=parity,
bytesize=int(databits),
stopbits=STOPBITS_MAP[stopbits],
timeout=1
)
# Connexion WebSocket
log(f"Connexion au WebSocket {url}", log_widget)
if url.startswith("wss://"):
log("Utilisation dun contexte SSL non vérifié (wss)…", log_widget)
ws = await websockets.connect(url, ssl=ssl_context)
else:
ws = await websockets.connect(url)
log("Connexion établie.", log_widget)
status_label.config(text="Connecté")
# Message test Minitel
ser.write(b"\x07\x0c\x1f\x40\x41connexion\x0a")
ser.write(b"\x1b\x3b\x60\x58\x52")
# WebSocket → Minitel
async def w2m():
while running:
try:
data = await ws.recv()
if isinstance(data, bytes):
ser.write(data)
log(f"[WS→Minitel] {len(data)} octets", log_widget)
else:
ser.write(data.encode("latin1", "replace"))
log(f"[WS→Minitel] {data}", log_widget)
except:
break
# Minitel → WebSocket
async def m2w():
while running:
if ser.in_waiting > 0:
data = ser.read(ser.in_waiting)
await ws.send(data.decode("latin1", "replace"))
log(f"[Minitel→WS] {data}", log_widget)
else:
await asyncio.sleep(0.05)
await asyncio.gather(w2m(), m2w())
except Exception as e:
log(f"Erreur : {e}", log_widget)
status_label.config(text="Erreur")
finally:
running = False
log("Déconnexion…", log_widget)
try:
if ws:
await ws.close()
except:
pass
try:
if ser:
ser.close()
except:
pass
status_label.config(text="Déconnecté")
log("Connexions fermées.", log_widget)
# ─────────────────────────────────────────────
# THREAD RUNNER
# ─────────────────────────────────────────────
def start_async(url, tty, speed, parity, databits, stopbits, status_label, log_widget):
global running
if running:
return
running = True
loop = asyncio.new_event_loop()
threading.Thread(
target=loop.run_until_complete,
args=(websocket_task(url, tty, speed, parity, databits, stopbits, status_label, log_widget),),
daemon=True
).start()
def stop_connection(status_label, log_widget):
global running
running = False
status_label.config(text="Déconnecté")
log("Arrêt demandé.", log_widget)
# ─────────────────────────────────────────────
# Port detection refresh
# ─────────────────────────────────────────────
def update_ports(combo):
current_ports = set(combo["values"])
detected = {p.device for p in serial.tools.list_ports.comports()}
if detected != current_ports:
combo["values"] = list(detected)
if detected:
combo.set(list(detected)[0])
# ─────────────────────────────────────────────
# GUI
# ─────────────────────────────────────────────
def build_gui():
root = tk.Tk()
root.title("WebSocket ↔ Minitel")
# LISTE DE SERVEURS
SERVERS = {
"MiniPAVI (officiel)": "wss://go.minipavi.fr:8181",
"Hacker": "ws://mntl.joher.com:2018",
"Annuaire": "ws://3611.re/ws",
"3615": "ws://3615co.de/ws",
"Retrocampus": "wss://bbs.retrocampus.com:8051",
"LABBEJ27": "wss://minitel.labbej.fr:8182",
"Saisie manuelle…": ""
}
tk.Label(root, text="Serveur prédéfini").grid(row=0, column=0)
server_combo = ttk.Combobox(root, values=list(SERVERS.keys()), width=40)
server_combo.set("MiniPAVI (officiel)")
server_combo.grid(row=0, column=1)
# Champ URL modifiable
tk.Label(root, text="Adresse WebSocket").grid(row=1, column=0)
url_entry = tk.Entry(root, width=40)
url_entry.insert(0, SERVERS["MiniPAVI (officiel)"])
url_entry.grid(row=1, column=1)
def on_server_change(event):
url = SERVERS.get(server_combo.get(), "")
url_entry.delete(0, tk.END)
url_entry.insert(0, url)
server_combo.bind("<<ComboboxSelected>>", on_server_change)
# PORT SERIE
tk.Label(root, text="Port série").grid(row=2, column=0)
ports = [p.device for p in serial.tools.list_ports.comports()]
port_combo = ttk.Combobox(root, values=ports, width=20)
if ports:
port_combo.set(ports[0])
port_combo.grid(row=2, column=1)
# VITESSE
tk.Label(root, text="Vitesse").grid(row=3, column=0)
speeds = ["1200", "4800", "9600", "19200"]
speed_combo = ttk.Combobox(root, values=speeds, width=20)
speed_combo.set("1200")
speed_combo.grid(row=3, column=1)
# PARITÉ
tk.Label(root, text="Parité").grid(row=4, column=0)
parity_map = {
"Even (pair)": serial.PARITY_EVEN,
"Odd (impair)": serial.PARITY_ODD,
"None": serial.PARITY_NONE,
"Mark": serial.PARITY_MARK,
"Space": serial.PARITY_SPACE
}
parity_combo = ttk.Combobox(root, values=list(parity_map.keys()), width=20)
parity_combo.set("Even (pair)")
parity_combo.grid(row=4, column=1)
# DATABITS
tk.Label(root, text="Bits de données").grid(row=5, column=0)
databits_combo = ttk.Combobox(root, values=["7", "8"], width=20)
databits_combo.set("7")
databits_combo.grid(row=5, column=1)
# STOPBITS
tk.Label(root, text="Bits de stop").grid(row=6, column=0)
stopbits_combo = ttk.Combobox(root, values=["1", "1.5", "2"], width=20)
stopbits_combo.set("1")
stopbits_combo.grid(row=6, column=1)
# LOG
log_widget = scrolledtext.ScrolledText(root, width=60, height=15, state="disabled")
log_widget.grid(row=8, column=0, columnspan=2, padx=5, pady=5)
# STATUT
status_label = tk.Label(root, text="En attente…")
status_label.grid(row=9, column=0, columnspan=2)
# BOUTONS
tk.Button(
root,
text="Connecter",
command=lambda: start_async(
url_entry.get(),
port_combo.get(),
speed_combo.get(),
parity_map[parity_combo.get()],
databits_combo.get(),
stopbits_combo.get(),
status_label,
log_widget
)
).grid(row=7, column=0)
tk.Button(
root,
text="Déconnecter",
command=lambda: stop_connection(status_label, log_widget)
).grid(row=7, column=1)
# REFRESH PORTS
def refresh_ports():
update_ports(port_combo)
root.after(1000, refresh_ports)
refresh_ports()
root.mainloop()
if __name__ == "__main__":
build_gui()