| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927 |
- import tkinter as tk
- from tkinter import ttk, scrolledtext, messagebox, filedialog
- import subprocess
- import threading
- import json
- import os
- import time
- import random
- import requests
- from datetime import datetime
- import win32gui
- import win32con
- import win32api
- from PIL import Image, ImageTk
- from tkcalendar import DateEntry # 需要安装: pip install tkcalendar
- def sendTask(payload):
- # 生成8-12分钟的随机延迟(转换为毫秒)
- # delay_minutes = random.randint(1, 2)
- delay_minutes = 0
- delay_seconds = delay_minutes * 60
- delay_ms = delay_seconds * 1000
-
- # 计算生效时间(当前时间 + 延迟毫秒数)
- effective_time = int((time.time() * 1000) + delay_ms)
-
- # 构建包含延迟信息的任务内容
- task_payload = {
- "plate": payload,
- "effectiveTime": effective_time # 添加生效时间字段
- }
-
- # 使用去重发布接口(避免重复任务)
- url = f"https://task.port.run/publish/carNext/600"
-
- print(f"发送车牌号: {payload} 到服务器...")
- print(f"延迟 {delay_minutes} 分钟后执行(生效时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(effective_time/1000))})")
-
- headers = {
- "Content-Type": "application/json",
- "Accept": "*/*",
- "Accept-Encoding": "gzip, deflate, br",
- "User-Agent": "PostmanRuntime-ApipostRuntime/1.1.0",
- "Connection": "keep-alive"
- }
- response = requests.request("POST", url, json=task_payload, headers=headers)
- print(response.text)
- return response
- def drag_in_emulator(manager_path, emu_index, x, y1, y2, duration=400):
- """
- 在模拟器中从 (x, y1) 拖拽到 (x, y2)
- """
- # 获取adb端口
- cmd = [manager_path, "info", "-v", str(emu_index)]
- result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8')
-
- if result.returncode != 0:
- return False
-
- try:
- data = json.loads(result.stdout)
- adb_port = data.get('adb_port')
- if not adb_port:
- return False
- except:
- return False
-
- # 连接adb
- target_device = f"127.0.0.1:{adb_port}"
- subprocess.run(f"adb connect {target_device}", shell=True, capture_output=True)
- time.sleep(0.3)
-
- # 执行滑动
- cmd = f"adb -s {target_device} shell input swipe {x} {y1} {x} {y2} {duration}"
- result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
-
- return result.returncode == 0
- def input_text_to_emulator(manager_path, emu_index, text):
- """向模拟器输入文本"""
- # 获取adb端口
- cmd = [manager_path, "info", "-v", str(emu_index)]
- result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8')
-
- if result.returncode != 0:
- return False
-
- try:
- data = json.loads(result.stdout)
- adb_port = data.get('adb_port')
- if not adb_port:
- return False
- except:
- return False
-
- target_device = f"127.0.0.1:{adb_port}"
- subprocess.run(f"adb connect {target_device}", shell=True, capture_output=True)
-
- # 先删除
- subprocess.run(f"adb -s {target_device} shell input keyevent KEYCODE_DEL", shell=True)
-
- # 输入文本
- subprocess.run(f"adb -s {target_device} shell input text \"{text}\"", shell=True)
-
- return True
- def get_emulator_pixel_color(manager_path, emu_index, x, y):
- """
- 获取模拟器指定坐标的颜色
-
- Args:
- manager_path: MuMuManager.exe路径
- emu_index: 模拟器索引
- x, y: 坐标
-
- Returns:
- 颜色代码,如 "#FFFFFF",失败返回 None
- """
- # 获取adb端口
- cmd = [manager_path, "info", "-v", str(emu_index)]
- result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8')
-
- if result.returncode != 0:
- return None
-
- try:
- data = json.loads(result.stdout)
- adb_port = data.get('adb_port')
- if not adb_port:
- return None
- except:
- return None
-
- # 连接adb
- target_device = f"127.0.0.1:{adb_port}"
- subprocess.run(f"adb connect {target_device}", shell=True, capture_output=True)
- time.sleep(0.3)
-
- # 截图并获取颜色
- temp_file = f"temp_screenshot_{emu_index}.png"
- subprocess.run(f"adb -s {target_device} exec-out screencap -p > {temp_file}", shell=True)
- time.sleep(0.3)
-
- try:
- from PIL import Image
- img = Image.open(temp_file)
- pixel = img.getpixel((x, y))
- img.close()
-
- # 转换为十六进制颜色
- if isinstance(pixel, tuple):
- r, g, b = pixel[0], pixel[1], pixel[2]
- else:
- r = g = b = pixel
-
- color = f"#{r:02X}{g:02X}{b:02X}"
- return color
-
- except Exception as e:
- return None
- finally:
- import os
- if os.path.exists(temp_file):
- os.remove(temp_file)
- def split_and_random_select(input_string):
- """
- 将字符串按@分割,然后随机返回其中一个元素
-
- 参数:
- input_string: 类似 "32@33@41" 的字符串
-
- 返回:
- 随机选中的元素
- """
- # 按@分割字符串
- elements = input_string.split('@')
-
- # 从分割后的数组中随机选择一个元素
- if elements: # 确保数组不为空
- random_element = random.choice(elements)
- return random_element
- else:
- return None
-
- def show_image_at_position(image_path, x, y, width=250, height=250):
- """
- 在指定位置创建窗口显示图片并激活
- """
- # 创建窗口
- window = tk.Toplevel()
- window.title("图片显示")
-
- # 设置窗口位置和大小
- window.geometry(f"{width}x{height}+{x}+{y}")
-
- # 加载图片
- image = Image.open(image_path)
- image = image.resize((width, height), Image.Resampling.LANCZOS)
- photo = ImageTk.PhotoImage(image)
-
- # 显示图片
- label = tk.Label(window, image=photo)
- label.image = photo
- label.pack(fill=tk.BOTH, expand=True)
-
- # 刷新窗口
- window.update()
-
- # 获取窗口句柄并激活
- hwnd = win32gui.FindWindow(None, "图片显示")
- if hwnd:
- win32gui.SetForegroundWindow(hwnd)
-
- return window
- def click_screen(x, y):
- print(f"点击屏幕坐标: ({x}, {y})")
- """
- 在屏幕坐标(x, y)处点击鼠标左键
- """
- # 移动鼠标到指定位置
- win32api.SetCursorPos((x, y))
-
- # 按下左键
- win32api.mouse_event(win32con.MOUSEEVENTF_LEFTDOWN, x, y, 0, 0)
-
- # 抬起左键
- win32api.mouse_event(win32con.MOUSEEVENTF_LEFTUP, x, y, 0, 0)
- def activate_window_and_get_position(window_title):
- """
- 激活窗口并返回左上角坐标
- """
- # 查找窗口
- hwnd = win32gui.FindWindow(None, window_title)
-
- if hwnd == 0:
- print(f"未找到窗口: {window_title}")
- return None, None
-
- # 如果窗口最小化,恢复它
- if win32gui.IsIconic(hwnd):
- win32gui.ShowWindow(hwnd, win32con.SW_RESTORE)
-
- # 激活窗口
- win32gui.SetForegroundWindow(hwnd)
-
- # 获取窗口位置
- rect = win32gui.GetWindowRect(hwnd)
- x = rect[0]
- y = rect[1]
-
- return x, y
- class MuMuEmulatorManager:
- def __init__(self, manager_path):
- self.manager_path = manager_path
-
- def get_adb_port(self, index, log_callback=None):
- """获取指定模拟器的 ADB 端口"""
- cmd = [self.manager_path, "info", "-v", str(index)]
- result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8')
-
- if result.returncode == 0:
- try:
- data = json.loads(result.stdout)
- adb_port = data.get('adb_port')
- if adb_port is not None:
- if log_callback:
- log_callback(f"✅ 模拟器 {index} ADB端口: {adb_port}")
- return adb_port
- except:
- pass
- return None
-
- def tap(self, index, x, y, log_callback=None):
- """点击坐标"""
- adb_port = self.get_adb_port(index, log_callback)
- if not adb_port:
- if log_callback:
- log_callback(f"❌ 无法获取模拟器 {index} 的ADB端口")
- return False
-
- target_device = f"127.0.0.1:{adb_port}"
-
- # 连接ADB
- subprocess.run(f"adb connect {target_device}", shell=True, capture_output=True)
- time.sleep(0.5)
-
- # 执行点击
- cmd = f"adb -s {target_device} shell input tap {x} {y}"
- result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
-
- if result.returncode == 0:
- if log_callback:
- log_callback(f"✅ 点击坐标 ({x}, {y}) 成功")
- return True
- else:
- if log_callback:
- log_callback(f"❌ 点击失败: {result.stderr}")
- return False
-
- def get_emulator_list(self):
- """获取所有模拟器列表"""
- cmd = [self.manager_path, "info", "-v", "all"]
- result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8')
-
- if result.returncode != 0:
- return []
-
- try:
- # 清理输出(可能包含非JSON内容)
- output = result.stdout.strip()
- json_start = output.find('{')
- if json_start != -1:
- output = output[json_start:]
-
- data = json.loads(output)
- emulators = []
- for key, value in data.items():
- if isinstance(value, dict):
- value['index'] = key
- # 只返回运行中的模拟器
- if value.get('is_process_started', False):
- emulators.append(value)
- return emulators
- except json.JSONDecodeError:
- return []
- class SimpleMuMuManager:
- def __init__(self, root):
- self.root = root
- self.root.title("MuMu模拟器管理工具")
- self.root.geometry("700x600") # 增加高度以容纳新控件
-
- # 配置文件
- self.config_file = "mumu_config.json"
- self.mumu_manager_path = None
- self.emulators = []
-
- # 任务调度控制
- self.is_running = False
- self.task_thread = None
- self.stop_event = threading.Event()
-
- # 最后发货时间设置
- self.last_delivery_time = None # 存储最后发货时间
- self.delivery_time_enabled = False # 是否启用最后发货时间限制
-
- # 创建界面
- self.create_widgets()
-
- # 加载配置
- self.load_config()
-
- # 如果路径存在,自动刷新
- if self.mumu_manager_path and os.path.exists(self.mumu_manager_path):
- self.refresh_emulators()
-
- def create_widgets(self):
- # 主框架
- main_frame = ttk.Frame(self.root, padding="10")
- main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
-
- # 配置区域
- config_frame = ttk.LabelFrame(main_frame, text="MuMuManager配置", padding="10")
- config_frame.grid(row=0, column=0, sticky=(tk.W, tk.E), pady=5)
-
- self.path_var = tk.StringVar()
- self.path_entry = ttk.Entry(config_frame, textvariable=self.path_var, width=55)
- self.path_entry.grid(row=0, column=1, padx=5)
- self.path_entry.bind('<FocusOut>', self.on_path_changed)
-
- browse_btn = ttk.Button(config_frame, text="浏览", command=self.browse_mumu_manager)
- browse_btn.grid(row=0, column=2, padx=5)
-
- refresh_btn = ttk.Button(config_frame, text="刷新列表", command=self.refresh_emulators)
- refresh_btn.grid(row=0, column=3, padx=5)
-
- # 模拟器选择区域 - 一行两个
- selector_frame = ttk.Frame(main_frame)
- selector_frame.grid(row=1, column=0, pady=20)
-
- # 发货手机
- send_frame = ttk.LabelFrame(selector_frame, text="发货手机", padding="10")
- send_frame.grid(row=0, column=0, padx=10)
-
- self.send_var = tk.StringVar()
- self.send_combo = ttk.Combobox(send_frame, textvariable=self.send_var, width=35, state="readonly")
- self.send_combo.grid(row=0, column=0)
-
- # 收货手机
- receive_frame = ttk.LabelFrame(selector_frame, text="收货手机", padding="10")
- receive_frame.grid(row=0, column=1, padx=10)
-
- self.receive_var = tk.StringVar()
- self.receive_combo = ttk.Combobox(receive_frame, textvariable=self.receive_var, width=35, state="readonly")
- self.receive_combo.grid(row=0, column=0)
-
- # 最后发货时间设置区域
- time_frame = ttk.LabelFrame(main_frame, text="最后发货时间设置", padding="10")
- time_frame.grid(row=2, column=0, sticky=(tk.W, tk.E), pady=10)
-
- # 启用/禁用复选框
- self.time_enabled_var = tk.BooleanVar(value=False)
- enable_check = ttk.Checkbutton(
- time_frame,
- text="启用最后发货时间限制",
- variable=self.time_enabled_var,
- command=self.on_time_enable_changed
- )
- enable_check.grid(row=0, column=0, columnspan=4, sticky=tk.W, pady=5)
-
- # 日期选择
- ttk.Label(time_frame, text="截止日期:").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W)
- self.date_picker = DateEntry(
- time_frame,
- width=12,
- background='darkblue',
- foreground='white',
- borderwidth=2,
- date_pattern='yyyy-mm-dd'
- )
- self.date_picker.grid(row=1, column=1, padx=5, pady=5)
-
- # 时间选择(时:分)
- ttk.Label(time_frame, text="截止时间:").grid(row=1, column=2, padx=5, pady=5, sticky=tk.W)
-
- time_select_frame = ttk.Frame(time_frame)
- time_select_frame.grid(row=1, column=3, padx=5, pady=5, sticky=tk.W)
-
- self.hour_var = tk.StringVar(value="23")
- self.minute_var = tk.StringVar(value="59")
-
- hour_combo = ttk.Combobox(time_select_frame, textvariable=self.hour_var, width=5,
- values=[f"{i:02d}" for i in range(24)], state="readonly")
- hour_combo.grid(row=0, column=0)
- ttk.Label(time_select_frame, text=":").grid(row=0, column=1)
- minute_combo = ttk.Combobox(time_select_frame, textvariable=self.minute_var, width=5,
- values=[f"{i:02d}" for i in range(60)], state="readonly")
- minute_combo.grid(row=0, column=2)
-
- # 显示当前设置的状态标签
- self.time_status_label = ttk.Label(time_frame, text="未启用时间限制", foreground="gray")
- self.time_status_label.grid(row=2, column=0, columnspan=4, sticky=tk.W, pady=5)
-
- # 控制按钮区域
- control_frame = ttk.Frame(main_frame)
- control_frame.grid(row=3, column=0, pady=10)
-
- self.start_btn = ttk.Button(control_frame, text="开始运行", command=self.start_task_scheduler, width=15)
- self.start_btn.grid(row=0, column=0, padx=10)
-
- self.stop_btn = ttk.Button(control_frame, text="结束运行", command=self.stop_task_scheduler, width=15, state="disabled")
- self.stop_btn.grid(row=0, column=1, padx=10)
-
- # 状态显示
- status_frame = ttk.Frame(main_frame)
- status_frame.grid(row=4, column=0, pady=5)
-
- self.status_label = ttk.Label(status_frame, text="状态: ● 已停止", foreground="red")
- self.status_label.grid(row=0, column=0)
-
- # 日志区域
- log_frame = ttk.LabelFrame(main_frame, text="日志", padding="10")
- log_frame.grid(row=5, column=0, sticky=(tk.W, tk.E, tk.N, tk.S), pady=10)
-
- self.log_text = scrolledtext.ScrolledText(log_frame, width=75, height=15, wrap=tk.WORD)
- self.log_text.grid(row=0, column=0)
-
- # 配置权重
- self.root.columnconfigure(0, weight=1)
- self.root.rowconfigure(0, weight=1)
- main_frame.columnconfigure(0, weight=1)
- main_frame.rowconfigure(5, weight=1)
- log_frame.columnconfigure(0, weight=1)
- log_frame.rowconfigure(0, weight=1)
-
- def on_time_enable_changed(self):
- """时间限制启用/禁用状态改变时的处理"""
- self.delivery_time_enabled = self.time_enabled_var.get()
-
- # 启用/禁用日期时间选择器
- state = "normal" if self.delivery_time_enabled else "disabled"
- self.date_picker.config(state=state)
- self.hour_var.set("23" if self.delivery_time_enabled else "")
- self.minute_var.set("59" if self.delivery_time_enabled else "")
-
- if self.delivery_time_enabled:
- # 更新最后发货时间
- self.update_last_delivery_time()
- self.time_status_label.config(
- text=f"最后发货时间: {self.last_delivery_time.strftime('%Y-%m-%d %H:%M:%S') if self.last_delivery_time else '未设置'}",
- foreground="green"
- )
- self.log_message(f"已启用最后发货时间限制: {self.last_delivery_time}", "INFO")
- else:
- self.last_delivery_time = None
- self.time_status_label.config(text="未启用时间限制", foreground="gray")
- self.log_message("已禁用最后发货时间限制", "INFO")
-
- def update_last_delivery_time(self):
- """根据选择的日期时间更新最后发货时间"""
- if not self.delivery_time_enabled:
- return
-
- try:
- date_str = self.date_picker.get()
- hour = int(self.hour_var.get())
- minute = int(self.minute_var.get())
-
- # 组合日期时间
- self.last_delivery_time = datetime.strptime(f"{date_str} {hour:02d}:{minute:02d}:00", "%Y-%m-%d %H:%M:%S")
-
- # 如果设置的时间已经过了,给出警告
- if self.last_delivery_time <= datetime.now():
- self.log_message(f"警告: 设置的最后发货时间 {self.last_delivery_time} 已经过去,将立即停止发货任务", "WARNING")
- self.time_status_label.config(foreground="orange")
- else:
- self.time_status_label.config(foreground="green")
-
- except Exception as e:
- self.log_message(f"解析时间失败: {e}", "ERROR")
- self.last_delivery_time = None
-
- def is_delivery_allowed(self):
- """检查是否允许执行发货任务"""
- if not self.delivery_time_enabled:
- return True
-
- if self.last_delivery_time is None:
- self.update_last_delivery_time()
-
- current_time = datetime.now()
- allowed = current_time <= self.last_delivery_time
-
- if not allowed:
- self.log_message(f"当前时间 {current_time.strftime('%Y-%m-%d %H:%M:%S')} 已超过最后发货时间 {self.last_delivery_time.strftime('%Y-%m-%d %H:%M:%S')},跳过发货任务", "WARNING")
-
- return allowed
-
- def start_task_scheduler(self):
- """启动任务调度器"""
- # 检查是否选择了模拟器
- if not self.send_var.get():
- self.log_message("请先选择发货手机模拟器", "ERROR")
- messagebox.showwarning("警告", "请先选择发货手机模拟器")
- return
-
- if not self.receive_var.get():
- self.log_message("请先选择收货手机模拟器", "ERROR")
- messagebox.showwarning("警告", "请先选择收货手机模拟器")
- return
-
- if self.is_running:
- self.log_message("任务调度器已在运行中", "WARNING")
- return
-
- # 检查MuMuManager路径
- if not self.mumu_manager_path or not os.path.exists(self.mumu_manager_path):
- self.log_message("请先设置正确的 MuMuManager.exe 路径", "ERROR")
- messagebox.showwarning("警告", "请先设置 MuMuManager.exe 路径")
- return
-
- # 如果启用了时间限制,更新时间
- if self.delivery_time_enabled:
- self.update_last_delivery_time()
- if self.last_delivery_time and self.last_delivery_time <= datetime.now():
- result = messagebox.askyesno(
- "确认",
- f"最后发货时间 {self.last_delivery_time} 已经过去,将不会执行任何发货任务。是否继续?"
- )
- if not result:
- return
-
- self.is_running = True
- self.stop_event.clear()
-
- # 启动任务线程
- self.task_thread = threading.Thread(target=self.task_scheduler_loop, daemon=True)
- self.task_thread.start()
-
- # 更新UI
- self.start_btn.config(state="disabled")
- self.stop_btn.config(state="normal")
- self.status_label.config(text="状态: ● 运行中", foreground="green")
-
- time_info = f"最后发货时间: {self.last_delivery_time}" if self.delivery_time_enabled else "无时间限制"
- self.log_message(f"任务调度器已启动,每10秒检测一次任务。{time_info}", "INFO")
-
- def stop_task_scheduler(self):
- """停止任务调度器"""
- if not self.is_running:
- return
-
- self.is_running = False
- self.stop_event.set()
-
- # 等待线程结束
- if self.task_thread and self.task_thread.is_alive():
- self.task_thread.join(timeout=3)
-
- # 更新UI
- self.start_btn.config(state="normal")
- self.stop_btn.config(state="disabled")
- self.status_label.config(text="状态: ● 已停止", foreground="red")
- self.log_message("任务调度器已停止", "INFO")
-
- def task_scheduler_loop(self):
- """任务调度循环"""
- while self.is_running and not self.stop_event.is_set():
- try:
- self.log_message("开始检测任务...", "INFO")
-
- # 检查发货任务(根据时间限制决定)
- delivery_allowed = self.is_delivery_allowed()
-
- if delivery_allowed:
- # 如果允许发货,先检查发货任务
- has_task = self.check_and_execute_task("https://task.port.run/acquire/bdxkjzc", "发货")
-
- # 如果没有发货任务,检查收货任务
- if not has_task:
- has_task = self.check_and_execute_task("https://task.port.run/acquire/bdxkjxc", "收货")
- else:
- # 不允许发货,只检查收货任务
- self.log_message("已超过最后发货时间,仅处理收货任务", "INFO")
- has_task = self.check_and_execute_task("https://task.port.run/acquire/bdxkjxc", "收货")
-
- if not has_task:
- self.log_message("暂无可用任务,10秒后继续检测", "INFO")
-
- # 等待10秒后继续检测
- time.sleep(3)
-
- except Exception as e:
- self.log_message(f"任务调度循环出错: {e}", "ERROR")
- time.sleep(5)
-
- def check_and_execute_task(self, url, task_type):
- """
- 检查并执行任务
- Returns:
- bool: 是否执行了任务
- """
- try:
- # 发送请求获取任务
- response = requests.get(url, timeout=10)
- response.raise_for_status()
-
- repData = response.json()
-
- # 检查是否有任务
- if "task" in repData and "content" in repData['task']:
- content = repData['task']["content"]
- self.log_message(f"获取到{task_type}任务: {content.get('tableData', ['N/A'])[2] if 'tableData' in content else '未知'}", "INFO")
-
- # 执行任务
- if task_type == "发货":
- send_selection = self.send_var.get()
- emu_index = self.get_emulator_index(send_selection)
- if emu_index:
- self.perform_click(emu_index, send_selection.split(" (")[0], content)
- return True
- else:
- self.log_message(f"无法获取发货模拟器索引", "ERROR")
- return False
- else: # 收货任务
- receive_selection = self.receive_var.get()
- emu_index = self.get_emulator_index(receive_selection)
- if emu_index:
- self.perform_click2(emu_index, receive_selection.split(" (")[0], content)
- return True
- else:
- self.log_message(f"无法获取收货模拟器索引", "ERROR")
- return False
- else:
- self.log_message(f"暂无{task_type}任务", "INFO")
- return False
-
- except requests.RequestException as e:
- self.log_message(f"请求{task_type}任务失败: {e}", "ERROR")
- return False
- except Exception as e:
- self.log_message(f"处理{task_type}任务时出错: {e}", "ERROR")
- return False
-
- def browse_mumu_manager(self):
- """浏览选择MuMuManager.exe"""
- file_path = filedialog.askopenfilename(
- title="选择MuMuManager.exe",
- filetypes=[("Executable files", "*.exe"), ("All files", "*.*")]
- )
- if file_path:
- self.path_var.set(file_path)
- self.on_path_changed()
-
- def on_path_changed(self, event=None):
- """路径改变时自动保存并刷新"""
- new_path = self.path_var.get()
- if new_path and os.path.exists(new_path):
- self.mumu_manager_path = new_path
- self.save_config()
- self.log_message("配置已自动保存")
- self.refresh_emulators()
- elif new_path:
- self.log_message(f"路径不存在: {new_path}", "WARNING")
-
- def save_config(self):
- """保存配置"""
- config = {
- 'mumu_manager_path': self.mumu_manager_path,
- 'delivery_time_enabled': self.delivery_time_enabled,
- 'last_delivery_time': self.last_delivery_time.strftime("%Y-%m-%d %H:%M:%S") if self.last_delivery_time else None
- }
- try:
- with open(self.config_file, 'w', encoding='utf-8') as f:
- json.dump(config, f, indent=4)
- except Exception as e:
- self.log_message(f"保存配置失败: {e}", "ERROR")
-
- def load_config(self):
- """加载配置"""
- if os.path.exists(self.config_file):
- try:
- with open(self.config_file, 'r', encoding='utf-8') as f:
- config = json.load(f)
- path = config.get('mumu_manager_path')
- if path and os.path.exists(path):
- self.mumu_manager_path = path
- self.path_var.set(path)
- self.log_message(f"已加载配置: {path}")
- elif path:
- self.log_message(f"配置路径不存在: {path}", "WARNING")
-
- # 加载时间限制配置
- enabled = config.get('delivery_time_enabled', False)
- self.time_enabled_var.set(enabled)
- self.delivery_time_enabled = enabled
-
- last_time_str = config.get('last_delivery_time')
- if last_time_str and enabled:
- try:
- self.last_delivery_time = datetime.strptime(last_time_str, "%Y-%m-%d %H:%M:%S")
- # 设置日期选择器
- date_str = self.last_delivery_time.strftime("%Y-%m-%d")
- self.date_picker.set_date(date_str)
- self.hour_var.set(f"{self.last_delivery_time.hour:02d}")
- self.minute_var.set(f"{self.last_delivery_time.minute:02d}")
- self.on_time_enable_changed()
- self.log_message(f"已加载最后发货时间设置: {last_time_str}", "INFO")
- except Exception as e:
- self.log_message(f"加载最后发货时间失败: {e}", "WARNING")
-
- # 根据配置更新UI状态
- self.on_time_enable_changed()
-
- except Exception as e:
- self.log_message(f"加载配置失败: {e}", "ERROR")
-
- def log_message(self, message, level="INFO"):
- """添加日志"""
- timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- log_entry = f"[{timestamp}] [{level}] {message}\n"
-
- # 在主线程中更新UI
- self.root.after(0, lambda: self._append_log(log_entry))
-
- def _append_log(self, log_entry):
- """实际添加日志到文本框"""
- self.log_text.insert(tk.END, log_entry)
- self.log_text.see(tk.END)
-
- def refresh_emulators(self):
- """刷新模拟器列表"""
- if not self.mumu_manager_path or not os.path.exists(self.mumu_manager_path):
- self.log_message("请先设置正确的 MuMuManager.exe 路径", "WARNING")
- return
-
- self.log_message("正在获取模拟器列表...")
-
- try:
- manager = MuMuEmulatorManager(self.mumu_manager_path)
- self.emulators = manager.get_emulator_list()
-
- if not self.emulators:
- self.log_message("未发现运行中的模拟器,请确保 MuMu 模拟器已启动", "WARNING")
- self.send_combo['values'] = []
- self.receive_combo['values'] = []
- return
-
- # 构建显示名称列表
- emulator_names = []
- for emu in self.emulators:
- name = emu.get('name', f"模拟器_{emu.get('index')}")
- display_name = f"{name} (索引:{emu.get('index')})"
- emulator_names.append(display_name)
- self.log_message(f"发现模拟器: {display_name}")
-
- # 更新下拉框
- self.send_combo['values'] = emulator_names
- self.receive_combo['values'] = emulator_names
-
- self.log_message(f"✅ 已加载 {len(self.emulators)} 个运行中的模拟器")
-
- except Exception as e:
- self.log_message(f"获取模拟器列表失败: {e}", "ERROR")
-
- def get_emulator_index(self, display_name):
- """从显示名称获取模拟器索引"""
- if not display_name:
- return None
- # 提取索引,格式如 "名称 (索引:0)"
- import re
- match = re.search(r'索引:(\d+)', display_name)
- if match:
- return match.group(1)
- return None
-
- def perform_click(self, emu_index, window_title, content):
- """执行点击操作"""
- x, y = 508, 252
- self.log_message(f"开始在模拟器 {emu_index} 上执行发货任务")
-
- try:
- manager = MuMuEmulatorManager(self.mumu_manager_path)
-
- # 先获取adb端口
- adb_port = manager.get_adb_port(emu_index, self.log_message)
- if not adb_port:
- self.log_message(f"❌ 无法获取模拟器 {emu_index} 的ADB端口", "ERROR")
- return
-
- # 执行点击
- if manager.tap(emu_index, x, y, self.log_message):
- self.log_message(f"✅ 点击成功!模拟器 {emu_index} 位置 ({x}, {y})")
- rectX, rectY = activate_window_and_get_position(window_title)
- time.sleep(2) # 短暂等待窗口激活
- click_screen(rectX + 241, rectY + 411)
- time.sleep(1)
- showImageBox = show_image_at_position(".\\chepai\\" + content["tableData"][2] + ".png", rectX + 144, rectY + 355)
- # 开始检测是否扫描
- color1 = get_emulator_pixel_color(self.mumu_manager_path, emu_index, 523, 1114)
- while color1 != "#007AFF" and self.is_running:
- self.log_message(f"当前颜色: {color1}, 等待扫描完成...")
- time.sleep(1)
- color1 = get_emulator_pixel_color(self.mumu_manager_path, emu_index, 523, 1114)
- # 关闭窗口
- showImageBox.destroy()
- self.log_message(f"关闭二维码窗口!")
- manager.tap(emu_index, 323, 718, self.log_message)
- time.sleep(1)
- drag_in_emulator(self.mumu_manager_path, emu_index, 430, 1046, 980, 400)
- time.sleep(0.5)
- manager.tap(emu_index, 650, 743, self.log_message)
- time.sleep(1)
- manager.tap(emu_index, 358, 1112, self.log_message)
- time.sleep(1)
- manager.tap(emu_index, 419, 765, self.log_message)
- self.log_message(f"✅ 发货任务执行完成")
- sendTask(content["tableData"][2])
- else:
- self.log_message(f"❌ 点击失败", "ERROR")
-
- except Exception as e:
- self.log_message(f"❌ 执行出错: {e}", "ERROR")
- def perform_click2(self, emu_index, window_title, content):
- """执行点击操作"""
- x, y = 508, 252
- self.log_message(f"开始在模拟器 {emu_index} 上执行收货任务")
-
- try:
- manager = MuMuEmulatorManager(self.mumu_manager_path)
-
- # 先获取adb端口
- adb_port = manager.get_adb_port(emu_index, self.log_message)
- if not adb_port:
- self.log_message(f"❌ 无法获取模拟器 {emu_index} 的ADB端口", "ERROR")
- return
-
- # 执行点击
- if manager.tap(emu_index, x, y, self.log_message):
- self.log_message(f"✅ 点击成功!模拟器 {emu_index} 位置 ({x}, {y})")
- rectX, rectY = activate_window_and_get_position(window_title)
- time.sleep(2) # 短暂等待窗口激活
- click_screen(rectX + 241, rectY + 411)
- time.sleep(1)
- showImageBox = show_image_at_position(".\\chepai\\" + content["tableData"][2] + ".png", rectX + 144, rectY + 355)
- # 开始检测是否扫描
- color1 = get_emulator_pixel_color(self.mumu_manager_path, emu_index, 530, 1200)
- while color1 != "#007AFF" and self.is_running:
- self.log_message(f"当前颜色: {color1}, 等待扫描完成...")
- time.sleep(1)
- color1 = get_emulator_pixel_color(self.mumu_manager_path, emu_index, 530, 1200)
- # 关闭窗口
- showImageBox.destroy()
- self.log_message(f"关闭二维码窗口!")
- manager.tap(emu_index, 323, 718, self.log_message)
- time.sleep(2)
- manager.tap(emu_index, 652, 748, self.log_message)
- time.sleep(1)
- manager.tap(emu_index, 347, 918, self.log_message)
- time.sleep(1)
- input_text_to_emulator(self.mumu_manager_path, emu_index, split_and_random_select(content["tdds"]))
- time.sleep(1)
- manager.tap(emu_index, 530, 1200, self.log_message)
- time.sleep(1)
- manager.tap(emu_index, 419, 765, self.log_message)
- self.log_message(f"✅ 收货任务执行完成")
- sendTask(content["tableData"][2])
- else:
- self.log_message(f"❌ 点击失败", "ERROR")
-
- except Exception as e:
- self.log_message(f"❌ 执行出错: {e}", "ERROR")
- def main():
- root = tk.Tk()
- app = SimpleMuMuManager(root)
- root.mainloop()
- if __name__ == "__main__":
- main()
|