| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479 |
- import tkinter as tk
- from tkinter import ttk, messagebox
- import json
- import requests
- import threading
- from datetime import datetime
- import uuid
- import time
- import os
- class PhoneQueryApp:
- def __init__(self, root):
- self.root = root
- self.root.title("手机数据查询系统")
- self.root.geometry("1200x800")
-
- # 商品数据
- self.current_products = []
- self.is_loading_cycles = False
- self.collected_items = []
-
- # 状况标签映射(tagId -> 名称)
- self.condition_tags = {
- 10: "屏幕完美",
- 11: "机身无痕",
- 12: "功能完好",
- 17: "电池效率95%+",
- 20: "原厂在保",
- 19: "无维修",
- 15: "官方在保",
- 13: "原厂电池",
- 14: "原厂屏幕",
- 21: "原厂部件"
- }
-
- # 从文件读取cookie
- self.cookie = self.load_cookie()
-
- # 创建界面
- self.create_widgets()
-
- def load_cookie(self):
- """从cookie.txt文件读取cookie"""
- cookie_file = "cookie.txt"
- if os.path.exists(cookie_file):
- try:
- with open(cookie_file, 'r', encoding='utf-8') as f:
- cookie = f.read().strip()
- print(f"已加载cookie: {cookie[:50]}...")
- return cookie
- except Exception as e:
- print(f"读取cookie文件失败: {e}")
- return ""
- else:
- print("cookie.txt文件不存在,请创建并放入cookie")
- return ""
-
- def get_headers(self):
- """获取请求头"""
- return {
- "Connection": "keep-alive",
- "content-type": "application/json;charset=UTF-8",
- "ahs-app-version": "7.49.3",
- "Ahs-Timestamp": str(int(datetime.now().timestamp())),
- "Ahs-Token": "dccf18b052ef46d78d4feb796182d5df",
- "Ahs-Session-Id": str(uuid.uuid4()),
- "Ahs-App-Id": "10007",
- "Ahs-Device-Id": "3166e003-4ba7-4b57-8158-30859d81c7d5",
- "Ahs-Sign": "9ea25fcfcda37585065244ed271f1b0d",
- "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 26_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.74(0x18004a2c) NetType/WIFI Language/zh_CN",
- "Referer": "https://servicewechat.com/wx7e490492b4c23e98/1055/page-frame.html",
- "Accept": "*/*",
- "Accept-Encoding": "gzip, deflate, br",
- # "Cookie": self.cookie
- }
-
- def create_widgets(self):
- """创建界面组件"""
- main_frame = ttk.Frame(self.root, padding="10")
- main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
-
- self.root.columnconfigure(0, weight=1)
- self.root.rowconfigure(0, weight=1)
- main_frame.columnconfigure(0, weight=1)
- main_frame.rowconfigure(2, weight=1)
-
- # 查询区域
- query_frame = ttk.LabelFrame(main_frame, text="查询条件", padding="10")
- query_frame.grid(row=0, column=0, sticky=(tk.W, tk.E), pady=(0, 10))
- query_frame.columnconfigure(1, weight=1)
-
- ttk.Label(query_frame, text="机型:", font=("Arial", 11)).grid(row=0, column=0, sticky=tk.W, padx=(0, 10))
- self.model_entry = ttk.Entry(query_frame, width=50, font=("Arial", 11))
- self.model_entry.grid(row=0, column=1, sticky=(tk.W, tk.E), padx=(0, 10))
- self.model_entry.insert(0, "iPhone 15 Pro")
-
- self.search_btn = ttk.Button(query_frame, text="搜索", command=self.search_products, width=15)
- self.search_btn.grid(row=0, column=2)
-
- # 筛选区域
- filter_frame = ttk.LabelFrame(main_frame, text="筛选条件", padding="10")
- filter_frame.grid(row=1, column=0, sticky=(tk.W, tk.E), pady=(0, 10))
-
- # 第一行:价格区间 和 成色
- # 价格区间
- price_frame = ttk.LabelFrame(filter_frame, text="价格区间", padding="5")
- price_frame.grid(row=0, column=0, sticky=(tk.W, tk.E), padx=5, pady=5)
-
- self.price_vars = {}
- price_options = ["0-1499", "1500-1999", "2000-2999", "3000-3999", "4000以上"]
- for i, option in enumerate(price_options):
- var = tk.BooleanVar()
- self.price_vars[option] = var
- cb = ttk.Checkbutton(price_frame, text=option, variable=var)
- cb.grid(row=0, column=i, sticky=tk.W, padx=10, pady=2)
-
- # 成色
- fineness_frame = ttk.LabelFrame(filter_frame, text="成色", padding="5")
- fineness_frame.grid(row=0, column=1, sticky=(tk.W, tk.E), padx=5, pady=5)
-
- self.fineness_vars = {}
- fineness_options = ["全新仅拆封", "准新机", "99新", "95新", "9成新", "8成新", "7成新"]
- for i, option in enumerate(fineness_options):
- var = tk.BooleanVar()
- self.fineness_vars[option] = var
- cb = ttk.Checkbutton(fineness_frame, text=option, variable=var)
- cb.grid(row=0, column=i, sticky=tk.W, padx=10, pady=2)
-
- # 第二行:状况标签(占满整行)
- condition_frame = ttk.LabelFrame(filter_frame, text="状况标签", padding="5")
- condition_frame.grid(row=1, column=0, columnspan=2, sticky=(tk.W, tk.E), padx=5, pady=5)
-
- self.condition_vars = {}
- # 将状况标签分成两行显示,每行5个
- condition_items = list(self.condition_tags.items())
- for i, (tag_id, tag_name) in enumerate(condition_items):
- var = tk.BooleanVar()
- self.condition_vars[tag_id] = var
- row = i // 5 # 每行5个
- col = i % 5
- cb = ttk.Checkbutton(condition_frame, text=tag_name, variable=var)
- cb.grid(row=row, column=col, sticky=tk.W, padx=10, pady=2)
-
- filter_frame.columnconfigure(0, weight=1)
- filter_frame.columnconfigure(1, weight=1)
-
- # 结果显示区域
- result_frame = ttk.LabelFrame(main_frame, text="查询结果", padding="10")
- result_frame.grid(row=2, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
- result_frame.columnconfigure(0, weight=1)
- result_frame.rowconfigure(0, weight=1)
-
- columns = ("名称", "价格", "循环次数", "成色", "内存", "颜色", "网络制式")
- self.result_tree = ttk.Treeview(result_frame, columns=columns, show="headings", height=15)
-
- self.result_tree.heading("名称", text="名称")
- self.result_tree.heading("价格", text="价格")
- self.result_tree.heading("循环次数", text="循环次数")
- self.result_tree.heading("成色", text="成色")
- self.result_tree.heading("内存", text="内存")
- self.result_tree.heading("颜色", text="颜色")
- self.result_tree.heading("网络制式", text="网络制式")
-
- self.result_tree.column("名称", width=400)
- self.result_tree.column("价格", width=100)
- self.result_tree.column("循环次数", width=100)
- self.result_tree.column("成色", width=100)
- self.result_tree.column("内存", width=80)
- self.result_tree.column("颜色", width=100)
- self.result_tree.column("网络制式", width=100)
-
- scrollbar = ttk.Scrollbar(result_frame, orient=tk.VERTICAL, command=self.result_tree.yview)
- self.result_tree.configure(yscrollcommand=scrollbar.set)
- self.result_tree.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
- scrollbar.grid(row=0, column=1, sticky=(tk.N, tk.S))
-
- # 底部收藏栏
- bottom_frame = ttk.Frame(main_frame)
- bottom_frame.grid(row=3, column=0, sticky=(tk.W, tk.E), pady=(10, 0))
-
- ttk.Label(bottom_frame, text="循环次数小于:", font=("Arial", 10)).pack(side=tk.LEFT, padx=(0, 5))
- self.cycle_threshold = ttk.Entry(bottom_frame, width=10, font=("Arial", 10))
- self.cycle_threshold.pack(side=tk.LEFT, padx=(0, 5))
- self.cycle_threshold.insert(0, "600")
- ttk.Label(bottom_frame, text="次的机型自动收藏", font=("Arial", 10)).pack(side=tk.LEFT, padx=(0, 20))
-
- self.collect_btn = ttk.Button(bottom_frame, text="启动", command=self.start_collect, width=10)
- self.collect_btn.pack(side=tk.LEFT)
-
- self.progress_var = tk.StringVar()
- self.progress_var.set("")
- ttk.Label(bottom_frame, textvariable=self.progress_var, font=("Arial", 9)).pack(side=tk.LEFT, padx=(20, 0))
-
- self.collect_count_var = tk.StringVar()
- self.collect_count_var.set("")
- ttk.Label(bottom_frame, textvariable=self.collect_count_var, font=("Arial", 9), foreground="green").pack(side=tk.LEFT, padx=(10, 0))
-
- # 状态栏
- self.status_var = tk.StringVar()
- self.status_var.set("就绪")
- status_bar = ttk.Label(main_frame, textvariable=self.status_var, relief=tk.SUNKEN)
- status_bar.grid(row=4, column=0, sticky=(tk.W, tk.E), pady=(10, 0))
-
- def get_selected_prices(self):
- price_map = {
- "0-1499": (0, 1499),
- "1500-1999": (1500, 1999),
- "2000-2999": (2000, 2999),
- "3000-3999": (3000, 3999),
- "4000以上": (4000, 20000)
- }
- for option, var in self.price_vars.items():
- if var.get():
- return price_map[option]
- return (0, 20000)
-
- def get_selected_fineness(self):
- fineness_map = {
- "全新仅拆封": 111,
- "准新机": 100,
- "99新": 99,
- "95新": 95,
- "9成新": 90,
- "8成新": 80,
- "7成新": 70
- }
- selected = []
- for option, var in self.fineness_vars.items():
- if var.get():
- selected.append(fineness_map[option])
- return selected if selected else [99, 100, 111, 95, 90, 80, 70]
-
- def get_selected_condition_tags(self):
- """获取选中的状况标签ID列表"""
- selected = []
- for tag_id, var in self.condition_vars.items():
- if var.get():
- selected.append(tag_id)
- return selected
-
- def search_products(self):
- keyword = self.model_entry.get().strip()
- if not keyword:
- messagebox.showwarning("警告", "请输入机型")
- return
-
- for item in self.result_tree.get_children():
- self.result_tree.delete(item)
- self.current_products.clear()
- self.collected_items.clear()
- self.progress_var.set("")
- self.collect_count_var.set("")
- self.collect_btn.config(state="normal")
-
- self.status_var.set("正在搜索...")
- self.search_btn.config(state="disabled")
-
- thread = threading.Thread(target=self.do_search, args=(keyword,))
- thread.daemon = True
- thread.start()
-
- def do_search(self, keyword):
- try:
- min_price, max_price = self.get_selected_prices()
- fineness_ids = self.get_selected_fineness()
- condition_tags = self.get_selected_condition_tags()
-
- request_data = {
- "minPrice": min_price,
- "maxPrice": max_price,
- "sortValue": "sort_composite",
- "gaeaPricePpvIds": {},
- "gaeaFinenessIds": fineness_ids,
- "conditionTagList": condition_tags, # 使用选中的状况标签
- "scene": "search",
- "keyword": keyword,
- "sirReq": False,
- "cityId": 324,
- "pageIndex": 0,
- "pageSize": 50
- }
-
- url = "https://dubai-mp.aihuishou.com/ahs-yanxuan-service/products/search-goods-v2"
- response = requests.post(url, json=request_data, headers=self.get_headers(), timeout=30)
- result = response.json()
-
- self.root.after(0, self.display_results, result)
-
- except Exception as e:
- self.root.after(0, self.show_error, str(e))
-
- def parse_product_info(self, product):
- name = product.get('name', '')
- sale_goods_no = product.get('saleGoodsNo', '')
- product_no = product.get('productNo', '')
-
- color = ""
- memory = product.get('memoryDesc', '')
-
- if '黑色' in name:
- color = "黑色"
- elif '白色' in name:
- color = "白色"
- elif '蓝色' in name:
- color = "蓝色"
- elif '原色' in name:
- color = "原色"
-
- return {
- 'name': name[:50],
- 'price': product.get('price', 0),
- 'cycle_count': '',
- 'fineness': product.get('gaeaFinenessName', ''),
- 'memory': memory,
- 'color': color,
- 'network': "全网通",
- 'saleGoodsNo': sale_goods_no,
- 'productNo': product_no
- }
-
- def display_results(self, result):
- self.search_btn.config(state="normal")
-
- if result.get('code') != 0:
- self.status_var.set(f"查询失败: {result.get('resultMessage', '未知错误')}")
- messagebox.showerror("错误", result.get('resultMessage', '查询失败'))
- return
-
- data = result.get('data', [])
- if not data:
- self.status_var.set("未找到相关产品")
- messagebox.showinfo("提示", "未找到相关产品")
- return
-
- for product in data:
- info = self.parse_product_info(product)
- item_id = self.result_tree.insert("", tk.END, values=(
- info['name'], f"¥{info['price']}", info['cycle_count'],
- info['fineness'], info['memory'], info['color'], info['network']
- ))
- self.current_products.append({
- 'item_id': item_id,
- 'saleGoodsNo': info['saleGoodsNo'],
- 'productNo': info['productNo'],
- 'info': info
- })
-
- self.status_var.set(f"共找到 {len(data)} 条结果")
-
- def get_cycle_count(self, sale_goods_no):
- """获取商品的电池循环次数"""
- try:
- url = f"https://dubai-mp.aihuishou.com/ahs-yanxuan-service/products/goods-tag-param?saleGoodsNo={sale_goods_no}"
- response = requests.get(url, headers=self.get_headers(), timeout=10)
- result = response.json()
-
- if result.get('code') == 0 and 'data' in result:
- data = result['data']
- if 'machineConditionList' in data:
- for item in data['machineConditionList']:
- if item.get('name') == '充电次数':
- return item.get('value', '')
- if 'aggConditionTagParamList' in data:
- for group in data['aggConditionTagParamList']:
- if group.get('groupName') == '核心机况':
- for item in group.get('valueList', []):
- if item.get('name') == '充电次数':
- return item.get('value', '')
- return ''
- except Exception as e:
- print(f"获取循环次数失败: {e}")
- return ''
-
- def add_to_collection(self, item_no):
- """调用收藏接口添加收藏"""
- try:
- url = "https://dubai-mp.aihuishou.com/ahs-yanxuan-service/collect/save"
- payload = json.dumps({"itemNo": item_no, "type": 1})
- headers = self.get_headers()
-
- response = requests.post(url, data=payload, headers=headers, timeout=10)
- result = response.json()
-
- # 根据实际返回判断:code为200且data为True表示成功
- if result.get('code') == 200 and result.get('data') == True:
- return True, "收藏成功"
- else:
- return False, result.get('resultMessage', '收藏失败')
- except Exception as e:
- return False, str(e)
-
- def start_collect(self):
- if not self.current_products:
- messagebox.showwarning("警告", "请先搜索商品")
- return
- if self.is_loading_cycles:
- messagebox.showinfo("提示", "正在处理中,请稍候...")
- return
-
- if not self.cookie:
- result = messagebox.askyesno("提示", "未找到cookie.txt文件或cookie为空,是否继续?")
- if not result:
- return
-
- try:
- threshold = int(self.cycle_threshold.get().strip())
- except ValueError:
- messagebox.showerror("错误", "请输入有效的循环次数")
- return
-
- thread = threading.Thread(target=self.process_collect, args=(threshold,))
- thread.daemon = True
- thread.start()
-
- def process_collect(self, threshold):
- """处理收藏:获取循环次数并收藏符合条件的商品"""
- self.is_loading_cycles = True
- self.collect_btn.config(state="disabled")
- self.collected_items.clear()
-
- total = len(self.current_products)
- collected_count = 0
-
- for i, product in enumerate(self.current_products):
- self.root.after(0, lambda txt=f"处理: {i+1}/{total}": self.progress_var.set(txt))
- self.root.after(0, lambda i=i: self.status_var.set(f"正在获取循环次数 ({i+1}/{total})..."))
-
- cycle_count = self.get_cycle_count(product['saleGoodsNo'])
-
- if cycle_count:
- values = list(self.result_tree.item(product['item_id'], 'values'))
- values[2] = cycle_count
- self.root.after(0, lambda pid=product['item_id'], vals=values: self.result_tree.item(pid, values=vals))
-
- try:
- cycle_num = int(cycle_count)
-
- if cycle_num < threshold:
- item_no = product['productNo'] or product['saleGoodsNo']
- success, msg = self.add_to_collection(item_no)
-
- if success:
- collected_count += 1
- self.collected_items.append(f"{product['info']['name']} ({cycle_num}次)")
- self.root.after(0, lambda: self.status_var.set(f"✓ 已收藏: {product['info']['name'][:30]}"))
- except ValueError:
- pass
-
- time.sleep(0.3)
-
- self.root.after(0, lambda: self.finish_collect(collected_count, threshold))
-
- def finish_collect(self, collected_count, threshold):
- self.is_loading_cycles = False
- self.collect_btn.config(state="normal")
- self.progress_var.set("")
- self.collect_count_var.set(f"已收藏: {collected_count} 件")
- self.status_var.set(f"完成!处理 {len(self.current_products)} 条,收藏 {collected_count} 件")
-
- if collected_count > 0:
- detail = "\n".join(self.collected_items[:5])
- if len(self.collected_items) > 5:
- detail += f"\n... 等共{collected_count}件"
- messagebox.showinfo("收藏完成", f"已成功收藏 {collected_count} 件商品\n\n{detail}")
- else:
- messagebox.showinfo("完成", f"没有找到循环次数小于 {threshold} 的商品")
-
- def show_error(self, error_msg):
- self.search_btn.config(state="normal")
- self.status_var.set(f"查询失败: {error_msg}")
- messagebox.showerror("错误", f"查询失败: {error_msg}")
- def main():
- root = tk.Tk()
- app = PhoneQueryApp(root)
- root.mainloop()
- if __name__ == "__main__":
- main()
|