271 lines
8.8 KiB
Python
271 lines
8.8 KiB
Python
|
|
import tkinter as tk
|
|||
|
|
from tkinter import ttk, scrolledtext
|
|||
|
|
import serial.tools.list_ports
|
|||
|
|
import asyncio
|
|||
|
|
import threading
|
|||
|
|
import serial
|
|||
|
|
import websockets
|
|||
|
|
import time
|
|||
|
|
import ssl
|
|||
|
|
|
|||
|
|
# SSL macOS workaround
|
|||
|
|
ssl_context = ssl._create_unverified_context()
|
|||
|
|
|
|||
|
|
ser = None
|
|||
|
|
ws = None
|
|||
|
|
running = False
|
|||
|
|
|
|||
|
|
# ─────────────────────────────────────────────
|
|||
|
|
# LOG
|
|||
|
|
# ─────────────────────────────────────────────
|
|||
|
|
def log(text, log_widget):
|
|||
|
|
timestamp = time.strftime("%H:%M:%S")
|
|||
|
|
log_widget.configure(state="normal")
|
|||
|
|
log_widget.insert(tk.END, f"[{timestamp}] {text}\n")
|
|||
|
|
log_widget.see(tk.END)
|
|||
|
|
log_widget.configure(state="disabled")
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ─────────────────────────────────────────────
|
|||
|
|
# ASYNC TASK : WebSocket ↔ Minitel
|
|||
|
|
# ─────────────────────────────────────────────
|
|||
|
|
async def websocket_task(url, tty, speed, parity, databits, stopbits, status_label, log_widget):
|
|||
|
|
global ser, ws, running
|
|||
|
|
|
|||
|
|
STOPBITS_MAP = {
|
|||
|
|
"1": serial.STOPBITS_ONE,
|
|||
|
|
"1.5": serial.STOPBITS_ONE_POINT_FIVE,
|
|||
|
|
"2": serial.STOPBITS_TWO
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# Ouverture port série
|
|||
|
|
log(f"Ouverture du port série {tty} à {speed} bauds…", log_widget)
|
|||
|
|
ser = serial.Serial(
|
|||
|
|
tty,
|
|||
|
|
int(speed),
|
|||
|
|
parity=parity,
|
|||
|
|
bytesize=int(databits),
|
|||
|
|
stopbits=STOPBITS_MAP[stopbits],
|
|||
|
|
timeout=1
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# Connexion WebSocket
|
|||
|
|
log(f"Connexion au WebSocket {url}…", log_widget)
|
|||
|
|
|
|||
|
|
if url.startswith("wss://"):
|
|||
|
|
log("Utilisation d’un contexte SSL non vérifié (wss)…", log_widget)
|
|||
|
|
ws = await websockets.connect(url, ssl=ssl_context)
|
|||
|
|
else:
|
|||
|
|
ws = await websockets.connect(url)
|
|||
|
|
|
|||
|
|
log("Connexion établie.", log_widget)
|
|||
|
|
status_label.config(text="Connecté")
|
|||
|
|
|
|||
|
|
# Message test Minitel
|
|||
|
|
ser.write(b"\x07\x0c\x1f\x40\x41connexion\x0a")
|
|||
|
|
ser.write(b"\x1b\x3b\x60\x58\x52")
|
|||
|
|
|
|||
|
|
# WebSocket → Minitel
|
|||
|
|
async def w2m():
|
|||
|
|
while running:
|
|||
|
|
try:
|
|||
|
|
data = await ws.recv()
|
|||
|
|
if isinstance(data, bytes):
|
|||
|
|
ser.write(data)
|
|||
|
|
log(f"[WS→Minitel] {len(data)} octets", log_widget)
|
|||
|
|
else:
|
|||
|
|
ser.write(data.encode("latin1", "replace"))
|
|||
|
|
log(f"[WS→Minitel] {data}", log_widget)
|
|||
|
|
except:
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
# Minitel → WebSocket
|
|||
|
|
async def m2w():
|
|||
|
|
while running:
|
|||
|
|
if ser.in_waiting > 0:
|
|||
|
|
data = ser.read(ser.in_waiting)
|
|||
|
|
await ws.send(data.decode("latin1", "replace"))
|
|||
|
|
log(f"[Minitel→WS] {data}", log_widget)
|
|||
|
|
else:
|
|||
|
|
await asyncio.sleep(0.05)
|
|||
|
|
|
|||
|
|
await asyncio.gather(w2m(), m2w())
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
log(f"Erreur : {e}", log_widget)
|
|||
|
|
status_label.config(text="Erreur")
|
|||
|
|
|
|||
|
|
finally:
|
|||
|
|
running = False
|
|||
|
|
log("Déconnexion…", log_widget)
|
|||
|
|
try:
|
|||
|
|
if ws:
|
|||
|
|
await ws.close()
|
|||
|
|
except:
|
|||
|
|
pass
|
|||
|
|
try:
|
|||
|
|
if ser:
|
|||
|
|
ser.close()
|
|||
|
|
except:
|
|||
|
|
pass
|
|||
|
|
status_label.config(text="Déconnecté")
|
|||
|
|
log("Connexions fermées.", log_widget)
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ─────────────────────────────────────────────
|
|||
|
|
# THREAD RUNNER
|
|||
|
|
# ─────────────────────────────────────────────
|
|||
|
|
def start_async(url, tty, speed, parity, databits, stopbits, status_label, log_widget):
|
|||
|
|
global running
|
|||
|
|
if running:
|
|||
|
|
return
|
|||
|
|
running = True
|
|||
|
|
|
|||
|
|
loop = asyncio.new_event_loop()
|
|||
|
|
threading.Thread(
|
|||
|
|
target=loop.run_until_complete,
|
|||
|
|
args=(websocket_task(url, tty, speed, parity, databits, stopbits, status_label, log_widget),),
|
|||
|
|
daemon=True
|
|||
|
|
).start()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def stop_connection(status_label, log_widget):
|
|||
|
|
global running
|
|||
|
|
running = False
|
|||
|
|
status_label.config(text="Déconnecté")
|
|||
|
|
log("Arrêt demandé.", log_widget)
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ─────────────────────────────────────────────
|
|||
|
|
# Port detection refresh
|
|||
|
|
# ─────────────────────────────────────────────
|
|||
|
|
def update_ports(combo):
|
|||
|
|
current_ports = set(combo["values"])
|
|||
|
|
detected = {p.device for p in serial.tools.list_ports.comports()}
|
|||
|
|
|
|||
|
|
if detected != current_ports:
|
|||
|
|
combo["values"] = list(detected)
|
|||
|
|
if detected:
|
|||
|
|
combo.set(list(detected)[0])
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ─────────────────────────────────────────────
|
|||
|
|
# GUI
|
|||
|
|
# ─────────────────────────────────────────────
|
|||
|
|
def build_gui():
|
|||
|
|
root = tk.Tk()
|
|||
|
|
root.title("WebSocket ↔ Minitel")
|
|||
|
|
|
|||
|
|
# LISTE DE SERVEURS
|
|||
|
|
SERVERS = {
|
|||
|
|
"MiniPAVI (officiel)": "wss://go.minipavi.fr:8181",
|
|||
|
|
"Hacker": "ws://mntl.joher.com:2018",
|
|||
|
|
"Annuaire": "ws://3611.re/ws",
|
|||
|
|
"3615": "ws://3615co.de/ws",
|
|||
|
|
"Retrocampus": "wss://bbs.retrocampus.com:8051",
|
|||
|
|
"LABBEJ27": "wss://minitel.labbej.fr:8182",
|
|||
|
|
"Saisie manuelle…": ""
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
tk.Label(root, text="Serveur prédéfini").grid(row=0, column=0)
|
|||
|
|
server_combo = ttk.Combobox(root, values=list(SERVERS.keys()), width=40)
|
|||
|
|
server_combo.set("MiniPAVI (officiel)")
|
|||
|
|
server_combo.grid(row=0, column=1)
|
|||
|
|
|
|||
|
|
# Champ URL modifiable
|
|||
|
|
tk.Label(root, text="Adresse WebSocket").grid(row=1, column=0)
|
|||
|
|
url_entry = tk.Entry(root, width=40)
|
|||
|
|
url_entry.insert(0, SERVERS["MiniPAVI (officiel)"])
|
|||
|
|
url_entry.grid(row=1, column=1)
|
|||
|
|
|
|||
|
|
def on_server_change(event):
|
|||
|
|
url = SERVERS.get(server_combo.get(), "")
|
|||
|
|
url_entry.delete(0, tk.END)
|
|||
|
|
url_entry.insert(0, url)
|
|||
|
|
|
|||
|
|
server_combo.bind("<<ComboboxSelected>>", on_server_change)
|
|||
|
|
|
|||
|
|
# PORT SERIE
|
|||
|
|
tk.Label(root, text="Port série").grid(row=2, column=0)
|
|||
|
|
ports = [p.device for p in serial.tools.list_ports.comports()]
|
|||
|
|
port_combo = ttk.Combobox(root, values=ports, width=20)
|
|||
|
|
if ports:
|
|||
|
|
port_combo.set(ports[0])
|
|||
|
|
port_combo.grid(row=2, column=1)
|
|||
|
|
|
|||
|
|
# VITESSE
|
|||
|
|
tk.Label(root, text="Vitesse").grid(row=3, column=0)
|
|||
|
|
speeds = ["1200", "4800", "9600", "19200"]
|
|||
|
|
speed_combo = ttk.Combobox(root, values=speeds, width=20)
|
|||
|
|
speed_combo.set("1200")
|
|||
|
|
speed_combo.grid(row=3, column=1)
|
|||
|
|
|
|||
|
|
# PARITÉ
|
|||
|
|
tk.Label(root, text="Parité").grid(row=4, column=0)
|
|||
|
|
parity_map = {
|
|||
|
|
"Even (pair)": serial.PARITY_EVEN,
|
|||
|
|
"Odd (impair)": serial.PARITY_ODD,
|
|||
|
|
"None": serial.PARITY_NONE,
|
|||
|
|
"Mark": serial.PARITY_MARK,
|
|||
|
|
"Space": serial.PARITY_SPACE
|
|||
|
|
}
|
|||
|
|
parity_combo = ttk.Combobox(root, values=list(parity_map.keys()), width=20)
|
|||
|
|
parity_combo.set("Even (pair)")
|
|||
|
|
parity_combo.grid(row=4, column=1)
|
|||
|
|
|
|||
|
|
# DATABITS
|
|||
|
|
tk.Label(root, text="Bits de données").grid(row=5, column=0)
|
|||
|
|
databits_combo = ttk.Combobox(root, values=["7", "8"], width=20)
|
|||
|
|
databits_combo.set("7")
|
|||
|
|
databits_combo.grid(row=5, column=1)
|
|||
|
|
|
|||
|
|
# STOPBITS
|
|||
|
|
tk.Label(root, text="Bits de stop").grid(row=6, column=0)
|
|||
|
|
stopbits_combo = ttk.Combobox(root, values=["1", "1.5", "2"], width=20)
|
|||
|
|
stopbits_combo.set("1")
|
|||
|
|
stopbits_combo.grid(row=6, column=1)
|
|||
|
|
|
|||
|
|
# LOG
|
|||
|
|
log_widget = scrolledtext.ScrolledText(root, width=60, height=15, state="disabled")
|
|||
|
|
log_widget.grid(row=8, column=0, columnspan=2, padx=5, pady=5)
|
|||
|
|
|
|||
|
|
# STATUT
|
|||
|
|
status_label = tk.Label(root, text="En attente…")
|
|||
|
|
status_label.grid(row=9, column=0, columnspan=2)
|
|||
|
|
|
|||
|
|
# BOUTONS
|
|||
|
|
tk.Button(
|
|||
|
|
root,
|
|||
|
|
text="Connecter",
|
|||
|
|
command=lambda: start_async(
|
|||
|
|
url_entry.get(),
|
|||
|
|
port_combo.get(),
|
|||
|
|
speed_combo.get(),
|
|||
|
|
parity_map[parity_combo.get()],
|
|||
|
|
databits_combo.get(),
|
|||
|
|
stopbits_combo.get(),
|
|||
|
|
status_label,
|
|||
|
|
log_widget
|
|||
|
|
)
|
|||
|
|
).grid(row=7, column=0)
|
|||
|
|
|
|||
|
|
tk.Button(
|
|||
|
|
root,
|
|||
|
|
text="Déconnecter",
|
|||
|
|
command=lambda: stop_connection(status_label, log_widget)
|
|||
|
|
).grid(row=7, column=1)
|
|||
|
|
|
|||
|
|
# REFRESH PORTS
|
|||
|
|
def refresh_ports():
|
|||
|
|
update_ports(port_combo)
|
|||
|
|
root.after(1000, refresh_ports)
|
|||
|
|
|
|||
|
|
refresh_ports()
|
|||
|
|
|
|||
|
|
root.mainloop()
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
build_gui()
|