| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467 |
- import json
- import os
- import re
- import requests
- from pathlib import Path
- from urllib.parse import urlparse
- from tkinter import *
- from tkinter import ttk, filedialog, messagebox, scrolledtext
- import threading
- from datetime import datetime
- class ProductDownloaderGUI:
- def __init__(self, root):
- self.root = root
- self.root.title("商品资源下载器")
- self.root.geometry("900x700")
- self.root.resizable(True, True)
-
- # 设置样式
- style = ttk.Style()
- style.theme_use('clam')
-
- # 创建主框架
- main_frame = ttk.Frame(root, padding="10")
- main_frame.pack(fill=BOTH, expand=True)
-
- # 输入区域
- input_frame = ttk.LabelFrame(main_frame, text="商品数据输入", padding="10")
- input_frame.pack(fill=BOTH, expand=True, pady=(0, 10))
-
- # 输入框
- self.input_text = scrolledtext.ScrolledText(input_frame, height=15, font=("Consolas", 10))
- self.input_text.pack(fill=BOTH, expand=True)
-
- # 输出目录选择
- output_frame = ttk.LabelFrame(main_frame, text="输出设置", padding="10")
- output_frame.pack(fill=X, pady=(0, 10))
-
- dir_select_frame = ttk.Frame(output_frame)
- dir_select_frame.pack(fill=X)
-
- self.output_dir = StringVar()
- self.output_entry = ttk.Entry(dir_select_frame, textvariable=self.output_dir)
- self.output_entry.pack(side=LEFT, fill=X, expand=True, padx=(0, 5))
-
- browse_btn = ttk.Button(dir_select_frame, text="浏览...", command=self.browse_output_dir)
- browse_btn.pack(side=RIGHT)
-
- # 信息显示区域
- info_frame = ttk.LabelFrame(main_frame, text="商品信息", padding="10")
- info_frame.pack(fill=BOTH, expand=True, pady=(0, 10))
-
- self.info_text = scrolledtext.ScrolledText(info_frame, height=8, font=("微软雅黑", 10))
- self.info_text.pack(fill=BOTH, expand=True)
-
- # 进度条
- self.progress_var = DoubleVar()
- self.progress_bar = ttk.Progressbar(main_frame, variable=self.progress_var, maximum=100)
- self.progress_bar.pack(fill=X, pady=(0, 10))
-
- # 状态标签
- self.status_label = ttk.Label(main_frame, text="就绪")
- self.status_label.pack(pady=(0, 10))
-
- # 按钮区域
- button_frame = ttk.Frame(main_frame)
- button_frame.pack(fill=X)
-
- # 启动按钮
- self.start_btn = ttk.Button(button_frame, text="▶ 启动下载", command=self.start_download, width=15)
- self.start_btn.pack(side=LEFT, padx=5)
-
- clear_btn = ttk.Button(button_frame, text="清空输入", command=self.clear_input)
- clear_btn.pack(side=LEFT, padx=5)
-
- # 退出按钮
- exit_btn = ttk.Button(button_frame, text="退出", command=self.root.quit)
- exit_btn.pack(side=RIGHT, padx=5)
-
- # 设置下载会话
- self.session = requests.Session()
- self.session.headers.update({
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
- })
-
- def browse_output_dir(self):
- """选择输出目录"""
- directory = filedialog.askdirectory()
- if directory:
- self.output_dir.set(directory)
-
- def clear_input(self):
- """清空输入框"""
- self.input_text.delete(1.0, END)
-
- def update_info_display(self, product_info):
- """更新商品信息显示"""
- self.info_text.delete(1.0, END)
- info_str = f"""
- 【商品名称】{product_info.get('name', '未知')}
- 【商品描述】{product_info.get('description', '无')}
- 【价格】¥{product_info.get('min_price', '未知')}
- 【规格】{product_info.get('spec', '无')}
- 【品牌】{product_info.get('brand', '未知')}
- 【月销量】{product_info.get('month_saled', '未知')}
- """
- self.info_text.insert(1.0, info_str.strip())
-
- def extract_urls(self, data):
- """从JSON数据中提取图片和视频URL"""
- urls = {
- 'main_images': [], # 主图
- 'detail_images': [], # 详情图
- 'videos': [] # 视频
- }
-
- seen_urls = set()
-
- def add_url(url, url_type):
- """添加URL,避免重复"""
- if url and url not in seen_urls:
- seen_urls.add(url)
- if url_type == 'main':
- urls['main_images'].append(url)
- elif url_type == 'detail':
- urls['detail_images'].append(url)
- elif url_type == 'video':
- urls['videos'].append(url)
-
- try:
- # 从common_info中提取SKU主图
- if 'data' in data and 'ext' in data['data']:
- common_info = data['data']['ext'].get('common_info', {})
-
- # 提取商品名称和描述
- if 'skus' in common_info:
- for sku in common_info['skus']:
- if 'picture' in sku and sku['picture']:
- add_url(sku['picture'], 'main')
-
- # 从blocks中提取
- if 'blocks' in data['data']['ext']:
- blocks = data['data']['ext']['blocks']
-
- # 提取主图(头图)
- for block in blocks.get('banner', []):
- s_type = block.get('s_type', '')
-
- # 头图模块
- if s_type == 'sm_type_detail_pop_head_photo':
- head_info = block.get('data', {}).get('head_info', {})
- pictures = head_info.get('pictures', [])
- for pic in pictures:
- add_url(pic, 'main')
-
- # 商品详情图模块
- elif s_type == 'sm_type_goods_detail_describe_non_food':
- json_data = block.get('data', {}).get('json_data', {})
- pic_content = json_data.get('pic_content', {})
- contents = pic_content.get('contents', [])
- for content in contents:
- if content.startswith('http') and ('jpg' in content or 'png' in content or 'jpeg' in content):
- add_url(content, 'detail')
-
- # 推荐搭配模块中的图片
- elif s_type == 'sm_type_goods_detail_match_product':
- json_data = block.get('data', {}).get('json_data', {})
- collocate = json_data.get('collocate_recommend', {})
- spus = collocate.get('spus', [])
- for spu in spus:
- if 'picture' in spu and spu['picture']:
- add_url(spu['picture'], 'detail')
-
- # 从float模块中提取购物车相关图片
- for block in blocks.get('float', []):
- if block.get('s_type') == 'sm_type_cart_info':
- data_json = block.get('data', {})
- shopping_cart = data_json.get('shopping_cart', {})
- if shopping_cart.get('shopping_cart_icon'):
- add_url(shopping_cart['shopping_cart_icon'], 'detail')
-
- # 使用正则表达式提取所有图片URL作为补充
- data_str = json.dumps(data, ensure_ascii=False)
-
- # 匹配图片URL
- image_pattern = r'https?://[^\s"\']+\.(?:jpg|jpeg|png|gif|webp)(?:\?[^\s"\']*)?'
- all_images = re.findall(image_pattern, data_str, re.IGNORECASE)
-
- # 视频匹配
- video_pattern = r'https?://[^\s"\']+\.(?:mp4|mov|avi|flv|webm)(?:\?[^\s"\']*)?'
- all_videos = re.findall(video_pattern, data_str, re.IGNORECASE)
-
- # 分类正则匹配到的图片
- for img_url in all_images:
- if img_url in seen_urls:
- continue
- seen_urls.add(img_url)
-
- # 根据URL特征分类
- if 'head_photo' in img_url or 'main' in img_url.lower() or 'sku' in img_url.lower():
- urls['main_images'].append(img_url)
- else:
- urls['detail_images'].append(img_url)
-
- # 添加视频
- for video_url in all_videos:
- if video_url not in seen_urls:
- seen_urls.add(video_url)
- urls['videos'].append(video_url)
-
- except Exception as e:
- print(f"提取URL时出错: {e}")
-
- # 限制数量
- urls['main_images'] = urls['main_images'][:10]
- urls['detail_images'] = urls['detail_images'][:30]
- urls['videos'] = urls['videos'][:5]
-
- return urls
-
- def extract_product_info(self, data):
- """提取商品信息"""
- product_info = {
- 'name': '未命名商品',
- 'description': '无',
- 'min_price': '未知',
- 'spec': '无',
- 'brand': '未知',
- 'month_saled': '未知'
- }
-
- try:
- if 'data' in data and 'ext' in data['data']:
- common_info = data['data']['ext'].get('common_info', {})
-
- product_info['name'] = common_info.get('name', '未命名商品')
- product_info['description'] = common_info.get('description', '无')
- product_info['min_price'] = common_info.get('min_price', '未知')
- product_info['spec'] = common_info.get('sku_label', '无')
-
- # 提取月销量
- if 'skus' in common_info and common_info['skus']:
- sku = common_info['skus'][0]
- if 'promotion_info' in sku:
- match = re.search(r'月售(\d+)', sku.get('promotion_info', ''))
- if match:
- product_info['month_saled'] = match.group(1)
-
- # 提取品牌
- if 'name' in product_info and '农夫山泉' in product_info['name']:
- product_info['brand'] = '农夫山泉'
- elif 'name' in product_info and '可口可乐' in product_info['name']:
- product_info['brand'] = '可口可乐'
- elif 'name' in product_info and '百事' in product_info['name']:
- product_info['brand'] = '百事'
-
- except Exception as e:
- print(f"提取商品信息时出错: {e}")
-
- return product_info
-
- def download_file(self, url, filepath, timeout=30):
- """下载文件"""
- try:
- response = self.session.get(url, timeout=timeout, stream=True)
- if response.status_code == 200:
- with open(filepath, 'wb') as f:
- for chunk in response.iter_content(chunk_size=8192):
- if chunk:
- f.write(chunk)
- return True
- else:
- return False
- except Exception as e:
- print(f"下载失败 {url}: {e}")
- return False
-
- def sanitize_filename(self, name):
- """清理文件名中的非法字符"""
- # 移除非法字符
- name = re.sub(r'[<>:"/\\|?*]', '_', name)
- # 限制长度
- if len(name) > 100:
- name = name[:100]
- return name.strip()
-
- def download_resources(self, product_name, output_path, urls, progress_callback):
- """下载所有资源"""
- results = {
- 'success': True,
- 'downloaded': 0,
- 'total': 0,
- 'errors': []
- }
-
- # 创建子目录
- main_img_dir = output_path / "主图"
- detail_img_dir = output_path / "详情图"
- video_dir = output_path / "视频"
-
- main_img_dir.mkdir(exist_ok=True)
- detail_img_dir.mkdir(exist_ok=True)
- video_dir.mkdir(exist_ok=True)
-
- total_items = len(urls['main_images']) + len(urls['detail_images']) + len(urls['videos'])
-
- # 下载主图
- for i, url in enumerate(urls['main_images'], 1):
- results['total'] += 1
- # 获取文件扩展名
- path = urlparse(url).path
- ext = os.path.splitext(path)[1] or '.jpg'
- if ext not in ['.jpg', '.jpeg', '.png', '.gif', '.webp']:
- ext = '.jpg'
- filename = f"zhutu_{i}{ext}"
- filepath = main_img_dir / filename
-
- progress_callback(f"下载主图 {i}/{len(urls['main_images'])}...",
- int(results['total'] / total_items * 100))
-
- if self.download_file(url, filepath):
- results['downloaded'] += 1
- else:
- results['errors'].append(f"主图{i}: {url}")
-
- # 下载详情图
- for i, url in enumerate(urls['detail_images'], 1):
- results['total'] += 1
- path = urlparse(url).path
- ext = os.path.splitext(path)[1] or '.jpg'
- if ext not in ['.jpg', '.jpeg', '.png', '.gif', '.webp']:
- ext = '.jpg'
- filename = f"xiangqing_{i}{ext}"
- filepath = detail_img_dir / filename
-
- progress_callback(f"下载详情图 {i}/{len(urls['detail_images'])}...",
- int(results['total'] / total_items * 100))
-
- if self.download_file(url, filepath):
- results['downloaded'] += 1
- else:
- results['errors'].append(f"详情图{i}: {url}")
-
- # 下载视频
- for i, url in enumerate(urls['videos'], 1):
- results['total'] += 1
- path = urlparse(url).path
- ext = os.path.splitext(path)[1] or '.mp4'
- if ext not in ['.mp4', '.mov', '.avi', '.webm']:
- ext = '.mp4'
- filename = f"shipin_{i}{ext}"
- filepath = video_dir / filename
-
- progress_callback(f"下载视频 {i}/{len(urls['videos'])}...",
- int(results['total'] / total_items * 100))
-
- if self.download_file(url, filepath):
- results['downloaded'] += 1
- else:
- results['errors'].append(f"视频{i}: {url}")
-
- return results
-
- def start_download(self):
- """启动下载任务"""
- # 获取输入数据
- input_data = self.input_text.get(1.0, END).strip()
- if not input_data:
- messagebox.showerror("错误", "请输入商品数据")
- return
-
- # 检查输出目录
- output_dir = self.output_dir.get()
- if not output_dir:
- messagebox.showerror("错误", "请选择输出目录")
- return
-
- # 禁用启动按钮
- self.start_btn.config(state=DISABLED, text="⏳ 下载中...")
- self.progress_var.set(0)
-
- # 在新线程中执行下载
- thread = threading.Thread(target=self.download_task, args=(input_data, output_dir))
- thread.daemon = True
- thread.start()
-
- def download_task(self, input_data, output_dir):
- """下载任务"""
- try:
- # 解析JSON
- data = json.loads(input_data)
-
- # 提取商品信息
- product_info = self.extract_product_info(data)
-
- # 更新信息显示
- self.root.after(0, self.update_info_display, product_info)
-
- # 清理产品名作为文件夹名
- safe_name = self.sanitize_filename(product_info['name'])
-
- # 创建产品文件夹
- product_path = Path(output_dir) / safe_name
- product_path.mkdir(exist_ok=True)
-
- # 提取URL
- self.update_status_thread("正在分析数据,提取资源链接...")
- urls = self.extract_urls(data)
-
- total_urls = len(urls['main_images']) + len(urls['detail_images']) + len(urls['videos'])
-
- if total_urls == 0:
- self.root.after(0, messagebox.showwarning, "警告", "未找到任何图片或视频资源")
- self.update_status_thread("未找到资源")
- self.enable_button_thread()
- return
-
- self.update_status_thread(f"发现 {len(urls['main_images'])} 张主图, {len(urls['detail_images'])} 张详情图, {len(urls['videos'])} 个视频")
-
- # 定义进度回调
- def update_progress(msg, progress):
- self.update_status_thread(msg)
- self.root.after(0, self.progress_var.set, progress)
-
- # 下载资源
- results = self.download_resources(safe_name, product_path, urls, update_progress)
-
- # 更新进度到100%
- self.root.after(0, self.progress_var.set, 100)
-
- # 显示结果
- if results['downloaded'] > 0:
- msg = f"✅ 下载完成!\n\n成功下载 {results['downloaded']}/{results['total']} 个文件\n保存路径: {product_path}"
- if results['errors']:
- msg += f"\n\n⚠️ 失败 {len(results['errors'])} 个文件"
- self.root.after(0, messagebox.showinfo, "完成", msg)
- self.update_status_thread(f"下载完成 - 成功 {results['downloaded']}/{results['total']}")
- else:
- self.root.after(0, messagebox.showerror, "错误", "下载失败,没有成功下载任何文件")
- self.update_status_thread("下载失败")
-
- except json.JSONDecodeError as e:
- self.root.after(0, messagebox.showerror, "JSON解析错误", f"输入的不是有效的JSON格式:\n{str(e)}")
- self.update_status_thread("JSON解析失败")
- except Exception as e:
- self.root.after(0, messagebox.showerror, "错误", f"处理过程中出现错误:\n{str(e)}")
- self.update_status_thread("处理失败")
- finally:
- self.enable_button_thread()
-
- def update_status_thread(self, message):
- """从线程更新状态"""
- self.root.after(0, lambda: self.status_label.config(text=message))
-
- def enable_button_thread(self):
- """从线程启用按钮"""
- self.root.after(0, lambda: self.start_btn.config(state=NORMAL, text="▶ 启动下载"))
- def main():
- root = Tk()
- app = ProductDownloaderGUI(root)
- root.mainloop()
- if __name__ == "__main__":
- main()
|