import random import subprocess import json import time import os import tkinter as tk from tkinter import ttk, scrolledtext, messagebox, filedialog import threading import configparser from datetime import datetime # 用于存储线程的列表 threads = [] class MuMuEmulatorManager: # 类级别的剪贴板锁,所有实例共享 _clipboard_lock = threading.Lock() def __init__(self, manager_path=r"D:\MuMuPlayer\nx_main\MuMuManager.exe"): self.manager_path = manager_path if not os.path.exists(manager_path): raise FileNotFoundError(f"找不到 MuMuManager.exe: {manager_path}") def get_adb_port(self, index, log_callback=None): """实时获取指定模拟器的 ADB 端口,获取不到就一直等待直到成功""" while True: cmd = [self.manager_path, "info", "-v", str(index)] result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8') if result.returncode == 0: try: data = json.loads(result.stdout) adb_port = data.get('adb_port') if adb_port is not None: if log_callback: log_callback(f"✅ 模拟器 {index} ADB端口: {adb_port}") return adb_port except: pass if log_callback: log_callback(f"⏳ 模拟器 {index} 等待ADB端口...") time.sleep(3) # 等待3秒后重试 def get_emulator_list(self): """获取所有模拟器列表""" cmd = [self.manager_path, "info", "-v", "all"] result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8') if result.returncode != 0: return [] try: data = json.loads(result.stdout) emulators = [] for key, value in data.items(): if isinstance(value, dict): value['index'] = key emulators.append(value) return emulators except json.JSONDecodeError: return [] def start_emulator(self, index): """启动指定索引的模拟器""" cmd = [self.manager_path, "control", "-v", str(index), "launch"] result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8') return result.returncode == 0 def wait_for_emulator_ready(self, index, timeout=120, check_interval=3, log_callback=None): """等待模拟器启动完成""" start_time = time.time() while time.time() - start_time < timeout: cmd = [self.manager_path, "info", "-v", str(index)] result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8') if result.returncode == 0: try: data = json.loads(result.stdout) if data.get('is_android_started') == True: return True except: pass if log_callback: log_callback(f"等待模拟器 {index} 启动... ({int(time.time() - start_time)}秒)") time.sleep(check_interval) return False def stop_emulator(self, index): """关闭模拟器""" cmd = [self.manager_path, "control", "-v", str(index), "shutdown"] result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8') return result.returncode == 0 def clear_app_data(self, index, manager, log_callback): """清除指定应用的数据""" manager.tap(index, 59, 90, log_callback) time.sleep(2) manager.tap(index, 59, 90, log_callback) time.sleep(2) manager.tap(index, 59, 90, log_callback) time.sleep(2) manager.tap(index, 59, 90, log_callback) time.sleep(2) color5990 = manager.get_pixel_color(index, 59, 90, log_callback=log_callback) log_callback(color5990) if color5990 == "#F3F3F3": manager.tap(index, 645, 1240, log_callback) time.sleep(2) manager.tap(index, 663, 91, log_callback) time.sleep(2) manager.tap(index, 663, 91, log_callback) time.sleep(2) manager.tap(index, 144, 239, log_callback) time.sleep(2) manager.tap(index, 592, 618, log_callback) time.sleep(2) def install_apk(self, index, apk_path, log_callback=None): """安装APK""" if not os.path.exists(apk_path): if log_callback: log_callback(f"❌ APK文件不存在: {apk_path}") return False adb_port = self.get_adb_port(index) target_device = f"127.0.0.1:{adb_port}" # 连接ADB subprocess.run(f"adb connect {target_device}", shell=True, capture_output=True) time.sleep(2) # 检查是否已安装 check_cmd = f"adb -s {target_device} shell pm list packages | findstr \"com.dragon.read\"" check_result = subprocess.run(check_cmd, shell=True, capture_output=True, text=True) if "com.dragon.read" in check_result.stdout: if log_callback: log_callback(f"✅ com.dragon.read 已安装,跳过安装步骤") return True # 安装APK if log_callback: log_callback(f"正在安装APK: {os.path.basename(apk_path)} 端口: {adb_port}...") install_cmd = f"adb -s {target_device} install -r \"{apk_path}\"" result = subprocess.run(install_cmd, shell=True, capture_output=True, text=True) return "Success" in result.stdout def open_app(self, index, package_name, log_callback=None): """打开应用""" adb_port = self.get_adb_port(index) target_device = f"127.0.0.1:{adb_port}" subprocess.run(f"adb connect {target_device}", shell=True, capture_output=True) time.sleep(1) cmd = f"adb -s {target_device} shell monkey -p {package_name} -c android.intent.category.LAUNCHER 1" result = subprocess.run(cmd, shell=True, capture_output=True, text=True) if "Events injected" in result.stdout or result.returncode == 0: if log_callback: log_callback(f"✅ 已打开应用: {package_name}") return True return False # 替代方案:直接发送文本字符,不用剪贴板 def paste_text(self, index, text, log_callback=None): adb_port = self.get_adb_port(index) target_device = f"127.0.0.1:{adb_port}" subprocess.run(f"adb connect {target_device}", shell=True, capture_output=True) time.sleep(0.3) # 直接通过 ADB 输入文本(逐字符) # 先确保输入框获得焦点(点击一下) subprocess.run(f"adb -s {target_device} shell input tap 390 90", shell=True) time.sleep(0.5) # 使用 adb shell input text 输入(会自动处理空格和特殊字符) # 注意:需要用 %s 转义空格 safe_text = text.replace(' ', '%s').replace('&', '\\&') subprocess.run(f"adb -s {target_device} shell input text '{safe_text}'", shell=True) if log_callback: log_callback(f"✅ 已输入: {text[:50]}{'...' if len(text) > 50 else ''}") return True def tap(self, index, x, y, log_callback=None): """点击坐标""" adb_port = self.get_adb_port(index) target_device = f"127.0.0.1:{adb_port}" subprocess.run(f"adb connect {target_device}", shell=True, capture_output=True) cmd = f"adb -s {target_device} shell input tap {x} {y}" result = subprocess.run(cmd, shell=True, capture_output=True, text=True) if result.returncode == 0: if log_callback: log_callback(f"✅ 点击坐标 ({x}, {y})") return True return False def swipe(self, index, x1, y1, x2, y2, duration_ms=300, log_callback=None): """从坐标 (x1, y1) 滑动到 (x2, y2)""" adb_port = self.get_adb_port(index) target_device = f"127.0.0.1:{adb_port}" subprocess.run(f"adb connect {target_device}", shell=True, capture_output=True) cmd = f"adb -s {target_device} shell input swipe {x1} {y1} {x2} {y2} {duration_ms}" result = subprocess.run(cmd, shell=True, capture_output=True, text=True) if result.returncode == 0: if log_callback: log_callback(f"✅ 滑动从 ({x1}, {y1}) 到 ({x2}, {y2}),耗时 {duration_ms}ms") return True else: if log_callback: log_callback(f"❌ 滑动失败: {result.stderr}") return False def get_screen_size(self, index, log_callback=None): """获取模拟器屏幕分辨率,返回 (width, height)""" adb_port = self.get_adb_port(index) target_device = f"127.0.0.1:{adb_port}" subprocess.run(f"adb connect {target_device}", shell=True, capture_output=True) time.sleep(0.5) get_resolution_cmd = f"adb -s {target_device} shell wm size" result = subprocess.run(get_resolution_cmd, shell=True, capture_output=True, text=True) if result.stdout: import re match = re.search(r'(\d+)x(\d+)', result.stdout) if match: width = int(match.group(1)) height = int(match.group(2)) if log_callback: log_callback(f"📱 模拟器 {index} 分辨率: {width}x{height}") return width, height return None, None def check_resolution(self, index, expected_width=720, log_callback=None): """检查分辨率宽度是否符合要求,返回 True/False""" width, height = self.get_screen_size(index, log_callback) if width is None: if log_callback: log_callback(f"⚠️ 模拟器 {index} 无法获取分辨率") return False return width == expected_width def get_pixel_color(self, index, x, y, log_callback=None): """获取模拟器内指定坐标点的颜色""" adb_port = self.get_adb_port(index) target_device = f"127.0.0.1:{adb_port}" # 连接ADB并验证连接 subprocess.run(f"adb connect {target_device}", shell=True, capture_output=True) time.sleep(0.5) # 验证设备是否在线 verify_cmd = f"adb -s {target_device} shell echo 1" verify_result = subprocess.run(verify_cmd, shell=True, capture_output=True, text=True) if verify_result.returncode != 0: if log_callback: log_callback(f"⚠️ 模拟器 {index} ADB 连接失败,重新连接...") subprocess.run(f"adb disconnect {target_device}", shell=True, capture_output=True) time.sleep(1) subprocess.run(f"adb connect {target_device}", shell=True, capture_output=True) time.sleep(1) try: # 获取屏幕分辨率 get_resolution_cmd = f"adb -s {target_device} shell wm size" resolution_result = subprocess.run(get_resolution_cmd, shell=True, capture_output=True, text=True) if resolution_result.stdout: import re match = re.search(r'(\d+)x(\d+)', resolution_result.stdout) if match: screen_width = int(match.group(1)) screen_height = int(match.group(2)) else: screen_width = 720 screen_height = 1280 else: screen_width = 720 screen_height = 1280 if log_callback: log_callback(f"📱 屏幕分辨率: {screen_width}x{screen_height}") # 使用唯一的临时文件名(包含线程ID和时间戳) import threading thread_id = threading.current_thread().ident temp_local_file = f"temp_screenshot_{thread_id}_{int(time.time()*1000)}.png" # 截图并保存到本地 screenshot_cmd = f"adb -s {target_device} exec-out screencap -p > {temp_local_file}" subprocess.run(screenshot_cmd, shell=True, capture_output=True, text=True) time.sleep(0.3) # 检查文件是否存在且不为空 if os.path.exists(temp_local_file) and os.path.getsize(temp_local_file) > 0: try: from PIL import Image # 打开图片 img = Image.open(temp_local_file) # 确保坐标在范围内 width, height = img.size if x < 0 or x >= width or y < 0 or y >= height: if log_callback: log_callback(f"❌ 坐标({x},{y})超出屏幕范围 {width}x{height}") os.remove(temp_local_file) return None # 获取像素颜色 pixel = img.getpixel((x, y)) # 转换为十六进制颜色值 if isinstance(pixel, tuple): if len(pixel) >= 3: r, g, b = pixel[0], pixel[1], pixel[2] else: r, g, b = pixel, pixel, pixel else: r = g = b = pixel color = f"#{r:02X}{g:02X}{b:02X}" # 关闭图片并清理临时文件 img.close() if os.path.exists(temp_local_file): os.remove(temp_local_file) if log_callback: log_callback(f"🎨 坐标({x},{y}) 颜色: {color}") return color except ImportError: if log_callback: log_callback("❌ 请先安装PIL库: pip install Pillow") if os.path.exists(temp_local_file): os.remove(temp_local_file) return None except Exception as e: if log_callback: log_callback(f"❌ 解析图片失败: {e}") if os.path.exists(temp_local_file): try: os.remove(temp_local_file) except: pass return None else: if log_callback: log_callback("❌ 截图失败") if os.path.exists(temp_local_file): try: os.remove(temp_local_file) except: pass return None except Exception as e: if log_callback: log_callback(f"❌ 获取颜色失败: {e}") return None class MuMuAutoGUI: def __init__(self): self.root = tk.Tk() self.root.title("MuMu模拟器自动化工具") self.root.geometry("700x360") # 配置文件 self.config_file = "mumu_config.ini" self.config = configparser.ConfigParser() self.load_config() # 运行状态 self.is_running = False self.is_paused = False self.should_stop = False self.current_thread = None self.selected_emulators = [] self.selected_emulators = [] self.thread_semaphore = None # 添加信号量控制并发数 self.active_threads = 0 # 记录当前活跃线程数 self.threads_lock = threading.Lock() # 线程锁 self.load_btn = None self.start_read_btn = None self.start_comment_btn = None self.results = {} # 创建界面 self.create_widgets() # 加载保存的配置 self.load_settings() def load_config(self): """加载配置文件""" if os.path.exists(self.config_file): self.config.read(self.config_file, encoding='utf-8') else: self.config['Settings'] = { 'mumu_path': r'D:\MuMuPlayer\nx_main\MuMuManager.exe', 'apk_path': 'fanqie.apk', 'package_name': 'com.dragon.read', 'search_content': '玄幻战神:开局就得到大佬的守护' } def save_config(self): """保存配置文件""" with open(self.config_file, 'w', encoding='utf-8') as f: self.config.write(f) def load_settings(self): """加载设置到界面""" self.mumu_path_var.set(self.config['Settings']['mumu_path'] if 'mumu_path' in self.config['Settings'] else '') self.apk_path_var.set(self.config['Settings']['apk_path'] if 'apk_path' in self.config['Settings'] else '') self.package_name_var.set(self.config['Settings']['package_name'] if 'package_name' in self.config['Settings'] else '') self.search_content_var.set(self.config['Settings']['search_content'] if 'search_content' in self.config['Settings'] else '盗墓笔记') self.page_count_var.set(self.config['Settings']['page_count_var'] if 'page_count_var' in self.config['Settings'] else '10-30') self.page_interval_var.set(self.config['Settings']['page_interval_var'] if 'page_interval_var' in self.config['Settings'] else '5') self.max_threads_var.set(self.config['Settings']['max_threads_var'] if 'max_threads_var' in self.config['Settings'] else '1') def save_settings(self): """保存界面设置到文件""" self.config['Settings']['mumu_path'] = self.mumu_path_var.get() self.config['Settings']['apk_path'] = self.apk_path_var.get() self.config['Settings']['package_name'] = self.package_name_var.get() self.config['Settings']['search_content'] = self.search_content_var.get() self.config['Settings']['page_count_var'] = self.page_count_var.get() self.config['Settings']['page_interval_var'] = self.page_interval_var.get() self.config['Settings']['max_threads_var'] = self.max_threads_var.get() self.save_config() self.log_message("✅ 配置已保存") def create_widgets(self): """创建界面组件""" # 创建选项卡 self.notebook = ttk.Notebook(self.root) self.notebook.pack(fill='both', expand=True, padx=5, pady=5) # 配置选项卡 self.create_config_tab() # 任务选项卡 self.create_task_tab() # 日志选项卡 self.create_log_tab() def create_config_tab(self): """创建配置选项卡""" config_frame = ttk.Frame(self.notebook) self.notebook.add(config_frame, text="配置") # 创建滚动框架 canvas = tk.Canvas(config_frame) scrollbar = ttk.Scrollbar(config_frame, orient="vertical", command=canvas.yview) scrollable_frame = ttk.Frame(canvas) scrollable_frame.bind( "", lambda e: canvas.configure(scrollregion=canvas.bbox("all")) ) canvas.create_window((0, 0), window=scrollable_frame, anchor="nw") canvas.configure(yscrollcommand=scrollbar.set) # 配置项 row = 0 # MuMu路径 ttk.Label(scrollable_frame, text="MuMuManager路径:").grid(row=row, column=0, sticky='w', padx=10, pady=5) self.mumu_path_var = tk.StringVar() mumu_entry = ttk.Entry(scrollable_frame, textvariable=self.mumu_path_var, width=60) mumu_entry.grid(row=row, column=1, padx=10, pady=5) ttk.Button(scrollable_frame, text="浏览", command=self.browse_mumu_path).grid(row=row, column=2, padx=5, pady=5) row += 1 # APK路径 ttk.Label(scrollable_frame, text="APK文件路径:").grid(row=row, column=0, sticky='w', padx=10, pady=5) self.apk_path_var = tk.StringVar() apk_entry = ttk.Entry(scrollable_frame, textvariable=self.apk_path_var, width=60) apk_entry.grid(row=row, column=1, padx=10, pady=5) ttk.Button(scrollable_frame, text="浏览", command=self.browse_apk_path).grid(row=row, column=2, padx=5, pady=5) row += 1 # 包名 ttk.Label(scrollable_frame, text="应用包名:").grid(row=row, column=0, sticky='w', padx=10, pady=5) self.package_name_var = tk.StringVar() ttk.Entry(scrollable_frame, textvariable=self.package_name_var, width=40).grid(row=row, column=1, sticky='w', padx=10, pady=5) row += 1 # 搜索内容 ttk.Label(scrollable_frame, text="搜索内容:").grid(row=row, column=0, sticky='w', padx=10, pady=5) self.search_content_var = tk.StringVar() ttk.Entry(scrollable_frame, textvariable=self.search_content_var, width=60).grid(row=row, column=1, padx=10, pady=5) row += 1 # 翻页间隔 ttk.Label(scrollable_frame, text="翻页间隔(秒):").grid(row=row, column=0, sticky='w', padx=10, pady=5) self.page_interval_var = tk.StringVar() ttk.Entry(scrollable_frame, textvariable=self.page_interval_var, width=60).grid(row=row, column=1, padx=10, pady=5) row += 1 # 阅读页数 ttk.Label(scrollable_frame, text="阅读页数:").grid(row=row, column=0, sticky='w', padx=10, pady=5) self.page_count_var = tk.StringVar() ttk.Entry(scrollable_frame, textvariable=self.page_count_var, width=60).grid(row=row, column=1, padx=10, pady=5) row += 1 # 最大线程 ttk.Label(scrollable_frame, text="最大线程:").grid(row=row, column=0, sticky='w', padx=10, pady=5) self.max_threads_var = tk.StringVar() ttk.Entry(scrollable_frame, textvariable=self.max_threads_var, width=60).grid(row=row, column=1, padx=10, pady=5) row += 1 # 保存按钮 ttk.Button(scrollable_frame, text="保存配置", command=self.save_settings).grid(row=row, column=0, columnspan=3, pady=20) canvas.pack(side="left", fill="both", expand=True) scrollbar.pack(side="right", fill="y") def select_failed_emulators(self): """选择所有执行失败的模拟器""" if not self.results: self.log_message("⚠️ 没有任务执行记录,请先运行任务") return failed_indices = [idx for idx, success in self.results.items() if not success] if not failed_indices: self.log_message("✅ 没有失败的模拟器") return # 清空当前选择 for item in self.emulator_tree.get_children(): values = self.emulator_tree.item(item, 'values') if values[0] == "☑": self.emulator_tree.item(item, values=("□", values[1], values[2], values[3])) # 选中失败的模拟器 selected_count = 0 for item in self.emulator_tree.get_children(): values = self.emulator_tree.item(item, 'values') if values[1] in failed_indices: self.emulator_tree.item(item, values=("☑", values[1], values[2], values[3])) selected_count += 1 self.log_message(f"✅ 已选中 {selected_count} 个失败模拟器: {', '.join(failed_indices)}") def create_task_tab(self): """创建任务选项卡""" task_frame = ttk.Frame(self.notebook) self.notebook.add(task_frame, text="任务") # 上部:模拟器列表 list_frame = ttk.LabelFrame(task_frame, text="任务控制") list_frame.pack(fill='both', expand=True, padx=5, pady=5) # 按钮栏 button_frame = ttk.Frame(list_frame) button_frame.pack(fill='x', padx=5, pady=5) self.load_btn = ttk.Button(button_frame, text="读取模拟器", command=self.load_emulators) self.load_btn.pack(side='left', padx=5) # 添加全选按钮 self.select_all_btn = ttk.Button(button_frame, text="全选", command=self.select_all_emulators, width=6) self.select_all_btn.pack(side='left', padx=5) # 添加选择失败项按钮 self.select_failed_btn = ttk.Button(button_frame, text="选择失败项", command=self.select_failed_emulators, width=10) self.select_failed_btn.pack(side='left', padx=5) self.start_read_btn = ttk.Button(button_frame, text="开始阅读", command=self.start_task, width=10) self.start_read_btn.pack(side='left', padx=10) self.start_comment_btn = ttk.Button(button_frame, text="开始评价", command=self.start_task2, width=10) self.start_comment_btn.pack(side='left', padx=10) self.pause_btn = ttk.Button(button_frame, text="暂停", command=self.pause_task, width=10, state='disabled') self.pause_btn.pack(side='left', padx=10) self.stop_btn = ttk.Button(button_frame, text="停止", command=self.stop_task, width=10, state='disabled') self.stop_btn.pack(side='left', padx=10) # 模拟器列表(带复选框) tree_frame = ttk.Frame(list_frame) tree_frame.pack(fill='both', expand=True, padx=5, pady=5) # 创建Treeview columns = ("选择", "索引", "名称", "状态") self.emulator_tree = ttk.Treeview(tree_frame, columns=columns, show='headings', height=8) # 设置列标题 self.emulator_tree.heading("选择", text="选择") self.emulator_tree.heading("索引", text="索引") self.emulator_tree.heading("名称", text="名称") self.emulator_tree.heading("状态", text="状态") # 设置列宽 self.emulator_tree.column("选择", width=50) self.emulator_tree.column("索引", width=50) self.emulator_tree.column("名称", width=150) self.emulator_tree.column("状态", width=100) # 添加滚动条 vsb = ttk.Scrollbar(tree_frame, orient="vertical", command=self.emulator_tree.yview) self.emulator_tree.configure(yscrollcommand=vsb.set) self.emulator_tree.pack(side='left', fill='both', expand=True) vsb.pack(side='right', fill='y') # 绑定双击选择 self.emulator_tree.bind('', self.on_tree_click) def create_log_tab(self): """创建日志选项卡""" log_frame = ttk.Frame(self.notebook) self.notebook.add(log_frame, text="日志") # 日志文本框 self.log_text = scrolledtext.ScrolledText(log_frame, wrap=tk.WORD, height=20) self.log_text.pack(fill='both', expand=True, padx=5, pady=5) # 清空按钮 btn_frame = ttk.Frame(log_frame) btn_frame.pack(fill='x', padx=5, pady=5) ttk.Button(btn_frame, text="清空日志", command=self.clear_log).pack(side='right') def browse_mumu_path(self): """浏览MuMuManager路径""" path = filedialog.askopenfilename(title="选择MuMuManager.exe", filetypes=[("Executable", "*.exe")]) if path: self.mumu_path_var.set(path) def browse_apk_path(self): """浏览APK文件""" path = filedialog.askopenfilename(title="选择APK文件", filetypes=[("APK", "*.apk")]) if path: self.apk_path_var.set(path) def select_all_emulators(self): """全选所有模拟器""" for item in self.emulator_tree.get_children(): values = self.emulator_tree.item(item, 'values') if values[0] == "□": self.emulator_tree.item(item, values=("☑", values[1], values[2], values[3])) self.log_message("已全选所有模拟器") def update_emulator_status(self, index, status): """更新指定索引的模拟器状态显示 Args: index: 模拟器索引(字符串) status: 状态文本,如 "运行中" 或 "未运行" """ for item in self.emulator_tree.get_children(): values = self.emulator_tree.item(item, 'values') if values[1] == str(index): self.emulator_tree.item(item, values=(values[0], values[1], values[2], status)) break def load_emulators(self): """加载模拟器列表""" try: manager = MuMuEmulatorManager(self.mumu_path_var.get()) emulators = manager.get_emulator_list() # 清空现有列表 for item in self.emulator_tree.get_children(): self.emulator_tree.delete(item) # 添加模拟器 for emu in emulators: status = "运行中" if emu.get('is_process_started') else "未运行" self.emulator_tree.insert('', 'end', values=("□", emu.get('index'), emu.get('name'), status)) self.log_message(f"已加载 {len(emulators)} 个模拟器") except Exception as e: self.log_message(f"加载模拟器失败: {e}") def on_tree_click(self, event): """处理列表点击选择""" region = self.emulator_tree.identify_region(event.x, event.y) if region == "cell": column = self.emulator_tree.identify_column(event.x) if column == "#1": # 选择列 item = self.emulator_tree.identify_row(event.y) if item: values = self.emulator_tree.item(item, 'values') current = values[0] new_value = "☑" if current == "□" else "□" self.emulator_tree.item(item, values=(new_value, values[1], values[2], values[3])) def get_selected_emulators(self): """获取选中的模拟器""" selected = [] for item in self.emulator_tree.get_children(): values = self.emulator_tree.item(item, 'values') if values[0] == "☑": selected.append({ 'index': values[1], 'name': values[2] }) return selected def log_message(self, message): """添加日志""" timestamp = datetime.now().strftime("%H:%M:%S") self.log_text.insert(tk.END, f"[{timestamp}] {message}\n") self.log_text.see(tk.END) self.root.update() def start_task(self): """开始阅读任务""" selected = self.get_selected_emulators() if not selected: messagebox.showwarning("警告", "请至少选择一个模拟器") return self.selected_emulators = selected self.is_running = True self.is_paused = False self.should_stop = False # 禁用相关按钮 self.load_btn.config(state='disabled') self.start_read_btn.config(state='disabled') self.start_comment_btn.config(state='disabled') self.pause_btn.config(state='normal') self.stop_btn.config(state='normal') # 在新线程中运行任务 self.current_thread = threading.Thread(target=self.run_task, daemon=True) self.current_thread.start() def start_task2(self): """开始评价任务""" selected = self.get_selected_emulators() if not selected: messagebox.showwarning("警告", "请至少选择一个模拟器") return self.selected_emulators = selected self.is_running = True self.is_paused = False self.should_stop = False # 禁用相关按钮 self.load_btn.config(state='disabled') self.start_read_btn.config(state='disabled') self.start_comment_btn.config(state='disabled') self.pause_btn.config(state='normal') self.stop_btn.config(state='normal') # 在新线程中运行评价任务 self.current_thread = threading.Thread(target=self.run_task2, daemon=True) self.current_thread.start() def pause_task(self): """暂停任务""" if self.is_running and not self.is_paused: self.is_paused = True self.pause_btn.config(text="继续") self.log_message("⏸ 任务已暂停") elif self.is_running and self.is_paused: self.is_paused = False self.pause_btn.config(text="暂停") self.log_message("▶️ 任务已继续") def stop_task(self): """停止任务""" if self.is_running: self.should_stop = True self.is_running = False self.is_paused = False self.log_message("⏹ 正在停止任务...") def openBook(self, manager, index): """打开书籍,遇到错误返回False""" # 执行操作 manager.tap(index, 390, 90, self.log_message) errNumber = 0 while True: color56065 = manager.get_pixel_color(index, 560, 65, log_callback=self.log_message) if color56065 == "#F7F7F7": break elif color56065 == "#EBF8EC": manager.tap(index, 390, 90, self.log_message) self.log_message(f"模拟器 {index} 尝试重新点击搜索...") elif color56065 == "#5E635E": self.log_message(f"模拟器 {index} 需要关闭红包弹窗...") # manager.tap(index, 640, 260, self.log_message) manager.tap(index, 360, 1035, self.log_message) time.sleep(20) manager.tap(index, 57, 193, self.log_message) elif color56065 in ["#EBE8E4", "#CDD0D1"]: self.log_message(f"模拟器 {index} 进入错误页面,返回...") manager.tap(index, 44, 92, self.log_message) else: errNumber = errNumber + 1 if errNumber > 30: self.log_message(f"模拟器 {index} 连续多次未检测到搜索页面,设置错误并退出...") manager.stop_emulator(index) return False self.log_message(f"模拟器 {index} 等待搜索页面准备就绪...") time.sleep(3) time.sleep(3) # 点输入框 manager.tap(index, 340, 90, self.log_message) time.sleep(3) # 黏贴 manager.paste_text(index, self.search_content_var.get(), self.log_message) time.sleep(3) errNumber = 0 while True: if manager.get_pixel_color(index, 435, 880, log_callback=self.log_message) == "#FFFFFF": break errNumber = errNumber + 1 if errNumber > 30: self.log_message(f"模拟器 {index} 连续多次未检测到搜索页面,设置错误并退出...") manager.stop_emulator(index) return False self.log_message(f"模拟器 {index} 没有输入搜索内容...") # 点输入框 manager.tap(index, 556, 87, self.log_message) time.sleep(1) manager.tap(index, 340, 90, self.log_message) time.sleep(2) # 黏贴 manager.paste_text(index, self.search_content_var.get(), self.log_message) time.sleep(3) manager.tap(index, 655, 92, self.log_message) time.sleep(6) errNumber = 0 while True: color630235 = manager.get_pixel_color(index, 630, 235, log_callback=self.log_message) if color630235 in ["#FFFFFF"]: self.log_message(f"模拟器 {index} 已经在搜索结果页面,继续...") break elif color630235 in ["#E8E3CE", "#E0DBC6", "#CCCBCB", "#DFDAC5", "#141000"]: self.log_message(f"模拟器 {index} 已经在看书目录界面,继续...") break elif color630235 in ["#F9F9FC"]: self.log_message(f"模拟器 {index} 关闭广告弹窗,继续...") manager.tap(index, 634, 123, self.log_message) else: errNumber = errNumber + 1 if errNumber > 30: self.log_message(f"模拟器 {index} 连续多次未检测到搜索页面,设置错误并退出...") manager.stop_emulator(index) return False self.log_message(f"模拟器 {index} 等待搜索结果...") time.sleep(3) # 进入书目 time.sleep(6) manager.tap(index, 355, 333, self.log_message) time.sleep(5) errNumber = 0 while True: color630235 = manager.get_pixel_color(index, 630, 235, log_callback=self.log_message) if color630235 in ["#E8E3CE", "#E0DBC6", "#CCCBCB", "#DFDAC5", "#141000", "#E3DEC9"]: self.log_message(f"模拟器 {index} 已经在看书目录界面,继续...") return True else: errNumber = errNumber + 1 if errNumber > 20: self.log_message(f"模拟器 {index} 连续多次未检测到搜索页面,设置错误并退出...") manager.stop_emulator(index) return False self.log_message(f"模拟器 {index} 还在搜索结果页面,重新点击...") manager.tap(index, 355, 333, self.log_message) time.sleep(5) def run_task(self): """执行任务(支持并发,出错直接退出不重试)""" global threads try: # 获取最大线程数 max_threads = int(self.max_threads_var.get()) if self.max_threads_var.get().isdigit() else 1 self.thread_semaphore = threading.Semaphore(max_threads) self.active_threads = 0 self.log_message(f"📌 最大并发数: {max_threads}") threads = [] def process_emulator(emu): """处理单个模拟器的函数,出错直接退出""" # 获取信号量,控制并发数 self.thread_semaphore.acquire() # 增加活跃线程计数 with self.threads_lock: self.active_threads += 1 current_active = self.active_threads self.log_message(f"📊 当前活跃线程数: {current_active}/{max_threads}") index = emu['index'] try: if self.should_stop: return self.log_message(f"\n{'='*50}") self.log_message(f"开始处理模拟器 {index} ({emu['name']})") self.log_message(f"{'='*50}") # 更新状态为运行中 self.root.after(0, lambda: self.update_emulator_status(index, "运行中")) manager = MuMuEmulatorManager(self.mumu_path_var.get()) # 启动模拟器 self.log_message(f"正在启动模拟器 {index}...") if not manager.start_emulator(index): self.log_message(f"❌ 模拟器 {index} 启动失败") self.results[index] = False self.root.after(0, lambda: self.update_emulator_status(index, "启动失败")) return # 等待就绪 if not manager.wait_for_emulator_ready(index, timeout=180, log_callback=self.log_message): self.log_message(f"❌ 模拟器 {index} 启动超时") self.results[index] = False self.root.after(0, lambda: self.update_emulator_status(index, "启动超时")) return time.sleep(5) if not manager.check_resolution(index, 720, self.log_message): self.log_message(f"❌ 模拟器 {index} 分辨率不是720,停止任务并关闭模拟器") manager.stop_emulator(index) self.results[index] = False self.root.after(0, lambda: self.update_emulator_status(index, "分辨率错误")) return # 安装APK if not manager.install_apk(index, self.apk_path_var.get(), self.log_message): self.log_message(f"⚠️ APK安装失败,但继续执行...") # 打开应用 manager.open_app(index, self.package_name_var.get(), self.log_message) # 判断是否需要同意 time.sleep(20) button_color = manager.get_pixel_color(index, 394, 846, log_callback=self.log_message) # 是否是第一次进入 if button_color and button_color.upper() == "#FC7838": manager.tap(index, 394, 846, self.log_message) time.sleep(120) manager.tap(index, 640, 260, self.log_message) time.sleep(20) manager.tap(index, 57, 193, self.log_message) time.sleep(5) else: time.sleep(10) # 等待进入主界面 errNumber = 0 while True: if self.should_stop or self.is_paused: # 处理暂停 while self.is_paused and not self.should_stop: time.sleep(1) if self.should_stop: return color560130 = manager.get_pixel_color(index, 560, 130, log_callback=self.log_message) if color560130 in ["#EEF8EE", "#F7F7F7"]: self.log_message(f"模拟器 {index} 已经进入主界面,继续...") break elif color560130 in ["#5F635F"]: self.log_message(f"模拟器 {index} 需要关闭红包弹窗...") # manager.tap(index, 640, 260, self.log_message) manager.tap(index, 360, 1035, self.log_message) time.sleep(20) manager.tap(index, 57, 193, self.log_message) elif color560130 in ["#CED0D2"]: self.log_message(f"模拟器 {index} 需要关闭广告弹窗...") manager.tap(index, 200, 80, self.log_message) elif color560130 in ["#F3CEA9"]: self.log_message(f"模拟器 {index} 进入错误页面,返回...") manager.tap(index, 44, 92, self.log_message) time.sleep(20) manager.tap(index, 57, 193, self.log_message) else: errNumber = errNumber + 1 if errNumber > 30: self.log_message(f"模拟器 {index} 连续多次未进入主界面,退出...") manager.stop_emulator(index) self.results[index] = False self.root.after(0, lambda: self.update_emulator_status(index, "进入主界面失败")) return self.log_message(f"模拟器 {index} 等待进入主界面中...") time.sleep(5) # 执行打开书籍 if not self.openBook(manager, index): self.results[index] = False self.root.after(0, lambda: self.update_emulator_status(index, "打开书籍失败")) return time.sleep(8) # 翻页 page_range = self.page_count_var.get() if '-' in page_range: parts = page_range.split('-') if len(parts) == 2 and parts[0].isdigit() and parts[1].isdigit(): page_count = random.randint(int(parts[0]), int(parts[1])) else: page_count = 10 else: page_count = int(page_range) if page_range.isdigit() else 10 for page in range(page_count): if self.should_stop or self.is_paused: # 处理暂停 while self.is_paused and not self.should_stop: time.sleep(1) if self.should_stop: return # 判断是否在看书目录界面 errNumber = 0 while True: color7001000 = manager.get_pixel_color(index, 700, 1000, log_callback=self.log_message) if color7001000 in ["#E8E3CE", "#E0DBC6", "#CCCBCB"]: self.log_message(f"模拟器 {index} 在看书目录界面,继续翻页...") break elif color7001000 in ["#F9F9FC"]: self.log_message(f"模拟器 {index} 遇到广告,点击跳过...") manager.tap(index, 700, 300, self.log_message) else: errNumber = errNumber + 1 if errNumber > 5: self.log_message(f"尝试滑动是否能跳过!") manager.swipe(index, 700, 700, 200, 700, 500, self.log_message) if errNumber > 10: self.log_message(f"模拟器 {index} 连续多次未检测到目录界面,退出...") manager.stop_emulator(index) self.results[index] = False self.root.after(0, lambda: self.update_emulator_status(index, "翻页失败")) return # 判断是否有广告 if manager.get_pixel_color(index, 700, 1200, log_callback=self.log_message) in ["#E8E3CE"]: self.log_message(f"模拟器 {index} 点击跳过广告!") manager.tap(index, 700, 1200, self.log_message) time.sleep(2) # 随机翻页间隔 interval_str = self.page_interval_var.get() if '-' in interval_str: parts = interval_str.split('-') if len(parts) == 2 and parts[0].isdigit() and parts[1].isdigit(): intervalNum = random.randint(int(parts[0]), int(parts[1])) else: intervalNum = 30 else: intervalNum = int(interval_str) if interval_str.isdigit() else 30 time.sleep(intervalNum) manager.swipe(index, 700, 700, 200, 700, 500, self.log_message) time.sleep(2) # 加入书签 self.log_message(f"正在加入书架 {index}...") manager.tap(index, 360, 600, self.log_message) time.sleep(2) manager.tap(index, 255, 84, self.log_message) time.sleep(3) # 清除应用数据 self.log_message(f"正在清除模拟器 {index} 应用数据...") manager.clear_app_data(index, manager, self.log_message) time.sleep(2) # 关闭模拟器 self.log_message(f"正在关闭模拟器 {index}...") manager.stop_emulator(index) self.log_message(f"✅ 模拟器 {index} 任务完成") self.results[index] = True self.root.after(0, lambda: self.update_emulator_status(index, "任务完成")) except Exception as e: self.log_message(f"❌ 模拟器 {index} 执行出错: {e}") self.results[index] = False self.root.after(0, lambda: self.update_emulator_status(index, "执行出错")) finally: # 减少活跃线程计数 with self.threads_lock: self.active_threads -= 1 current_active = self.active_threads self.log_message(f"📊 当前活跃线程数: {current_active}/{max_threads}") # 释放信号量,让下一个等待的线程开始 self.thread_semaphore.release() # 启动所有模拟器任务(信号量会自动控制并发数) for emu in self.selected_emulators: if self.should_stop: break thread = threading.Thread(target=process_emulator, args=(emu,)) thread.start() threads.append(thread) # 稍微延迟一下,避免同时启动太多 time.sleep(2) # 等待所有线程完成 for thread in threads: thread.join() success_count = sum(1 for v in self.results.values() if v) self.log_message(f"\n🎉 任务执行完毕!成功: {success_count}/{len(self.selected_emulators)}") self.select_failed_emulators() self.start_task() except Exception as e: self.log_message(f"❌ 任务执行出错: {e}") finally: self.is_running = False self.is_paused = False self.pause_btn.config(text="暂停") # 恢复按钮状态 self.root.after(0, self.reset_buttons) def run_task2(self): """执行评价任务(支持并发,出错直接退出不重试)""" try: # 获取最大线程数 max_threads = int(self.max_threads_var.get()) if self.max_threads_var.get().isdigit() else 1 self.thread_semaphore = threading.Semaphore(max_threads) self.active_threads = 0 self.log_message(f"📌 最大并发数: {max_threads}") threads = [] def process_emulator(emu): """处理单个模拟器的函数,出错直接退出""" # 获取信号量,控制并发数 self.thread_semaphore.acquire() # 增加活跃线程计数 with self.threads_lock: self.active_threads += 1 current_active = self.active_threads self.log_message(f"📊 当前活跃线程数: {current_active}/{max_threads}") index = emu['index'] try: if self.should_stop: return self.log_message(f"\n{'='*50}") self.log_message(f"开始处理模拟器 {index} ({emu['name']})") self.log_message(f"{'='*50}") # 更新状态为运行中 self.root.after(0, lambda: self.update_emulator_status(index, "运行中")) manager = MuMuEmulatorManager(self.mumu_path_var.get()) # 启动模拟器 self.log_message(f"正在启动模拟器 {index}...") if not manager.start_emulator(index): self.log_message(f"❌ 模拟器 {index} 启动失败") self.results[index] = False self.root.after(0, lambda: self.update_emulator_status(index, "启动失败")) return # 等待就绪 if not manager.wait_for_emulator_ready(index, timeout=180, log_callback=self.log_message): self.log_message(f"❌ 模拟器 {index} 启动超时") self.results[index] = False self.root.after(0, lambda: self.update_emulator_status(index, "启动超时")) return time.sleep(5) if not manager.check_resolution(index, 720, self.log_message): self.log_message(f"❌ 模拟器 {index} 分辨率不是720,停止任务并关闭模拟器") manager.stop_emulator(index) self.results[index] = False self.root.after(0, lambda: self.update_emulator_status(index, "分辨率错误")) return # 安装APK if not manager.install_apk(index, self.apk_path_var.get(), self.log_message): self.log_message(f"⚠️ APK安装失败,但继续执行...") # 打开应用 manager.open_app(index, self.package_name_var.get(), self.log_message) # 等待进入主界面 errNumber = 0 while True: if self.should_stop or self.is_paused: while self.is_paused and not self.should_stop: time.sleep(1) if self.should_stop: return color560130 = manager.get_pixel_color(index, 560, 130, log_callback=self.log_message) if color560130 in ["#EEF8EE", "#F7F7F7"]: self.log_message(f"模拟器 {index} 已经进入主界面,继续...") break elif color560130 in ["#5F635F"]: self.log_message(f"模拟器 {index} 需要关闭红包弹窗...") # manager.tap(index, 640, 260, self.log_message) manager.tap(index, 360, 1035, self.log_message) time.sleep(20) manager.tap(index, 57, 193, self.log_message) elif color560130 in ["#CED0D2"]: self.log_message(f"模拟器 {index} 需要关闭广告弹窗...") manager.tap(index, 200, 80, self.log_message) elif color560130 in ["#F3CEA9"]: self.log_message(f"模拟器 {index} 进入错误页面,返回...") manager.tap(index, 44, 92, self.log_message) time.sleep(20) manager.tap(index, 57, 193, self.log_message) else: errNumber = errNumber + 1 if errNumber > 30: self.log_message(f"模拟器 {index} 连续多次未进入主界面,退出...") manager.stop_emulator(index) self.results[index] = False self.root.after(0, lambda: self.update_emulator_status(index, "进入主界面失败")) return self.log_message(f"模拟器 {index} 等待进入主界面中...") time.sleep(5) # 执行打开书籍 if not self.openBook(manager, index): self.results[index] = False self.root.after(0, lambda: self.update_emulator_status(index, "打开书籍失败")) return # 评价 time.sleep(8) manager.tap(index, 680, 95, self.log_message) time.sleep(2) manager.tap(index, 633, 930, self.log_message) # 发表 time.sleep(5) manager.tap(index, 640, 95, self.log_message) time.sleep(4) # 清除应用数据 self.log_message(f"正在清除模拟器 {index} 应用数据...") manager.clear_app_data(index, manager, self.log_message) time.sleep(2) # 关闭模拟器 self.log_message(f"正在关闭模拟器 {index}...") manager.stop_emulator(index) self.log_message(f"✅ 模拟器 {index} 任务完成") self.results[index] = True self.root.after(0, lambda: self.update_emulator_status(index, "任务完成")) except Exception as e: self.log_message(f"❌ 模拟器 {index} 执行出错: {e}") self.results[index] = False self.root.after(0, lambda: self.update_emulator_status(index, "执行出错")) finally: # 减少活跃线程计数 with self.threads_lock: self.active_threads -= 1 current_active = self.active_threads self.log_message(f"📊 当前活跃线程数: {current_active}/{max_threads}") # 释放信号量 self.thread_semaphore.release() # 启动所有模拟器任务 for emu in self.selected_emulators: if self.should_stop: break thread = threading.Thread(target=process_emulator, args=(emu,)) thread.start() threads.append(thread) time.sleep(2) # 等待所有线程完成 for thread in threads: thread.join() success_count = sum(1 for v in self.results.values() if v) self.log_message(f"\n🎉 任务执行完毕!成功: {success_count}/{len(self.selected_emulators)}") except Exception as e: self.log_message(f"❌ 任务执行出错: {e}") finally: self.is_running = False self.is_paused = False self.pause_btn.config(text="暂停") # 恢复按钮状态 self.root.after(0, self.reset_buttons) def reset_buttons(self): """重置按钮状态""" self.load_btn.config(state='normal') self.start_read_btn.config(state='normal') self.start_comment_btn.config(state='normal') self.pause_btn.config(state='disabled', text="暂停") self.stop_btn.config(state='disabled') def clear_log(self): """清空日志""" self.log_text.delete(1.0, tk.END) def run(self): """运行GUI""" self.root.mainloop() if __name__ == "__main__": app = MuMuAutoGUI() app.run()