import requests import json import os from urllib.parse import urlparse from typing import Optional, Dict, Any from PIL import Image import io token = "Bearer eyJhbGciOiJIUzI1NiJ9.eyJyb2xlIjoiQURNSU4iLCJ1c2VySWQiOjEsImlhdCI6MTc3MjYxOTA2NiwiZXhwIjoxNzczMjIzODY2fQ.IatRhEStT5IUXHsN8_tvxuijNFO0tNkIADYIV79dWMo" def get_product_detail(mer_id: str, product_id: str) -> Optional[Dict[str, Any]]: global token """ 获取产品详情 Args: mer_id: 商户ID product_id: 产品ID Returns: 返回JSON格式的产品详情数据,如果请求失败则返回None """ # 构建URL base_url = "https://api.tiki-int.site/api/product/detail" params = { "mer_id": mer_id, "product_id": product_id } # 设置请求头 headers = { "accept": "application/json, text/plain, */*", "accept-language": "zh-CN,zh;q=0.9,en;q=0.8,zh-TW;q=0.7", "cache-control": "no-cache", "lang": "en", "pragma": "no-cache", "priority": "u=1, i", "sec-ch-ua": '"Not:A-Brand";v="99", "Google Chrome";v="145", "Chromium";v="145"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "cross-site", "Referer": "https://tiki.fashion11.top/", "Referrer-Policy": "strict-origin-when-cross-origin" } try: # 发送GET请求 response = requests.get( url=base_url, params=params, headers=headers, timeout=10 # 设置10秒超时 ) # 检查响应状态 response.raise_for_status() # 返回JSON数据 return response.json() except requests.exceptions.RequestException as e: print(f"请求失败: {e}") return None except json.JSONDecodeError as e: print(f"JSON解析失败: {e}") return None import os from urllib.parse import urlparse import requests from PIL import Image import io def download_and_upload_image(image_url): """ 下载图片并上传到指定地址,webp格式自动转换为jpg Args: image_url: 图片的URL地址 Returns: 上传结果的JSON响应 """ # 默认上传地址 upload_url = "https://www.as1688.vip/api/v1/file/upload" # 从URL中提取文件名和后缀 parsed_url = urlparse(image_url) filename = os.path.basename(parsed_url.path) if not filename or '.' not in filename: filename = "image.jpg" # 获取文件后缀 ext = os.path.splitext(filename)[1].lower() # 临时文件路径 temp_file = f"temp_{filename}" try: # 1. 下载图片 # print(f"正在下载图片: {image_url}") img_response = requests.get(image_url, timeout=30) img_response.raise_for_status() # 2. 处理webp格式 need_convert = False if ext == '.webp': need_convert = True # print("检测到webp格式,正在转换为jpg...") # 使用PIL转换webp为jpg image = Image.open(io.BytesIO(img_response.content)) # 如果是RGBA模式,转换为RGB(去除alpha通道) if image.mode == 'RGBA': # 创建白色背景 rgb_image = Image.new('RGB', image.size, (255, 255, 255)) # 粘贴webp图像,使用alpha通道作为掩码 rgb_image.paste(image, mask=image.split()[3]) image = rgb_image elif image.mode != 'RGB': image = image.convert('RGB') # 保存为jpg到临时文件 image.save(temp_file, 'JPEG', quality=85) # 更新文件名和MIME类型 base_name = os.path.splitext(filename)[0] filename = f"{base_name}.jpg" mime_type = 'image/jpeg' print(f"已转换为: {filename}") else: # 普通图片,直接保存 with open(temp_file, 'wb') as f: f.write(img_response.content) # 设置MIME类型 mime_types = { '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', '.gif': 'image/gif', '.bmp': 'image/bmp', '.ico': 'image/x-icon', '.svg': 'image/svg+xml' } mime_type = mime_types.get(ext, 'image/jpeg') # print(f"图片已保存到: {temp_file}") # 3. 准备上传 headers = { "accept": "application/json", "accept-language": "zh-CN,zh;q=0.9", "authorization": token, "sec-ch-ua": '"Not:A-Brand";v="99", "Google Chrome";v="145", "Chromium";v="145"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"' } # 准备文件上传 with open(temp_file, 'rb') as f: files = { 'file': (filename, f, mime_type) } # print(f"正在上传到: {upload_url}") # print(f"文件名: {filename}, 类型: {mime_type}") upload_response = requests.post( upload_url, headers=headers, files=files, timeout=30 ) upload_response.raise_for_status() result = upload_response.json() print("上传成功!") return result except requests.exceptions.RequestException as e: print(f"网络请求错误: {e}") return {"error": str(e)} except ImportError: print("错误: 请安装PIL库: pip install Pillow") return {"error": "缺少PIL库,无法转换webp格式"} except Exception as e: print(f"其他错误: {e}") return {"error": str(e)} finally: # 清理临时文件 if os.path.exists(temp_file): os.remove(temp_file) # print(f"临时文件已删除: {temp_file}") def batch_create_products(body_data): """ 批量创建商品 Args: body_data: 可以是字典或JSON字符串 token: 授权token,如果不提供则使用默认token Returns: 接口返回的JSON结果 """ # 上传地址 url = "https://www.as1688.vip/api/v1/product-management/products/batch" # 请求头 headers = { "accept": "application/json", "accept-language": "zh-CN,zh;q=0.9", "authorization": token, "content-type": "application/json", "sec-ch-ua": '"Not:A-Brand";v="99", "Google Chrome";v="145", "Chromium";v="145"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin" } # 处理body数据 if isinstance(body_data, dict): json_body = body_data elif isinstance(body_data, str): json_body = json.loads(body_data) else: raise ValueError("body_data必须是字典或JSON字符串") try: # 发送POST请求 response = requests.post( url, headers=headers, json=json_body, timeout=30 ) # 检查响应 response.raise_for_status() # 返回JSON结果 return response.json() except requests.exceptions.RequestException as e: print(f"请求错误: {e}") if hasattr(e, 'response') and e.response: print(f"响应内容: {e.response.text}") return {"error": str(e)} except json.JSONDecodeError as e: print(f"JSON解析错误: {e}") return {"error": str(e)} def get_product_list(category_id, page=1): """最简单的版本,只返回商品列表""" url = f"https://api.tiki-int.site/api/product/search?category_id={category_id}&page={page}&limit=15&type=1" try: return requests.get(url).json() except: return [] # 使用示例 if __name__ == "__main__": for index in range(1, 214): listData = get_product_list(24, index) for item in listData['data']['list']: print(index,item['product_id'], item['mer_id']) # 测试函数 result = get_product_detail(item['mer_id'],item['product_id']) print(result['data']['goods']) content = result['data']['goods']['content'] title = result['data']['goods']['title'] stock = result['data']['goods']['stock'] images = result['data']['goods']['images'].split(",") market_price = result['data']['goods']['sales_price'] # 上传图片获取URL uploaded_image_urls = [] for image_url in images: print(f"处理图片: {image_url}") resultFile = download_and_upload_image(image_url) if ('data' in resultFile): uploadFileURL = resultFile['data'] uploaded_image_urls.append(uploadFileURL) print(title, stock, market_price) if (len(uploaded_image_urls) > 0 and market_price > 0): # uploadInfo = {"products":[{"name":title,"category":"全球购","price":str(market_price),"stock":stock,"description":content,"images":uploaded_image_urls}]} print(uploadInfo["products"][0]["category"]) uploadInfoReturn = batch_create_products(uploadInfo) print(uploadInfoReturn)