import tkinter as tk from tkinter import ttk, scrolledtext, messagebox import subprocess import threading import time import json import os from datetime import datetime class MumuManager: def __init__(self, root): self.root = root self.root.title("Mumu模拟器批量启动工具") self.root.geometry("800x600") # 控制变量 self.is_running = False self.is_paused = False self.should_stop = False self.current_thread = None # 创建界面 self.create_widgets() def create_widgets(self): # 主框架 main_frame = ttk.Frame(self.root, padding="10") main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S)) # Mumu安装路径 ttk.Label(main_frame, text="MuMu安装路径:").grid(row=0, column=0, sticky=tk.W, pady=5) self.mumu_install_path = tk.StringVar(value=r"D:\MuMuPlayer") ttk.Entry(main_frame, textvariable=self.mumu_install_path, width=70).grid(row=0, column=1, columnspan=2, sticky=(tk.W, tk.E), pady=5) # MuMuManager路径(自动查找) self.manager_path = tk.StringVar() # 同时启动模拟器数量 ttk.Label(main_frame, text="同时启动数量:").grid(row=1, column=0, sticky=tk.W, pady=5) self.batch_size = tk.IntVar(value=5) ttk.Spinbox(main_frame, from_=1, to=20, textvariable=self.batch_size, width=10).grid(row=1, column=1, sticky=tk.W, pady=5) # 启动后等待时间 ttk.Label(main_frame, text="启动后等待(秒):").grid(row=2, column=0, sticky=tk.W, pady=5) self.wait_time = tk.IntVar(value=60) ttk.Spinbox(main_frame, from_=10, to=180, textvariable=self.wait_time, width=10).grid(row=2, column=1, sticky=tk.W, pady=5) # 模拟器列表显示 ttk.Label(main_frame, text="模拟器列表:").grid(row=3, column=0, sticky=tk.W, pady=5) # 模拟器列表框架 list_frame = ttk.Frame(main_frame) list_frame.grid(row=4, column=0, columnspan=3, sticky=(tk.W, tk.E, tk.N, tk.S), pady=5) # 创建Treeview columns = ("选择", "索引", "名称", "状态") self.emulator_tree = ttk.Treeview(list_frame, columns=columns, show='headings', height=10) self.emulator_tree.heading("选择", text="选择") self.emulator_tree.heading("索引", text="索引") self.emulator_tree.heading("名称", text="名称") self.emulator_tree.heading("状态", text="状态") self.emulator_tree.column("选择", width=50) self.emulator_tree.column("索引", width=80) self.emulator_tree.column("名称", width=200) self.emulator_tree.column("状态", width=100) # 滚动条 vsb = ttk.Scrollbar(list_frame, orient="vertical", command=self.emulator_tree.yview) self.emulator_tree.configure(yscrollcommand=vsb.set) self.emulator_tree.pack(side='left', fill='both', expand=True) vsb.pack(side='right', fill='y') # 绑定点击事件 self.emulator_tree.bind('', self.on_tree_click) # 按钮框架 button_frame = ttk.Frame(main_frame) button_frame.grid(row=5, column=0, columnspan=3, pady=10) ttk.Button(button_frame, text="刷新列表", command=self.load_emulators).pack(side=tk.LEFT, padx=5) ttk.Button(button_frame, text="全选", command=self.select_all).pack(side=tk.LEFT, padx=5) ttk.Button(button_frame, text="全不选", command=self.select_none).pack(side=tk.LEFT, padx=5) self.start_btn = ttk.Button(button_frame, text="启动", command=self.start_process) self.start_btn.pack(side=tk.LEFT, padx=5) self.pause_btn = ttk.Button(button_frame, text="暂停", command=self.pause_process, state=tk.DISABLED) self.pause_btn.pack(side=tk.LEFT, padx=5) self.stop_btn = ttk.Button(button_frame, text="停止", command=self.stop_process, state=tk.DISABLED) self.stop_btn.pack(side=tk.LEFT, padx=5) # 进度条 self.progress = ttk.Progressbar(main_frame, mode='determinate') self.progress.grid(row=6, column=0, columnspan=3, sticky=(tk.W, tk.E), pady=5) # 日志框 ttk.Label(main_frame, text="日志信息:").grid(row=7, column=0, sticky=tk.W, pady=5) self.log_text = scrolledtext.ScrolledText(main_frame, height=18, width=90) self.log_text.grid(row=8, column=0, columnspan=3, sticky=(tk.W, tk.E, tk.N, tk.S), pady=5) # 配置grid权重 self.root.columnconfigure(0, weight=1) self.root.rowconfigure(0, weight=1) main_frame.columnconfigure(1, weight=1) main_frame.rowconfigure(8, weight=1) # 启动时自动加载模拟器列表 self.root.after(100, self.load_emulators) def find_manager_path(self): """查找 MuMuManager.exe 的完整路径""" install_path = self.mumu_install_path.get() # 可能的路径 possible_paths = [ os.path.join(install_path, "nx_main", "MuMuManager.exe"), os.path.join(install_path, "MuMuPlayer", "nx_main", "MuMuManager.exe"), os.path.join(install_path, "shell", "MuMuManager.exe"), r"D:\MuMuPlayer\nx_main\MuMuManager.exe", r"D:\Program Files\MuMuPlayer\nx_main\MuMuManager.exe", r"C:\Program Files\MuMuPlayer\nx_main\MuMuManager.exe" ] for path in possible_paths: if os.path.exists(path): self.log(f"找到 MuMuManager: {path}") return path return None def run_mumu_command(self, args): """运行 MuMuManager 命令,自动处理 Qt 环境变量""" manager_path = self.find_manager_path() if not manager_path: self.log("❌ 找不到 MuMuManager.exe") return None try: # 获取 MuMuManager.exe 所在目录 manager_dir = os.path.dirname(manager_path) # 设置环境变量 env = os.environ.copy() env['QT_PLUGIN_PATH'] = manager_dir env['PATH'] = manager_dir + os.pathsep + env.get('PATH', '') # 运行命令 cmd = [manager_path] + args result = subprocess.run( cmd, capture_output=True, text=True, encoding='utf-8', env=env, cwd=manager_dir ) return result except Exception as e: self.log(f"运行命令出错: {e}") return None def log(self, message): """添加日志""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_message = f"[{timestamp}] {message}\n" self.root.after(0, lambda: self._update_log(log_message)) def _update_log(self, message): """更新日志框""" self.log_text.insert(tk.END, message) self.log_text.see(tk.END) def on_tree_click(self, event): """处理列表点击选择""" region = self.emulator_tree.identify_region(event.x, event.y) if region == "cell": column = self.emulator_tree.identify_column(event.x) if column == "#1": # 选择列 item = self.emulator_tree.identify_row(event.y) if item: values = self.emulator_tree.item(item, 'values') current = values[0] new_value = "☑" if current == "□" else "□" self.emulator_tree.item(item, values=(new_value, values[1], values[2], values[3])) def select_all(self): """全选""" for item in self.emulator_tree.get_children(): values = self.emulator_tree.item(item, 'values') self.emulator_tree.item(item, values=("☑", values[1], values[2], values[3])) self.log("已全选所有模拟器") def select_none(self): """全不选""" for item in self.emulator_tree.get_children(): values = self.emulator_tree.item(item, 'values') self.emulator_tree.item(item, values=("□", values[1], values[2], values[3])) self.log("已取消全选") def get_selected_emulators(self): """获取选中的模拟器列表""" selected = [] for item in self.emulator_tree.get_children(): values = self.emulator_tree.item(item, 'values') if values[0] == "☑": selected.append({ 'index': values[1], 'name': values[2] }) return selected def load_emulators(self): """加载模拟器列表""" if not self.find_manager_path(): self.log(f"❌ 找不到 MuMuManager.exe,请检查 MuMu安装路径") return result = self.run_mumu_command(["info", "-v", "all"]) if not result or result.returncode != 0: self.log(f"获取模拟器列表失败") if result and result.stderr: self.log(f"错误信息: {result.stderr}") return # 清空现有列表 for item in self.emulator_tree.get_children(): self.emulator_tree.delete(item) # 解析JSON try: # 尝试清理输出(可能包含非JSON内容) output = result.stdout.strip() # 查找JSON开始的位置 json_start = output.find('{') if json_start != -1: output = output[json_start:] data = json.loads(output) for key, value in data.items(): if isinstance(value, dict): index = value.get('index', key) name = value.get('name', f"模拟器_{index}") status = "运行中" if value.get('is_process_started') else "未运行" self.emulator_tree.insert('', 'end', values=("□", index, name, status)) count = len(self.emulator_tree.get_children()) self.log(f"✅ 已加载 {count} 个模拟器") if count == 0: self.log("提示: 没有找到模拟器,请检查 MuMu 模拟器是否已安装") except json.JSONDecodeError as e: self.log(f"解析模拟器列表失败: {e}") self.log(f"原始输出: {result.stdout[:200]}...") def start_emulator(self, index): """启动单个模拟器""" result = self.run_mumu_command(["control", "-v", str(index), "launch"]) if result and result.returncode == 0: self.log(f"✅ 模拟器 {index} 启动命令已发送") return True else: self.log(f"❌ 模拟器 {index} 启动失败") if result and result.stderr: self.log(f"错误: {result.stderr}") return False def wait_for_emulator_ready(self, index, timeout=120): """等待模拟器启动完成""" start_time = time.time() while time.time() - start_time < timeout: if self.should_stop or self.is_paused: return False result = self.run_mumu_command(["info", "-v", str(index)]) if result and result.returncode == 0: try: # 清理输出 output = result.stdout.strip() json_start = output.find('{') if json_start != -1: output = output[json_start:] data = json.loads(output) if data.get('is_android_started') == True: self.log(f"✅ 模拟器 {index} 已就绪") return True except: pass time.sleep(3) self.log(f"❌ 模拟器 {index} 启动超时") return False def perform_click_actions(self): """执行点击操作""" try: import pyautogui import win32gui import win32con # 查找MuMu模拟器窗口 def find_mumu_window(): def enum_callback(hwnd, windows): if win32gui.IsWindowVisible(hwnd): title = win32gui.GetWindowText(hwnd) if "MuMu模拟器" in title or (title and "MuMu" in title and "模拟器" in title): windows.append((hwnd, title)) return True windows = [] win32gui.EnumWindows(enum_callback, windows) return windows[0] if windows else None # 查找窗口 self.log("正在查找MuMu模拟器窗口...") for i in range(15): # 最多尝试15次 window_info = find_mumu_window() if window_info: hwnd, title = window_info self.log(f"找到窗口: {title}") break self.log(f"等待窗口出现... ({i+1}/15)") time.sleep(3) else: self.log("❌ 未找到MuMu模拟器窗口") return False # 激活窗口 win32gui.ShowWindow(hwnd, win32con.SW_RESTORE) win32gui.SetForegroundWindow(hwnd) time.sleep(1) # 获取窗口位置 rect = win32gui.GetWindowRect(hwnd) x, y = rect[0], rect[1] self.log(f"窗口位置: ({x}, {y})") self.log(f"窗口大小: {rect[2]-rect[0]} x {rect[3]-rect[1]}") # 执行点击操作 clicks = [ (272, 86, "第一个点"), (111, 80, "第二个点"), (447, 82, "第三个点"), (476, 164, "第四个点"), (352, 339, "第五个点"), (778, 88, "第六个点") ] for dx, dy, description in clicks: if self.should_stop or self.is_paused: self.log("操作被中断") return False click_x = x + dx click_y = y + dy self.log(f"点击{description}: ({click_x}, {click_y})") pyautogui.click(click_x, click_y) time.sleep(1) self.log("✅ 点击操作完成") return True except ImportError as e: self.log(f"缺少必要的库: {e}") self.log("请安装: pip install pyautogui pywin32") return False except Exception as e: self.log(f"执行点击操作出错: {e}") import traceback self.log(traceback.format_exc()) return False def process_batch(self, emulators_batch): """处理一批模拟器""" # 启动本批模拟器 for emu in emulators_batch: if self.should_stop: return False self.start_emulator(emu['index']) time.sleep(3) # 间隔启动 # 等待模拟器就绪 wait_seconds = self.wait_time.get() self.log(f"等待 {wait_seconds} 秒,让模拟器完全启动...") for i in range(wait_seconds): if self.should_stop: return False # 处理暂停 while self.is_paused and not self.should_stop: time.sleep(1) if self.should_stop: return False if (i + 1) % 10 == 0: self.log(f"已等待 {i+1}/{wait_seconds} 秒...") time.sleep(1) # 执行点击操作 self.log("开始执行点击操作...") return self.perform_click_actions() def run_automation(self): """运行自动化流程""" try: selected = self.get_selected_emulators() if not selected: self.log("请至少选择一个模拟器") return batch_size = self.batch_size.get() total = len(selected) batches = (total + batch_size - 1) // batch_size self.log(f"开始处理 {total} 个模拟器,分 {batches} 批,每批 {batch_size} 个") # 设置进度条 self.progress['maximum'] = batches self.progress['value'] = 0 # 分批处理 for i in range(0, total, batch_size): if self.should_stop: self.log("流程已停止") break # 处理暂停 while self.is_paused and not self.should_stop: time.sleep(1) if self.should_stop: break batch = selected[i:i+batch_size] batch_num = i // batch_size + 1 self.log(f"\n{'='*50}") self.log(f"开始处理第 {batch_num}/{batches} 批") self.log(f"本批模拟器: {[e['index'] for e in batch]}") self.log(f"{'='*50}") success = self.process_batch(batch) # 更新进度 self.progress['value'] = batch_num if not success and not self.is_paused: self.log("流程中断") break # 批次间等待(除了最后一批) if i + batch_size < total and not self.should_stop: self.log("等待30秒后处理下一批...") for _ in range(30): if self.should_stop: break time.sleep(1) if not self.should_stop: self.log("\n🎉 所有任务执行完毕!") except Exception as e: self.log(f"运行出错: {e}") import traceback self.log(traceback.format_exc()) finally: self.stop_process() def start_process(self): """启动流程""" if self.is_running: self.log("流程已在运行中") return # 检查MuMuManager.exe if not self.find_manager_path(): messagebox.showerror("错误", f"找不到 MuMuManager.exe\n请检查 MuMu安装路径: {self.mumu_install_path.get()}") return # 检查选中的模拟器 selected = self.get_selected_emulators() if not selected: messagebox.showwarning("警告", "请至少选择一个模拟器") return self.is_running = True self.is_paused = False self.should_stop = False # 更新按钮状态 self.start_btn.config(state=tk.DISABLED) self.pause_btn.config(state=tk.NORMAL) self.stop_btn.config(state=tk.NORMAL) # 清空进度条 self.progress['value'] = 0 # 在新线程中运行 self.current_thread = threading.Thread(target=self.run_automation) self.current_thread.daemon = True self.current_thread.start() self.log("🚀 自动化流程已启动") def pause_process(self): """暂停/继续流程""" if not self.is_running: return if self.is_paused: self.is_paused = False self.pause_btn.config(text="暂停") self.log("▶️ 流程已继续") else: self.is_paused = True self.pause_btn.config(text="继续") self.log("⏸ 流程已暂停") def stop_process(self): """停止流程""" self.should_stop = True self.is_running = False self.is_paused = False # 更新按钮状态 self.start_btn.config(state=tk.NORMAL) self.pause_btn.config(state=tk.DISABLED, text="暂停") self.stop_btn.config(state=tk.DISABLED) self.log("⏹ 流程已停止") def main(): root = tk.Tk() app = MumuManager(root) root.mainloop() if __name__ == "__main__": main()