import json import os import re import requests from pathlib import Path from urllib.parse import urlparse from tkinter import * from tkinter import ttk, filedialog, messagebox, scrolledtext import threading from datetime import datetime class ProductDownloaderGUI: def __init__(self, root): self.root = root self.root.title("商品资源下载器") self.root.geometry("900x700") self.root.resizable(True, True) # 设置样式 style = ttk.Style() style.theme_use('clam') # 创建主框架 main_frame = ttk.Frame(root, padding="10") main_frame.pack(fill=BOTH, expand=True) # 输入区域 input_frame = ttk.LabelFrame(main_frame, text="商品数据输入", padding="10") input_frame.pack(fill=BOTH, expand=True, pady=(0, 10)) # 输入框 self.input_text = scrolledtext.ScrolledText(input_frame, height=15, font=("Consolas", 10)) self.input_text.pack(fill=BOTH, expand=True) # 输出目录选择 output_frame = ttk.LabelFrame(main_frame, text="输出设置", padding="10") output_frame.pack(fill=X, pady=(0, 10)) dir_select_frame = ttk.Frame(output_frame) dir_select_frame.pack(fill=X) self.output_dir = StringVar() self.output_entry = ttk.Entry(dir_select_frame, textvariable=self.output_dir) self.output_entry.pack(side=LEFT, fill=X, expand=True, padx=(0, 5)) browse_btn = ttk.Button(dir_select_frame, text="浏览...", command=self.browse_output_dir) browse_btn.pack(side=RIGHT) # 信息显示区域 info_frame = ttk.LabelFrame(main_frame, text="商品信息", padding="10") info_frame.pack(fill=BOTH, expand=True, pady=(0, 10)) self.info_text = scrolledtext.ScrolledText(info_frame, height=8, font=("微软雅黑", 10)) self.info_text.pack(fill=BOTH, expand=True) # 进度条 self.progress_var = DoubleVar() self.progress_bar = ttk.Progressbar(main_frame, variable=self.progress_var, maximum=100) self.progress_bar.pack(fill=X, pady=(0, 10)) # 状态标签 self.status_label = ttk.Label(main_frame, text="就绪") self.status_label.pack(pady=(0, 10)) # 按钮区域 button_frame = ttk.Frame(main_frame) button_frame.pack(fill=X) # 启动按钮 self.start_btn = ttk.Button(button_frame, text="▶ 启动下载", command=self.start_download, width=15) self.start_btn.pack(side=LEFT, padx=5) clear_btn = ttk.Button(button_frame, text="清空输入", command=self.clear_input) clear_btn.pack(side=LEFT, padx=5) # 退出按钮 exit_btn = ttk.Button(button_frame, text="退出", command=self.root.quit) exit_btn.pack(side=RIGHT, padx=5) # 设置下载会话 self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' }) def browse_output_dir(self): """选择输出目录""" directory = filedialog.askdirectory() if directory: self.output_dir.set(directory) def clear_input(self): """清空输入框""" self.input_text.delete(1.0, END) def update_info_display(self, product_info): """更新商品信息显示""" self.info_text.delete(1.0, END) info_str = f""" 【商品名称】{product_info.get('name', '未知')} 【商品描述】{product_info.get('description', '无')} 【价格】¥{product_info.get('min_price', '未知')} 【规格】{product_info.get('spec', '无')} 【品牌】{product_info.get('brand', '未知')} 【月销量】{product_info.get('month_saled', '未知')} """ self.info_text.insert(1.0, info_str.strip()) def extract_urls(self, data): """从JSON数据中提取图片和视频URL""" urls = { 'main_images': [], # 主图 'detail_images': [], # 详情图 'videos': [] # 视频 } seen_urls = set() def add_url(url, url_type): """添加URL,避免重复""" if url and url not in seen_urls: seen_urls.add(url) if url_type == 'main': urls['main_images'].append(url) elif url_type == 'detail': urls['detail_images'].append(url) elif url_type == 'video': urls['videos'].append(url) try: # 从common_info中提取SKU主图 if 'data' in data and 'ext' in data['data']: common_info = data['data']['ext'].get('common_info', {}) # 提取商品名称和描述 if 'skus' in common_info: for sku in common_info['skus']: if 'picture' in sku and sku['picture']: add_url(sku['picture'], 'main') # 从blocks中提取 if 'blocks' in data['data']['ext']: blocks = data['data']['ext']['blocks'] # 提取主图(头图) for block in blocks.get('banner', []): s_type = block.get('s_type', '') # 头图模块 if s_type == 'sm_type_detail_pop_head_photo': head_info = block.get('data', {}).get('head_info', {}) pictures = head_info.get('pictures', []) for pic in pictures: add_url(pic, 'main') # 商品详情图模块 elif s_type == 'sm_type_goods_detail_describe_non_food': json_data = block.get('data', {}).get('json_data', {}) pic_content = json_data.get('pic_content', {}) contents = pic_content.get('contents', []) for content in contents: if content.startswith('http') and ('jpg' in content or 'png' in content or 'jpeg' in content): add_url(content, 'detail') # 推荐搭配模块中的图片 elif s_type == 'sm_type_goods_detail_match_product': json_data = block.get('data', {}).get('json_data', {}) collocate = json_data.get('collocate_recommend', {}) spus = collocate.get('spus', []) for spu in spus: if 'picture' in spu and spu['picture']: add_url(spu['picture'], 'detail') # 从float模块中提取购物车相关图片 for block in blocks.get('float', []): if block.get('s_type') == 'sm_type_cart_info': data_json = block.get('data', {}) shopping_cart = data_json.get('shopping_cart', {}) if shopping_cart.get('shopping_cart_icon'): add_url(shopping_cart['shopping_cart_icon'], 'detail') # 使用正则表达式提取所有图片URL作为补充 data_str = json.dumps(data, ensure_ascii=False) # 匹配图片URL image_pattern = r'https?://[^\s"\']+\.(?:jpg|jpeg|png|gif|webp)(?:\?[^\s"\']*)?' all_images = re.findall(image_pattern, data_str, re.IGNORECASE) # 视频匹配 video_pattern = r'https?://[^\s"\']+\.(?:mp4|mov|avi|flv|webm)(?:\?[^\s"\']*)?' all_videos = re.findall(video_pattern, data_str, re.IGNORECASE) # 分类正则匹配到的图片 for img_url in all_images: if img_url in seen_urls: continue seen_urls.add(img_url) # 根据URL特征分类 if 'head_photo' in img_url or 'main' in img_url.lower() or 'sku' in img_url.lower(): urls['main_images'].append(img_url) else: urls['detail_images'].append(img_url) # 添加视频 for video_url in all_videos: if video_url not in seen_urls: seen_urls.add(video_url) urls['videos'].append(video_url) except Exception as e: print(f"提取URL时出错: {e}") # 限制数量 urls['main_images'] = urls['main_images'][:10] urls['detail_images'] = urls['detail_images'][:30] urls['videos'] = urls['videos'][:5] return urls def extract_product_info(self, data): """提取商品信息""" product_info = { 'name': '未命名商品', 'description': '无', 'min_price': '未知', 'spec': '无', 'brand': '未知', 'month_saled': '未知' } try: if 'data' in data and 'ext' in data['data']: common_info = data['data']['ext'].get('common_info', {}) product_info['name'] = common_info.get('name', '未命名商品') product_info['description'] = common_info.get('description', '无') product_info['min_price'] = common_info.get('min_price', '未知') product_info['spec'] = common_info.get('sku_label', '无') # 提取月销量 if 'skus' in common_info and common_info['skus']: sku = common_info['skus'][0] if 'promotion_info' in sku: match = re.search(r'月售(\d+)', sku.get('promotion_info', '')) if match: product_info['month_saled'] = match.group(1) # 提取品牌 if 'name' in product_info and '农夫山泉' in product_info['name']: product_info['brand'] = '农夫山泉' elif 'name' in product_info and '可口可乐' in product_info['name']: product_info['brand'] = '可口可乐' elif 'name' in product_info and '百事' in product_info['name']: product_info['brand'] = '百事' except Exception as e: print(f"提取商品信息时出错: {e}") return product_info def download_file(self, url, filepath, timeout=30): """下载文件""" try: response = self.session.get(url, timeout=timeout, stream=True) if response.status_code == 200: with open(filepath, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) return True else: return False except Exception as e: print(f"下载失败 {url}: {e}") return False def sanitize_filename(self, name): """清理文件名中的非法字符""" # 移除非法字符 name = re.sub(r'[<>:"/\\|?*]', '_', name) # 限制长度 if len(name) > 100: name = name[:100] return name.strip() def download_resources(self, product_name, output_path, urls, progress_callback): """下载所有资源""" results = { 'success': True, 'downloaded': 0, 'total': 0, 'errors': [] } # 创建子目录 main_img_dir = output_path / "主图" detail_img_dir = output_path / "详情图" video_dir = output_path / "视频" main_img_dir.mkdir(exist_ok=True) detail_img_dir.mkdir(exist_ok=True) video_dir.mkdir(exist_ok=True) total_items = len(urls['main_images']) + len(urls['detail_images']) + len(urls['videos']) # 下载主图 for i, url in enumerate(urls['main_images'], 1): results['total'] += 1 # 获取文件扩展名 path = urlparse(url).path ext = os.path.splitext(path)[1] or '.jpg' if ext not in ['.jpg', '.jpeg', '.png', '.gif', '.webp']: ext = '.jpg' filename = f"zhutu_{i}{ext}" filepath = main_img_dir / filename progress_callback(f"下载主图 {i}/{len(urls['main_images'])}...", int(results['total'] / total_items * 100)) if self.download_file(url, filepath): results['downloaded'] += 1 else: results['errors'].append(f"主图{i}: {url}") # 下载详情图 for i, url in enumerate(urls['detail_images'], 1): results['total'] += 1 path = urlparse(url).path ext = os.path.splitext(path)[1] or '.jpg' if ext not in ['.jpg', '.jpeg', '.png', '.gif', '.webp']: ext = '.jpg' filename = f"xiangqing_{i}{ext}" filepath = detail_img_dir / filename progress_callback(f"下载详情图 {i}/{len(urls['detail_images'])}...", int(results['total'] / total_items * 100)) if self.download_file(url, filepath): results['downloaded'] += 1 else: results['errors'].append(f"详情图{i}: {url}") # 下载视频 for i, url in enumerate(urls['videos'], 1): results['total'] += 1 path = urlparse(url).path ext = os.path.splitext(path)[1] or '.mp4' if ext not in ['.mp4', '.mov', '.avi', '.webm']: ext = '.mp4' filename = f"shipin_{i}{ext}" filepath = video_dir / filename progress_callback(f"下载视频 {i}/{len(urls['videos'])}...", int(results['total'] / total_items * 100)) if self.download_file(url, filepath): results['downloaded'] += 1 else: results['errors'].append(f"视频{i}: {url}") return results def start_download(self): """启动下载任务""" # 获取输入数据 input_data = self.input_text.get(1.0, END).strip() if not input_data: messagebox.showerror("错误", "请输入商品数据") return # 检查输出目录 output_dir = self.output_dir.get() if not output_dir: messagebox.showerror("错误", "请选择输出目录") return # 禁用启动按钮 self.start_btn.config(state=DISABLED, text="⏳ 下载中...") self.progress_var.set(0) # 在新线程中执行下载 thread = threading.Thread(target=self.download_task, args=(input_data, output_dir)) thread.daemon = True thread.start() def download_task(self, input_data, output_dir): """下载任务""" try: # 解析JSON data = json.loads(input_data) # 提取商品信息 product_info = self.extract_product_info(data) # 更新信息显示 self.root.after(0, self.update_info_display, product_info) # 清理产品名作为文件夹名 safe_name = self.sanitize_filename(product_info['name']) # 创建产品文件夹 product_path = Path(output_dir) / safe_name product_path.mkdir(exist_ok=True) # 提取URL self.update_status_thread("正在分析数据,提取资源链接...") urls = self.extract_urls(data) total_urls = len(urls['main_images']) + len(urls['detail_images']) + len(urls['videos']) if total_urls == 0: self.root.after(0, messagebox.showwarning, "警告", "未找到任何图片或视频资源") self.update_status_thread("未找到资源") self.enable_button_thread() return self.update_status_thread(f"发现 {len(urls['main_images'])} 张主图, {len(urls['detail_images'])} 张详情图, {len(urls['videos'])} 个视频") # 定义进度回调 def update_progress(msg, progress): self.update_status_thread(msg) self.root.after(0, self.progress_var.set, progress) # 下载资源 results = self.download_resources(safe_name, product_path, urls, update_progress) # 更新进度到100% self.root.after(0, self.progress_var.set, 100) # 显示结果 if results['downloaded'] > 0: msg = f"✅ 下载完成!\n\n成功下载 {results['downloaded']}/{results['total']} 个文件\n保存路径: {product_path}" if results['errors']: msg += f"\n\n⚠️ 失败 {len(results['errors'])} 个文件" self.root.after(0, messagebox.showinfo, "完成", msg) self.update_status_thread(f"下载完成 - 成功 {results['downloaded']}/{results['total']}") else: self.root.after(0, messagebox.showerror, "错误", "下载失败,没有成功下载任何文件") self.update_status_thread("下载失败") except json.JSONDecodeError as e: self.root.after(0, messagebox.showerror, "JSON解析错误", f"输入的不是有效的JSON格式:\n{str(e)}") self.update_status_thread("JSON解析失败") except Exception as e: self.root.after(0, messagebox.showerror, "错误", f"处理过程中出现错误:\n{str(e)}") self.update_status_thread("处理失败") finally: self.enable_button_thread() def update_status_thread(self, message): """从线程更新状态""" self.root.after(0, lambda: self.status_label.config(text=message)) def enable_button_thread(self): """从线程启用按钮""" self.root.after(0, lambda: self.start_btn.config(state=NORMAL, text="▶ 启动下载")) def main(): root = Tk() app = ProductDownloaderGUI(root) root.mainloop() if __name__ == "__main__": main()