From f1c2495c2242cf7ae150bf2864d12426946dffbf Mon Sep 17 00:00:00 2001 From: "DESKTOP-064TTA1\\Fai LUK" Date: Wed, 1 Apr 2026 22:28:25 +0800 Subject: [PATCH] no message --- python/Bag3.py | 261 ++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 225 insertions(+), 36 deletions(-) diff --git a/python/Bag3.py b/python/Bag3.py index 364fc36..ac9c2ce 100644 --- a/python/Bag3.py +++ b/python/Bag3.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 """ -Bag3 v3.1 – FPSMS job orders by plan date (this file is the maintained version). +Bag3 v3.2 – FPSMS job orders by plan date (this file is the maintained version). Uses the public API GET /py/job-orders (no login required). UI tuned for aged users: larger font, Traditional Chinese labels, prev/next date. @@ -21,7 +21,7 @@ import time import tkinter as tk from datetime import date, datetime, timedelta from tkinter import messagebox, ttk -from typing import Callable, Optional +from typing import Callable, Optional, Tuple import requests @@ -58,7 +58,7 @@ except ImportError: qrcode = None # type: ignore[assignment] _HAS_PIL_QR = False -APP_VERSION = "3.1" +APP_VERSION = "3.2" DEFAULT_BASE_URL = os.environ.get("FPSMS_BASE_URL", "http://localhost:8090/api") # When run as PyInstaller exe, save settings next to the exe; otherwise next to script @@ -213,11 +213,15 @@ def generate_zpl_dataflex( lot_no: Optional[str] = None, font_regular: str = "E:STXihei.ttf", font_bold: str = "E:STXihei.ttf", + reset_counter_first: bool = False, ) -> str: """ Row 1 (from zero): QR code, then item name (rotated 90°). Row 2: Batch/lot (left), item code (right). Label and QR use lotNo from API when present, else batch_no (Bxxxxx). + + When reset_counter_first is True, Zebra counter reset commands (~RO1, ~RO2) are inserted after ^XA + in the *same* label job. Many printers ignore a separate reset-only TCP job. """ desc = _zpl_escape((item_name or "—").strip()) code = _zpl_escape((item_code or "—").strip()) @@ -229,8 +233,9 @@ def generate_zpl_dataflex( else: qr_payload = label_line if label_line else batch_no.strip() qr_value = _zpl_escape(qr_payload) + counter_lines = _zpl_dataflex_counter_reset_lines() if reset_counter_first else "" return f"""^XA -^CI28 +{counter_lines}^CI28 ^PW700 ^LL500 ^PO N @@ -245,6 +250,15 @@ def generate_zpl_dataflex( ^XZ""" +def _zpl_dataflex_counter_reset_lines() -> str: + """ + Zebra ZPL counter reset (tilde commands), embedded after ^XA on the first label of each job: + ~RO1 = reset counter 1, ~RO2 = reset counter 2. + See Zebra ZPL / printer manual; add ~RO3 etc. if your model exposes more counters. + """ + return "~RO1\n~RO2\n" + + def generate_zpl_label_small( batch_no: str, item_code: str, @@ -713,7 +727,7 @@ def send_job_to_laser( if item_id is not None and stock_in_line_id is not None: # Use compact JSON so device-side parser doesn't get spaces. json_part = json.dumps( - {"itemID": item_id, "stockInLineId": stock_in_line_id}, + {"itemId": item_id, "stockInLineId": stock_in_line_id}, separators=(",", ":"), ) reply = f"{json_part};{code_str};{name_str};;" @@ -1011,18 +1025,18 @@ def ask_label_count(parent: tk.Tk) -> Optional[int]: win.wait_window() return result[0] -def ask_bag_count(parent: tk.Tk) -> Optional[int]: +def ask_bag_count(parent: tk.Tk) -> Optional[Tuple[int, bool]]: """ - When printer is 打袋機 DataFlex, ask how many bags: - optional direct qty in text field, +50/+10/+5/+1, 重置, then 確認送出. - Returns count (>= 1), or None if cancelled. + When printer is 打袋機 DataFlex: qty with +按鈕 then 確認送出, or big bottom「C」for continuous. + Returns (count, continuous_print). If continuous_print is True, count is ignored (use 0). + None if cancelled. """ - result: list[Optional[int]] = [None] + result: list[Optional[Tuple[int, bool]]] = [None] qty_var = tk.StringVar(value="0") win = tk.Toplevel(parent) win.title("打袋列印數量") - win.geometry("580x280") + win.geometry("580x420") win.transient(parent) win.grab_set() win.configure(bg=BG_TOP) @@ -1063,7 +1077,12 @@ def ask_bag_count(parent: tk.Tk) -> Optional[int]: if q < 1: messagebox.showwarning("打袋機", "請輸入需求數量或按 +50、+10、+5、+1。", parent=win) return - result[0] = q + result[0] = (q, False) + win.destroy() + + def start_continuous() -> None: + """Big C: continuous print until 停止; counter reset at job start.""" + result[0] = (0, True) win.destroy() btn_row1 = tk.Frame(win, bg=BG_TOP) @@ -1075,11 +1094,90 @@ def ask_bag_count(parent: tk.Tk) -> Optional[int]: ttk.Button(win, text="確認送出", command=confirm, width=14).pack(pady=12) qty_entry.bind("", lambda e: confirm()) + + sep = ttk.Separator(win, orient=tk.HORIZONTAL) + sep.pack(fill=tk.X, padx=16, pady=(4, 8)) + + bottom = tk.Frame(win, bg=BG_TOP) + bottom.pack(fill=tk.X, padx=12, pady=(0, 12)) + tk.Label( + bottom, + text="連續出袋 · 每單開始重設計數 · 另開視窗按「停止列印」結束", + font=get_font(FONT_SIZE_META), + bg=BG_TOP, + fg="#333333", + wraplength=540, + justify=tk.CENTER, + ).pack(fill=tk.X, pady=(0, 6)) + + tk.Button( + bottom, + text="C(連續印)", + command=start_continuous, + font=(FONT_FAMILY, 38, "bold"), + bg="#2E7D32", + fg="white", + activebackground="#1B5E20", + activeforeground="white", + relief=tk.RAISED, + bd=4, + cursor="hand2", + padx=24, + pady=18, + ).pack(fill=tk.X) win.protocol("WM_DELETE_WINDOW", win.destroy) win.wait_window() return result[0] +def _sleep_interruptible(stop_event: threading.Event, total_sec: float) -> None: + """Sleep up to total_sec but return early if stop_event is set.""" + end = time.perf_counter() + total_sec + while time.perf_counter() < end: + if stop_event.is_set(): + return + remaining = end - time.perf_counter() + if remaining <= 0: + break + time.sleep(min(0.05, remaining)) + + +def open_dataflex_stop_window(parent: tk.Tk, stop_event: threading.Event) -> tk.Toplevel: + """Small window with 停止列印 for DataFlex continuous mode (non-modal so stop stays usable).""" + win = tk.Toplevel(parent) + win.title("打袋機連續列印") + win.geometry("420x170") + win.transient(parent) + win.configure(bg=BG_TOP) + tk.Label( + win, + text="連續列印進行中,可隨時按下方停止。", + font=get_font(FONT_SIZE), + bg=BG_TOP, + wraplength=400, + ).pack(pady=(16, 8)) + + def stop() -> None: + stop_event.set() + try: + win.destroy() + except tk.TclError: + pass + + tk.Button( + win, + text="停止列印", + command=stop, + font=get_font(FONT_SIZE_BUTTONS), + bg=BG_STATUS_ERROR, + fg=FG_STATUS_ERROR, + padx=20, + pady=10, + ).pack(pady=12) + win.protocol("WM_DELETE_WINDOW", stop) + return win + + def main() -> None: settings = load_settings() base_url_ref = [build_base_url(settings["api_ip"], settings["api_port"])] @@ -1515,8 +1613,9 @@ def main() -> None: if not ip: messagebox.showerror("打袋機", "請在設定中填寫打袋機 DataFlex 的 IP。") else: - count = ask_bag_count(root) - if count is not None: + bag_ans = ask_bag_count(root) + if bag_ans is not None: + n, continuous = bag_ans item_code = j.get("itemCode") or "—" item_name = j.get("itemName") or "—" item_id = j.get("itemId") @@ -1529,33 +1628,123 @@ def main() -> None: item_id=item_id, stock_in_line_id=stock_in_line_id, lot_no=lot_no, + reset_counter_first=False, + ) + zpl_first_job = generate_zpl_dataflex( + b, + item_code, + item_name, + item_id=item_id, + stock_in_line_id=stock_in_line_id, + lot_no=lot_no, + reset_counter_first=True, ) label_text = (lot_no or b).strip() - n = count - try: - for i in range(n): - send_zpl_to_dataflex(ip, port, zpl) - if i < n - 1: - time.sleep(2) - set_status_message(f"已送出列印:批次 {label_text} x {n} 張", is_error=False) - jo_id = j.get("id") - if jo_id is not None: + if continuous: + stop_ev = threading.Event() + stop_win = open_dataflex_stop_window(root, stop_ev) + + def dflex_worker() -> None: + printed = 0 + error_shown = False try: - submit_job_order_print_submit( - base_url_ref[0], int(jo_id), n, "DATAFLEX" + while not stop_ev.is_set(): + payload = zpl_first_job if printed == 0 else zpl + send_zpl_to_dataflex(ip, port, payload) + printed += 1 + _sleep_interruptible(stop_ev, 2.0) + except ConnectionRefusedError: + error_shown = True + root.after( + 0, + lambda: set_status_message( + f"無法連線至 {ip}:{port},請確認印表機已開機且 IP 正確。", + is_error=True, + ), ) - load_job_orders(from_user_date_change=False) - except requests.RequestException as ex: - messagebox.showwarning( - "打袋機", - f"已送出 {n} 張,但伺服器記錄失敗:{ex}", + except socket.timeout: + error_shown = True + root.after( + 0, + lambda: set_status_message( + f"連線逾時 ({ip}:{port}),請檢查網路與連接埠。", + is_error=True, + ), + ) + except OSError as err: + error_shown = True + root.after( + 0, + lambda e=err: set_status_message( + f"列印失敗:{e}", + is_error=True, + ), + ) + finally: + + def _done() -> None: + try: + stop_win.destroy() + except tk.TclError: + pass + if printed > 0: + set_status_message( + f"連續列印結束:批次 {label_text},已印 {printed} 張", + is_error=False, + ) + jo_id = j.get("id") + if jo_id is not None: + try: + submit_job_order_print_submit( + base_url_ref[0], + int(jo_id), + printed, + "DATAFLEX", + ) + load_job_orders(from_user_date_change=False) + except requests.RequestException as ex: + messagebox.showwarning( + "打袋機", + f"已印 {printed} 張,但伺服器記錄失敗:{ex}", + ) + elif not error_shown: + set_status_message( + "連續列印未印出或已取消", + is_error=True, + ) + + root.after(0, _done) + + threading.Thread(target=dflex_worker, daemon=True).start() + else: + try: + for i in range(n): + send_zpl_to_dataflex( + ip, + port, + zpl_first_job if i == 0 else zpl, ) - except ConnectionRefusedError: - set_status_message(f"無法連線至 {ip}:{port},請確認印表機已開機且 IP 正確。", is_error=True) - except socket.timeout: - set_status_message(f"連線逾時 ({ip}:{port}),請檢查網路與連接埠。", is_error=True) - except OSError as err: - set_status_message(f"列印失敗:{err}", is_error=True) + if i < n - 1: + time.sleep(2) + set_status_message(f"已送出列印:批次 {label_text} x {n} 張", is_error=False) + jo_id = j.get("id") + if jo_id is not None: + try: + submit_job_order_print_submit( + base_url_ref[0], int(jo_id), n, "DATAFLEX" + ) + load_job_orders(from_user_date_change=False) + except requests.RequestException as ex: + messagebox.showwarning( + "打袋機", + f"已送出 {n} 張,但伺服器記錄失敗:{ex}", + ) + except ConnectionRefusedError: + set_status_message(f"無法連線至 {ip}:{port},請確認印表機已開機且 IP 正確。", is_error=True) + except socket.timeout: + set_status_message(f"連線逾時 ({ip}:{port}),請檢查網路與連接埠。", is_error=True) + except OSError as err: + set_status_message(f"列印失敗:{err}", is_error=True) elif printer_var.get() == "標簽機": com = (settings.get("label_com") or "").strip() if not com: