import time import requests import sqlite3 from datetime import datetime def get_buff_data(page_num): """从Buff API获取商品数据""" url = "https://buff.163.com/api/market/goods" querystring = {"game": "csgo"} headers = { "accept": "application/json, text/javascript, */*; q=0.01", "accept-language": "zh-CN,zh;q=0.9", "cache-control": "no-cache", "dnt": "1", "pragma": "no-cache", "priority": "u=1, i", "referer": "https://buff.163.com/market/csgo", "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36", "x-requested-with": "XMLHttpRequest", "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive" } proxies={ "http": "socks5://USER660348-zone-custom-region-HK:e8b389@hk.rotgb.711proxy.com:10000", "https": "socks5://USER660348-zone-custom-region-HK:e8b389@hk.rotgb.711proxy.com:10000", } try: response = requests.get(url, headers=headers, params=querystring, proxies = proxies) return response.json() except Exception as e: print(f"请求出错: {e}") return None def save_to_database(data): """保存数据到数据库""" if data is None: print("数据为空,跳过保存") return conn = sqlite3.connect('buff_prices.db') cursor = conn.cursor() current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') try: items = data.get('data', {}).get('items', []) count = 0 inserted_count = 0 updated_count = 0 for item in items: # 获取稀有度 rarity = None if 'goods_info' in item and 'info' in item['goods_info']: tags = item['goods_info']['info'].get('tags', {}) if 'rarity' in tags: rarity = tags['rarity'].get('localized_name') # 获取类型 item_type = None if 'goods_info' in item and 'info' in item['goods_info']: tags = item['goods_info']['info'].get('tags', {}) if 'type' in tags: item_type = tags['type'].get('localized_name') # 获取图标 icon_url = None if 'goods_info' in item: icon_url = item['goods_info'].get('icon_url') # 插入或更新商品信息 cursor.execute(''' INSERT INTO products (buff_id, name, market_hash_name, type, rarity, icon_url, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT(buff_id) DO UPDATE SET name = excluded.name, market_hash_name = excluded.market_hash_name, type = excluded.type, rarity = excluded.rarity, icon_url = excluded.icon_url, updated_at = excluded.updated_at ''', ( item['id'], item['name'], item['market_hash_name'], item_type, rarity, icon_url, current_time )) # 判断是插入还是更新 if cursor.rowcount == 1: inserted_count += 1 else: updated_count += 1 # 获取刚插入或更新的商品ID cursor.execute('SELECT id FROM products WHERE buff_id = ?', (item['id'],)) product = cursor.fetchone() product_id = product[0] # 插入或更新Buff平台价格 cursor.execute(''' INSERT INTO platform_prices (product_id, platform_code, min_price, recorded_at) VALUES (?, ?, ?, ?) ON CONFLICT(product_id, platform_code) DO UPDATE SET min_price = excluded.min_price, recorded_at = excluded.recorded_at ''', ( product_id, 'buff', item['sell_min_price'], current_time )) count += 1 conn.commit() print(f"成功保存 {count} 条商品数据") print(f"其中: 新插入 {inserted_count} 条, 更新 {updated_count} 条") except Exception as e: conn.rollback() print(f"保存数据出错: {e}") finally: conn.close() def main(): ind = 0 while True: try: # ind = ind + 1 # 获取Buff数据 # print("正在获取Buff数据,页码:" + str(ind)) response_data = get_buff_data(ind) if response_data and response_data.get('code') == 'OK': # 保存到数据库 save_to_database(response_data) else: if response_data: print(f"API返回错误: {response_data.get('msg')}") else: print("获取数据失败") except Exception as e: print(f"主循环出错: {e}") time.sleep(30) if __name__ == "__main__": main()