import tkinter as tk from tkinter import ttk, scrolledtext, messagebox, filedialog import subprocess import threading import json import os import time import random import requests from datetime import datetime import win32gui import win32con import win32api import tkinter as tk from PIL import Image, ImageTk def drag_in_emulator(manager_path, emu_index, x, y1, y2, duration=400): """ 在模拟器中从 (x, y1) 拖拽到 (x, y2) """ # 获取adb端口 cmd = [manager_path, "info", "-v", str(emu_index)] result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8') if result.returncode != 0: return False try: data = json.loads(result.stdout) adb_port = data.get('adb_port') if not adb_port: return False except: return False # 连接adb target_device = f"127.0.0.1:{adb_port}" subprocess.run(f"adb connect {target_device}", shell=True, capture_output=True) time.sleep(0.3) # 执行滑动 cmd = f"adb -s {target_device} shell input swipe {x} {y1} {x} {y2} {duration}" result = subprocess.run(cmd, shell=True, capture_output=True, text=True) return result.returncode == 0 def input_text_to_emulator(manager_path, emu_index, text): """向模拟器输入文本""" # 获取adb端口 cmd = [manager_path, "info", "-v", str(emu_index)] result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8') if result.returncode != 0: return False try: data = json.loads(result.stdout) adb_port = data.get('adb_port') if not adb_port: return False except: return False target_device = f"127.0.0.1:{adb_port}" subprocess.run(f"adb connect {target_device}", shell=True, capture_output=True) # 先删除 subprocess.run(f"adb -s {target_device} shell input keyevent KEYCODE_DEL", shell=True) # 输入文本 subprocess.run(f"adb -s {target_device} shell input text \"{text}\"", shell=True) return True def get_emulator_pixel_color(manager_path, emu_index, x, y): """ 获取模拟器指定坐标的颜色 Args: manager_path: MuMuManager.exe路径 emu_index: 模拟器索引 x, y: 坐标 Returns: 颜色代码,如 "#FFFFFF",失败返回 None """ # 获取adb端口 cmd = [manager_path, "info", "-v", str(emu_index)] result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8') if result.returncode != 0: return None try: data = json.loads(result.stdout) adb_port = data.get('adb_port') if not adb_port: return None except: return None # 连接adb target_device = f"127.0.0.1:{adb_port}" subprocess.run(f"adb connect {target_device}", shell=True, capture_output=True) time.sleep(0.3) # 截图并获取颜色 temp_file = f"temp_screenshot_{emu_index}.png" subprocess.run(f"adb -s {target_device} exec-out screencap -p > {temp_file}", shell=True) time.sleep(0.3) try: from PIL import Image img = Image.open(temp_file) pixel = img.getpixel((x, y)) img.close() # 转换为十六进制颜色 if isinstance(pixel, tuple): r, g, b = pixel[0], pixel[1], pixel[2] else: r = g = b = pixel color = f"#{r:02X}{g:02X}{b:02X}" return color except Exception as e: return None finally: import os if os.path.exists(temp_file): os.remove(temp_file) def show_image_at_position(image_path, x, y, width=250, height=250): """ 在指定位置创建窗口显示图片并激活 """ # 创建窗口 window = tk.Toplevel() window.title("图片显示") # 设置窗口位置和大小 window.geometry(f"{width}x{height}+{x}+{y}") # 加载图片 image = Image.open(image_path) image = image.resize((width, height), Image.Resampling.LANCZOS) photo = ImageTk.PhotoImage(image) # 显示图片 label = tk.Label(window, image=photo) label.image = photo label.pack(fill=tk.BOTH, expand=True) # 刷新窗口 window.update() # 获取窗口句柄并激活 hwnd = win32gui.FindWindow(None, "图片显示") if hwnd: win32gui.SetForegroundWindow(hwnd) return window def click_screen(x, y): """ 在屏幕坐标(x, y)处点击鼠标左键 """ # 移动鼠标到指定位置 win32api.SetCursorPos((x, y)) # 按下左键 win32api.mouse_event(win32con.MOUSEEVENTF_LEFTDOWN, x, y, 0, 0) # 抬起左键 win32api.mouse_event(win32con.MOUSEEVENTF_LEFTUP, x, y, 0, 0) def activate_window_and_get_position(window_title): """ 激活窗口并返回左上角坐标 """ # 查找窗口 hwnd = win32gui.FindWindow(None, window_title) if hwnd == 0: print(f"未找到窗口: {window_title}") return None, None # 如果窗口最小化,恢复它 if win32gui.IsIconic(hwnd): win32gui.ShowWindow(hwnd, win32con.SW_RESTORE) # 激活窗口 win32gui.SetForegroundWindow(hwnd) # 获取窗口位置 rect = win32gui.GetWindowRect(hwnd) x = rect[0] y = rect[1] return x, y class MuMuEmulatorManager: def __init__(self, manager_path): self.manager_path = manager_path def get_adb_port(self, index, log_callback=None): """获取指定模拟器的 ADB 端口""" cmd = [self.manager_path, "info", "-v", str(index)] result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8') if result.returncode == 0: try: data = json.loads(result.stdout) adb_port = data.get('adb_port') if adb_port is not None: if log_callback: log_callback(f"✅ 模拟器 {index} ADB端口: {adb_port}") return adb_port except: pass return None def tap(self, index, x, y, log_callback=None): """点击坐标""" adb_port = self.get_adb_port(index, log_callback) if not adb_port: if log_callback: log_callback(f"❌ 无法获取模拟器 {index} 的ADB端口") return False target_device = f"127.0.0.1:{adb_port}" # 连接ADB subprocess.run(f"adb connect {target_device}", shell=True, capture_output=True) time.sleep(0.5) # 执行点击 cmd = f"adb -s {target_device} shell input tap {x} {y}" result = subprocess.run(cmd, shell=True, capture_output=True, text=True) if result.returncode == 0: if log_callback: log_callback(f"✅ 点击坐标 ({x}, {y}) 成功") return True else: if log_callback: log_callback(f"❌ 点击失败: {result.stderr}") return False def get_emulator_list(self): """获取所有模拟器列表""" cmd = [self.manager_path, "info", "-v", "all"] result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8') if result.returncode != 0: return [] try: # 清理输出(可能包含非JSON内容) output = result.stdout.strip() json_start = output.find('{') if json_start != -1: output = output[json_start:] data = json.loads(output) emulators = [] for key, value in data.items(): if isinstance(value, dict): value['index'] = key # 只返回运行中的模拟器 if value.get('is_process_started', False): emulators.append(value) return emulators except json.JSONDecodeError: return [] class SimpleMuMuManager: def __init__(self, root): self.root = root self.root.title("MuMu模拟器管理工具") self.root.geometry("650x450") # 配置文件 self.config_file = "mumu_config.json" self.mumu_manager_path = None self.emulators = [] # 创建界面 self.create_widgets() # 加载配置 self.load_config() # 如果路径存在,自动刷新 if self.mumu_manager_path and os.path.exists(self.mumu_manager_path): self.refresh_emulators() def create_widgets(self): # 主框架 main_frame = ttk.Frame(self.root, padding="10") main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S)) # 配置区域 config_frame = ttk.LabelFrame(main_frame, text="MuMuManager配置", padding="10") config_frame.grid(row=0, column=0, sticky=(tk.W, tk.E), pady=5) self.path_var = tk.StringVar() self.path_entry = ttk.Entry(config_frame, textvariable=self.path_var, width=55) self.path_entry.grid(row=0, column=1, padx=5) self.path_entry.bind('', self.on_path_changed) browse_btn = ttk.Button(config_frame, text="浏览", command=self.browse_mumu_manager) browse_btn.grid(row=0, column=2, padx=5) refresh_btn = ttk.Button(config_frame, text="刷新列表", command=self.refresh_emulators) refresh_btn.grid(row=0, column=3, padx=5) # 模拟器选择区域 - 一行两个 selector_frame = ttk.Frame(main_frame) selector_frame.grid(row=1, column=0, pady=20) # 发货手机 send_frame = ttk.LabelFrame(selector_frame, text="发货手机", padding="10") send_frame.grid(row=0, column=0, padx=10) self.send_var = tk.StringVar() self.send_combo = ttk.Combobox(send_frame, textvariable=self.send_var, width=35, state="readonly") self.send_combo.grid(row=0, column=0) # 收货手机 receive_frame = ttk.LabelFrame(selector_frame, text="收货手机", padding="10") receive_frame.grid(row=0, column=1, padx=10) self.receive_var = tk.StringVar() self.receive_combo = ttk.Combobox(receive_frame, textvariable=self.receive_var, width=35, state="readonly") self.receive_combo.grid(row=0, column=0) # 日志区域 log_frame = ttk.LabelFrame(main_frame, text="日志", padding="10") log_frame.grid(row=2, column=0, sticky=(tk.W, tk.E, tk.N, tk.S), pady=10) self.log_text = scrolledtext.ScrolledText(log_frame, width=75, height=15, wrap=tk.WORD) self.log_text.grid(row=0, column=0) # 测试按钮 # test_btn = ttk.Button(main_frame, text="测试发货", command=self.test_click, width=20) # test_btn.grid(row=3, column=0, pady=1) test_btn2 = ttk.Button(main_frame, text="测试收货", command=self.test_click2, width=20) test_btn2.grid(row=3, column=0, pady=1) # 配置权重 self.root.columnconfigure(0, weight=1) self.root.rowconfigure(0, weight=1) main_frame.columnconfigure(0, weight=1) main_frame.rowconfigure(2, weight=1) log_frame.columnconfigure(0, weight=1) log_frame.rowconfigure(0, weight=1) def browse_mumu_manager(self): """浏览选择MuMuManager.exe""" file_path = filedialog.askopenfilename( title="选择MuMuManager.exe", filetypes=[("Executable files", "*.exe"), ("All files", "*.*")] ) if file_path: self.path_var.set(file_path) self.on_path_changed() def on_path_changed(self, event=None): """路径改变时自动保存并刷新""" new_path = self.path_var.get() if new_path and os.path.exists(new_path): self.mumu_manager_path = new_path self.save_config() self.log_message("配置已自动保存") self.refresh_emulators() elif new_path: self.log_message(f"路径不存在: {new_path}", "WARNING") def save_config(self): """保存配置""" config = {'mumu_manager_path': self.mumu_manager_path} try: with open(self.config_file, 'w', encoding='utf-8') as f: json.dump(config, f, indent=4) except Exception as e: self.log_message(f"保存配置失败: {e}", "ERROR") def load_config(self): """加载配置""" if os.path.exists(self.config_file): try: with open(self.config_file, 'r', encoding='utf-8') as f: config = json.load(f) path = config.get('mumu_manager_path') if path and os.path.exists(path): self.mumu_manager_path = path self.path_var.set(path) self.log_message(f"已加载配置: {path}") elif path: self.log_message(f"配置路径不存在: {path}", "WARNING") except Exception as e: self.log_message(f"加载配置失败: {e}", "ERROR") def log_message(self, message, level="INFO"): """添加日志""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_entry = f"[{timestamp}] [{level}] {message}\n" self.log_text.insert(tk.END, log_entry) self.log_text.see(tk.END) self.root.update() def refresh_emulators(self): """刷新模拟器列表""" if not self.mumu_manager_path or not os.path.exists(self.mumu_manager_path): self.log_message("请先设置正确的 MuMuManager.exe 路径", "WARNING") return self.log_message("正在获取模拟器列表...") try: manager = MuMuEmulatorManager(self.mumu_manager_path) self.emulators = manager.get_emulator_list() if not self.emulators: self.log_message("未发现运行中的模拟器,请确保 MuMu 模拟器已启动", "WARNING") self.send_combo['values'] = [] self.receive_combo['values'] = [] return # 构建显示名称列表 emulator_names = [] for emu in self.emulators: name = emu.get('name', f"模拟器_{emu.get('index')}") display_name = f"{name} (索引:{emu.get('index')})" emulator_names.append(display_name) self.log_message(f"发现模拟器: {display_name}") # 更新下拉框 self.send_combo['values'] = emulator_names self.receive_combo['values'] = emulator_names self.log_message(f"✅ 已加载 {len(self.emulators)} 个运行中的模拟器") except Exception as e: self.log_message(f"获取模拟器列表失败: {e}", "ERROR") def get_emulator_index(self, display_name): """从显示名称获取模拟器索引""" if not display_name: return None # 提取索引,格式如 "名称 (索引:0)" import re match = re.search(r'索引:(\d+)', display_name) if match: return match.group(1) return None def test_click(self): url = "https://task.port.run/acquire/bdxkjzc" payload = "" headers = { "Content-Type": "application/json", "Accept": "*/*", "Accept-Encoding": "gzip, deflate, br", "User-Agent": "PostmanRuntime-ApipostRuntime/1.1.0", "Connection": "keep-alive" } response = requests.request("GET", url, data=payload, headers=headers) print(response.text) repData = json.loads(response.text) if ("task" in repData and "content" in repData['task']): """测试点击""" send_selection = self.send_var.get() if not send_selection: self.log_message("请选择发货手机模拟器", "ERROR") messagebox.showwarning("警告", "请选择发货手机模拟器") return emu_index = self.get_emulator_index(send_selection) if not emu_index: self.log_message("无法获取模拟器索引", "ERROR") return # 在新线程中执行点击 thread = threading.Thread(target=self.perform_click, args=(emu_index, send_selection.split(" (")[0], repData['task']["content"])) thread.daemon = True thread.start() else: self.log_message("未获取到有效任务数据", "ERROR") messagebox.showerror("错误", "未获取到有效任务数据") def test_click2(self): url = "https://task.port.run/acquire/bdxkjxc" payload = "" headers = { "Content-Type": "application/json", "Accept": "*/*", "Accept-Encoding": "gzip, deflate, br", "User-Agent": "PostmanRuntime-ApipostRuntime/1.1.0", "Connection": "keep-alive" } response = requests.request("GET", url, data=payload, headers=headers) print(response.text) repData = json.loads(response.text) if ("task" in repData and "content" in repData['task']): """测试点击""" receive_selection = self.receive_var.get() if not receive_selection: self.log_message("请选择收货手机模拟器", "ERROR") messagebox.showwarning("警告", "请选择收货手机模拟器") return emu_index = self.get_emulator_index(receive_selection) if not emu_index: self.log_message("无法获取模拟器索引", "ERROR") return # 在新线程中执行点击 thread = threading.Thread(target=self.perform_click2, args=(emu_index, receive_selection.split(" (")[0], repData['task']["content"])) thread.daemon = True thread.start() else: self.log_message("未获取到有效任务数据", "ERROR") messagebox.showerror("错误", "未获取到有效任务数据") def perform_click(self, emu_index, window_title, content): """执行点击操作""" x, y = 508, 252 self.log_message(f"开始在模拟器 {emu_index} 上点击位置 ({x}, {y})") try: manager = MuMuEmulatorManager(self.mumu_manager_path) # 先获取adb端口 adb_port = manager.get_adb_port(emu_index, self.log_message) if not adb_port: self.log_message(f"❌ 无法获取模拟器 {emu_index} 的ADB端口", "ERROR") return # 执行点击 if manager.tap(emu_index, x, y, self.log_message): self.log_message(f"✅ 点击成功!模拟器 {emu_index} 位置 ({x}, {y})") rectX, rectY = activate_window_and_get_position(window_title) time.sleep(2) # 短暂等待窗口激活 click_screen(rectX + 339, rectY + 576) time.sleep(1) showImageBox = show_image_at_position("C:\\Users\\mail\\Downloads\\chepai\\" + content["tableData"][2] + ".png", rectX + 273, rectY + 549 ) # 开始检测是否扫描 color1 = get_emulator_pixel_color(self.mumu_manager_path, emu_index, 523, 1114) while color1 != "#007AFF": # 假设白色表示未扫描 self.log_message(f"当前颜色: {color1}, 等待扫描完成...") time.sleep(1) color1 = get_emulator_pixel_color(self.mumu_manager_path, emu_index, 523, 1114) # 关闭窗口 showImageBox.destroy() self.log_message(f"关闭二维码窗口!") click_screen(rectX + 323, rectY + 718) time.sleep(1) drag_in_emulator(self.mumu_manager_path, emu_index, 430, 1046, 980, 400) time.sleep(0.5) click_screen(rectX + 650, rectY + 743) time.sleep(1) click_screen(rectX + 358, rectY + 1112) time.sleep(1) click_screen(rectX + 419, rectY + 760) else: self.log_message(f"❌ 点击失败", "ERROR") except Exception as e: self.log_message(f"❌ 执行出错: {e}", "ERROR") def perform_click2(self, emu_index, window_title, content): """执行点击操作""" x, y = 508, 252 self.log_message(f"开始在模拟器 {emu_index} 上点击位置 ({x}, {y})") try: manager = MuMuEmulatorManager(self.mumu_manager_path) # 先获取adb端口 adb_port = manager.get_adb_port(emu_index, self.log_message) if not adb_port: self.log_message(f"❌ 无法获取模拟器 {emu_index} 的ADB端口", "ERROR") return # 执行点击 if manager.tap(emu_index, x, y, self.log_message): self.log_message(f"✅ 点击成功!模拟器 {emu_index} 位置 ({x}, {y})") rectX, rectY = activate_window_and_get_position(window_title) time.sleep(2) # 短暂等待窗口激活 click_screen(rectX + 339, rectY + 576) time.sleep(1) showImageBox = show_image_at_position("C:\\Users\\mail\\Downloads\\chepai\\" + content["tableData"][2] + ".png", rectX + 273, rectY + 549 ) # 开始检测是否扫描 color1 = get_emulator_pixel_color(self.mumu_manager_path, emu_index, 530, 1200) while color1 != "#007AFF": # 假设白色表示未扫描 self.log_message(f"当前颜色: {color1}, 等待扫描完成...") time.sleep(1) color1 = get_emulator_pixel_color(self.mumu_manager_path, emu_index, 530, 1200) # 关闭窗口 showImageBox.destroy() self.log_message(f"关闭二维码窗口!") manager.tap(emu_index, 323, 718, self.log_message) time.sleep(2) manager.tap(emu_index, 652, 748, self.log_message) time.sleep(1) manager.tap(emu_index, 347, 918, self.log_message) time.sleep(1) input_text_to_emulator(self.mumu_manager_path, emu_index, random.randint(35, 36)) time.sleep(1) time.sleep(1) manager.tap(emu_index, 530, 1200, self.log_message) time.sleep(1) manager.tap(emu_index, 419, 765, self.log_message) else: self.log_message(f"❌ 点击失败", "ERROR") except Exception as e: self.log_message(f"❌ 执行出错: {e}", "ERROR") def main(): root = tk.Tk() app = SimpleMuMuManager(root) root.mainloop() if __name__ == "__main__": main()