PUGE пре 3 месеци
комит
029ccacade
4 измењених фајлова са 626 додато и 0 уклоњено
  1. 304 0
      app.py
  2. 145 0
      main.py
  3. 145 0
      发邮件.py
  4. 32 0
      随机sql.py

+ 304 - 0
app.py

@@ -0,0 +1,304 @@
+import requests
+import json
+import os
+from urllib.parse import urlparse
+from typing import Optional, Dict, Any
+from PIL import Image
+import io
+
+token = "Bearer eyJhbGciOiJIUzI1NiJ9.eyJyb2xlIjoiQURNSU4iLCJ1c2VySWQiOjEsImlhdCI6MTc3MjYxOTA2NiwiZXhwIjoxNzczMjIzODY2fQ.IatRhEStT5IUXHsN8_tvxuijNFO0tNkIADYIV79dWMo"
+
+def get_product_detail(mer_id: str, product_id: str) -> Optional[Dict[str, Any]]:
+    global token
+    """
+    获取产品详情
+    
+    Args:
+        mer_id: 商户ID
+        product_id: 产品ID
+        
+    Returns:
+        返回JSON格式的产品详情数据,如果请求失败则返回None
+    """
+    # 构建URL
+    base_url = "https://api.tiki-int.site/api/product/detail"
+    params = {
+        "mer_id": mer_id,
+        "product_id": product_id
+    }
+    
+    # 设置请求头
+    headers = {
+        "accept": "application/json, text/plain, */*",
+        "accept-language": "zh-CN,zh;q=0.9,en;q=0.8,zh-TW;q=0.7",
+        "cache-control": "no-cache",
+        "lang": "en",
+        "pragma": "no-cache",
+        "priority": "u=1, i",
+        "sec-ch-ua": '"Not:A-Brand";v="99", "Google Chrome";v="145", "Chromium";v="145"',
+        "sec-ch-ua-mobile": "?0",
+        "sec-ch-ua-platform": '"Windows"',
+        "sec-fetch-dest": "empty",
+        "sec-fetch-mode": "cors",
+        "sec-fetch-site": "cross-site",
+        "Referer": "https://tiki.fashion11.top/",
+        "Referrer-Policy": "strict-origin-when-cross-origin"
+    }
+    
+    try:
+        # 发送GET请求
+        response = requests.get(
+            url=base_url,
+            params=params,
+            headers=headers,
+            timeout=10  # 设置10秒超时
+        )
+        
+        # 检查响应状态
+        response.raise_for_status()
+        
+        # 返回JSON数据
+        return response.json()
+        
+    except requests.exceptions.RequestException as e:
+        print(f"请求失败: {e}")
+        return None
+    except json.JSONDecodeError as e:
+        print(f"JSON解析失败: {e}")
+        return None
+
+
+import os
+from urllib.parse import urlparse
+import requests
+from PIL import Image
+import io
+
+def download_and_upload_image(image_url):
+    """
+    下载图片并上传到指定地址,webp格式自动转换为jpg
+    
+    Args:
+        image_url: 图片的URL地址
+    
+    Returns:
+        上传结果的JSON响应
+    """
+    # 默认上传地址
+    upload_url = "https://www.as1688.vip/api/v1/file/upload"
+    
+    # 从URL中提取文件名和后缀
+    parsed_url = urlparse(image_url)
+    filename = os.path.basename(parsed_url.path)
+    if not filename or '.' not in filename:
+        filename = "image.jpg"
+    
+    # 获取文件后缀
+    ext = os.path.splitext(filename)[1].lower()
+    
+    # 临时文件路径
+    temp_file = f"temp_{filename}"
+    
+    try:
+        # 1. 下载图片
+        # print(f"正在下载图片: {image_url}")
+        img_response = requests.get(image_url, timeout=30)
+        img_response.raise_for_status()
+        
+        # 2. 处理webp格式
+        need_convert = False
+        if ext == '.webp':
+            need_convert = True
+            # print("检测到webp格式,正在转换为jpg...")
+            
+            # 使用PIL转换webp为jpg
+            image = Image.open(io.BytesIO(img_response.content))
+            
+            # 如果是RGBA模式,转换为RGB(去除alpha通道)
+            if image.mode == 'RGBA':
+                # 创建白色背景
+                rgb_image = Image.new('RGB', image.size, (255, 255, 255))
+                # 粘贴webp图像,使用alpha通道作为掩码
+                rgb_image.paste(image, mask=image.split()[3])
+                image = rgb_image
+            elif image.mode != 'RGB':
+                image = image.convert('RGB')
+            
+            # 保存为jpg到临时文件
+            image.save(temp_file, 'JPEG', quality=85)
+            
+            # 更新文件名和MIME类型
+            base_name = os.path.splitext(filename)[0]
+            filename = f"{base_name}.jpg"
+            mime_type = 'image/jpeg'
+            
+            print(f"已转换为: {filename}")
+        else:
+            # 普通图片,直接保存
+            with open(temp_file, 'wb') as f:
+                f.write(img_response.content)
+            
+            # 设置MIME类型
+            mime_types = {
+                '.jpg': 'image/jpeg',
+                '.jpeg': 'image/jpeg',
+                '.png': 'image/png',
+                '.gif': 'image/gif',
+                '.bmp': 'image/bmp',
+                '.ico': 'image/x-icon',
+                '.svg': 'image/svg+xml'
+            }
+            mime_type = mime_types.get(ext, 'image/jpeg')
+        
+        # print(f"图片已保存到: {temp_file}")
+        
+        # 3. 准备上传
+        headers = {
+            "accept": "application/json",
+            "accept-language": "zh-CN,zh;q=0.9",
+            "authorization": token,
+            "sec-ch-ua": '"Not:A-Brand";v="99", "Google Chrome";v="145", "Chromium";v="145"',
+            "sec-ch-ua-mobile": "?0",
+            "sec-ch-ua-platform": '"Windows"'
+        }
+        
+        # 准备文件上传
+        with open(temp_file, 'rb') as f:
+            files = {
+                'file': (filename, f, mime_type)
+            }
+            
+            # print(f"正在上传到: {upload_url}")
+            # print(f"文件名: {filename}, 类型: {mime_type}")
+            
+            upload_response = requests.post(
+                upload_url,
+                headers=headers,
+                files=files,
+                timeout=30
+            )
+            
+            upload_response.raise_for_status()
+            result = upload_response.json()
+            print("上传成功!")
+            return result
+            
+    except requests.exceptions.RequestException as e:
+        print(f"网络请求错误: {e}")
+        return {"error": str(e)}
+    except ImportError:
+        print("错误: 请安装PIL库: pip install Pillow")
+        return {"error": "缺少PIL库,无法转换webp格式"}
+    except Exception as e:
+        print(f"其他错误: {e}")
+        return {"error": str(e)}
+    finally:
+        # 清理临时文件
+        if os.path.exists(temp_file):
+            os.remove(temp_file)
+            # print(f"临时文件已删除: {temp_file}")
+
+
+def batch_create_products(body_data):
+    """
+    批量创建商品
+    
+    Args:
+        body_data: 可以是字典或JSON字符串
+        token: 授权token,如果不提供则使用默认token
+    
+    Returns:
+        接口返回的JSON结果
+    """
+    # 上传地址
+    url = "https://www.as1688.vip/api/v1/product-management/products/batch"
+    
+
+    # 请求头
+    headers = {
+        "accept": "application/json",
+        "accept-language": "zh-CN,zh;q=0.9",
+        "authorization": token,
+        "content-type": "application/json",
+        "sec-ch-ua": '"Not:A-Brand";v="99", "Google Chrome";v="145", "Chromium";v="145"',
+        "sec-ch-ua-mobile": "?0",
+        "sec-ch-ua-platform": '"Windows"',
+        "sec-fetch-dest": "empty",
+        "sec-fetch-mode": "cors",
+        "sec-fetch-site": "same-origin"
+    }
+    
+    # 处理body数据
+    if isinstance(body_data, dict):
+        json_body = body_data
+    elif isinstance(body_data, str):
+        json_body = json.loads(body_data)
+    else:
+        raise ValueError("body_data必须是字典或JSON字符串")
+    
+    try:
+        # 发送POST请求
+        response = requests.post(
+            url,
+            headers=headers,
+            json=json_body,
+            timeout=30
+        )
+        
+        # 检查响应
+        response.raise_for_status()
+        
+        # 返回JSON结果
+        return response.json()
+        
+    except requests.exceptions.RequestException as e:
+        print(f"请求错误: {e}")
+        if hasattr(e, 'response') and e.response:
+            print(f"响应内容: {e.response.text}")
+        return {"error": str(e)}
+    except json.JSONDecodeError as e:
+        print(f"JSON解析错误: {e}")
+        return {"error": str(e)}
+
+
+def get_product_list(category_id, page=1):
+    """最简单的版本,只返回商品列表"""
+    url = f"https://api.tiki-int.site/api/product/search?category_id={category_id}&page={page}&limit=15&type=1"
+    
+    try:
+        return requests.get(url).json()
+    except:
+        return []
+# 使用示例
+if __name__ == "__main__":
+    for index in range(1, 214):
+        listData = get_product_list(24, index)
+        for item in listData['data']['list']:
+            print(index,item['product_id'], item['mer_id'])
+            # 测试函数
+            result = get_product_detail(item['mer_id'],item['product_id'])
+
+            print(result['data']['goods'])
+            
+            content = result['data']['goods']['content']
+            title = result['data']['goods']['title']
+            stock = result['data']['goods']['stock']
+            images = result['data']['goods']['images'].split(",")
+            market_price = result['data']['goods']['sales_price']
+            # 上传图片获取URL
+            uploaded_image_urls = []
+            for image_url in images:
+                print(f"处理图片: {image_url}")
+                resultFile = download_and_upload_image(image_url)
+
+                if ('data' in resultFile):
+                    uploadFileURL = resultFile['data']
+                    uploaded_image_urls.append(uploadFileURL)
+            print(title, stock, market_price)
+            if (len(uploaded_image_urls) > 0 and market_price > 0):
+                #      
+                uploadInfo = {"products":[{"name":title,"category":"全球购","price":str(market_price),"stock":stock,"description":content,"images":uploaded_image_urls}]}
+                print(uploadInfo["products"][0]["category"])
+                uploadInfoReturn = batch_create_products(uploadInfo)
+                print(uploadInfoReturn)
+                

