PUGE 5 日 前
コミット
b19b6c70c6
1 ファイル変更467 行追加0 行削除
  1. 467 0
      整理美团数据.py

+ 467 - 0
整理美团数据.py

@@ -0,0 +1,467 @@
+import json
+import os
+import re
+import requests
+from pathlib import Path
+from urllib.parse import urlparse
+from tkinter import *
+from tkinter import ttk, filedialog, messagebox, scrolledtext
+import threading
+from datetime import datetime
+
+class ProductDownloaderGUI:
+    def __init__(self, root):
+        self.root = root
+        self.root.title("商品资源下载器")
+        self.root.geometry("900x700")
+        self.root.resizable(True, True)
+        
+        # 设置样式
+        style = ttk.Style()
+        style.theme_use('clam')
+        
+        # 创建主框架
+        main_frame = ttk.Frame(root, padding="10")
+        main_frame.pack(fill=BOTH, expand=True)
+        
+        # 输入区域
+        input_frame = ttk.LabelFrame(main_frame, text="商品数据输入", padding="10")
+        input_frame.pack(fill=BOTH, expand=True, pady=(0, 10))
+        
+        # 输入框
+        self.input_text = scrolledtext.ScrolledText(input_frame, height=15, font=("Consolas", 10))
+        self.input_text.pack(fill=BOTH, expand=True)
+        
+        # 输出目录选择
+        output_frame = ttk.LabelFrame(main_frame, text="输出设置", padding="10")
+        output_frame.pack(fill=X, pady=(0, 10))
+        
+        dir_select_frame = ttk.Frame(output_frame)
+        dir_select_frame.pack(fill=X)
+        
+        self.output_dir = StringVar()
+        self.output_entry = ttk.Entry(dir_select_frame, textvariable=self.output_dir)
+        self.output_entry.pack(side=LEFT, fill=X, expand=True, padx=(0, 5))
+        
+        browse_btn = ttk.Button(dir_select_frame, text="浏览...", command=self.browse_output_dir)
+        browse_btn.pack(side=RIGHT)
+        
+        # 信息显示区域
+        info_frame = ttk.LabelFrame(main_frame, text="商品信息", padding="10")
+        info_frame.pack(fill=BOTH, expand=True, pady=(0, 10))
+        
+        self.info_text = scrolledtext.ScrolledText(info_frame, height=8, font=("微软雅黑", 10))
+        self.info_text.pack(fill=BOTH, expand=True)
+        
+        # 进度条
+        self.progress_var = DoubleVar()
+        self.progress_bar = ttk.Progressbar(main_frame, variable=self.progress_var, maximum=100)
+        self.progress_bar.pack(fill=X, pady=(0, 10))
+        
+        # 状态标签
+        self.status_label = ttk.Label(main_frame, text="就绪")
+        self.status_label.pack(pady=(0, 10))
+        
+        # 按钮区域
+        button_frame = ttk.Frame(main_frame)
+        button_frame.pack(fill=X)
+        
+        # 启动按钮
+        self.start_btn = ttk.Button(button_frame, text="▶ 启动下载", command=self.start_download, width=15)
+        self.start_btn.pack(side=LEFT, padx=5)
+        
+        clear_btn = ttk.Button(button_frame, text="清空输入", command=self.clear_input)
+        clear_btn.pack(side=LEFT, padx=5)
+        
+        # 退出按钮
+        exit_btn = ttk.Button(button_frame, text="退出", command=self.root.quit)
+        exit_btn.pack(side=RIGHT, padx=5)
+        
+        # 设置下载会话
+        self.session = requests.Session()
+        self.session.headers.update({
+            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
+        })
+        
+    def browse_output_dir(self):
+        """选择输出目录"""
+        directory = filedialog.askdirectory()
+        if directory:
+            self.output_dir.set(directory)
+            
+    def clear_input(self):
+        """清空输入框"""
+        self.input_text.delete(1.0, END)
+        
+    def update_info_display(self, product_info):
+        """更新商品信息显示"""
+        self.info_text.delete(1.0, END)
+        info_str = f"""
+【商品名称】{product_info.get('name', '未知')}
+【商品描述】{product_info.get('description', '无')}
+【价格】¥{product_info.get('min_price', '未知')}
+【规格】{product_info.get('spec', '无')}
+【品牌】{product_info.get('brand', '未知')}
+【月销量】{product_info.get('month_saled', '未知')}
+        """
+        self.info_text.insert(1.0, info_str.strip())
+        
+    def extract_urls(self, data):
+        """从JSON数据中提取图片和视频URL"""
+        urls = {
+            'main_images': [],      # 主图
+            'detail_images': [],    # 详情图
+            'videos': []            # 视频
+        }
+        
+        seen_urls = set()
+        
+        def add_url(url, url_type):
+            """添加URL,避免重复"""
+            if url and url not in seen_urls:
+                seen_urls.add(url)
+                if url_type == 'main':
+                    urls['main_images'].append(url)
+                elif url_type == 'detail':
+                    urls['detail_images'].append(url)
+                elif url_type == 'video':
+                    urls['videos'].append(url)
+        
+        try:
+            # 从common_info中提取SKU主图
+            if 'data' in data and 'ext' in data['data']:
+                common_info = data['data']['ext'].get('common_info', {})
+                
+                # 提取商品名称和描述
+                if 'skus' in common_info:
+                    for sku in common_info['skus']:
+                        if 'picture' in sku and sku['picture']:
+                            add_url(sku['picture'], 'main')
+                
+                # 从blocks中提取
+                if 'blocks' in data['data']['ext']:
+                    blocks = data['data']['ext']['blocks']
+                    
+                    # 提取主图(头图)
+                    for block in blocks.get('banner', []):
+                        s_type = block.get('s_type', '')
+                        
+                        # 头图模块
+                        if s_type == 'sm_type_detail_pop_head_photo':
+                            head_info = block.get('data', {}).get('head_info', {})
+                            pictures = head_info.get('pictures', [])
+                            for pic in pictures:
+                                add_url(pic, 'main')
+                        
+                        # 商品详情图模块
+                        elif s_type == 'sm_type_goods_detail_describe_non_food':
+                            json_data = block.get('data', {}).get('json_data', {})
+                            pic_content = json_data.get('pic_content', {})
+                            contents = pic_content.get('contents', [])
+                            for content in contents:
+                                if content.startswith('http') and ('jpg' in content or 'png' in content or 'jpeg' in content):
+                                    add_url(content, 'detail')
+                        
+                        # 推荐搭配模块中的图片
+                        elif s_type == 'sm_type_goods_detail_match_product':
+                            json_data = block.get('data', {}).get('json_data', {})
+                            collocate = json_data.get('collocate_recommend', {})
+                            spus = collocate.get('spus', [])
+                            for spu in spus:
+                                if 'picture' in spu and spu['picture']:
+                                    add_url(spu['picture'], 'detail')
+                    
+                    # 从float模块中提取购物车相关图片
+                    for block in blocks.get('float', []):
+                        if block.get('s_type') == 'sm_type_cart_info':
+                            data_json = block.get('data', {})
+                            shopping_cart = data_json.get('shopping_cart', {})
+                            if shopping_cart.get('shopping_cart_icon'):
+                                add_url(shopping_cart['shopping_cart_icon'], 'detail')
+            
+            # 使用正则表达式提取所有图片URL作为补充
+            data_str = json.dumps(data, ensure_ascii=False)
+            
+            # 匹配图片URL
+            image_pattern = r'https?://[^\s"\']+\.(?:jpg|jpeg|png|gif|webp)(?:\?[^\s"\']*)?'
+            all_images = re.findall(image_pattern, data_str, re.IGNORECASE)
+            
+            # 视频匹配
+            video_pattern = r'https?://[^\s"\']+\.(?:mp4|mov|avi|flv|webm)(?:\?[^\s"\']*)?'
+            all_videos = re.findall(video_pattern, data_str, re.IGNORECASE)
+            
+            # 分类正则匹配到的图片
+            for img_url in all_images:
+                if img_url in seen_urls:
+                    continue
+                seen_urls.add(img_url)
+                
+                # 根据URL特征分类
+                if 'head_photo' in img_url or 'main' in img_url.lower() or 'sku' in img_url.lower():
+                    urls['main_images'].append(img_url)
+                else:
+                    urls['detail_images'].append(img_url)
+            
+            # 添加视频
+            for video_url in all_videos:
+                if video_url not in seen_urls:
+                    seen_urls.add(video_url)
+                    urls['videos'].append(video_url)
+                    
+        except Exception as e:
+            print(f"提取URL时出错: {e}")
+        
+        # 限制数量
+        urls['main_images'] = urls['main_images'][:10]
+        urls['detail_images'] = urls['detail_images'][:30]
+        urls['videos'] = urls['videos'][:5]
+        
+        return urls
+    
+    def extract_product_info(self, data):
+        """提取商品信息"""
+        product_info = {
+            'name': '未命名商品',
+            'description': '无',
+            'min_price': '未知',
+            'spec': '无',
+            'brand': '未知',
+            'month_saled': '未知'
+        }
+        
+        try:
+            if 'data' in data and 'ext' in data['data']:
+                common_info = data['data']['ext'].get('common_info', {})
+                
+                product_info['name'] = common_info.get('name', '未命名商品')
+                product_info['description'] = common_info.get('description', '无')
+                product_info['min_price'] = common_info.get('min_price', '未知')
+                product_info['spec'] = common_info.get('sku_label', '无')
+                
+                # 提取月销量
+                if 'skus' in common_info and common_info['skus']:
+                    sku = common_info['skus'][0]
+                    if 'promotion_info' in sku:
+                        match = re.search(r'月售(\d+)', sku.get('promotion_info', ''))
+                        if match:
+                            product_info['month_saled'] = match.group(1)
+                
+                # 提取品牌
+                if 'name' in product_info and '农夫山泉' in product_info['name']:
+                    product_info['brand'] = '农夫山泉'
+                elif 'name' in product_info and '可口可乐' in product_info['name']:
+                    product_info['brand'] = '可口可乐'
+                elif 'name' in product_info and '百事' in product_info['name']:
+                    product_info['brand'] = '百事'
+                    
+        except Exception as e:
+            print(f"提取商品信息时出错: {e}")
+        
+        return product_info
+    
+    def download_file(self, url, filepath, timeout=30):
+        """下载文件"""
+        try:
+            response = self.session.get(url, timeout=timeout, stream=True)
+            if response.status_code == 200:
+                with open(filepath, 'wb') as f:
+                    for chunk in response.iter_content(chunk_size=8192):
+                        if chunk:
+                            f.write(chunk)
+                return True
+            else:
+                return False
+        except Exception as e:
+            print(f"下载失败 {url}: {e}")
+            return False
+    
+    def sanitize_filename(self, name):
+        """清理文件名中的非法字符"""
+        # 移除非法字符
+        name = re.sub(r'[<>:"/\\|?*]', '_', name)
+        # 限制长度
+        if len(name) > 100:
+            name = name[:100]
+        return name.strip()
+    
+    def download_resources(self, product_name, output_path, urls, progress_callback):
+        """下载所有资源"""
+        results = {
+            'success': True,
+            'downloaded': 0,
+            'total': 0,
+            'errors': []
+        }
+        
+        # 创建子目录
+        main_img_dir = output_path / "主图"
+        detail_img_dir = output_path / "详情图"
+        video_dir = output_path / "视频"
+        
+        main_img_dir.mkdir(exist_ok=True)
+        detail_img_dir.mkdir(exist_ok=True)
+        video_dir.mkdir(exist_ok=True)
+        
+        total_items = len(urls['main_images']) + len(urls['detail_images']) + len(urls['videos'])
+        
+        # 下载主图
+        for i, url in enumerate(urls['main_images'], 1):
+            results['total'] += 1
+            # 获取文件扩展名
+            path = urlparse(url).path
+            ext = os.path.splitext(path)[1] or '.jpg'
+            if ext not in ['.jpg', '.jpeg', '.png', '.gif', '.webp']:
+                ext = '.jpg'
+            filename = f"zhutu_{i}{ext}"
+            filepath = main_img_dir / filename
+            
+            progress_callback(f"下载主图 {i}/{len(urls['main_images'])}...", 
+                            int(results['total'] / total_items * 100))
+            
+            if self.download_file(url, filepath):
+                results['downloaded'] += 1
+            else:
+                results['errors'].append(f"主图{i}: {url}")
+        
+        # 下载详情图
+        for i, url in enumerate(urls['detail_images'], 1):
+            results['total'] += 1
+            path = urlparse(url).path
+            ext = os.path.splitext(path)[1] or '.jpg'
+            if ext not in ['.jpg', '.jpeg', '.png', '.gif', '.webp']:
+                ext = '.jpg'
+            filename = f"xiangqing_{i}{ext}"
+            filepath = detail_img_dir / filename
+            
+            progress_callback(f"下载详情图 {i}/{len(urls['detail_images'])}...",
+                            int(results['total'] / total_items * 100))
+            
+            if self.download_file(url, filepath):
+                results['downloaded'] += 1
+            else:
+                results['errors'].append(f"详情图{i}: {url}")
+        
+        # 下载视频
+        for i, url in enumerate(urls['videos'], 1):
+            results['total'] += 1
+            path = urlparse(url).path
+            ext = os.path.splitext(path)[1] or '.mp4'
+            if ext not in ['.mp4', '.mov', '.avi', '.webm']:
+                ext = '.mp4'
+            filename = f"shipin_{i}{ext}"
+            filepath = video_dir / filename
+            
+            progress_callback(f"下载视频 {i}/{len(urls['videos'])}...",
+                            int(results['total'] / total_items * 100))
+            
+            if self.download_file(url, filepath):
+                results['downloaded'] += 1
+            else:
+                results['errors'].append(f"视频{i}: {url}")
+        
+        return results
+    
+    def start_download(self):
+        """启动下载任务"""
+        # 获取输入数据
+        input_data = self.input_text.get(1.0, END).strip()
+        if not input_data:
+            messagebox.showerror("错误", "请输入商品数据")
+            return
+        
+        # 检查输出目录
+        output_dir = self.output_dir.get()
+        if not output_dir:
+            messagebox.showerror("错误", "请选择输出目录")
+            return
+        
+        # 禁用启动按钮
+        self.start_btn.config(state=DISABLED, text="⏳ 下载中...")
+        self.progress_var.set(0)
+        
+        # 在新线程中执行下载
+        thread = threading.Thread(target=self.download_task, args=(input_data, output_dir))
+        thread.daemon = True
+        thread.start()
+    
+    def download_task(self, input_data, output_dir):
+        """下载任务"""
+        try:
+            # 解析JSON
+            data = json.loads(input_data)
+            
+            # 提取商品信息
+            product_info = self.extract_product_info(data)
+            
+            # 更新信息显示
+            self.root.after(0, self.update_info_display, product_info)
+            
+            # 清理产品名作为文件夹名
+            safe_name = self.sanitize_filename(product_info['name'])
+            
+            # 创建产品文件夹
+            product_path = Path(output_dir) / safe_name
+            product_path.mkdir(exist_ok=True)
+            
+            # 提取URL
+            self.update_status_thread("正在分析数据,提取资源链接...")
+            urls = self.extract_urls(data)
+            
+            total_urls = len(urls['main_images']) + len(urls['detail_images']) + len(urls['videos'])
+            
+            if total_urls == 0:
+                self.root.after(0, messagebox.showwarning, "警告", "未找到任何图片或视频资源")
+                self.update_status_thread("未找到资源")
+                self.enable_button_thread()
+                return
+            
+            self.update_status_thread(f"发现 {len(urls['main_images'])} 张主图, {len(urls['detail_images'])} 张详情图, {len(urls['videos'])} 个视频")
+            
+            # 定义进度回调
+            def update_progress(msg, progress):
+                self.update_status_thread(msg)
+                self.root.after(0, self.progress_var.set, progress)
+            
+            # 下载资源
+            results = self.download_resources(safe_name, product_path, urls, update_progress)
+            
+            # 更新进度到100%
+            self.root.after(0, self.progress_var.set, 100)
+            
+            # 显示结果
+            if results['downloaded'] > 0:
+                msg = f"✅ 下载完成!\n\n成功下载 {results['downloaded']}/{results['total']} 个文件\n保存路径: {product_path}"
+                if results['errors']:
+                    msg += f"\n\n⚠️ 失败 {len(results['errors'])} 个文件"
+                self.root.after(0, messagebox.showinfo, "完成", msg)
+                self.update_status_thread(f"下载完成 - 成功 {results['downloaded']}/{results['total']}")
+            else:
+                self.root.after(0, messagebox.showerror, "错误", "下载失败,没有成功下载任何文件")
+                self.update_status_thread("下载失败")
+                
+        except json.JSONDecodeError as e:
+            self.root.after(0, messagebox.showerror, "JSON解析错误", f"输入的不是有效的JSON格式:\n{str(e)}")
+            self.update_status_thread("JSON解析失败")
+        except Exception as e:
+            self.root.after(0, messagebox.showerror, "错误", f"处理过程中出现错误:\n{str(e)}")
+            self.update_status_thread("处理失败")
+        finally:
+            self.enable_button_thread()
+    
+    def update_status_thread(self, message):
+        """从线程更新状态"""
+        self.root.after(0, lambda: self.status_label.config(text=message))
+    
+    def enable_button_thread(self):
+        """从线程启用按钮"""
+        self.root.after(0, lambda: self.start_btn.config(state=NORMAL, text="▶ 启动下载"))
+
+
+def main():
+    root = Tk()
+    app = ProductDownloaderGUI(root)
+    root.mainloop()
+
+
+if __name__ == "__main__":
+    main()