import random import subprocess import json import time import os import tkinter as tk from tkinter import ttk, scrolledtext, messagebox, filedialog import threading import configparser from datetime import datetime class MuMuEmulatorManager: # 类级别的剪贴板锁,所有实例共享 _clipboard_lock = threading.Lock() def __init__(self, manager_path=r"D:\MuMuPlayer\nx_main\MuMuManager.exe"): self.manager_path = manager_path if not os.path.exists(manager_path): raise FileNotFoundError(f"找不到 MuMuManager.exe: {manager_path}") def get_adb_port(self, index, log_callback=None): """实时获取指定模拟器的 ADB 端口,获取不到就一直等待直到成功""" while True: cmd = [self.manager_path, "info", "-v", str(index)] result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8') if result.returncode == 0: try: data = json.loads(result.stdout) adb_port = data.get('adb_port') if adb_port is not None: if log_callback: log_callback(f"✅ 模拟器 {index} ADB端口: {adb_port}") return adb_port except: pass if log_callback: log_callback(f"⏳ 模拟器 {index} 等待ADB端口...") time.sleep(3) # 等待3秒后重试 def get_emulator_list(self): """获取所有模拟器列表""" cmd = [self.manager_path, "info", "-v", "all"] result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8') if result.returncode != 0: return [] try: data = json.loads(result.stdout) emulators = [] for key, value in data.items(): if isinstance(value, dict): value['index'] = key emulators.append(value) return emulators except json.JSONDecodeError: return [] def start_emulator(self, index): """启动指定索引的模拟器""" cmd = [self.manager_path, "control", "-v", str(index), "launch"] result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8') return result.returncode == 0 def wait_for_emulator_ready(self, index, timeout=120, check_interval=3, log_callback=None): """等待模拟器启动完成""" start_time = time.time() while time.time() - start_time < timeout: cmd = [self.manager_path, "info", "-v", str(index)] result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8') if result.returncode == 0: try: data = json.loads(result.stdout) if data.get('is_android_started') == True: return True except: pass if log_callback: log_callback(f"等待模拟器 {index} 启动... ({int(time.time() - start_time)}秒)") time.sleep(check_interval) return False def stop_emulator(self, index): """关闭模拟器""" cmd = [self.manager_path, "control", "-v", str(index), "shutdown"] result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8') return result.returncode == 0 def install_apk(self, index, apk_path, log_callback=None): """安装APK""" if not os.path.exists(apk_path): if log_callback: log_callback(f"❌ APK文件不存在: {apk_path}") return False adb_port = self.get_adb_port(index) target_device = f"127.0.0.1:{adb_port}" # 连接ADB subprocess.run(f"adb connect {target_device}", shell=True, capture_output=True) time.sleep(2) # 检查是否已安装 check_cmd = f"adb -s {target_device} shell pm list packages | findstr \"com.dragon.read\"" check_result = subprocess.run(check_cmd, shell=True, capture_output=True, text=True) if "com.dragon.read" in check_result.stdout: if log_callback: log_callback(f"✅ com.dragon.read 已安装,跳过安装步骤") return True # 安装APK if log_callback: log_callback(f"正在安装APK: {os.path.basename(apk_path)} 端口: {adb_port}...") install_cmd = f"adb -s {target_device} install -r \"{apk_path}\"" result = subprocess.run(install_cmd, shell=True, capture_output=True, text=True) return "Success" in result.stdout def open_app(self, index, package_name, log_callback=None): """打开应用""" adb_port = self.get_adb_port(index) target_device = f"127.0.0.1:{adb_port}" subprocess.run(f"adb connect {target_device}", shell=True, capture_output=True) time.sleep(1) cmd = f"adb -s {target_device} shell monkey -p {package_name} -c android.intent.category.LAUNCHER 1" result = subprocess.run(cmd, shell=True, capture_output=True, text=True) if "Events injected" in result.stdout or result.returncode == 0: if log_callback: log_callback(f"✅ 已打开应用: {package_name}") return True return False # 替代方案:直接发送文本字符,不用剪贴板 def paste_text(self, index, text, log_callback=None): adb_port = self.get_adb_port(index) target_device = f"127.0.0.1:{adb_port}" subprocess.run(f"adb connect {target_device}", shell=True, capture_output=True) time.sleep(0.3) # 直接通过 ADB 输入文本(逐字符) # 先确保输入框获得焦点(点击一下) subprocess.run(f"adb -s {target_device} shell input tap 390 90", shell=True) time.sleep(0.5) # 使用 adb shell input text 输入(会自动处理空格和特殊字符) # 注意:需要用 %s 转义空格 safe_text = text.replace(' ', '%s').replace('&', '\\&') subprocess.run(f"adb -s {target_device} shell input text '{safe_text}'", shell=True) if log_callback: log_callback(f"✅ 已输入: {text[:50]}{'...' if len(text) > 50 else ''}") return True def tap(self, index, x, y, log_callback=None): """点击坐标""" adb_port = self.get_adb_port(index) target_device = f"127.0.0.1:{adb_port}" subprocess.run(f"adb connect {target_device}", shell=True, capture_output=True) cmd = f"adb -s {target_device} shell input tap {x} {y}" result = subprocess.run(cmd, shell=True, capture_output=True, text=True) if result.returncode == 0: if log_callback: log_callback(f"✅ 点击坐标 ({x}, {y})") return True return False def get_pixel_color(self, index, x, y, log_callback=None): """获取模拟器内指定坐标点的颜色""" adb_port = self.get_adb_port(index) target_device = f"127.0.0.1:{adb_port}" # 连接ADB并验证连接 subprocess.run(f"adb connect {target_device}", shell=True, capture_output=True) time.sleep(0.5) # 验证设备是否在线 verify_cmd = f"adb -s {target_device} shell echo 1" verify_result = subprocess.run(verify_cmd, shell=True, capture_output=True, text=True) if verify_result.returncode != 0: if log_callback: log_callback(f"⚠️ 模拟器 {index} ADB 连接失败,重新连接...") subprocess.run(f"adb disconnect {target_device}", shell=True, capture_output=True) time.sleep(1) subprocess.run(f"adb connect {target_device}", shell=True, capture_output=True) time.sleep(1) try: # 获取屏幕分辨率 get_resolution_cmd = f"adb -s {target_device} shell wm size" resolution_result = subprocess.run(get_resolution_cmd, shell=True, capture_output=True, text=True) if resolution_result.stdout: import re match = re.search(r'(\d+)x(\d+)', resolution_result.stdout) if match: screen_width = int(match.group(1)) screen_height = int(match.group(2)) else: screen_width = 720 screen_height = 1280 else: screen_width = 720 screen_height = 1280 if log_callback: log_callback(f"📱 屏幕分辨率: {screen_width}x{screen_height}") # 使用唯一的临时文件名(包含线程ID和时间戳) import threading thread_id = threading.current_thread().ident temp_local_file = f"temp_screenshot_{thread_id}_{int(time.time()*1000)}.png" # 截图并保存到本地 screenshot_cmd = f"adb -s {target_device} exec-out screencap -p > {temp_local_file}" subprocess.run(screenshot_cmd, shell=True, capture_output=True, text=True) time.sleep(0.3) # 检查文件是否存在且不为空 if os.path.exists(temp_local_file) and os.path.getsize(temp_local_file) > 0: try: from PIL import Image # 打开图片 img = Image.open(temp_local_file) # 确保坐标在范围内 width, height = img.size if x < 0 or x >= width or y < 0 or y >= height: if log_callback: log_callback(f"❌ 坐标({x},{y})超出屏幕范围 {width}x{height}") os.remove(temp_local_file) return None # 获取像素颜色 pixel = img.getpixel((x, y)) # 转换为十六进制颜色值 if isinstance(pixel, tuple): if len(pixel) >= 3: r, g, b = pixel[0], pixel[1], pixel[2] else: r, g, b = pixel, pixel, pixel else: r = g = b = pixel color = f"#{r:02X}{g:02X}{b:02X}" # 关闭图片并清理临时文件 img.close() if os.path.exists(temp_local_file): os.remove(temp_local_file) if log_callback: log_callback(f"🎨 坐标({x},{y}) 颜色: {color}") return color except ImportError: if log_callback: log_callback("❌ 请先安装PIL库: pip install Pillow") if os.path.exists(temp_local_file): os.remove(temp_local_file) return None except Exception as e: if log_callback: log_callback(f"❌ 解析图片失败: {e}") if os.path.exists(temp_local_file): try: os.remove(temp_local_file) except: pass return None else: if log_callback: log_callback("❌ 截图失败") if os.path.exists(temp_local_file): try: os.remove(temp_local_file) except: pass return None except Exception as e: if log_callback: log_callback(f"❌ 获取颜色失败: {e}") return None class MuMuAutoGUI: def __init__(self): self.root = tk.Tk() self.root.title("MuMu模拟器自动化工具") self.root.geometry("700x360") # 配置文件 self.config_file = "mumu_config.ini" self.config = configparser.ConfigParser() self.load_config() # 运行状态 self.is_running = False self.is_paused = False self.should_stop = False self.current_thread = None self.selected_emulators = [] self.selected_emulators = [] self.thread_semaphore = None # 添加信号量控制并发数 self.load_btn = None self.start_read_btn = None self.start_comment_btn = None # 创建界面 self.create_widgets() # 加载保存的配置 self.load_settings() def load_config(self): """加载配置文件""" if os.path.exists(self.config_file): self.config.read(self.config_file, encoding='utf-8') else: self.config['Settings'] = { 'mumu_path': r'D:\MuMuPlayer\nx_main\MuMuManager.exe', 'apk_path': 'fanqie.apk', 'package_name': 'com.dragon.read', 'search_content': '玄幻战神:开局就得到大佬的守护' } def save_config(self): """保存配置文件""" with open(self.config_file, 'w', encoding='utf-8') as f: self.config.write(f) def load_settings(self): """加载设置到界面""" self.mumu_path_var.set(self.config['Settings']['mumu_path'] if 'mumu_path' in self.config['Settings'] else '') self.apk_path_var.set(self.config['Settings']['apk_path'] if 'apk_path' in self.config['Settings'] else '') self.package_name_var.set(self.config['Settings']['package_name'] if 'package_name' in self.config['Settings'] else '') self.search_content_var.set(self.config['Settings']['search_content'] if 'search_content' in self.config['Settings'] else '盗墓笔记') self.page_count_var.set(self.config['Settings']['page_count_var'] if 'page_count_var' in self.config['Settings'] else '10-30') self.page_interval_var.set(self.config['Settings']['page_interval_var'] if 'page_interval_var' in self.config['Settings'] else '5') self.max_threads_var.set(self.config['Settings']['max_threads_var'] if 'max_threads_var' in self.config['Settings'] else '1') def save_settings(self): """保存界面设置到文件""" self.config['Settings']['mumu_path'] = self.mumu_path_var.get() self.config['Settings']['apk_path'] = self.apk_path_var.get() self.config['Settings']['package_name'] = self.package_name_var.get() self.config['Settings']['search_content'] = self.search_content_var.get() self.config['Settings']['page_count_var'] = self.page_count_var.get() self.config['Settings']['page_interval_var'] = self.page_interval_var.get() self.config['Settings']['max_threads_var'] = self.max_threads_var.get() self.save_config() self.log_message("✅ 配置已保存") def create_widgets(self): """创建界面组件""" # 创建选项卡 self.notebook = ttk.Notebook(self.root) self.notebook.pack(fill='both', expand=True, padx=5, pady=5) # 配置选项卡 self.create_config_tab() # 任务选项卡 self.create_task_tab() # 日志选项卡 self.create_log_tab() def create_config_tab(self): """创建配置选项卡""" config_frame = ttk.Frame(self.notebook) self.notebook.add(config_frame, text="配置") # 创建滚动框架 canvas = tk.Canvas(config_frame) scrollbar = ttk.Scrollbar(config_frame, orient="vertical", command=canvas.yview) scrollable_frame = ttk.Frame(canvas) scrollable_frame.bind( "", lambda e: canvas.configure(scrollregion=canvas.bbox("all")) ) canvas.create_window((0, 0), window=scrollable_frame, anchor="nw") canvas.configure(yscrollcommand=scrollbar.set) # 配置项 row = 0 # MuMu路径 ttk.Label(scrollable_frame, text="MuMuManager路径:").grid(row=row, column=0, sticky='w', padx=10, pady=5) self.mumu_path_var = tk.StringVar() mumu_entry = ttk.Entry(scrollable_frame, textvariable=self.mumu_path_var, width=60) mumu_entry.grid(row=row, column=1, padx=10, pady=5) ttk.Button(scrollable_frame, text="浏览", command=self.browse_mumu_path).grid(row=row, column=2, padx=5, pady=5) row += 1 # APK路径 ttk.Label(scrollable_frame, text="APK文件路径:").grid(row=row, column=0, sticky='w', padx=10, pady=5) self.apk_path_var = tk.StringVar() apk_entry = ttk.Entry(scrollable_frame, textvariable=self.apk_path_var, width=60) apk_entry.grid(row=row, column=1, padx=10, pady=5) ttk.Button(scrollable_frame, text="浏览", command=self.browse_apk_path).grid(row=row, column=2, padx=5, pady=5) row += 1 # 包名 ttk.Label(scrollable_frame, text="应用包名:").grid(row=row, column=0, sticky='w', padx=10, pady=5) self.package_name_var = tk.StringVar() ttk.Entry(scrollable_frame, textvariable=self.package_name_var, width=40).grid(row=row, column=1, sticky='w', padx=10, pady=5) row += 1 # 搜索内容 ttk.Label(scrollable_frame, text="搜索内容:").grid(row=row, column=0, sticky='w', padx=10, pady=5) self.search_content_var = tk.StringVar() ttk.Entry(scrollable_frame, textvariable=self.search_content_var, width=60).grid(row=row, column=1, padx=10, pady=5) row += 1 # 翻页间隔 ttk.Label(scrollable_frame, text="翻页间隔(秒):").grid(row=row, column=0, sticky='w', padx=10, pady=5) self.page_interval_var = tk.StringVar() ttk.Entry(scrollable_frame, textvariable=self.page_interval_var, width=60).grid(row=row, column=1, padx=10, pady=5) row += 1 # 阅读页数 ttk.Label(scrollable_frame, text="阅读页数:").grid(row=row, column=0, sticky='w', padx=10, pady=5) self.page_count_var = tk.StringVar() ttk.Entry(scrollable_frame, textvariable=self.page_count_var, width=60).grid(row=row, column=1, padx=10, pady=5) row += 1 # 最大线程 ttk.Label(scrollable_frame, text="最大线程:").grid(row=row, column=0, sticky='w', padx=10, pady=5) self.max_threads_var = tk.StringVar() ttk.Entry(scrollable_frame, textvariable=self.max_threads_var, width=60).grid(row=row, column=1, padx=10, pady=5) row += 1 # 保存按钮 ttk.Button(scrollable_frame, text="保存配置", command=self.save_settings).grid(row=row, column=0, columnspan=3, pady=20) canvas.pack(side="left", fill="both", expand=True) scrollbar.pack(side="right", fill="y") def create_task_tab(self): """创建任务选项卡""" task_frame = ttk.Frame(self.notebook) self.notebook.add(task_frame, text="任务") # 上部:模拟器列表 list_frame = ttk.LabelFrame(task_frame, text="任务控制") list_frame.pack(fill='both', expand=True, padx=5, pady=5) # 按钮栏 button_frame = ttk.Frame(list_frame) button_frame.pack(fill='x', padx=5, pady=5) self.load_btn = ttk.Button(button_frame, text="读取模拟器", command=self.load_emulators) self.load_btn.pack(side='left', padx=5) # 添加全选按钮 self.select_all_btn = ttk.Button(button_frame, text="全选", command=self.select_all_emulators, width=6) self.select_all_btn.pack(side='left', padx=5) self.start_read_btn = ttk.Button(button_frame, text="开始阅读", command=self.start_task, width=10) self.start_read_btn.pack(side='left', padx=10) self.start_comment_btn = ttk.Button(button_frame, text="开始评价", command=self.start_task2, width=10) self.start_comment_btn.pack(side='left', padx=10) self.pause_btn = ttk.Button(button_frame, text="暂停", command=self.pause_task, width=10, state='disabled') self.pause_btn.pack(side='left', padx=10) self.stop_btn = ttk.Button(button_frame, text="停止", command=self.stop_task, width=10, state='disabled') self.stop_btn.pack(side='left', padx=10) # 模拟器列表(带复选框) tree_frame = ttk.Frame(list_frame) tree_frame.pack(fill='both', expand=True, padx=5, pady=5) # 创建Treeview columns = ("选择", "索引", "名称", "状态") self.emulator_tree = ttk.Treeview(tree_frame, columns=columns, show='headings', height=8) # 设置列标题 self.emulator_tree.heading("选择", text="选择") self.emulator_tree.heading("索引", text="索引") self.emulator_tree.heading("名称", text="名称") self.emulator_tree.heading("状态", text="状态") # 设置列宽 self.emulator_tree.column("选择", width=50) self.emulator_tree.column("索引", width=50) self.emulator_tree.column("名称", width=150) self.emulator_tree.column("状态", width=100) # 添加滚动条 vsb = ttk.Scrollbar(tree_frame, orient="vertical", command=self.emulator_tree.yview) self.emulator_tree.configure(yscrollcommand=vsb.set) self.emulator_tree.pack(side='left', fill='both', expand=True) vsb.pack(side='right', fill='y') # 绑定双击选择 self.emulator_tree.bind('', self.on_tree_click) def create_log_tab(self): """创建日志选项卡""" log_frame = ttk.Frame(self.notebook) self.notebook.add(log_frame, text="日志") # 日志文本框 self.log_text = scrolledtext.ScrolledText(log_frame, wrap=tk.WORD, height=20) self.log_text.pack(fill='both', expand=True, padx=5, pady=5) # 清空按钮 btn_frame = ttk.Frame(log_frame) btn_frame.pack(fill='x', padx=5, pady=5) ttk.Button(btn_frame, text="清空日志", command=self.clear_log).pack(side='right') def browse_mumu_path(self): """浏览MuMuManager路径""" path = filedialog.askopenfilename(title="选择MuMuManager.exe", filetypes=[("Executable", "*.exe")]) if path: self.mumu_path_var.set(path) def browse_apk_path(self): """浏览APK文件""" path = filedialog.askopenfilename(title="选择APK文件", filetypes=[("APK", "*.apk")]) if path: self.apk_path_var.set(path) def select_all_emulators(self): """全选所有模拟器""" for item in self.emulator_tree.get_children(): values = self.emulator_tree.item(item, 'values') if values[0] == "□": self.emulator_tree.item(item, values=("☑", values[1], values[2], values[3], values[4])) self.log_message("已全选所有模拟器") def load_emulators(self): """加载模拟器列表""" try: manager = MuMuEmulatorManager(self.mumu_path_var.get()) emulators = manager.get_emulator_list() # 清空现有列表 for item in self.emulator_tree.get_children(): self.emulator_tree.delete(item) # 添加模拟器 for emu in emulators: status = "运行中" if emu.get('is_process_started') else "未运行" self.emulator_tree.insert('', 'end', values=("□", emu.get('index'), emu.get('name'), status)) self.log_message(f"已加载 {len(emulators)} 个模拟器") except Exception as e: self.log_message(f"加载模拟器失败: {e}") def on_tree_click(self, event): """处理列表点击选择""" region = self.emulator_tree.identify_region(event.x, event.y) if region == "cell": column = self.emulator_tree.identify_column(event.x) if column == "#1": # 选择列 item = self.emulator_tree.identify_row(event.y) if item: values = self.emulator_tree.item(item, 'values') current = values[0] new_value = "☑" if current == "□" else "□" self.emulator_tree.item(item, values=(new_value, values[1], values[2], values[3])) def get_selected_emulators(self): """获取选中的模拟器""" selected = [] for item in self.emulator_tree.get_children(): values = self.emulator_tree.item(item, 'values') if values[0] == "☑": selected.append({ 'index': values[1], 'name': values[2] }) return selected def log_message(self, message): """添加日志""" timestamp = datetime.now().strftime("%H:%M:%S") self.log_text.insert(tk.END, f"[{timestamp}] {message}\n") self.log_text.see(tk.END) self.root.update() def start_task(self): """开始阅读任务""" selected = self.get_selected_emulators() if not selected: messagebox.showwarning("警告", "请至少选择一个模拟器") return self.selected_emulators = selected self.is_running = True self.is_paused = False self.should_stop = False # 禁用相关按钮 self.load_btn.config(state='disabled') self.start_read_btn.config(state='disabled') self.start_comment_btn.config(state='disabled') self.pause_btn.config(state='normal') self.stop_btn.config(state='normal') # 在新线程中运行任务 self.current_thread = threading.Thread(target=self.run_task, daemon=True) self.current_thread.start() def start_task2(self): """开始评价任务""" selected = self.get_selected_emulators() if not selected: messagebox.showwarning("警告", "请至少选择一个模拟器") return self.selected_emulators = selected self.is_running = True self.is_paused = False self.should_stop = False # 禁用相关按钮 self.load_btn.config(state='disabled') self.start_read_btn.config(state='disabled') self.start_comment_btn.config(state='disabled') self.pause_btn.config(state='normal') self.stop_btn.config(state='normal') # 在新线程中运行评价任务 self.current_thread = threading.Thread(target=self.run_task2, daemon=True) self.current_thread.start() def pause_task(self): """暂停任务""" if self.is_running and not self.is_paused: self.is_paused = True self.pause_btn.config(text="继续") self.log_message("⏸ 任务已暂停") elif self.is_running and self.is_paused: self.is_paused = False self.pause_btn.config(text="暂停") self.log_message("▶️ 任务已继续") def stop_task(self): """停止任务""" if self.is_running: self.should_stop = True self.is_running = False self.is_paused = False self.log_message("⏹ 正在停止任务...") def openBook(self, manager, index): # 执行操作 manager.tap(index, 390, 90, self.log_message) while manager.get_pixel_color(index, 560, 65, log_callback=self.log_message) != "#F7F7F7": self.log_message(f"模拟器 {index} 等待搜索页面准备就绪...") time.sleep(3) time.sleep(3) # 点输入框 manager.tap(index, 340, 90, self.log_message) time.sleep(3) # 黏贴 manager.paste_text(index, self.search_content_var.get(), self.log_message) time.sleep(3) while manager.get_pixel_color(index, 435, 880, log_callback=self.log_message) != "#FFFFFF": self.log_message(f"模拟器 {index} 没有输入搜索内容...") # 点输入框 manager.tap(index, 556, 87, self.log_message) time.sleep(1) manager.tap(index, 340, 90, self.log_message) time.sleep(2) # 黏贴 manager.paste_text(index, self.search_content_var.get(), self.log_message) time.sleep(3) manager.tap(index, 655, 92, self.log_message) time.sleep(6) while manager.get_pixel_color(index, 630, 235, log_callback=self.log_message) != "#FFFFFF": self.log_message(f"模拟器 {index} 等待搜索结果...") time.sleep(3) # 进入书目 time.sleep(6) manager.tap(index, 355, 333, self.log_message) time.sleep(5) while manager.get_pixel_color(index, 630, 235, log_callback=self.log_message) == "#FFFFFF": self.log_message(f"模拟器 {index} 还在搜索结果页面,重新点击...") manager.tap(index, 355, 333, self.log_message) time.sleep(5) def run_task(self): """执行任务(支持并发)""" try: manager = MuMuEmulatorManager(self.mumu_path_var.get()) # 获取最大线程数 max_threads = int(self.max_threads_var.get()) if self.max_threads_var.get().isdigit() else 1 self.thread_semaphore = threading.Semaphore(max_threads) self.log_message(f"📌 最大并发数: {max_threads}") # 用于存储线程的列表 threads = [] results = {} # 存储每个模拟器的执行结果 def process_emulator(emu): """处理单个模拟器的函数""" with self.thread_semaphore: if self.should_stop: return index = emu['index'] self.log_message(f"\n{'='*50}") self.log_message(f"开始处理模拟器 {index} ({emu['name']})") self.log_message(f"{'='*50}") try: # 启动模拟器 self.log_message(f"正在启动模拟器 {index}...") if not manager.start_emulator(index): self.log_message(f"❌ 模拟器 {index} 启动失败") results[index] = False return # 等待就绪 if not manager.wait_for_emulator_ready(index, timeout=180, log_callback=self.log_message): self.log_message(f"❌ 模拟器 {index} 启动超时") results[index] = False return time.sleep(5) # 安装APK if not manager.install_apk(index, self.apk_path_var.get(), self.log_message): self.log_message(f"❌ APK安装失败") # results[index] = False # return # 打开应用 manager.open_app(index, self.package_name_var.get(), self.log_message) # 判断是否需要同意 time.sleep(20) button_color = manager.get_pixel_color(index, 394, 846, log_callback=self.log_message) self.log_message(button_color) # 是否是第一次进入 if (button_color and button_color.upper() == "#FC7838"): manager.tap(index, 394, 846, self.log_message) time.sleep(90) manager.tap(index, 640, 260, self.log_message) time.sleep(10) manager.tap(index, 57, 193, self.log_message) time.sleep(5) else: time.sleep(10) while manager.get_pixel_color(index, 560, 130, log_callback=self.log_message) != "#EEF8EE": self.log_message(f"模拟器 {index} 等待进入主界面中...") manager.get_pixel_color(index, 640, 260, log_callback=self.log_message) time.sleep(5) # 执行操作 self.openBook(manager, index) time.sleep(8) # 翻页 for page in range(int(self.page_count_var.get()) if self.page_count_var.get().isdigit() else 5): # 判断是否在看书目录界面 while manager.get_pixel_color(index, 712, 147, log_callback=self.log_message) not in ["#E8E3CE", "#E0DBC6"]: self.log_message(f"模拟器 {index} 不在看书目录界面,等待中...") # 判断是否有广告 time.sleep(2) if manager.get_pixel_color(index, 700, 1200, log_callback=self.log_message) in ["#e8e3ce"]: self.log_message(f"模拟器 {index} 点击跳过广告!") manager.tap(index, 700, 1200, self.log_message) time.sleep(3) if self.should_stop or self.is_paused: break intervalNum = 30 if ('-' in self.page_interval_var.get()): parts = self.page_interval_var.get().split('-') if len(parts) == 2 and parts[0].isdigit() and parts[1].isdigit(): min_interval = int(parts[0]) max_interval = int(parts[1]) intervalNum = random.randint(min_interval, max_interval) else: intervalNum = int(self.page_interval_var.get()) if self.page_interval_var.get().isdigit() else 30 time.sleep(intervalNum) # manager.tap(index, 700, 1055, self.log_message) manager.tap(index, 700, 1055, self.log_message) time.sleep(2) # 加入书签 self.log_message(f"正在加入书架 {index}...") manager.tap(index, 360, 600, self.log_message) time.sleep(2) manager.tap(index, 255, 97, self.log_message) time.sleep(3) # 关闭模拟器 self.log_message(f"正在关闭模拟器 {index}...") manager.stop_emulator(index) self.log_message(f"✅ 模拟器 {index} 任务完成") results[index] = True except Exception as e: self.log_message(f"❌ 模拟器 {index} 执行出错: {e}") results[index] = False # 启动所有模拟器任务(信号量会自动控制并发数) for emu in self.selected_emulators: if self.should_stop: break # 等待直到有空闲槽位(信号量内部处理) thread = threading.Thread(target=process_emulator, args=(emu,)) thread.start() threads.append(thread) # 稍微延迟一下,避免同时启动太多 time.sleep(10) # 等待所有线程完成 for thread in threads: thread.join() success_count = sum(1 for v in results.values() if v) self.log_message(f"\n🎉 任务执行完毕!成功: {success_count}/{len(self.selected_emulators)}") except Exception as e: self.log_message(f"❌ 任务执行出错: {e}") finally: self.is_running = False self.is_paused = False self.pause_btn.config(text="暂停") # 恢复按钮状态 self.root.after(0, self.reset_buttons) def run_task2(self): """执行评价任务(支持并发)""" try: manager = MuMuEmulatorManager(self.mumu_path_var.get()) # 获取最大线程数 max_threads = int(self.max_threads_var.get()) if self.max_threads_var.get().isdigit() else 1 self.thread_semaphore = threading.Semaphore(max_threads) self.log_message(f"📌 最大并发数: {max_threads}") # 用于存储线程的列表 threads = [] results = {} def process_emulator(emu): """处理单个模拟器的函数""" with self.thread_semaphore: if self.should_stop: return index = emu['index'] self.log_message(f"\n{'='*50}") self.log_message(f"开始处理模拟器 {index} ({emu['name']})") self.log_message(f"{'='*50}") try: # 启动模拟器 self.log_message(f"正在启动模拟器 {index}...") if not manager.start_emulator(index): self.log_message(f"❌ 模拟器 {index} 启动失败") results[index] = False return # 等待就绪 if not manager.wait_for_emulator_ready(index, timeout=180, log_callback=self.log_message): self.log_message(f"❌ 模拟器 {index} 启动超时") results[index] = False return time.sleep(5) # 安装APK if not manager.install_apk(index, self.apk_path_var.get(), self.log_message): self.log_message(f"❌ APK安装失败") # results[index] = False # return # 打开应用 manager.open_app(index, self.package_name_var.get(), self.log_message) while manager.get_pixel_color(index, 560, 130, log_callback=self.log_message) != "#EEF8EE" or manager.get_pixel_color(index, 533, 60, log_callback=self.log_message) != "#FFFFFF": self.log_message(f"模拟器 {index} 等待进入主界面中...") time.sleep(5) # 执行操作 self.openBook(manager, index) # 评价 time.sleep(8) manager.tap(index, 680, 95, self.log_message) time.sleep(2) manager.tap(index, 633, 930, self.log_message) # 发表 time.sleep(5) manager.tap(index, 640, 95, self.log_message) time.sleep(4) # 关闭模拟器 self.log_message(f"正在关闭模拟器 {index}...") manager.stop_emulator(index) self.log_message(f"✅ 模拟器 {index} 任务完成") results[index] = True except Exception as e: self.log_message(f"❌ 模拟器 {index} 执行出错: {e}") results[index] = False # 启动所有模拟器任务 for emu in self.selected_emulators: if self.should_stop: break thread = threading.Thread(target=process_emulator, args=(emu,)) thread.start() threads.append(thread) time.sleep(2) # 等待所有线程完成 for thread in threads: thread.join() success_count = sum(1 for v in results.values() if v) self.log_message(f"\n🎉 任务执行完毕!成功: {success_count}/{len(self.selected_emulators)}") except Exception as e: self.log_message(f"❌ 任务执行出错: {e}") finally: self.is_running = False self.is_paused = False self.pause_btn.config(text="暂停") # 恢复按钮状态 self.root.after(0, self.reset_buttons) def reset_buttons(self): """重置按钮状态""" self.load_btn.config(state='normal') self.start_read_btn.config(state='normal') self.start_comment_btn.config(state='normal') self.pause_btn.config(state='disabled', text="暂停") self.stop_btn.config(state='disabled') def clear_log(self): """清空日志""" self.log_text.delete(1.0, tk.END) def run(self): """运行GUI""" self.root.mainloop() if __name__ == "__main__": if datetime.now() < datetime(2026, 5, 25): app = MuMuAutoGUI() app.run()