+ 145 - 0
main.py

@@ -0,0 +1,145 @@
+import subprocess
+import os
+import time
+
+def screenshot_basic(save_path="screenshot.png"):
+    """
+    基础截图功能:先保存到手机,再拉取到电脑
+    
+    Args:
+        save_path: 保存到电脑的路径
+    """
+    try:
+        # 1. 在手机上进行截图并保存到/sdcard/目录
+        print("正在截图...")
+        subprocess.run(
+            ["adb", "shell", "screencap", "-p", "/sdcard/temp_screen.png"],
+            check=True,  # check=True会在命令失败时抛出异常[citation:2]
+            timeout=10
+        )
+        
+        # 2. 将截图从手机拉取到电脑
+        print("正在传输截图...")
+        subprocess.run(
+            ["adb", "pull", "/sdcard/temp_screen.png", save_path],
+            check=True,
+            timeout=10
+        )
+        
+        # 3. 清理手机上的临时文件
+        subprocess.run(
+            ["adb", "shell", "rm", "/sdcard/temp_screen.png"],
+            check=True,
+            timeout=5
+        )
+        
+        print(f"截图已保存到: {save_path}")
+        return True
+        
+    except subprocess.CalledProcessError as e:
+        print(f"ADB命令执行失败: {e}")
+        return False
+    except subprocess.TimeoutExpired as e:
+        print(f"命令执行超时: {e}")
+        return False
+    except FileNotFoundError:
+        print("未找到ADB命令,请确认ADB已安装并配置环境变量")
+        return False
+
+def tap(x, y, device_serial=None):
+    """
+    在指定坐标位置执行点击操作
+    
+    Args:
+        x: X坐标
+        y: Y坐标
+        device_serial: 设备序列号(可选,多设备时需要指定)
+    
+    Returns:
+        bool: 是否成功
+    """
+    try:
+        cmd = ["adb"]
+        if device_serial:
+            cmd.extend(["-s", device_serial])
+        cmd.extend(["shell", "input", "tap", str(x), str(y)])
+        
+        result = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
+        return result.returncode == 0
+    except Exception as e:
+        print(f"点击失败: {e}")
+        return False
+
+def swipe_vertical(x, y1, y2, duration=300, device_serial=None):
+    """
+    在指定X坐标位置,从y1垂直滑动到y2
+    
+    Args:
+        x: X轴坐标
+        y1: 起始Y坐标
+        y2: 结束Y坐标
+        duration: 滑动持续时间(毫秒),默认300ms
+        device_serial: 设备序列号(可选,多设备时需要指定)
+    
+    Returns:
+        bool: 是否成功
+    """
+    try:
+        # 构建ADB命令
+        cmd = ["adb"]
+        if device_serial:
+            cmd.extend(["-s", device_serial])
+        
+        cmd.extend([
+            "shell", "input", "swipe",
+            str(x), str(y1),
+            str(x), str(y2),
+            str(duration)
+        ])
+        
+        print(f"滑动: ({x}, {y1}) -> ({x}, {y2}) 持续时间: {duration}ms")
+        
+        # 执行滑动
+        result = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
+        
+        if result.returncode == 0:
+            print("滑动成功")
+            return True
+        else:
+            print(f"滑动失败: {result.stderr}")
+            return False
+            
+    except Exception as e:
+        print(f"执行滑动时出错: {e}")
+        return False
+
+# 使用示例
+if __name__ == "__main__":
+    # 先确认设备已连接
+    result = subprocess.run(["adb", "devices"], capture_output=True, text=True)
+    print("当前连接的设备:")
+    print(result.stdout)
+    
+    while True:
+        swipe_vertical(100*2, 600*2, 300*2)
+        time.sleep(0.5)
+        swipe_vertical(100*2, 600*2, 300*2)
+        time.sleep(0.5)
+        swipe_vertical(100*2, 600*2, 300*2)
+        time.sleep(0.5)
+        swipe_vertical(100*2, 600*2, 300*2)
+        time.sleep(0.5)
+        swipe_vertical(100*2, 600*2, 300*2)
+        time.sleep(0.5)
+        swipe_vertical(100*2, 600*2, 300*2)
+        time.sleep(0.5)
+        tap(416 * 2, 790 * 2)
+        time.sleep(1)
+        tap(269 * 2, 875 * 2)
+        time.sleep(2)
+        # tap(103 * 4, 262 * 4)
+        # time.sleep(num1)
+        # swipe_vertical(138*4, 446*4, 578*4)
+        # time.sleep(5)
+    # time.sleep(2)
+    screenshot_basic("my_screenshot.png")

