import tkinter as tk from tkinter import scrolledtext, messagebox import threading import time import json import requests import base64 from datetime import datetime class OrderMonitorApp: def __init__(self, root): self.root = root self.root.title("订单监控系统") self.root.geometry("900x600") self.root.resizable(True, True) # 运行标志 self.running = False self.thread = None # 创建界面组件 self.create_widgets() # 默认关闭时停止 self.root.protocol("WM_DELETE_WINDOW", self.on_closing) def create_widgets(self): # 顶部控制面板 control_frame = tk.Frame(self.root) control_frame.pack(pady=10, padx=10, fill=tk.X) # 请求间隔 tk.Label(control_frame, text="请求间隔(秒):", font=("Arial", 12)).pack(side=tk.LEFT, padx=5) self.interval_var = tk.StringVar(value="5") self.interval_entry = tk.Entry(control_frame, textvariable=self.interval_var, width=8, font=("Arial", 12)) self.interval_entry.pack(side=tk.LEFT, padx=5) # 抢单价格 tk.Label(control_frame, text="抢单价格(大于):", font=("Arial", 12)).pack(side=tk.LEFT, padx=5) self.buyPrice = tk.StringVar(value="180") self.interval_entry = tk.Entry(control_frame, textvariable=self.buyPrice, width=8, font=("Arial", 12)) self.interval_entry.pack(side=tk.LEFT, padx=5) # 备注过滤 tk.Label(control_frame, text="备注过滤(@隔开多个词):", font=("Arial", 12)).pack(side=tk.LEFT, padx=5) self.remarkIgnore = tk.StringVar(value="双女@单女") self.interval_entry = tk.Entry(control_frame, textvariable=self.remarkIgnore, width=24, font=("Arial", 12)) self.interval_entry.pack(side=tk.LEFT, padx=5) control_frame2 = tk.Frame(self.root) control_frame2.pack(pady=10, padx=10, fill=tk.X) # 启动按钮 self.start_button = tk.Button(control_frame2, text="启动监控", command=self.start_monitor, bg="green", fg="white", font=("Arial", 12), width=12) self.start_button.pack(side=tk.LEFT, padx=5) # 停止按钮 self.stop_button = tk.Button(control_frame2, text="停止监控", command=self.stop_monitor, bg="red", fg="white", font=("Arial", 12), width=12, state=tk.DISABLED) self.stop_button.pack(side=tk.LEFT, padx=5) # 清空日志按钮 clear_button = tk.Button(control_frame2, text="清空日志", command=self.clear_log, bg="gray", fg="white", font=("Arial", 12), width=10) clear_button.pack(side=tk.LEFT, padx=5) # 日志框架 log_frame = tk.Frame(self.root) log_frame.pack(pady=10, padx=10, fill=tk.BOTH, expand=True) tk.Label(log_frame, text="监控日志:", font=("Arial", 12), anchor="w").pack(anchor="w") self.log_text = scrolledtext.ScrolledText(log_frame, wrap=tk.WORD, font=("Consolas", 10), height=25) self.log_text.pack(fill=tk.BOTH, expand=True) # 验证输入只能为整数 vcmd = (self.root.register(self.validate_int), '%P') self.interval_entry.config(validate="key", validatecommand=vcmd) def validate_int(self, new_value): """验证输入是否为整数""" if new_value == "": return True try: int(new_value) return True except ValueError: return False def get_captcha(self): try: # 发送GET请求获取图片 response = requests.get("https://jiechidj.haiqigame.com/api/common/captcha?type=1&id=421052×tamp=" + str(int(time.time() * 1000)), timeout=10) # 检查请求是否成功 response.raise_for_status() # 获取图片的二进制内容 image_data = response.content # 转换为base64编码 base64_str = base64.b64encode(image_data).decode('utf-8') # 如果需要带前缀的完整格式(用于HTML img标签) # 根据响应的Content-Type判断图片类型 content_type = response.headers.get('content-type', 'image/png') full_base64 = f"data:{content_type};base64,{base64_str}" return full_base64 # 或返回 base64_str,根据需求选择 except requests.exceptions.RequestException as e: print(f"请求图片失败: {e}") return None except Exception as e: print(f"转换失败: {e}") return None def log_message(self, msg, level="INFO"): """在日志框中添加消息""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") formatted_msg = f"[{timestamp}] [{level}] {msg}\n" self.log_text.insert(tk.END, formatted_msg) self.log_text.see(tk.END) # 自动滚动到底部 self.root.update_idletasks() def clear_log(self): """清空日志""" self.log_text.delete(1.0, tk.END) def ttshitu(self, imgBase64): url = "https://api.ttshitu.com/predict" payload = {"username": "puge","password": "mmit7750","typeid": 11,"remark": "测试","image": imgBase64} headers = { "Accept": "*/*", "Accept-Encoding": "gzip, deflate, br", "User-Agent": "PostmanRuntime-ApipostRuntime/1.1.0", "Connection": "keep-alive", "Content-Type": "application/json" } response = requests.request("POST", url, data=json.dumps(payload), headers=headers) jsonData = json.loads(response.text) print('验证码识别结果:' + jsonData['data']['result']) return jsonData['data']['result'] def sendConnect(self, order_sn, captcha): """请求订单API""" url = "https://jiechidj.haiqigame.com/addons/shop/api.order/sendConnect" headers = { "Host": "jiechidj.haiqigame.com", "Connection": "keep-alive", "content-type": "application/json", "uid": "421052", "platform": "ios", "model": "iPhone 15", "sid": "uhir5qYEdlZYwE5SMC8jvpJrfLc5jL9x", "token": "63d53066-ac50-4b39-b184-cf63f254afa2", "brand": "iPhone", "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 26_4_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.71(0x1800472b) NetType/WIFI Language/zh_CN", "Accept": "*/*", "Accept-Encoding": "gzip, deflate, br" } try: response = requests.post(url, headers=headers, data=json.dumps({"order_sn":order_sn,"captcha":captcha,"__token__":"5c8f6680ff6a1732b42ed36ebed7d1d2"}), timeout=10) response.raise_for_status() print(response.json()) return response.json() except requests.exceptions.RequestException as e: self.log_message(f"请求失败: {str(e)}", "ERROR") return None except ValueError as e: self.log_message(f"解析JSON失败: {str(e)}", "ERROR") return None def fetch_orders(self): """请求订单API""" url = "https://jiechidj.haiqigame.com/addons/shop/api.order/index" querystring = { "connect": "true", "company_id": "131", "page": "1", "orderstate": "0", "paystate": "1", "shippingstate": "0" } headers = { "Host": "jiechidj.haiqigame.com", "Connection": "keep-alive", "content-type": "application/json", "uid": "421052", "platform": "ios", "model": "iPhone 15", "sid": "uhir5qYEdlZYwE5SMC8jvpJrfLc5jL9x", "token": "63d53066-ac50-4b39-b184-cf63f254afa2", "brand": "iPhone", "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 26_4_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.71(0x1800472b) NetType/WIFI Language/zh_CN", "Accept": "*/*", "Accept-Encoding": "gzip, deflate, br" } try: response = requests.get(url, headers=headers, params=querystring, timeout=10) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: self.log_message(f"请求失败: {str(e)}", "ERROR") return None except ValueError as e: self.log_message(f"解析JSON失败: {str(e)}", "ERROR") return None def process_orders(self, data): """处理订单数据并显示关键信息""" if not data or data.get("code") != 1: self.log_message("API返回错误: " + str(data.get("msg", "未知错误")), "WARNING") return orders = data.get("data", {}).get("data", []) if not orders: self.log_message("当前无新订单", "INFO") return self.log_message(f"获取到 {len(orders)} 个订单", "INFO") for order in orders: # 提取订单基本信息 order_sn = order.get("order_sn", "未知订单号") client = order.get("client", "未知") self.log_message(f"--- 订单号: {order_sn} (客户端: {client}) ---", "INFO") # 提取 order_goods 中的 title, attrdata, price order_goods = order.get("order_goods", []) for goods in order_goods: title = goods.get("title", "无标题") attrdata = goods.get("attrdata", "无规格") price = goods.get("price", "0.00") self.log_message(f" 商品: {title}", "INFO") self.log_message(f" 规格: {attrdata}", "INFO") self.log_message(f" 价格: ¥{price}", "INFO") # 提取接单要求 (order_desc 中 title="接单要求" 的 content) game_type = order.get("client", "未指定") self.log_message(f" 接单要求(端游/手游): {game_type}", "INFO") # 提取订单备注 (order_desc 中其他备注内容) remark_content = order.get("memo", "无备注") self.log_message(f" 订单备注: {remark_content}", "INFO") self.log_message("-" * 50, "INFO") # 抢单 # 判断备注里面有没有关键词 remarkGood = True remarkText = self.remarkIgnore.get() for item in remarkText.split('@'): if (item in remark_content): print('备注不符合!') remarkGood = False if (remarkGood): print('订单价格:' + price + ' 设置价格:' + self.buyPrice.get()) if (float(price) > float(self.buyPrice.get())): print('定单价格符合!') yzmImg = self.get_captcha() yzmCode = self.ttshitu(yzmImg.split(',')[1]) self.sendConnect(order_sn, yzmCode) break def monitor_loop(self): """监控循环,在后台线程中运行""" while self.running: try: interval = int(self.interval_var.get()) if interval <= 0: interval = 5 except ValueError: interval = 5 self.log_message("正在请求订单数据...", "INFO") data = self.fetch_orders() if data: self.process_orders(data) else: self.log_message("请求失败,请检查网络或API配置", "ERROR") # 等待间隔时间,但需要检查运行标志 for _ in range(interval): if not self.running: break time.sleep(1) def start_monitor(self): """启动监控线程""" # 验证间隔输入 try: interval = int(self.interval_var.get()) if interval <= 0: raise ValueError except ValueError: messagebox.showerror("错误", "请求间隔必须是正整数!") return if self.running: messagebox.showwarning("警告", "监控已在运行中") return self.running = True self.thread = threading.Thread(target=self.monitor_loop, daemon=True) self.thread.start() self.start_button.config(state=tk.DISABLED) self.stop_button.config(state=tk.NORMAL) self.log_message("监控已启动", "SUCCESS") def stop_monitor(self): """停止监控""" if not self.running: return self.running = False if self.thread and self.thread.is_alive(): self.thread.join(timeout=2) self.start_button.config(state=tk.NORMAL) self.stop_button.config(state=tk.DISABLED) self.log_message("监控已停止", "SUCCESS") def on_closing(self): """关闭窗口时的清理""" if self.running: self.stop_monitor() self.root.destroy() def main(): root = tk.Tk() app = OrderMonitorApp(root) root.mainloop() if __name__ == "__main__": main()