DESKTOP-064TTA1\Fai LUK 6 часов назад
Родитель
Сommit
f1c2495c22
1 измененных файлов: 225 добавлений и 36 удалений
  1. +225
    -36
      python/Bag3.py

+ 225
- 36
python/Bag3.py Просмотреть файл

@@ -1,6 +1,6 @@
#!/usr/bin/env python3
"""
Bag3 v3.1 – FPSMS job orders by plan date (this file is the maintained version).
Bag3 v3.2 – FPSMS job orders by plan date (this file is the maintained version).

Uses the public API GET /py/job-orders (no login required).
UI tuned for aged users: larger font, Traditional Chinese labels, prev/next date.
@@ -21,7 +21,7 @@ import time
import tkinter as tk
from datetime import date, datetime, timedelta
from tkinter import messagebox, ttk
from typing import Callable, Optional
from typing import Callable, Optional, Tuple

import requests

@@ -58,7 +58,7 @@ except ImportError:
qrcode = None # type: ignore[assignment]
_HAS_PIL_QR = False

APP_VERSION = "3.1"
APP_VERSION = "3.2"

DEFAULT_BASE_URL = os.environ.get("FPSMS_BASE_URL", "http://localhost:8090/api")
# When run as PyInstaller exe, save settings next to the exe; otherwise next to script
@@ -213,11 +213,15 @@ def generate_zpl_dataflex(
lot_no: Optional[str] = None,
font_regular: str = "E:STXihei.ttf",
font_bold: str = "E:STXihei.ttf",
reset_counter_first: bool = False,
) -> str:
"""
Row 1 (from zero): QR code, then item name (rotated 90°).
Row 2: Batch/lot (left), item code (right).
Label and QR use lotNo from API when present, else batch_no (Bxxxxx).