+ 145 - 0
发邮件.py

@@ -0,0 +1,145 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+"""
+固定配置的邮件发送脚本
+"""
+
+import smtplib
+import ssl
+from email.mime.text import MIMEText
+from email.mime.multipart import MIMEMultipart
+
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+"""
+简化版 - 监控MySQL配置值变化
+"""
+
+import time
+import pymysql
+import datetime
+
+# 数据库配置
+DB_CONFIG = {
+    'host': '127.0.0.1',
+    'port': 3306,
+    'user': 'root',
+    'password': 'Blossom2$',
+    'database': 'usdt_mall',
+    'charset': 'utf8mb4'
+}
+
+# 监控配置
+CHECK_INTERVAL = 600  # 10分钟
+CONFIG_KEY = 'receiving_address'
+
+def get_value():
+    """获取当前值"""
+    try:
+        conn = pymysql.connect(**DB_CONFIG)
+        with conn.cursor() as cursor:
+            cursor.execute(
+                "SELECT config_value FROM system_config WHERE config_key = %s",
+                (CONFIG_KEY,)
+            )
+            result = cursor.fetchone()
+            return result[0] if result else None
+    except Exception as e:
+        print(f"查询出错: {e}")
+        return None
+    finally:
+        if 'conn' in locals():
+            conn.close()
+
+
+
+def send_email(subject, body):
+    """发送固定内容的邮件"""
+    
+    # ===== 配置信息 =====
+    # SMTP服务器配置
+    smtp_server = "smtppro.zoho.com"
+    port = 465  # SSL端口
+    
+    # 发件人信息(请修改为你的密码)
+    sender_email = "info@as1688.vip"
+    sender_password = "jexSRAGgpmy0" 
+    
+    # 收件人列表
+    receivers = [
+        "Williamjohson9699@gmail.com",
+        "info@as1688.vip"
+    ]
+    
+
+    # ===================
+    
+    # 创建邮件
+    message = MIMEMultipart()
+    message["From"] = sender_email
+    message["To"] = ", ".join(receivers)  # 多个收件人用逗号分隔
+    message["Subject"] = subject
+    
+    # 添加邮件正文
+    message.attach(MIMEText(body, "plain", "utf-8"))
+    
+    try:
+        # 创建SSL连接
+        context = ssl.create_default_context()
+        
+        print(f"正在连接到 {smtp_server}...")
+        print(f"发件人: {sender_email}")
+        print(f"收件人: {', '.join(receivers)}")
+        print(f"主题: {subject}")
+        print(f"内容: {body}")
+        
+        # 连接到SMTP服务器
+        with smtplib.SMTP_SSL(smtp_server, port, context=context) as server:
+            # 登录
+            print("正在登录...")
+            server.login(sender_email, sender_password)
+            
+            # 发送邮件
+            print("正在发送邮件...")
+            server.send_message(message)
+            
+        print("✅ 邮件发送成功!")
+        
+    except Exception as e:
+        print(f"❌ 发送邮件时出错: {e}")
+        print("\n可能的原因:")
+        print("1. 密码错误 - 请检查 sender_password 是否正确")
+        print("2. 需要应用专用密码 - 请在Zoho邮箱生成应用专用密码")
+        print("3. 网络连接问题 - 请检查网络连接")
+        print("4. SMTP设置问题 - 确认Zoho邮箱已开启SMTP访问")
+
+if __name__ == "__main__":
+    print(f"开始监控 {CONFIG_KEY} 的变化...")
+    print(f"检查间隔: {CHECK_INTERVAL/60} 分钟")
+    print("-" * 50)
+    
+    previous = get_value()
+
+    
+    print(f"初始值: {previous}")
+    
+    count = 0
+    while True:
+        time.sleep(CHECK_INTERVAL)
+        count += 1
+        
+        current = get_value()
+        if current is None:
+            continue
+        
+        if current != previous:
+            print(f"[{datetime.datetime.now()}] 值发生变化!")
+            print(f"  旧值: {previous}")
+            print(f"  新值: {current}")
+            previous = current
+            send_email(f"[{datetime.datetime.now()}] 值发生变化!", f"  旧值: {previous}   新值: {current}")
+        else:
+            print(f"[{datetime.datetime.now()}] 第{count}次检查: 值未变化")
+                

