import tkinter as tk from tkinter import ttk, messagebox import json import requests import threading from datetime import datetime import uuid import time import os class PhoneQueryApp: def __init__(self, root): self.root = root self.root.title("手机数据查询系统") self.root.geometry("1200x800") # 商品数据 self.current_products = [] self.is_loading_cycles = False self.collected_items = [] # 状况标签映射(tagId -> 名称) self.condition_tags = { 10: "屏幕完美", 11: "机身无痕", 12: "功能完好", 17: "电池效率95%+", 20: "原厂在保", 19: "无维修", 15: "官方在保", 13: "原厂电池", 14: "原厂屏幕", 21: "原厂部件" } # 从文件读取cookie self.cookie = self.load_cookie() # 创建界面 self.create_widgets() def load_cookie(self): """从cookie.txt文件读取cookie""" cookie_file = "cookie.txt" if os.path.exists(cookie_file): try: with open(cookie_file, 'r', encoding='utf-8') as f: cookie = f.read().strip() print(f"已加载cookie: {cookie[:50]}...") return cookie except Exception as e: print(f"读取cookie文件失败: {e}") return "" else: print("cookie.txt文件不存在,请创建并放入cookie") return "" def get_headers(self): """获取请求头""" return { "Connection": "keep-alive", "content-type": "application/json;charset=UTF-8", "ahs-app-version": "7.49.3", "Ahs-Timestamp": str(int(datetime.now().timestamp())), "Ahs-Token": "dccf18b052ef46d78d4feb796182d5df", "Ahs-Session-Id": str(uuid.uuid4()), "Ahs-App-Id": "10007", "Ahs-Device-Id": "3166e003-4ba7-4b57-8158-30859d81c7d5", "Ahs-Sign": "9ea25fcfcda37585065244ed271f1b0d", "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 26_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.74(0x18004a2c) NetType/WIFI Language/zh_CN", "Referer": "https://servicewechat.com/wx7e490492b4c23e98/1055/page-frame.html", "Accept": "*/*", "Accept-Encoding": "gzip, deflate, br", # "Cookie": self.cookie } def create_widgets(self): """创建界面组件""" main_frame = ttk.Frame(self.root, padding="10") main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S)) self.root.columnconfigure(0, weight=1) self.root.rowconfigure(0, weight=1) main_frame.columnconfigure(0, weight=1) main_frame.rowconfigure(2, weight=1) # 查询区域 query_frame = ttk.LabelFrame(main_frame, text="查询条件", padding="10") query_frame.grid(row=0, column=0, sticky=(tk.W, tk.E), pady=(0, 10)) query_frame.columnconfigure(1, weight=1) ttk.Label(query_frame, text="机型:", font=("Arial", 11)).grid(row=0, column=0, sticky=tk.W, padx=(0, 10)) self.model_entry = ttk.Entry(query_frame, width=50, font=("Arial", 11)) self.model_entry.grid(row=0, column=1, sticky=(tk.W, tk.E), padx=(0, 10)) self.model_entry.insert(0, "iPhone 15 Pro") self.search_btn = ttk.Button(query_frame, text="搜索", command=self.search_products, width=15) self.search_btn.grid(row=0, column=2) # 筛选区域 filter_frame = ttk.LabelFrame(main_frame, text="筛选条件", padding="10") filter_frame.grid(row=1, column=0, sticky=(tk.W, tk.E), pady=(0, 10)) # 第一行:价格区间 和 成色 # 价格区间 price_frame = ttk.LabelFrame(filter_frame, text="价格区间", padding="5") price_frame.grid(row=0, column=0, sticky=(tk.W, tk.E), padx=5, pady=5) self.price_vars = {} price_options = ["0-1499", "1500-1999", "2000-2999", "3000-3999", "4000以上"] for i, option in enumerate(price_options): var = tk.BooleanVar() self.price_vars[option] = var cb = ttk.Checkbutton(price_frame, text=option, variable=var) cb.grid(row=0, column=i, sticky=tk.W, padx=10, pady=2) # 成色 fineness_frame = ttk.LabelFrame(filter_frame, text="成色", padding="5") fineness_frame.grid(row=0, column=1, sticky=(tk.W, tk.E), padx=5, pady=5) self.fineness_vars = {} fineness_options = ["全新仅拆封", "准新机", "99新", "95新", "9成新", "8成新", "7成新"] for i, option in enumerate(fineness_options): var = tk.BooleanVar() self.fineness_vars[option] = var cb = ttk.Checkbutton(fineness_frame, text=option, variable=var) cb.grid(row=0, column=i, sticky=tk.W, padx=10, pady=2) # 第二行:状况标签(占满整行) condition_frame = ttk.LabelFrame(filter_frame, text="状况标签", padding="5") condition_frame.grid(row=1, column=0, columnspan=2, sticky=(tk.W, tk.E), padx=5, pady=5) self.condition_vars = {} # 将状况标签分成两行显示,每行5个 condition_items = list(self.condition_tags.items()) for i, (tag_id, tag_name) in enumerate(condition_items): var = tk.BooleanVar() self.condition_vars[tag_id] = var row = i // 5 # 每行5个 col = i % 5 cb = ttk.Checkbutton(condition_frame, text=tag_name, variable=var) cb.grid(row=row, column=col, sticky=tk.W, padx=10, pady=2) filter_frame.columnconfigure(0, weight=1) filter_frame.columnconfigure(1, weight=1) # 结果显示区域 result_frame = ttk.LabelFrame(main_frame, text="查询结果", padding="10") result_frame.grid(row=2, column=0, sticky=(tk.W, tk.E, tk.N, tk.S)) result_frame.columnconfigure(0, weight=1) result_frame.rowconfigure(0, weight=1) columns = ("名称", "价格", "循环次数", "成色", "内存", "颜色", "网络制式") self.result_tree = ttk.Treeview(result_frame, columns=columns, show="headings", height=15) self.result_tree.heading("名称", text="名称") self.result_tree.heading("价格", text="价格") self.result_tree.heading("循环次数", text="循环次数") self.result_tree.heading("成色", text="成色") self.result_tree.heading("内存", text="内存") self.result_tree.heading("颜色", text="颜色") self.result_tree.heading("网络制式", text="网络制式") self.result_tree.column("名称", width=400) self.result_tree.column("价格", width=100) self.result_tree.column("循环次数", width=100) self.result_tree.column("成色", width=100) self.result_tree.column("内存", width=80) self.result_tree.column("颜色", width=100) self.result_tree.column("网络制式", width=100) scrollbar = ttk.Scrollbar(result_frame, orient=tk.VERTICAL, command=self.result_tree.yview) self.result_tree.configure(yscrollcommand=scrollbar.set) self.result_tree.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S)) scrollbar.grid(row=0, column=1, sticky=(tk.N, tk.S)) # 底部收藏栏 bottom_frame = ttk.Frame(main_frame) bottom_frame.grid(row=3, column=0, sticky=(tk.W, tk.E), pady=(10, 0)) ttk.Label(bottom_frame, text="循环次数小于:", font=("Arial", 10)).pack(side=tk.LEFT, padx=(0, 5)) self.cycle_threshold = ttk.Entry(bottom_frame, width=10, font=("Arial", 10)) self.cycle_threshold.pack(side=tk.LEFT, padx=(0, 5)) self.cycle_threshold.insert(0, "600") ttk.Label(bottom_frame, text="次的机型自动收藏", font=("Arial", 10)).pack(side=tk.LEFT, padx=(0, 20)) self.collect_btn = ttk.Button(bottom_frame, text="启动", command=self.start_collect, width=10) self.collect_btn.pack(side=tk.LEFT) self.progress_var = tk.StringVar() self.progress_var.set("") ttk.Label(bottom_frame, textvariable=self.progress_var, font=("Arial", 9)).pack(side=tk.LEFT, padx=(20, 0)) self.collect_count_var = tk.StringVar() self.collect_count_var.set("") ttk.Label(bottom_frame, textvariable=self.collect_count_var, font=("Arial", 9), foreground="green").pack(side=tk.LEFT, padx=(10, 0)) # 状态栏 self.status_var = tk.StringVar() self.status_var.set("就绪") status_bar = ttk.Label(main_frame, textvariable=self.status_var, relief=tk.SUNKEN) status_bar.grid(row=4, column=0, sticky=(tk.W, tk.E), pady=(10, 0)) def get_selected_prices(self): price_map = { "0-1499": (0, 1499), "1500-1999": (1500, 1999), "2000-2999": (2000, 2999), "3000-3999": (3000, 3999), "4000以上": (4000, 20000) } for option, var in self.price_vars.items(): if var.get(): return price_map[option] return (0, 20000) def get_selected_fineness(self): fineness_map = { "全新仅拆封": 111, "准新机": 100, "99新": 99, "95新": 95, "9成新": 90, "8成新": 80, "7成新": 70 } selected = [] for option, var in self.fineness_vars.items(): if var.get(): selected.append(fineness_map[option]) return selected if selected else [99, 100, 111, 95, 90, 80, 70] def get_selected_condition_tags(self): """获取选中的状况标签ID列表""" selected = [] for tag_id, var in self.condition_vars.items(): if var.get(): selected.append(tag_id) return selected def search_products(self): keyword = self.model_entry.get().strip() if not keyword: messagebox.showwarning("警告", "请输入机型") return for item in self.result_tree.get_children(): self.result_tree.delete(item) self.current_products.clear() self.collected_items.clear() self.progress_var.set("") self.collect_count_var.set("") self.collect_btn.config(state="normal") self.status_var.set("正在搜索...") self.search_btn.config(state="disabled") thread = threading.Thread(target=self.do_search, args=(keyword,)) thread.daemon = True thread.start() def do_search(self, keyword): try: min_price, max_price = self.get_selected_prices() fineness_ids = self.get_selected_fineness() condition_tags = self.get_selected_condition_tags() request_data = { "minPrice": min_price, "maxPrice": max_price, "sortValue": "sort_composite", "gaeaPricePpvIds": {}, "gaeaFinenessIds": fineness_ids, "conditionTagList": condition_tags, # 使用选中的状况标签 "scene": "search", "keyword": keyword, "sirReq": False, "cityId": 324, "pageIndex": 0, "pageSize": 50 } url = "https://dubai-mp.aihuishou.com/ahs-yanxuan-service/products/search-goods-v2" response = requests.post(url, json=request_data, headers=self.get_headers(), timeout=30) result = response.json() self.root.after(0, self.display_results, result) except Exception as e: self.root.after(0, self.show_error, str(e)) def parse_product_info(self, product): name = product.get('name', '') sale_goods_no = product.get('saleGoodsNo', '') product_no = product.get('productNo', '') color = "" memory = product.get('memoryDesc', '') if '黑色' in name: color = "黑色" elif '白色' in name: color = "白色" elif '蓝色' in name: color = "蓝色" elif '原色' in name: color = "原色" return { 'name': name[:50], 'price': product.get('price', 0), 'cycle_count': '', 'fineness': product.get('gaeaFinenessName', ''), 'memory': memory, 'color': color, 'network': "全网通", 'saleGoodsNo': sale_goods_no, 'productNo': product_no } def display_results(self, result): self.search_btn.config(state="normal") if result.get('code') != 0: self.status_var.set(f"查询失败: {result.get('resultMessage', '未知错误')}") messagebox.showerror("错误", result.get('resultMessage', '查询失败')) return data = result.get('data', []) if not data: self.status_var.set("未找到相关产品") messagebox.showinfo("提示", "未找到相关产品") return for product in data: info = self.parse_product_info(product) item_id = self.result_tree.insert("", tk.END, values=( info['name'], f"¥{info['price']}", info['cycle_count'], info['fineness'], info['memory'], info['color'], info['network'] )) self.current_products.append({ 'item_id': item_id, 'saleGoodsNo': info['saleGoodsNo'], 'productNo': info['productNo'], 'info': info }) self.status_var.set(f"共找到 {len(data)} 条结果") def get_cycle_count(self, sale_goods_no): """获取商品的电池循环次数""" try: url = f"https://dubai-mp.aihuishou.com/ahs-yanxuan-service/products/goods-tag-param?saleGoodsNo={sale_goods_no}" response = requests.get(url, headers=self.get_headers(), timeout=10) result = response.json() if result.get('code') == 0 and 'data' in result: data = result['data'] if 'machineConditionList' in data: for item in data['machineConditionList']: if item.get('name') == '充电次数': return item.get('value', '') if 'aggConditionTagParamList' in data: for group in data['aggConditionTagParamList']: if group.get('groupName') == '核心机况': for item in group.get('valueList', []): if item.get('name') == '充电次数': return item.get('value', '') return '' except Exception as e: print(f"获取循环次数失败: {e}") return '' def add_to_collection(self, item_no): """调用收藏接口添加收藏""" try: url = "https://dubai-mp.aihuishou.com/ahs-yanxuan-service/collect/save" payload = json.dumps({"itemNo": item_no, "type": 1}) headers = self.get_headers() response = requests.post(url, data=payload, headers=headers, timeout=10) result = response.json() # 根据实际返回判断:code为200且data为True表示成功 if result.get('code') == 200 and result.get('data') == True: return True, "收藏成功" else: return False, result.get('resultMessage', '收藏失败') except Exception as e: return False, str(e) def start_collect(self): if not self.current_products: messagebox.showwarning("警告", "请先搜索商品") return if self.is_loading_cycles: messagebox.showinfo("提示", "正在处理中,请稍候...") return if not self.cookie: result = messagebox.askyesno("提示", "未找到cookie.txt文件或cookie为空,是否继续?") if not result: return try: threshold = int(self.cycle_threshold.get().strip()) except ValueError: messagebox.showerror("错误", "请输入有效的循环次数") return thread = threading.Thread(target=self.process_collect, args=(threshold,)) thread.daemon = True thread.start() def process_collect(self, threshold): """处理收藏:获取循环次数并收藏符合条件的商品""" self.is_loading_cycles = True self.collect_btn.config(state="disabled") self.collected_items.clear() total = len(self.current_products) collected_count = 0 for i, product in enumerate(self.current_products): self.root.after(0, lambda txt=f"处理: {i+1}/{total}": self.progress_var.set(txt)) self.root.after(0, lambda i=i: self.status_var.set(f"正在获取循环次数 ({i+1}/{total})...")) cycle_count = self.get_cycle_count(product['saleGoodsNo']) if cycle_count: values = list(self.result_tree.item(product['item_id'], 'values')) values[2] = cycle_count self.root.after(0, lambda pid=product['item_id'], vals=values: self.result_tree.item(pid, values=vals)) try: cycle_num = int(cycle_count) if cycle_num < threshold: item_no = product['productNo'] or product['saleGoodsNo'] success, msg = self.add_to_collection(item_no) if success: collected_count += 1 self.collected_items.append(f"{product['info']['name']} ({cycle_num}次)") self.root.after(0, lambda: self.status_var.set(f"✓ 已收藏: {product['info']['name'][:30]}")) except ValueError: pass time.sleep(0.3) self.root.after(0, lambda: self.finish_collect(collected_count, threshold)) def finish_collect(self, collected_count, threshold): self.is_loading_cycles = False self.collect_btn.config(state="normal") self.progress_var.set("") self.collect_count_var.set(f"已收藏: {collected_count} 件") self.status_var.set(f"完成!处理 {len(self.current_products)} 条,收藏 {collected_count} 件") if collected_count > 0: detail = "\n".join(self.collected_items[:5]) if len(self.collected_items) > 5: detail += f"\n... 等共{collected_count}件" messagebox.showinfo("收藏完成", f"已成功收藏 {collected_count} 件商品\n\n{detail}") else: messagebox.showinfo("完成", f"没有找到循环次数小于 {threshold} 的商品") def show_error(self, error_msg): self.search_btn.config(state="normal") self.status_var.set(f"查询失败: {error_msg}") messagebox.showerror("错误", f"查询失败: {error_msg}") def main(): root = tk.Tk() app = PhoneQueryApp(root) root.mainloop() if __name__ == "__main__": main()