When reset_counter_first is True, Zebra counter reset commands (~RO1, ~RO2) are inserted after ^XA
in the *same* label job. Many printers ignore a separate reset-only TCP job.
"""
desc = _zpl_escape((item_name or "—").strip())
code = _zpl_escape((item_code or "—").strip())
@@ -229,8 +233,9 @@ def generate_zpl_dataflex(
else:
qr_payload = label_line if label_line else batch_no.strip()
qr_value = _zpl_escape(qr_payload)
counter_lines = _zpl_dataflex_counter_reset_lines() if reset_counter_first else ""
return f"""^XA
^CI28
{counter_lines}^CI28
^PW700
^LL500
^PO N
@@ -245,6 +250,15 @@ def generate_zpl_dataflex(
^XZ"""


def _zpl_dataflex_counter_reset_lines() -> str:
"""
Zebra ZPL counter reset (tilde commands), embedded after ^XA on the first label of each job:
~RO1 = reset counter 1, ~RO2 = reset counter 2.
See Zebra ZPL / printer manual; add ~RO3 etc. if your model exposes more counters.
"""
return "~RO1\n~RO2\n"


def generate_zpl_label_small(
batch_no: str,
item_code: str,
@@ -713,7 +727,7 @@ def send_job_to_laser(
if item_id is not None and stock_in_line_id is not None:
# Use compact JSON so device-side parser doesn't get spaces.
json_part = json.dumps(
{"itemID": item_id, "stockInLineId": stock_in_line_id},
{"itemId": item_id, "stockInLineId": stock_in_line_id},
separators=(",", ":"),
)
reply = f"{json_part};{code_str};{name_str};;"
@@ -1011,18 +1025,18 @@ def ask_label_count(parent: tk.Tk) -> Optional[int]:
win.wait_window()
return result[0]
def ask_bag_count(parent: tk.Tk) -> Optional[int]:
def ask_bag_count(parent: tk.Tk) -> Optional[Tuple[int, bool]]:
"""
When printer is 打袋機 DataFlex, ask how many bags:
optional direct qty in text field, +50/+10/+5/+1, 重置, then 確認送出.
Returns count (>= 1), or None if cancelled.
When printer is 打袋機 DataFlex: qty with +按鈕 then 確認送出, or big bottom「C」for continuous.
Returns (count, continuous_print). If continuous_print is True, count is ignored (use 0).
None if cancelled.
"""
result: list[Optional[int]] = [None]
result: list[Optional[Tuple[int, bool]]] = [None]
qty_var = tk.StringVar(value="0")

win = tk.Toplevel(parent)
win.title("打袋列印數量")
win.geometry("580x280")
win.geometry("580x420")
win.transient(parent)
win.grab_set()
win.configure(bg=BG_TOP)
@@ -1063,7 +1077,12 @@ def ask_bag_count(parent: tk.Tk) -> Optional[int]:
if q < 1:
messagebox.showwarning("打袋機", "請輸入需求數量或按 +50、+10、+5、+1。", parent=win)
return
result[0] = q
result[0] = (q, False)
win.destroy()

def start_continuous() -> None:
"""Big C: continuous print until 停止; counter reset at job start."""
result[0] = (0, True)
win.destroy()

btn_row1 = tk.Frame(win, bg=BG_TOP)
@@ -1075,11 +1094,90 @@ def ask_bag_count(parent: tk.Tk) -> Optional[int]:

ttk.Button(win, text="確認送出", command=confirm, width=14).pack(pady=12)
qty_entry.bind("<Return>", lambda e: confirm())

sep = ttk.Separator(win, orient=tk.HORIZONTAL)
sep.pack(fill=tk.X, padx=16, pady=(4, 8))

bottom = tk.Frame(win, bg=BG_TOP)
bottom.pack(fill=tk.X, padx=12, pady=(0, 12))
tk.Label(
bottom,
text="連續出袋 · 每單開始重設計數 · 另開視窗按「停止列印」結束",
font=get_font(FONT_SIZE_META),
bg=BG_TOP,
fg="#333333",
wraplength=540,
justify=tk.CENTER,
).pack(fill=tk.X, pady=(0, 6))

tk.Button(
bottom,
text="C(連續印)",
command=start_continuous,
font=(FONT_FAMILY, 38, "bold"),
bg="#2E7D32",
fg="white",
activebackground="#1B5E20",
activeforeground="white",
relief=tk.RAISED,
bd=4,
cursor="hand2",
padx=24,
pady=18,
).pack(fill=tk.X)
win.protocol("WM_DELETE_WINDOW", win.destroy)
win.wait_window()
return result[0]


def _sleep_interruptible(stop_event: threading.Event, total_sec: float) -> None:
"""Sleep up to total_sec but return early if stop_event is set."""
end = time.perf_counter() + total_sec
while time.perf_counter() < end:
if stop_event.is_set():
return
remaining = end - time.perf_counter()
if remaining <= 0:
break
time.sleep(min(0.05, remaining))


def open_dataflex_stop_window(parent: tk.Tk, stop_event: threading.Event) -> tk.Toplevel:
"""Small window with 停止列印 for DataFlex continuous mode (non-modal so stop stays usable)."""
win = tk.Toplevel(parent)
win.title("打袋機連續列印")
win.geometry("420x170")
win.transient(parent)
win.configure(bg=BG_TOP)
tk.Label(
win,
text="連續列印進行中,可隨時按下方停止。",
font=get_font(FONT_SIZE),
bg=BG_TOP,
wraplength=400,
).pack(pady=(16, 8))

def stop() -> None:
stop_event.set()
try:
win.destroy()
except tk.TclError:
pass

tk.Button(
win,
text="停止列印",
command=stop,
font=get_font(FONT_SIZE_BUTTONS),
bg=BG_STATUS_ERROR,
fg=FG_STATUS_ERROR,
padx=20,
pady=10,
).pack(pady=12)
win.protocol("WM_DELETE_WINDOW", stop)
return win


def main() -> None:
settings = load_settings()
base_url_ref = [build_base_url(settings["api_ip"], settings["api_port"])]
@@ -1515,8 +1613,9 @@ def main() -> None:
if not ip:
messagebox.showerror("打袋機", "請在設定中填寫打袋機 DataFlex 的 IP。")
else:
count = ask_bag_count(root)
if count is not None:
bag_ans = ask_bag_count(root)
if bag_ans is not None:
n, continuous = bag_ans
item_code = j.get("itemCode") or "—"
item_name = j.get("itemName") or "—"
item_id = j.get("itemId")
@@ -1529,33 +1628,123 @@ def main() -> None:
item_id=item_id,
stock_in_line_id=stock_in_line_id,
lot_no=lot_no,
reset_counter_first=False,
)
zpl_first_job = generate_zpl_dataflex(
b,
item_code,
item_name,
item_id=item_id,
stock_in_line_id=stock_in_line_id,
lot_no=lot_no,
reset_counter_first=True,
)
label_text = (lot_no or b).strip()
n = count
try:
for i in range(n):
send_zpl_to_dataflex(ip, port, zpl)
if i < n - 1:
time.sleep(2)
set_status_message(f"已送出列印:批次 {label_text} x {n} 張", is_error=False)
jo_id = j.get("id")
if jo_id is not None:
if continuous:
stop_ev = threading.Event()
stop_win = open_dataflex_stop_window(root, stop_ev)

def dflex_worker() -> None:
printed = 0
error_shown = False
try:
submit_job_order_print_submit(
base_url_ref[0], int(jo_id), n, "DATAFLEX"
while not stop_ev.is_set():
payload = zpl_first_job if printed == 0 else zpl
send_zpl_to_dataflex(ip, port, payload)
printed += 1
_sleep_interruptible(stop_ev, 2.0)
except ConnectionRefusedError:
error_shown = True
root.after(
0,
lambda: set_status_message(
f"無法連線至 {ip}:{port},請確認印表機已開機且 IP 正確。",
is_error=True,
),
)
load_job_orders(from_user_date_change=False)
except requests.RequestException as ex:
messagebox.showwarning(
"打袋機",
f"已送出 {n} 張,但伺服器記錄失敗:{ex}",
except socket.timeout:
error_shown = True
root.after(
0,
lambda: set_status_message(
f"連線逾時 ({ip}:{port}),請檢查網路與連接埠。",
is_error=True,
),
)
except OSError as err:
error_shown = True
root.after(
0,
lambda e=err: set_status_message(
f"列印失敗:{e}",
is_error=True,
),
)
finally:

def _done() -> None:
try:
stop_win.destroy()
except tk.TclError:
pass
if printed > 0:
set_status_message(
f"連續列印結束:批次 {label_text},已印 {printed} 張",
is_error=False,
)
jo_id = j.get("id")
if jo_id is not None:
try:
submit_job_order_print_submit(
base_url_ref[0],
int(jo_id),
printed,
"DATAFLEX",
)
load_job_orders(from_user_date_change=False)
except requests.RequestException as ex:
messagebox.showwarning(
"打袋機",
f"已印 {printed} 張,但伺服器記錄失敗:{ex}",
)
elif not error_shown:
set_status_message(
"連續列印未印出或已取消",
is_error=True,
)

root.after(0, _done)

threading.Thread(target=dflex_worker, daemon=True).start()
else:
try:
for i in range(n):
send_zpl_to_dataflex(
ip,
port,
zpl_first_job if i == 0 else zpl,
)
except ConnectionRefusedError:
set_status_message(f"無法連線至 {ip}:{port},請確認印表機已開機且 IP 正確。", is_error=True)
except socket.timeout:
set_status_message(f"連線逾時 ({ip}:{port}),請檢查網路與連接埠。", is_error=True)
except OSError as err:
set_status_message(f"列印失敗:{err}", is_error=True)
if i < n - 1:
time.sleep(2)
set_status_message(f"已送出列印:批次 {label_text} x {n} 張", is_error=False)
jo_id = j.get("id")
if jo_id is not None:
try:
submit_job_order_print_submit(
base_url_ref[0], int(jo_id), n, "DATAFLEX"
)
load_job_orders(from_user_date_change=False)
except requests.RequestException as ex:
messagebox.showwarning(
"打袋機",
f"已送出 {n} 張,但伺服器記錄失敗:{ex}",
)
except ConnectionRefusedError:
set_status_message(f"無法連線至 {ip}:{port},請確認印表機已開機且 IP 正確。", is_error=True)
except socket.timeout:
set_status_message(f"連線逾時 ({ip}:{port}),請檢查網路與連接埠。", is_error=True)
except OSError as err:
set_status_message(f"列印失敗:{err}", is_error=True)
elif printer_var.get() == "標簽機":
com = (settings.get("label_com") or "").strip()
if not com:


Загрузка…
Отмена
Сохранить