import tkinter as tk from tkinter import ttk, scrolledtext, messagebox, filedialog import subprocess import threading import json import os import time import random import requests from datetime import datetime import win32gui import win32con import win32api from PIL import Image, ImageTk from tkcalendar import DateEntry # 需要安装: pip install tkcalendar def sendTask(payload): # 生成8-12分钟的随机延迟(转换为毫秒) # delay_minutes = random.randint(1, 2) delay_minutes = 0 delay_seconds = delay_minutes * 60 delay_ms = delay_seconds * 1000 # 计算生效时间(当前时间 + 延迟毫秒数) effective_time = int((time.time() * 1000) + delay_ms) # 构建包含延迟信息的任务内容 task_payload = { "plate": payload, "effectiveTime": effective_time # 添加生效时间字段 } # 使用去重发布接口(避免重复任务) url = f"https://task.port.run/publish/carNext/600" print(f"发送车牌号: {payload} 到服务器...") print(f"延迟 {delay_minutes} 分钟后执行(生效时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(effective_time/1000))})") headers = { "Content-Type": "application/json", "Accept": "*/*", "Accept-Encoding": "gzip, deflate, br", "User-Agent": "PostmanRuntime-ApipostRuntime/1.1.0", "Connection": "keep-alive" } response = requests.request("POST", url, json=task_payload, headers=headers) print(response.text) return response def drag_in_emulator(manager_path, emu_index, x, y1, y2, duration=400): """ 在模拟器中从 (x, y1) 拖拽到 (x, y2) """ # 获取adb端口 cmd = [manager_path, "info", "-v", str(emu_index)] result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8') if result.returncode != 0: return False try: data = json.loads(result.stdout) adb_port = data.get('adb_port') if not adb_port: return False except: return False # 连接adb target_device = f"127.0.0.1:{adb_port}" subprocess.run(f"adb connect {target_device}", shell=True, capture_output=True) time.sleep(0.3) # 执行滑动 cmd = f"adb -s {target_device} shell input swipe {x} {y1} {x} {y2} {duration}" result = subprocess.run(cmd, shell=True, capture_output=True, text=True) return result.returncode == 0 def input_text_to_emulator(manager_path, emu_index, text): """向模拟器输入文本""" # 获取adb端口 cmd = [manager_path, "info", "-v", str(emu_index)] result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8') if result.returncode != 0: return False try: data = json.loads(result.stdout) adb_port = data.get('adb_port') if not adb_port: return False except: return False target_device = f"127.0.0.1:{adb_port}" subprocess.run(f"adb connect {target_device}", shell=True, capture_output=True) # 先删除 subprocess.run(f"adb -s {target_device} shell input keyevent KEYCODE_DEL", shell=True) # 输入文本 subprocess.run(f"adb -s {target_device} shell input text \"{text}\"", shell=True) return True def get_emulator_pixel_color(manager_path, emu_index, x, y): """ 获取模拟器指定坐标的颜色 Args: manager_path: MuMuManager.exe路径 emu_index: 模拟器索引 x, y: 坐标 Returns: 颜色代码,如 "#FFFFFF",失败返回 None """ # 获取adb端口 cmd = [manager_path, "info", "-v", str(emu_index)] result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8') if result.returncode != 0: return None try: data = json.loads(result.stdout) adb_port = data.get('adb_port') if not adb_port: return None except: return None # 连接adb target_device = f"127.0.0.1:{adb_port}" subprocess.run(f"adb connect {target_device}", shell=True, capture_output=True) time.sleep(0.3) # 截图并获取颜色 temp_file = f"temp_screenshot_{emu_index}.png" subprocess.run(f"adb -s {target_device} exec-out screencap -p > {temp_file}", shell=True) time.sleep(0.3) try: from PIL import Image img = Image.open(temp_file) pixel = img.getpixel((x, y)) img.close() # 转换为十六进制颜色 if isinstance(pixel, tuple): r, g, b = pixel[0], pixel[1], pixel[2] else: r = g = b = pixel color = f"#{r:02X}{g:02X}{b:02X}" return color except Exception as e: return None finally: import os if os.path.exists(temp_file): os.remove(temp_file) def split_and_random_select(input_string): """ 将字符串按@分割,然后随机返回其中一个元素 参数: input_string: 类似 "32@33@41" 的字符串 返回: 随机选中的元素 """ # 按@分割字符串 elements = input_string.split('@') # 从分割后的数组中随机选择一个元素 if elements: # 确保数组不为空 random_element = random.choice(elements) return random_element else: return None def show_image_at_position(image_path, x, y, width=250, height=250): """ 在指定位置创建窗口显示图片并激活 """ # 创建窗口 window = tk.Toplevel() window.title("图片显示") # 设置窗口位置和大小 window.geometry(f"{width}x{height}+{x}+{y}") # 加载图片 image = Image.open(image_path) image = image.resize((width, height), Image.Resampling.LANCZOS) photo = ImageTk.PhotoImage(image) # 显示图片 label = tk.Label(window, image=photo) label.image = photo label.pack(fill=tk.BOTH, expand=True) # 刷新窗口 window.update() # 获取窗口句柄并激活 hwnd = win32gui.FindWindow(None, "图片显示") if hwnd: win32gui.SetForegroundWindow(hwnd) return window def click_screen(x, y): print(f"点击屏幕坐标: ({x}, {y})") """ 在屏幕坐标(x, y)处点击鼠标左键 """ # 移动鼠标到指定位置 win32api.SetCursorPos((x, y)) # 按下左键 win32api.mouse_event(win32con.MOUSEEVENTF_LEFTDOWN, x, y, 0, 0) # 抬起左键 win32api.mouse_event(win32con.MOUSEEVENTF_LEFTUP, x, y, 0, 0) def activate_window_and_get_position(window_title): """ 激活窗口并返回左上角坐标 """ # 查找窗口 hwnd = win32gui.FindWindow(None, window_title) if hwnd == 0: print(f"未找到窗口: {window_title}") return None, None # 如果窗口最小化,恢复它 if win32gui.IsIconic(hwnd): win32gui.ShowWindow(hwnd, win32con.SW_RESTORE) # 激活窗口 win32gui.SetForegroundWindow(hwnd) # 获取窗口位置 rect = win32gui.GetWindowRect(hwnd) x = rect[0] y = rect[1] return x, y class MuMuEmulatorManager: def __init__(self, manager_path): self.manager_path = manager_path def get_adb_port(self, index, log_callback=None): """获取指定模拟器的 ADB 端口""" cmd = [self.manager_path, "info", "-v", str(index)] result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8') if result.returncode == 0: try: data = json.loads(result.stdout) adb_port = data.get('adb_port') if adb_port is not None: if log_callback: log_callback(f"✅ 模拟器 {index} ADB端口: {adb_port}") return adb_port except: pass return None def tap(self, index, x, y, log_callback=None): """点击坐标""" adb_port = self.get_adb_port(index, log_callback) if not adb_port: if log_callback: log_callback(f"❌ 无法获取模拟器 {index} 的ADB端口") return False target_device = f"127.0.0.1:{adb_port}" # 连接ADB subprocess.run(f"adb connect {target_device}", shell=True, capture_output=True) time.sleep(0.5) # 执行点击 cmd = f"adb -s {target_device} shell input tap {x} {y}" result = subprocess.run(cmd, shell=True, capture_output=True, text=True) if result.returncode == 0: if log_callback: log_callback(f"✅ 点击坐标 ({x}, {y}) 成功") return True else: if log_callback: log_callback(f"❌ 点击失败: {result.stderr}") return False def get_emulator_list(self): """获取所有模拟器列表""" cmd = [self.manager_path, "info", "-v", "all"] result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8') if result.returncode != 0: return [] try: # 清理输出(可能包含非JSON内容) output = result.stdout.strip() json_start = output.find('{') if json_start != -1: output = output[json_start:] data = json.loads(output) emulators = [] for key, value in data.items(): if isinstance(value, dict): value['index'] = key # 只返回运行中的模拟器 if value.get('is_process_started', False): emulators.append(value) return emulators except json.JSONDecodeError: return [] class SimpleMuMuManager: def __init__(self, root): self.root = root self.root.title("MuMu模拟器管理工具") self.root.geometry("700x600") # 增加高度以容纳新控件 # 配置文件 self.config_file = "mumu_config.json" self.mumu_manager_path = None self.emulators = [] # 任务调度控制 self.is_running = False self.task_thread = None self.stop_event = threading.Event() # 最后发货时间设置 self.last_delivery_time = None # 存储最后发货时间 self.delivery_time_enabled = False # 是否启用最后发货时间限制 # 创建界面 self.create_widgets() # 加载配置 self.load_config() # 如果路径存在,自动刷新 if self.mumu_manager_path and os.path.exists(self.mumu_manager_path): self.refresh_emulators() def create_widgets(self): # 主框架 main_frame = ttk.Frame(self.root, padding="10") main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S)) # 配置区域 config_frame = ttk.LabelFrame(main_frame, text="MuMuManager配置", padding="10") config_frame.grid(row=0, column=0, sticky=(tk.W, tk.E), pady=5) self.path_var = tk.StringVar() self.path_entry = ttk.Entry(config_frame, textvariable=self.path_var, width=55) self.path_entry.grid(row=0, column=1, padx=5) self.path_entry.bind('', self.on_path_changed) browse_btn = ttk.Button(config_frame, text="浏览", command=self.browse_mumu_manager) browse_btn.grid(row=0, column=2, padx=5) refresh_btn = ttk.Button(config_frame, text="刷新列表", command=self.refresh_emulators) refresh_btn.grid(row=0, column=3, padx=5) # 模拟器选择区域 - 一行两个 selector_frame = ttk.Frame(main_frame) selector_frame.grid(row=1, column=0, pady=20) # 发货手机 send_frame = ttk.LabelFrame(selector_frame, text="发货手机", padding="10") send_frame.grid(row=0, column=0, padx=10) self.send_var = tk.StringVar() self.send_combo = ttk.Combobox(send_frame, textvariable=self.send_var, width=35, state="readonly") self.send_combo.grid(row=0, column=0) # 收货手机 receive_frame = ttk.LabelFrame(selector_frame, text="收货手机", padding="10") receive_frame.grid(row=0, column=1, padx=10) self.receive_var = tk.StringVar() self.receive_combo = ttk.Combobox(receive_frame, textvariable=self.receive_var, width=35, state="readonly") self.receive_combo.grid(row=0, column=0) # 最后发货时间设置区域 time_frame = ttk.LabelFrame(main_frame, text="最后发货时间设置", padding="10") time_frame.grid(row=2, column=0, sticky=(tk.W, tk.E), pady=10) # 启用/禁用复选框 self.time_enabled_var = tk.BooleanVar(value=False) enable_check = ttk.Checkbutton( time_frame, text="启用最后发货时间限制", variable=self.time_enabled_var, command=self.on_time_enable_changed ) enable_check.grid(row=0, column=0, columnspan=4, sticky=tk.W, pady=5) # 日期选择 ttk.Label(time_frame, text="截止日期:").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W) self.date_picker = DateEntry( time_frame, width=12, background='darkblue', foreground='white', borderwidth=2, date_pattern='yyyy-mm-dd' ) self.date_picker.grid(row=1, column=1, padx=5, pady=5) # 时间选择(时:分) ttk.Label(time_frame, text="截止时间:").grid(row=1, column=2, padx=5, pady=5, sticky=tk.W) time_select_frame = ttk.Frame(time_frame) time_select_frame.grid(row=1, column=3, padx=5, pady=5, sticky=tk.W) self.hour_var = tk.StringVar(value="23") self.minute_var = tk.StringVar(value="59") hour_combo = ttk.Combobox(time_select_frame, textvariable=self.hour_var, width=5, values=[f"{i:02d}" for i in range(24)], state="readonly") hour_combo.grid(row=0, column=0) ttk.Label(time_select_frame, text=":").grid(row=0, column=1) minute_combo = ttk.Combobox(time_select_frame, textvariable=self.minute_var, width=5, values=[f"{i:02d}" for i in range(60)], state="readonly") minute_combo.grid(row=0, column=2) # 显示当前设置的状态标签 self.time_status_label = ttk.Label(time_frame, text="未启用时间限制", foreground="gray") self.time_status_label.grid(row=2, column=0, columnspan=4, sticky=tk.W, pady=5) # 控制按钮区域 control_frame = ttk.Frame(main_frame) control_frame.grid(row=3, column=0, pady=10) self.start_btn = ttk.Button(control_frame, text="开始运行", command=self.start_task_scheduler, width=15) self.start_btn.grid(row=0, column=0, padx=10) self.stop_btn = ttk.Button(control_frame, text="结束运行", command=self.stop_task_scheduler, width=15, state="disabled") self.stop_btn.grid(row=0, column=1, padx=10) # 状态显示 status_frame = ttk.Frame(main_frame) status_frame.grid(row=4, column=0, pady=5) self.status_label = ttk.Label(status_frame, text="状态: ● 已停止", foreground="red") self.status_label.grid(row=0, column=0) # 日志区域 log_frame = ttk.LabelFrame(main_frame, text="日志", padding="10") log_frame.grid(row=5, column=0, sticky=(tk.W, tk.E, tk.N, tk.S), pady=10) self.log_text = scrolledtext.ScrolledText(log_frame, width=75, height=15, wrap=tk.WORD) self.log_text.grid(row=0, column=0) # 配置权重 self.root.columnconfigure(0, weight=1) self.root.rowconfigure(0, weight=1) main_frame.columnconfigure(0, weight=1) main_frame.rowconfigure(5, weight=1) log_frame.columnconfigure(0, weight=1) log_frame.rowconfigure(0, weight=1) def on_time_enable_changed(self): """时间限制启用/禁用状态改变时的处理""" self.delivery_time_enabled = self.time_enabled_var.get() # 启用/禁用日期时间选择器 state = "normal" if self.delivery_time_enabled else "disabled" self.date_picker.config(state=state) self.hour_var.set("23" if self.delivery_time_enabled else "") self.minute_var.set("59" if self.delivery_time_enabled else "") if self.delivery_time_enabled: # 更新最后发货时间 self.update_last_delivery_time() self.time_status_label.config( text=f"最后发货时间: {self.last_delivery_time.strftime('%Y-%m-%d %H:%M:%S') if self.last_delivery_time else '未设置'}", foreground="green" ) self.log_message(f"已启用最后发货时间限制: {self.last_delivery_time}", "INFO") else: self.last_delivery_time = None self.time_status_label.config(text="未启用时间限制", foreground="gray") self.log_message("已禁用最后发货时间限制", "INFO") def update_last_delivery_time(self): """根据选择的日期时间更新最后发货时间""" if not self.delivery_time_enabled: return try: date_str = self.date_picker.get() hour = int(self.hour_var.get()) minute = int(self.minute_var.get()) # 组合日期时间 self.last_delivery_time = datetime.strptime(f"{date_str} {hour:02d}:{minute:02d}:00", "%Y-%m-%d %H:%M:%S") # 如果设置的时间已经过了,给出警告 if self.last_delivery_time <= datetime.now(): self.log_message(f"警告: 设置的最后发货时间 {self.last_delivery_time} 已经过去,将立即停止发货任务", "WARNING") self.time_status_label.config(foreground="orange") else: self.time_status_label.config(foreground="green") except Exception as e: self.log_message(f"解析时间失败: {e}", "ERROR") self.last_delivery_time = None def is_delivery_allowed(self): """检查是否允许执行发货任务""" if not self.delivery_time_enabled: return True if self.last_delivery_time is None: self.update_last_delivery_time() current_time = datetime.now() allowed = current_time <= self.last_delivery_time if not allowed: self.log_message(f"当前时间 {current_time.strftime('%Y-%m-%d %H:%M:%S')} 已超过最后发货时间 {self.last_delivery_time.strftime('%Y-%m-%d %H:%M:%S')},跳过发货任务", "WARNING") return allowed def start_task_scheduler(self): """启动任务调度器""" # 检查是否选择了模拟器 if not self.send_var.get(): self.log_message("请先选择发货手机模拟器", "ERROR") messagebox.showwarning("警告", "请先选择发货手机模拟器") return if not self.receive_var.get(): self.log_message("请先选择收货手机模拟器", "ERROR") messagebox.showwarning("警告", "请先选择收货手机模拟器") return if self.is_running: self.log_message("任务调度器已在运行中", "WARNING") return # 检查MuMuManager路径 if not self.mumu_manager_path or not os.path.exists(self.mumu_manager_path): self.log_message("请先设置正确的 MuMuManager.exe 路径", "ERROR") messagebox.showwarning("警告", "请先设置 MuMuManager.exe 路径") return # 如果启用了时间限制,更新时间 if self.delivery_time_enabled: self.update_last_delivery_time() if self.last_delivery_time and self.last_delivery_time <= datetime.now(): result = messagebox.askyesno( "确认", f"最后发货时间 {self.last_delivery_time} 已经过去,将不会执行任何发货任务。是否继续?" ) if not result: return self.is_running = True self.stop_event.clear() # 启动任务线程 self.task_thread = threading.Thread(target=self.task_scheduler_loop, daemon=True) self.task_thread.start() # 更新UI self.start_btn.config(state="disabled") self.stop_btn.config(state="normal") self.status_label.config(text="状态: ● 运行中", foreground="green") time_info = f"最后发货时间: {self.last_delivery_time}" if self.delivery_time_enabled else "无时间限制" self.log_message(f"任务调度器已启动,每10秒检测一次任务。{time_info}", "INFO") def stop_task_scheduler(self): """停止任务调度器""" if not self.is_running: return self.is_running = False self.stop_event.set() # 等待线程结束 if self.task_thread and self.task_thread.is_alive(): self.task_thread.join(timeout=3) # 更新UI self.start_btn.config(state="normal") self.stop_btn.config(state="disabled") self.status_label.config(text="状态: ● 已停止", foreground="red") self.log_message("任务调度器已停止", "INFO") def task_scheduler_loop(self): """任务调度循环""" while self.is_running and not self.stop_event.is_set(): try: self.log_message("开始检测任务...", "INFO") # 检查发货任务(根据时间限制决定) delivery_allowed = self.is_delivery_allowed() if delivery_allowed: # 如果允许发货,先检查发货任务 has_task = self.check_and_execute_task("https://task.port.run/acquire/bdxkjzc", "发货") # 如果没有发货任务,检查收货任务 if not has_task: has_task = self.check_and_execute_task("https://task.port.run/acquire/bdxkjxc", "收货") else: # 不允许发货,只检查收货任务 self.log_message("已超过最后发货时间,仅处理收货任务", "INFO") has_task = self.check_and_execute_task("https://task.port.run/acquire/bdxkjxc", "收货") if not has_task: self.log_message("暂无可用任务,10秒后继续检测", "INFO") # 等待10秒后继续检测 time.sleep(3) except Exception as e: self.log_message(f"任务调度循环出错: {e}", "ERROR") time.sleep(5) def check_and_execute_task(self, url, task_type): """ 检查并执行任务 Returns: bool: 是否执行了任务 """ try: # 发送请求获取任务 response = requests.get(url, timeout=10) response.raise_for_status() repData = response.json() # 检查是否有任务 if "task" in repData and "content" in repData['task']: content = repData['task']["content"] self.log_message(f"获取到{task_type}任务: {content.get('tableData', ['N/A'])[2] if 'tableData' in content else '未知'}", "INFO") # 执行任务 if task_type == "发货": send_selection = self.send_var.get() emu_index = self.get_emulator_index(send_selection) if emu_index: self.perform_click(emu_index, send_selection.split(" (")[0], content) return True else: self.log_message(f"无法获取发货模拟器索引", "ERROR") return False else: # 收货任务 receive_selection = self.receive_var.get() emu_index = self.get_emulator_index(receive_selection) if emu_index: self.perform_click2(emu_index, receive_selection.split(" (")[0], content) return True else: self.log_message(f"无法获取收货模拟器索引", "ERROR") return False else: self.log_message(f"暂无{task_type}任务", "INFO") return False except requests.RequestException as e: self.log_message(f"请求{task_type}任务失败: {e}", "ERROR") return False except Exception as e: self.log_message(f"处理{task_type}任务时出错: {e}", "ERROR") return False def browse_mumu_manager(self): """浏览选择MuMuManager.exe""" file_path = filedialog.askopenfilename( title="选择MuMuManager.exe", filetypes=[("Executable files", "*.exe"), ("All files", "*.*")] ) if file_path: self.path_var.set(file_path) self.on_path_changed() def on_path_changed(self, event=None): """路径改变时自动保存并刷新""" new_path = self.path_var.get() if new_path and os.path.exists(new_path): self.mumu_manager_path = new_path self.save_config() self.log_message("配置已自动保存") self.refresh_emulators() elif new_path: self.log_message(f"路径不存在: {new_path}", "WARNING") def save_config(self): """保存配置""" config = { 'mumu_manager_path': self.mumu_manager_path, 'delivery_time_enabled': self.delivery_time_enabled, 'last_delivery_time': self.last_delivery_time.strftime("%Y-%m-%d %H:%M:%S") if self.last_delivery_time else None } try: with open(self.config_file, 'w', encoding='utf-8') as f: json.dump(config, f, indent=4) except Exception as e: self.log_message(f"保存配置失败: {e}", "ERROR") def load_config(self): """加载配置""" if os.path.exists(self.config_file): try: with open(self.config_file, 'r', encoding='utf-8') as f: config = json.load(f) path = config.get('mumu_manager_path') if path and os.path.exists(path): self.mumu_manager_path = path self.path_var.set(path) self.log_message(f"已加载配置: {path}") elif path: self.log_message(f"配置路径不存在: {path}", "WARNING") # 加载时间限制配置 enabled = config.get('delivery_time_enabled', False) self.time_enabled_var.set(enabled) self.delivery_time_enabled = enabled last_time_str = config.get('last_delivery_time') if last_time_str and enabled: try: self.last_delivery_time = datetime.strptime(last_time_str, "%Y-%m-%d %H:%M:%S") # 设置日期选择器 date_str = self.last_delivery_time.strftime("%Y-%m-%d") self.date_picker.set_date(date_str) self.hour_var.set(f"{self.last_delivery_time.hour:02d}") self.minute_var.set(f"{self.last_delivery_time.minute:02d}") self.on_time_enable_changed() self.log_message(f"已加载最后发货时间设置: {last_time_str}", "INFO") except Exception as e: self.log_message(f"加载最后发货时间失败: {e}", "WARNING") # 根据配置更新UI状态 self.on_time_enable_changed() except Exception as e: self.log_message(f"加载配置失败: {e}", "ERROR") def log_message(self, message, level="INFO"): """添加日志""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_entry = f"[{timestamp}] [{level}] {message}\n" # 在主线程中更新UI self.root.after(0, lambda: self._append_log(log_entry)) def _append_log(self, log_entry): """实际添加日志到文本框""" self.log_text.insert(tk.END, log_entry) self.log_text.see(tk.END) def refresh_emulators(self): """刷新模拟器列表""" if not self.mumu_manager_path or not os.path.exists(self.mumu_manager_path): self.log_message("请先设置正确的 MuMuManager.exe 路径", "WARNING") return self.log_message("正在获取模拟器列表...") try: manager = MuMuEmulatorManager(self.mumu_manager_path) self.emulators = manager.get_emulator_list() if not self.emulators: self.log_message("未发现运行中的模拟器,请确保 MuMu 模拟器已启动", "WARNING") self.send_combo['values'] = [] self.receive_combo['values'] = [] return # 构建显示名称列表 emulator_names = [] for emu in self.emulators: name = emu.get('name', f"模拟器_{emu.get('index')}") display_name = f"{name} (索引:{emu.get('index')})" emulator_names.append(display_name) self.log_message(f"发现模拟器: {display_name}") # 更新下拉框 self.send_combo['values'] = emulator_names self.receive_combo['values'] = emulator_names self.log_message(f"✅ 已加载 {len(self.emulators)} 个运行中的模拟器") except Exception as e: self.log_message(f"获取模拟器列表失败: {e}", "ERROR") def get_emulator_index(self, display_name): """从显示名称获取模拟器索引""" if not display_name: return None # 提取索引,格式如 "名称 (索引:0)" import re match = re.search(r'索引:(\d+)', display_name) if match: return match.group(1) return None def perform_click(self, emu_index, window_title, content): """执行点击操作""" x, y = 508, 252 self.log_message(f"开始在模拟器 {emu_index} 上执行发货任务") try: manager = MuMuEmulatorManager(self.mumu_manager_path) # 先获取adb端口 adb_port = manager.get_adb_port(emu_index, self.log_message) if not adb_port: self.log_message(f"❌ 无法获取模拟器 {emu_index} 的ADB端口", "ERROR") return # 执行点击 if manager.tap(emu_index, x, y, self.log_message): self.log_message(f"✅ 点击成功!模拟器 {emu_index} 位置 ({x}, {y})") rectX, rectY = activate_window_and_get_position(window_title) time.sleep(2) # 短暂等待窗口激活 click_screen(rectX + 241, rectY + 411) time.sleep(1) showImageBox = show_image_at_position(".\\chepai\\" + content["tableData"][2] + ".png", rectX + 144, rectY + 355) # 开始检测是否扫描 color1 = get_emulator_pixel_color(self.mumu_manager_path, emu_index, 523, 1114) while color1 != "#007AFF" and self.is_running: self.log_message(f"当前颜色: {color1}, 等待扫描完成...") time.sleep(1) color1 = get_emulator_pixel_color(self.mumu_manager_path, emu_index, 523, 1114) # 关闭窗口 showImageBox.destroy() self.log_message(f"关闭二维码窗口!") manager.tap(emu_index, 323, 718, self.log_message) time.sleep(1) drag_in_emulator(self.mumu_manager_path, emu_index, 430, 1046, 980, 400) time.sleep(0.5) manager.tap(emu_index, 650, 743, self.log_message) time.sleep(1) manager.tap(emu_index, 358, 1112, self.log_message) time.sleep(1) manager.tap(emu_index, 419, 765, self.log_message) self.log_message(f"✅ 发货任务执行完成") sendTask(content["tableData"][2]) else: self.log_message(f"❌ 点击失败", "ERROR") except Exception as e: self.log_message(f"❌ 执行出错: {e}", "ERROR") def perform_click2(self, emu_index, window_title, content): """执行点击操作""" x, y = 508, 252 self.log_message(f"开始在模拟器 {emu_index} 上执行收货任务") try: manager = MuMuEmulatorManager(self.mumu_manager_path) # 先获取adb端口 adb_port = manager.get_adb_port(emu_index, self.log_message) if not adb_port: self.log_message(f"❌ 无法获取模拟器 {emu_index} 的ADB端口", "ERROR") return # 执行点击 if manager.tap(emu_index, x, y, self.log_message): self.log_message(f"✅ 点击成功!模拟器 {emu_index} 位置 ({x}, {y})") rectX, rectY = activate_window_and_get_position(window_title) time.sleep(2) # 短暂等待窗口激活 click_screen(rectX + 241, rectY + 411) time.sleep(1) showImageBox = show_image_at_position(".\\chepai\\" + content["tableData"][2] + ".png", rectX + 144, rectY + 355) # 开始检测是否扫描 color1 = get_emulator_pixel_color(self.mumu_manager_path, emu_index, 530, 1200) while color1 != "#007AFF" and self.is_running: self.log_message(f"当前颜色: {color1}, 等待扫描完成...") time.sleep(1) color1 = get_emulator_pixel_color(self.mumu_manager_path, emu_index, 530, 1200) # 关闭窗口 showImageBox.destroy() self.log_message(f"关闭二维码窗口!") manager.tap(emu_index, 323, 718, self.log_message) time.sleep(2) manager.tap(emu_index, 652, 748, self.log_message) time.sleep(1) manager.tap(emu_index, 347, 918, self.log_message) time.sleep(1) input_text_to_emulator(self.mumu_manager_path, emu_index, split_and_random_select(content["tdds"])) time.sleep(1) manager.tap(emu_index, 530, 1200, self.log_message) time.sleep(1) manager.tap(emu_index, 419, 765, self.log_message) self.log_message(f"✅ 收货任务执行完成") sendTask(content["tableData"][2]) else: self.log_message(f"❌ 点击失败", "ERROR") except Exception as e: self.log_message(f"❌ 执行出错: {e}", "ERROR") def main(): root = tk.Tk() app = SimpleMuMuManager(root) root.mainloop() if __name__ == "__main__": main()