import os import re import sys import shutil import threading import time import pyautogui import pyperclip import json from datetime import datetime from pathlib import Path from tkinter import * from tkinter import ttk, filedialog, messagebox, scrolledtext objTemp = {} # 配置文件路径 CONFIG_FILE = Path.home() / '.file_search_copy_config.json' def load_config(): """加载保存的配置""" if CONFIG_FILE.exists(): try: with open(CONFIG_FILE, 'r', encoding='utf-8') as f: return json.load(f) except: return {} return {} def save_config(search_dir, output_dir): """保存配置""" try: config = { 'search_dir': search_dir, 'output_dir': output_dir, 'last_update': time.strftime("%Y-%m-%d %H:%M:%S") } with open(CONFIG_FILE, 'w', encoding='utf-8') as f: json.dump(config, f, ensure_ascii=False, indent=2) return True except Exception as e: print(f"保存配置失败: {e}") return False def parse_jit_to_dict(text: str) -> dict: """ 将 JIT 格式的物流文本解析为 {订单号: 车次号} 的字典 参数: text: 原始字符串内容 返回: dict: 订单号到车次号的映射 """ result = {} # 按行分割,每一行是一个完整的 JIT 记录 lines = text.strip().split('\n') for line in lines: if not line.strip(): continue # 用制表符分割字段(JIT 格式用 \t 分隔) fields = line.split('\t') if len(fields) < 3: continue # 第一个字段:可能包含一个或多个订单号(逗号分隔) order_field = fields[0].strip() # 第三个字段:车次号 SC202606030063 这种 shipment_no = fields[2].strip() if len(fields) > 2 else '' if not shipment_no: continue # 处理订单号可能包含逗号分隔的情况 # 注意:有些订单号后面会有 ; 结尾,先去掉 # 订单号的格式通常是 POCY... 可能带 S01 后缀 # 方式1:按逗号拆分订单号 # 每个订单号格式类似 POCY2606030080586S01 # 可能有最后一个订单号不完整的情况,这里取完整的标准订单号 # 提取标准格式的订单号(POCY开头,后面数字+S或数字) # 先用正则提取所有 POCY 开头的订单号 order_ids = re.findall(r'(POCY\w+)', order_field) # 如果没找到标准 POCY 订单号,尝试按逗号直接拆分 if not order_ids: # 按逗号拆分,并清理可能的分号和空白 parts = [p.strip().rstrip(';') for p in order_field.split(',') if p.strip()] for p in parts: # 如果部分看起来像是订单号(长度大于10) if len(p) > 10 and not p.isdigit(): order_ids.append(p) # 映射到车次号 for oid in order_ids: if oid: # 非空 result[oid] = shipment_no return result class FileSearchCopyApp: def __init__(self, root): self.root = root self.root.title("文件搜索复制工具") self.root.geometry("950x800") # 加载保存的配置 self.saved_config = load_config() # 设置样式 style = ttk.Style() style.theme_use('clam') # 主框架 main_frame = ttk.Frame(root, padding="10") main_frame.grid(row=0, column=0, sticky=(W, E, N, S)) # 搜索目录选择 ttk.Label(main_frame, text="搜索目录:").grid(row=0, column=0, sticky=W, pady=5) self.search_dir = StringVar() # 自动填充保存的路径 if self.saved_config.get('search_dir'): self.search_dir.set(self.saved_config['search_dir']) ttk.Entry(main_frame, textvariable=self.search_dir, width=80).grid(row=0, column=1, padx=5, pady=5) ttk.Button(main_frame, text="浏览", command=self.select_search_dir).grid(row=0, column=2, padx=5, pady=5) # 输出目录选择 ttk.Label(main_frame, text="输出目录:").grid(row=1, column=0, sticky=W, pady=5) self.output_dir = StringVar() # 自动填充保存的路径 if self.saved_config.get('output_dir'): self.output_dir.set(self.saved_config['output_dir']) ttk.Entry(main_frame, textvariable=self.output_dir, width=80).grid(row=1, column=1, padx=5, pady=5) ttk.Button(main_frame, text="浏览", command=self.select_output_dir).grid(row=1, column=2, padx=5, pady=5) # 自动获取数据按钮 auto_frame = ttk.Frame(main_frame) auto_frame.grid(row=2, column=0, columnspan=3, pady=10) self.auto_button = ttk.Button(auto_frame, text="🤖 自动获取数据", command=self.auto_get_data, width=20) self.auto_button.pack() # 数据输入区域 ttk.Label(main_frame, text="数据输入(制表符分隔,每行一条记录,提取第3列):").grid(row=3, column=0, columnspan=3, sticky=W, pady=5) self.text_area = scrolledtext.ScrolledText(main_frame, height=12, width=110, font=('Consolas', 9)) self.text_area.grid(row=4, column=0, columnspan=3, pady=5) # 按钮框架 button_frame = ttk.Frame(main_frame) button_frame.grid(row=5, column=0, columnspan=3, pady=10) ttk.Button(button_frame, text="加载示例数据", command=self.load_example).pack(side=LEFT, padx=5) ttk.Button(button_frame, text="清空数据", command=self.clear_data).pack(side=LEFT, padx=5) ttk.Button(button_frame, text="清空日志", command=self.clear_log).pack(side=LEFT, padx=5) # 启动按钮 self.start_button = ttk.Button(button_frame, text="启动搜索复制", command=self.start_process, width=20) self.start_button.pack(side=LEFT, padx=20) # 进度条 self.progress = ttk.Progressbar(main_frame, mode='indeterminate') self.progress.grid(row=6, column=0, columnspan=3, sticky=(W, E), pady=5) # 统计信息框架 stats_frame = ttk.LabelFrame(main_frame, text="统计信息", padding="5") stats_frame.grid(row=7, column=0, columnspan=3, sticky=(W, E), pady=5) self.stats_label = ttk.Label(stats_frame, text="等待处理...") self.stats_label.pack() # 日志输出区域 ttk.Label(main_frame, text="运行日志:").grid(row=8, column=0, columnspan=3, sticky=W, pady=5) self.log_area = scrolledtext.ScrolledText(main_frame, height=15, width=110, fg='blue') self.log_area.grid(row=9, column=0, columnspan=3, pady=5) # 配置网格权重 root.columnconfigure(0, weight=1) root.rowconfigure(0, weight=1) main_frame.columnconfigure(1, weight=1) main_frame.rowconfigure(4, weight=1) main_frame.rowconfigure(9, weight=1) def select_search_dir(self): directory = filedialog.askdirectory() if directory: self.search_dir.set(directory) # 保存配置 save_config(self.search_dir.get(), self.output_dir.get()) def select_output_dir(self): directory = filedialog.askdirectory() if directory: self.output_dir.set(directory) # 保存配置 save_config(self.search_dir.get(), self.output_dir.get()) def clear_data(self): self.text_area.delete(1.0, END) def clear_log(self): self.log_area.delete(1.0, END) def log_message(self, message, is_error=False): """在日志区域显示消息""" timestamp = time.strftime("%H:%M:%S") self.log_area.insert(END, f"[{timestamp}] {message}\n") self.log_area.see(END) if is_error: # 可以添加错误计数或其他处理 pass def auto_get_data(self): """自动获取数据:点击指定位置,然后获取剪贴板内容""" # 在新线程中执行,避免界面卡顿 thread = threading.Thread(target=self._auto_get_data_thread) thread.daemon = True thread.start() def _auto_get_data_thread(self): global objTemp """自动获取数据的线程函数""" try: # 禁用按钮,防止重复点击 self.auto_button.config(state=DISABLED) self.log_message("开始自动获取数据...") self.log_message("请在5秒内切换到目标窗口...") # 等待5秒,让用户切换到目标窗口 for i in range(5, 0, -1): self.log_message(f"倒计时: {i} 秒") time.sleep(1) # pyautogui.moveTo(389, 290, duration=0.3) # pyautogui.click(button="right") # time.sleep(0.5) # pyautogui.moveTo(482, 472, duration=0.3) # time.sleep(1) # pyautogui.moveTo(606, 472, duration=0.3) # pyautogui.click(button="left") # time.sleep(1) # 获取剪贴板内容 # clipboard_content2 = pyperclip.paste() # objTemp = parse_jit_to_dict(clipboard_content2) # 定义点击位置 click_positions = [ (55, 663, "第一个位置", "left"), (115, 719, "第一个位置", "right"), (213, 897, "第二个位置", "left"), (334, 897, "第三个位置", "left") ] # 执行点击操作 for x, y, description, button in click_positions: self.log_message(f"准备点击{description}: ({x}, {y})") time.sleep(0.5) # 间隔0.5秒 # 移动并点击 pyautogui.moveTo(x, y, duration=0.3) pyautogui.click(button=button) self.log_message(f"✓ 已点击{description}: ({x}, {y})") # 等待一下确保复制操作完成 time.sleep(0.5) # 获取剪贴板内容 clipboard_content = pyperclip.paste() if clipboard_content: self.log_message(f"成功获取剪贴板内容,长度: {len(clipboard_content)} 字符") self.log_message("内容预览(前200字符):") preview = clipboard_content[:200] + "..." if len(clipboard_content) > 200 else clipboard_content self.log_message(preview) # 设置到输入框 self.text_area.delete(1.0, END) self.text_area.insert(1.0, clipboard_content) self.log_message("✓ 数据已设置到输入框") # ========== 新增:等待1秒后自动启动搜索复制 ========== self.log_message("等待1秒后自动启动搜索复制...") time.sleep(1) # 在主线程中启动搜索复制(因为涉及UI更新) self.root.after(0, self.auto_start_process) # ============================================== else: self.log_message("警告:剪贴板内容为空!", True) except pyautogui.FailSafeException: self.log_message("操作被用户中断(触发了紧急停止)", True) except Exception as e: self.log_message(f"自动获取数据失败: {str(e)}", True) finally: self.auto_button.config(state=NORMAL) self.log_message("自动获取数据流程结束") def auto_start_process(self): """自动启动搜索复制(由自动获取数据完成后调用)""" self.log_message("=" * 60) self.log_message("【自动启动】开始执行搜索复制任务") self.log_message("=" * 60) # 直接调用启动处理(会在新线程中运行) self.start_process() def load_example(self): example_data = """SCSQ202605260234 POCY2605260119298S01 S25092303841500101 S250923038415 8条装深浅条纹套装(4方巾+2毛巾+2浴巾) 8条装灰色 1 https://p16-she-sg.ibyteimg.com/tos-alisg-i-f7lenjegsh-sg/7e711c20b2ff4205910b659d75fd2d4e~tplv-f7lenjegsh-origin-webp.webp?dr=16384&from=3626923735&idc=sg1&ps=933b5bde&shcp=3c3d9ffb&shp=d82e76d7&t=16 0 灰+2*深浅条纹(1毛巾+2方巾+1浴巾) 25042214242424 0 灰+2*深浅条纹(1毛巾+2方巾+1浴巾) 2504221424 Gray 8pcs-2bath towel+2towel+4hand towel         SCSQ202605260219 POCY2605260123778S01 S25122500884400102 S251225008844 80*210cm拼接浴裙+公主帽套装 粉色 1 https://p16-she-sg.ibyteimg.com/tos-alisg-i-f7lenjegsh-sg/85be0d0d66ab46c6bb5ccb056fc02eb3~tplv-f7lenjegsh-origin-webp.webp?dr=16384&from=3626923735&idc=sg1&ps=933b5bde&shcp=3c3d9ffb&shp=d82e76d7&t=16 0 Pink And White-XXXL-XXXXL 26010529164844 0 Pink And White-XXXL-XXXXL 2601052916 Pink And White XXXL-XXXXL.       """ self.text_area.delete(1.0, END) self.text_area.insert(1.0, example_data) def update_stats(self, total_orders, processed_orders, total_files): """更新统计信息""" self.stats_label.config(text=f"总订单数: {total_orders} | 已处理: {processed_orders} | 复制文件数: {total_files}") def search_files(self, search_dir, order_number): """在搜索目录中搜索包含订单号的文件""" found_files = [] search_path = Path(search_dir) try: # 遍历所有文件 for file_path in search_path.rglob('*'): if file_path.is_file() and order_number in file_path.name: found_files.append(file_path) except Exception as e: self.log_message(f"搜索 {order_number} 时出错: {str(e)}", True) return found_files def copy_files(self, files, output_dir, order_number): """复制文件到目标目录""" # 创建订单号目录 target_dir = Path(output_dir) / order_number target_dir.mkdir(parents=True, exist_ok=True) copied_count = 0 for file_path in files: try: # 构建目标文件路径 target_file = target_dir / file_path.name # 如果文件已存在,添加序号 if target_file.exists(): base_name = file_path.stem extension = file_path.suffix counter = 1 while target_file.exists(): new_name = f"{base_name}_{counter}{extension}" target_file = target_dir / new_name counter += 1 # 复制文件 shutil.copy2(file_path, target_file) copied_count += 1 self.log_message(f" 已复制: {file_path.name} -> {target_file}") except Exception as e: self.log_message(f" 复制失败 {file_path.name}: {str(e)}", True) return copied_count def process_data(self): """处理数据的主函数""" search_dir = self.search_dir.get() output_dir = self.output_dir.get() data_text = self.text_area.get(1.0, END).strip() # 验证输入 if not search_dir: self.log_message("错误:请选择搜索目录!", True) return False if not output_dir: self.log_message("错误:请选择输出目录!", True) return False if not data_text: self.log_message("错误:请在数据输入框中输入数据!", True) return False if not os.path.exists(search_dir): self.log_message(f"错误:搜索目录不存在! {search_dir}", True) return False if not os.path.exists(output_dir): self.log_message(f"错误:输出目录不存在! {output_dir}", True) return False # 解析数据,提取订单号(每行的第3列) lines = data_text.split('\n') order_numbers = [] # 保留顺序 order_numbers_set = set() # 用于去重 送仓申请单号 = {} # 复制文件 timestamp = int(time.time()) print(f"当前时间戳: {timestamp}") date_str = datetime.fromtimestamp(timestamp).strftime("%Y%m%d%H%M%S") for line_num, line in enumerate(lines, 1): if not line.strip(): continue # 按制表符分割 parts = line.split('\t') if len(parts) >= 3: order_number = parts[2].strip() # 第3列(索引2) 送仓申请单号[order_number] = parts[0].strip() if order_number: if order_number not in order_numbers_set: order_numbers.append(order_number) order_numbers_set.add(order_number) self.log_message(f"提取订单号 [{len(order_numbers)}]: {order_number}") else: self.log_message(f"跳过重复订单号: {order_number}") else: self.log_message(f"警告:第{line_num}行第3列为空") else: self.log_message(f"警告:第{line_num}行格式不正确,字段数不足3个(当前{len(parts)}个字段)") if not order_numbers: self.log_message("错误:未能提取到有效的订单号!", True) return False self.log_message(f"\n共提取到 {len(order_numbers)} 个唯一的订单号") self.log_message("=" * 80) # 处理每个订单号 total_files_copied = 0 processed_orders = 0 for i, order_number in enumerate(order_numbers, 1): self.log_message(f"\n[{i}/{len(order_numbers)}] 处理订单号: {order_number}") self.log_message(f"搜索目录: {search_dir}") # 搜索文件 found_files = self.search_files(search_dir, order_number) if found_files: self.log_message(f"找到 {len(found_files)} 个相关文件:") for file_path in found_files: try: rel_path = file_path.relative_to(search_dir) self.log_message(f" - {rel_path}") except: self.log_message(f" - {file_path}") # 转换为字符串 copied_count = self.copy_files(found_files, output_dir + "/" + date_str, 送仓申请单号[order_number]) total_files_copied += copied_count self.log_message(f"✓ 成功复制 {copied_count} 个文件到 {output_dir}/{date_str}/{送仓申请单号[order_number]}") processed_orders += 1 else: self.log_message(f"✗ 警告:未找到包含 {order_number} 的文件", True) # 更新统计信息 self.update_stats(len(order_numbers), processed_orders, total_files_copied) self.log_message("\n" + "=" * 80) self.log_message("处理完成!") self.log_message(f"总订单数: {len(order_numbers)}") self.log_message(f"成功处理: {processed_orders}") self.log_message(f"失败/未找到: {len(order_numbers) - processed_orders}") self.log_message(f"共复制文件: {total_files_copied}") return True def run_in_thread(self): """在新线程中运行处理程序""" self.start_button.config(state=DISABLED) self.progress.start() try: success = self.process_data() if success: messagebox.showinfo("完成", "文件搜索复制完成!") else: messagebox.showerror("错误", "处理过程中出现错误,请查看日志。") except Exception as e: self.log_message(f"程序运行错误: {str(e)}", True) messagebox.showerror("错误", f"程序运行错误: {str(e)}") finally: self.progress.stop() self.start_button.config(state=NORMAL) def start_process(self): """启动处理程序""" # 在新线程中运行,避免界面卡顿 thread = threading.Thread(target=self.run_in_thread) thread.daemon = True thread.start() def main(): root = Tk() app = FileSearchCopyApp(root) root.mainloop() if __name__ == "__main__": main()