+ 32 - 0
随机sql.py

@@ -0,0 +1,32 @@
+import random
+import datetime
+
+# 生成200条记录
+for i in range(5):
+    # user_id 8-110随机
+    # user_id = random.randint(8, 110)
+    user_id = 8
+    
+    # 生成随机日期 (2025年)
+    start_date = datetime.datetime(2025, 1, 1)
+    end_date = datetime.datetime(2025, 6, 28)
+    random_date = start_date + datetime.timedelta(days=random.randint(0, 364))
+    random_time = datetime.time(random.randint(0, 23), random.randint(0, 59), random.randint(0, 59))
+    datetime_str = datetime.datetime.combine(random_date, random_time)
+    
+    # 订单号
+    order_time = datetime_str.strftime("%Y%m%d%H%M%S")
+    random_suffix = str(random.randint(100000, 999999))
+    order_no = order_time + random_suffix
+    
+    # 相关订单号
+    related_order_no = order_no[:-1] + str((int(order_no[-1]) + 1) % 10)
+    
+    # 随机金额
+    amount = round(random.uniform(6000, 12000), 2)
+    
+    # 日期时间字符串
+    date_str = datetime_str.strftime("%Y-%m-%d %H:%M:%S")
+    
+    # 打印SQL语句
+    print(f"INSERT INTO `usdt_mall`.`recharge_withdraw_record` (`user_id`, `order_no`, `type`, `payment_method`, `amount`, `status`, `proof_image`, `auditor_id`, `audit_time`, `remark`, `create_time`, `update_time`, `receiving_address`, `is_read`, `related_order_no`) VALUES ({user_id}, '{order_no}', 3, 'BALANCE', {amount}, 1, NULL, NULL, '{date_str}', NULL, '{date_str}', '{date_str}', NULL, 1, '{related_order_no}');")