Files
minitel-slideshow-tool/minitel_slideshow_tool.py
T
2025-12-13 00:28:40 +01:00

530 lines
23 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Minitel Slideshow Tool
- GUI Tkinter pour convertir des images en .vdt (encodeur intégré)
- Serveur WebSocket local (start/stop/restartables)
- Client WebSocket ↔ port série (keepalive)
- Choix de serveurs prédéfinis + saisie manuelle
- Temps d'affichage paramétrable
- Aperçu (hex dump) optionnel via log (preview binaire)
"""
from pathlib import Path
from io import BytesIO
from typing import Iterable, Generator, Any, Optional
import warnings, time, threading, asyncio, ssl
import tkinter as tk
from tkinter import ttk, scrolledtext, filedialog as fd
from PIL import Image
import serial, serial.tools.list_ports
import websockets
# SSL workaround
ssl_context = ssl._create_unverified_context()
# ------------------------
# GLOBALS / ETAT
# ------------------------
ser = None
ws = None
running = False
# Slideshow server state
slideshow_server = None # websockets.server.Serve object
slideshow_server_running = False
server_loop: Optional[asyncio.AbstractEventLoop] = None
server_thread: Optional[threading.Thread] = None
# ------------------------
# SERVERS
# ------------------------
SERVERS = {
"Localhost (Photos)": "ws://localhost:8765",
"MiniPAVI": "wss://go.minipavi.fr:8181",
"Hacker": "ws://mntl.joher.com:2018",
"Annuaire": "ws://3611.re/ws",
"3615": "ws://3615co.de/ws",
"Retrocampus": "wss://bbs.retrocampus.com:8051",
"LABBEJ27": "wss://minitel.labbej.fr:8182",
"Saisie manuelle…": ""
}
# ------------------------
# UTIL: LOG
# ------------------------
def log(text: str, log_widget: scrolledtext.ScrolledText):
ts = time.strftime("%H:%M:%S")
log_widget.configure(state="normal")
log_widget.insert(tk.END, f"[{ts}] {text}\n")
log_widget.see(tk.END)
log_widget.configure(state="disabled")
# ------------------------
# ENCODE (essentiel, allégé mais compatible pic2jpeg2vdt)
# ------------------------
def bytescat(*data: 'Iterable[bytes | int] | int') -> bytes:
r = bytearray()
for e in data:
if isinstance(e, (bytes, bytearray)):
r.extend(e)
elif isinstance(e, int):
r.append(e)
else:
r.extend(bytescat(*e))
return bytes(r)
SCREEN_WIDTH = 320
SCREEN_HEIGHT = 240
# Attributes (short names)
PSA = bytescat(0x20); PDA = bytescat(0x21); SPA = bytescat(0x22); SSA = bytescat(0x23); SCA = bytescat(0x24); TCA = bytescat(0x25)
RTD = bytescat(PSA, 0x30)
PDA_LOC = bytescat(PDA, 0x32); PDA_PAS = bytescat(PDA, 0x33); PDA_PPL = bytescat(PDA, 0x34); PDA_CPA = bytescat(PDA, 0x35)
SCA_JPG = bytescat(SCA, 0x30); TME = bytescat(TCA, 0x30)
PM = 0x23; PI = 0x40
QTABLE_0 = [16,11,10,16,24,40,51,61,12,12,14,19,26,58,60,55,14,13,16,24,40,57,69,56,14,17,22,29,51,87,80,62,18,22,37,56,68,109,103,77,24,35,55,64,81,104,113,92,49,64,78,87,103,121,120,101,72,92,95,98,112,100,103,99]
QTABLE_1 = [17,18,24,47,99,99,99,99,18,21,26,66,99,99,99,99,24,26,56,99,99,99,99,99,47,66,99,99,99,99,99,99,99,99,99,99,99,99,99,99,99,99,99,99,99,99,99,99,99,99,99,99,99,99,99,99,99,99,99,99,99,99,99,99,99,99,99]
def iter_jpeg_sections(data: bytes) -> 'Generator[tuple[int,int], None, None]':
pos = 0
if data[pos:pos+2] != b'\xFF\xD8': raise ValueError("Invalid JPEG")
pos += 2
while data[pos] == 0xFF and data[pos+1] != 0xDA:
length = (data[pos+2] << 8) | data[pos+3]
yield (pos, pos+length+2)
pos += length + 2
if data[pos:pos+2] != b'\xFF\xDA' or data[-2:] != b'\xFF\xD9':
raise ValueError("Invalid JPEG")
yield (pos, len(data))
def trim_jpeg(data: bytes) -> bytes:
res = bytearray(b'\xFF\xD8')
for b,e in iter_jpeg_sections(data):
if data[b:b+2] in (b'\xFF\xDA', b'\xFF\xC0', b'\xFF\xDB'):
res.extend(data[b:e])
return bytes(res)
def has_qtables(data: bytes) -> bool:
return any(data[b:b+2] == b'\xFF\xDB' for b,e in iter_jpeg_sections(data))
def encode_length(value: int) -> bytes:
out = bytearray()
first = True
while value != 0:
tmp = value & 31
tmp |= (1 << 6)
if not first:
tmp |= (1 << 5)
else:
first = False
value >>= 5
out.append(tmp)
out.append(0xFF)
out.reverse()
return bytes(out)
def encode_integer(number: int, raw: bool = False) -> bytes:
# minimal integer encoding used by header
nbits = 7
while not -(1 << (nbits-1)) <= number < (1 << (nbits-1)):
nbits += 7
if number < 0:
number += (1 << nbits)
data = bytearray()
if not raw:
data.append(0x40)
# length in bytes (nbits//7)
data.extend(bytes([nbits//7]))
for i in reversed(range(0, nbits, 7)):
data.append((number >> i) & 0x7F)
return bytes(data)
def encode_normalized(value: float) -> bytes:
nbytes = 4
if not (-1 <= value <= 1): raise ValueError
v = round(value * (1 << ((7*nbytes)-2)))
if v < 0:
v = v + (1 << (7*nbytes)) - (1 << ((7*nbytes)-2))
return bytescat(0x42, nbytes, [(v >> (7*i)) & 0x7F for i in reversed(range(nbytes))])
def encode_boolean(v: bool) -> bytes:
return bytescat(0x45, 0x01, 0x01 if v else 0x00)
def resize_image(im: Image.Image) -> Image.Image:
ratio = min(SCREEN_WIDTH / im.width, SCREEN_HEIGHT / im.height, 1)
w = round(im.width * ratio); w = ((w + 4)//8)*8
h = round(im.height * ratio); h = ((h + 5)//10)*10
if im.width == w and im.height == h: return im
return im.resize((w,h), Image.LANCZOS)
def encode_header(x:int,y:int,width:int,height:int, *, clear:bool=True, translation:bool=False, reset:bool=True, quantization:bool=False) -> bytes:
h = bytearray()
if reset: h.extend(bytescat(RTD, encode_boolean(True)))
h.extend(bytescat(PDA_LOC, encode_normalized(x/SCREEN_WIDTH), encode_normalized((SCREEN_HEIGHT-height-y)/SCREEN_HEIGHT*0.75)))
h.extend(bytescat(PDA_PAS, encode_normalized(width/SCREEN_WIDTH), encode_normalized(height/SCREEN_HEIGHT*0.75)))
h.extend(bytescat(PDA_PPL, encode_integer(0), encode_integer(0), encode_normalized(0), encode_normalized(height/SCREEN_HEIGHT*0.75)))
if clear: h.extend(bytescat(PDA_CPA, encode_boolean(clear)))
if quantization: h.extend(bytescat(ETM:=bytescat(SCA,0x31), bytescat(0x44,0x01,1), encode_integer(0x7F), bytescat(0x44,0x01,0x03)))
if translation: h.extend(bytescat(TME, bytescat(0x44,0x01,0x02)))
return bytes(h)
def export_image(im: Image.Image, *, subsampling: str="4:2:2", quality: Optional[int]=None) -> bytes:
if im.mode not in ("YCbCr","RGB","L"): im = im.convert("YCbCr")
kwargs = {"subsampling":subsampling,"optimize":False,"progressive":False,"keep_rgb":False,"restart_marker_blocks":False,"restart_marker_rows":False}
if quality is None:
kwargs["qtables"]=[QTABLE_0,QTABLE_1]; kwargs["streamtype"]=2
else:
kwargs["quality"]=quality; kwargs["streamtype"]=0
buf = BytesIO(); im.save(buf,"JPEG",**kwargs)
return trim_jpeg(buf.getvalue())
def split_chunks(data: bytes, chunk_size: int) -> 'Generator[tuple[bytes,bool], None, None]':
if chunk_size <= 0 or not data:
yield (data, True); return
for pos in range(0, len(data), chunk_size):
chunk = data[pos:pos+chunk_size]; final = (pos+len(chunk) >= len(data))
yield (chunk, final)
def translate_data(data: bytes) -> bytes:
res = bytearray(); pos = 0
while pos < len(data):
chunk = data[pos:pos+3]; b0 = (1<<6)
for idx, bt in enumerate(chunk): b0 |= ((bt>>6) << (4 - (idx<<1)))
res.append(b0); res.extend((1<<6)|(bt&0x3F) for bt in chunk); pos+=3
return bytes(res)
def encode_image(header: bytes, image: bytes, chunk_size: int=0x100, *, translation: bool=False) -> list[bytes]:
out = []
for chunk, final in split_chunks(header, chunk_size-1):
code = 0x51 if final else 0x50
out.append(bytescat(0x1B,0x70,PM,PI,encode_length(len(chunk)+1), code, chunk))
if translation:
chunk_size = (((chunk_size-1)*3)//4) + 1
for chunk, final in split_chunks(image, chunk_size):
if translation: chunk = translate_data(chunk)
code = 0x53 if final else 0x52
out.append(bytescat(0x1B,0x70,PM,PI,encode_length(len(chunk)+1), code, chunk))
return out
# ------------------------
# Conversion (thread-safe caller)
# ------------------------
def convert_images(input_folder: str, output_folder: str, log_widget: scrolledtext.ScrolledText, preview: bool=False):
inp = Path(input_folder or ""); out = Path(output_folder or "")
if not inp.exists() or not inp.is_dir():
log("Dossier d'entrée introuvable.", log_widget); return
out.mkdir(parents=True, exist_ok=True)
imgs = sorted([p for p in inp.iterdir() if p.suffix.lower() in (".jpg",".jpeg",".png",".bmp",".webp",".tif",".tiff")])
if not imgs:
log("Aucune image trouvée.", log_widget); return
log(f"Conversion {len(imgs)} images…", log_widget)
for p in imgs:
try:
im = Image.open(p)
im = resize_image(im)
jpeg = export_image(im, quality=78)
header = encode_header(0,0,im.width,im.height, clear=True, translation=False, reset=True, quantization=has_qtables(jpeg))
chunks = encode_image(header, jpeg, chunk_size=0x100, translation=False)
outp = out / (p.stem + ".vdt")
with open(outp, "wb") as f:
for c in chunks: f.write(c)
log(f"Converti: {p.name} -> {outp.name}", log_widget)
if preview:
# small hex summary in log (first 64 bytes)
sample = (outp.read_bytes()[:64]).hex()
log(f"Preview hex (first 64 bytes): {sample}", log_widget)
except Exception as e:
log(f"Erreur conversion {p.name}: {e}", log_widget)
log("Conversion terminée.", log_widget)
# ------------------------
# Slideshow server (architecture STANDARD - thread per server loop)
# ------------------------
async def slideshow_handler(ws_conn, path, vdt_files, delay):
INIT_SEQUENCE = b"\x1B@" + b"\x1B:" + b"\x1B[25l"
CLEAR = b"\x1B[2J\x1B[H" # Clear + home
await ws_conn.send(INIT_SEQUENCE)
await asyncio.sleep(0.1)
try:
while True:
for v in vdt_files:
await ws_conn.send(CLEAR)
await asyncio.sleep(0.02)
data = v.read_bytes()
await ws_conn.send(data)
await asyncio.sleep(delay)
except websockets.exceptions.ConnectionClosed:
return
async def start_slideshow_server(host: str, port: int, vdt_folder: str, delay: float, log_widget: scrolledtext.ScrolledText):
global slideshow_server, slideshow_server_running
slideshow_server_running = True
vdt_files = sorted(Path(vdt_folder).glob("*.vdt"))
if not vdt_files:
log("Aucun .vdt dans dossier.", log_widget); return
slideshow_server = await websockets.serve(lambda ws, path=None: slideshow_handler(ws, path, vdt_files, delay), host, port)
log(f"Serveur slideshow en écoute sur {host}:{port}", log_widget)
try:
while slideshow_server_running:
await asyncio.sleep(0.1)
finally:
slideshow_server.close()
await slideshow_server.wait_closed()
log("Serveur slideshow fermé (loop interne).", log_widget)
def launch_slideshow_server(host: str, port: int, vdt_folder: str, delay: float, log_widget: scrolledtext.ScrolledText):
global server_loop, server_thread, slideshow_server_running
if slideshow_server_running:
log("Slideshow server déjà démarré.", log_widget); return
if not Path(vdt_folder).exists():
log("Dossier VDT introuvable.", log_widget); return
log(f"Démarrage slideshow server {host}:{port} (delay={delay}s)…", log_widget)
server_loop = asyncio.new_event_loop()
def _runner():
try:
server_loop.run_until_complete(start_slideshow_server(host, port, vdt_folder, delay, log_widget))
except Exception as e:
# loop stopped or error
log(f"Serveur loop terminé: {e}", log_widget)
finally:
# ensure loop closed
try: server_loop.run_until_complete(server_loop.shutdown_asyncgens())
except: pass
server_loop.close()
server_thread = threading.Thread(target=_runner, daemon=True)
server_thread.start()
def stop_slideshow_server(log_widget: scrolledtext.ScrolledText):
global slideshow_server_running, server_loop, server_thread, slideshow_server
if not slideshow_server_running:
log("Aucun slideshow server en cours.", log_widget); return
log("Arrêt du slideshow server demandé…", log_widget)
slideshow_server_running = False
if server_loop and slideshow_server:
# close server safely in its loop
async def _close_and_stop():
try:
slideshow_server.close()
await slideshow_server.wait_closed()
except Exception:
pass
finally:
server_loop.stop()
try:
asyncio.run_coroutine_threadsafe(_close_and_stop(), server_loop)
except Exception:
pass
# wait a short time for thread to finish
if server_thread:
server_thread.join(timeout=2)
slideshow_server = None
server_loop = None
log("Slideshow server arrêté (commande émise).", log_widget)
# ------------------------
# send_vdt helper
# ------------------------
async def send_vdt(ws_conn, data: bytes):
lines = data.split(b"\n")
chunk = bytearray(); count = 0
for line in lines:
chunk.extend(line + b"\n"); count += 1
if count >= 10:
await ws_conn.send(chunk); await asyncio.sleep(0.06); chunk.clear(); count = 0
if chunk:
await ws_conn.send(chunk)
# ------------------------
# Client WebSocket <-> Serial (thread + loop)
# ------------------------
async def websocket_task(url, tty, speed, parity, databits, stopbits, status_label, log_widget):
global ser, ws, running
STOPBITS = {"1": serial.STOPBITS_ONE, "1.5": serial.STOPBITS_ONE_POINT_FIVE, "2": serial.STOPBITS_TWO}
try:
log(f"Ouverture port série {tty} @ {speed}", log_widget)
ser = serial.Serial(tty, int(speed), parity=parity, bytesize=int(databits), stopbits=STOPBITS[stopbits], timeout=1)
except Exception as e:
log(f"Erreur ouverture série: {e}", log_widget)
status_label.config(text="Erreur port série")
return
try:
log(f"Connexion WS -> {url}", log_widget)
if url.startswith("wss://"):
ws = await websockets.connect(url, ssl=ssl_context)
else:
ws = await websockets.connect(url)
status_label.config(text="Connecté")
log("WS connecté.", log_widget)
# small greeting
try:
ser.write(b"\x07\x0c\x1f\x40\x41connexion\x0a")
ser.write(b"\x1b\x3b\x60\x58\x52")
except Exception:
pass
async def w2m():
while running:
try:
data = await ws.recv()
if isinstance(data, bytes):
ser.write(data)
log(f"[WS→Minitel] {len(data)} bytes", log_widget)
else:
ser.write(data.encode("latin1","replace"))
log(f"[WS→Minitel] text", log_widget)
except Exception:
break
async def m2w():
while running:
try:
if ser.in_waiting > 0:
d = ser.read(ser.in_waiting)
await ws.send(d.decode("latin1","replace"))
log(f"[Minitel→WS] {len(d)} bytes", log_widget)
else:
await asyncio.sleep(0.05)
except Exception:
break
async def keepalive():
while running:
try:
await ws.ping()
except Exception:
break
await asyncio.sleep(20)
await asyncio.gather(w2m(), m2w(), keepalive())
except Exception as e:
log(f"Erreur WS/IO: {e}", log_widget)
status_label.config(text="Erreur")
finally:
running = False
log("Fermeture client WS/Serial…", log_widget)
try:
if ws:
await ws.close()
except: pass
try:
if ser:
ser.close()
except: pass
status_label.config(text="Déconnecté")
log("Client arrêté.", log_widget)
def start_async(url, tty, speed, parity, databits, stopbits, status_label, log_widget):
global running
if running:
log("Déjà connecté.", log_widget); return
running = True
loop = asyncio.new_event_loop()
def _run():
try:
loop.run_until_complete(websocket_task(url, tty, speed, parity, databits, stopbits, status_label, log_widget))
except Exception as e:
log(f"Client loop ended: {e}", log_widget)
finally:
try: loop.run_until_complete(loop.shutdown_asyncgens())
except: pass
loop.close()
threading.Thread(target=_run, daemon=True).start()
def stop_connection(status_label, log_widget):
global running
running = False
status_label.config(text="Déconnecté")
log("Arrêt demandé (client série/ws).", log_widget)
# ------------------------
# UTIL: refresh ports
# ------------------------
def update_ports(combo):
current = set(combo["values"])
detected = {p.device for p in serial.tools.list_ports.comports()}
if detected != current:
combo["values"] = list(detected)
if detected:
combo.set(list(detected)[0])
# ------------------------
# GUI (minimal, single-file)
# ------------------------
def build_gui():
root = tk.Tk(); root.title("Minitel Slideshow Tool — All-in-one (standard)")
padx = 6; pady = 4
tk.Label(root, text="Serveur").grid(row=0, column=0, sticky="w", padx=padx, pady=pady)
server_combo = ttk.Combobox(root, values=list(SERVERS.keys()), width=40); server_combo.set("Localhost (Photos)"); server_combo.grid(row=0,column=1,sticky="w",padx=padx,pady=pady)
tk.Label(root, text="Adresse WebSocket").grid(row=1, column=0, sticky="w", padx=padx, pady=pady)
url_entry = tk.Entry(root, width=44); url_entry.insert(0, SERVERS["Localhost (Photos)"]); url_entry.grid(row=1,column=1,sticky="w",padx=padx,pady=pady)
def update_manual(e=None):
choice = server_combo.get(); url_entry.delete(0,tk.END); url_entry.insert(0, SERVERS.get(choice,"") or "")
server_combo.bind("<<ComboboxSelected>>", update_manual)
tk.Label(root, text="Port série").grid(row=2,column=0,sticky="w",padx=padx,pady=pady)
ports = [p.device for p in serial.tools.list_ports.comports()]
port_combo = ttk.Combobox(root, values=ports, width=20); port_combo.grid(row=2,column=1,sticky="w",padx=padx,pady=pady)
if ports: port_combo.set(ports[0])
tk.Label(root, text="Vitesse").grid(row=3,column=0,sticky="w",padx=padx,pady=pady)
speed_combo = ttk.Combobox(root, values=["1200","4800","9600","19200"], width=20); speed_combo.set("9600"); speed_combo.grid(row=3,column=1,sticky="w",padx=padx,pady=pady)
tk.Label(root, text="Parité").grid(row=4,column=0,sticky="w",padx=padx,pady=pady)
parity_map = {"Even (pair)": serial.PARITY_EVEN, "Odd (impair)": serial.PARITY_ODD, "None": serial.PARITY_NONE, "Mark": serial.PARITY_MARK, "Space": serial.PARITY_SPACE}
parity_combo = ttk.Combobox(root, values=list(parity_map.keys()), width=20); parity_combo.set("None"); parity_combo.grid(row=4,column=1,sticky="w",padx=padx,pady=pady)
tk.Label(root, text="Bits").grid(row=5,column=0,sticky="w",padx=padx,pady=pady)
databits_combo = ttk.Combobox(root, values=["7","8"], width=20); databits_combo.set("8"); databits_combo.grid(row=5,column=1,sticky="w",padx=padx,pady=pady)
tk.Label(root, text="Stopbits").grid(row=6,column=0,sticky="w",padx=padx,pady=pady)
stopbits_combo = ttk.Combobox(root, values=["1","1.5","2"], width=20); stopbits_combo.set("1"); stopbits_combo.grid(row=6,column=1,sticky="w",padx=padx,pady=pady)
status_label = tk.Label(root, text="En attente…"); status_label.grid(row=7,column=0,columnspan=2,sticky="w",padx=padx,pady=pady)
log_widget = scrolledtext.ScrolledText(root, width=80, height=12, state="disabled"); log_widget.grid(row=8,column=0,columnspan=2,padx=padx,pady=pady)
tk.Button(root, text="Connecter", command=lambda: start_async(url_entry.get(), port_combo.get(), speed_combo.get(), parity_map[parity_combo.get()], databits_combo.get(), stopbits_combo.get(), status_label, log_widget)).grid(row=9,column=0,sticky="ew",padx=padx,pady=pady)
tk.Button(root, text="Déconnecter", command=lambda: stop_connection(status_label, log_widget)).grid(row=9,column=1,sticky="ew",padx=padx,pady=pady)
# Slideshow / Conversion
frame = tk.LabelFrame(root, text="Slideshow / Conversion"); frame.grid(row=10,column=0,columnspan=2,sticky="we",padx=padx,pady=pady)
tk.Label(frame, text="Input Images Folder").grid(row=0,column=0,sticky="w",padx=padx,pady=pady)
input_entry = tk.Entry(frame, width=50); input_entry.grid(row=0,column=1,padx=padx,pady=pady)
tk.Button(frame, text="Select", command=lambda: select_folder(input_entry)).grid(row=0,column=2,padx=padx,pady=pady)
tk.Label(frame, text="Output VDT Folder").grid(row=1,column=0,sticky="w",padx=padx,pady=pady)
output_entry = tk.Entry(frame, width=50); output_entry.grid(row=1,column=1,padx=padx,pady=pady)
tk.Button(frame, text="Select", command=lambda: select_folder(output_entry)).grid(row=1,column=2,padx=padx,pady=pady)
tk.Button(frame, text="Convert Images", command=lambda: threading.Thread(target=convert_images, args=(input_entry.get(), output_entry.get(), log_widget, True), daemon=True).start()).grid(row=2,column=0,columnspan=3,sticky="we",padx=padx,pady=pady)
tk.Label(frame, text="Temps par image (s)").grid(row=3,column=0,sticky="w",padx=padx,pady=pady)
delay_entry = tk.Entry(frame, width=10); delay_entry.insert(0,"3"); delay_entry.grid(row=3,column=1,sticky="w",padx=padx,pady=pady)
tk.Button(frame, text="Start Slideshow Server", command=lambda: launch_slideshow_server("0.0.0.0", 8765, output_entry.get(), max(0.1, float(delay_entry.get() or 3.0)), log_widget)).grid(row=4,column=0,columnspan=3,sticky="we",padx=padx,pady=pady)
tk.Button(frame, text="Stop Slideshow Server", command=lambda: stop_slideshow_server(log_widget)).grid(row=5,column=0,columnspan=3,sticky="we",padx=padx,pady=pady)
# refresh ports
def refresh_ports():
update_ports(port_combo); root.after(1000, refresh_ports)
refresh_ports()
root.mainloop()
# ------------------------
# simple helpers
# ------------------------
def select_folder(entry):
d = fd.askdirectory()
if d:
entry.delete(0,tk.END); entry.insert(0,d)
# ------------------------
# ENTRY POINT
# ------------------------
if __name__ == "__main__":
build_gui()