| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438 |
- import os
- import re
- import sys
- import shutil
- import threading
- import time
- import pyautogui
- import pyperclip
- from pathlib import Path
- from tkinter import *
- from tkinter import ttk, filedialog, messagebox, scrolledtext
- objTemp = {}
- def parse_jit_to_dict(text: str) -> dict:
- """
- 将 JIT 格式的物流文本解析为 {订单号: 车次号} 的字典
-
- 参数:
- text: 原始字符串内容
-
- 返回:
- dict: 订单号到车次号的映射
- """
- result = {}
-
- # 按行分割,每一行是一个完整的 JIT 记录
- lines = text.strip().split('\n')
-
- for line in lines:
- if not line.strip():
- continue
-
- # 用制表符分割字段(JIT 格式用 \t 分隔)
- fields = line.split('\t')
- if len(fields) < 3:
- continue
-
- # 第一个字段:可能包含一个或多个订单号(逗号分隔)
- order_field = fields[0].strip()
- # 第三个字段:车次号 SC202606030063 这种
- shipment_no = fields[2].strip() if len(fields) > 2 else ''
-
- if not shipment_no:
- continue
-
- # 处理订单号可能包含逗号分隔的情况
- # 注意:有些订单号后面会有 ; 结尾,先去掉
- # 订单号的格式通常是 POCY... 可能带 S01 后缀
-
- # 方式1:按逗号拆分订单号
- # 每个订单号格式类似 POCY2606030080586S01
- # 可能有最后一个订单号不完整的情况,这里取完整的标准订单号
-
- # 提取标准格式的订单号(POCY开头,后面数字+S或数字)
- # 先用正则提取所有 POCY 开头的订单号
- order_ids = re.findall(r'(POCY\w+)', order_field)
-
- # 如果没找到标准 POCY 订单号,尝试按逗号直接拆分
- if not order_ids:
- # 按逗号拆分,并清理可能的分号和空白
- parts = [p.strip().rstrip(';') for p in order_field.split(',') if p.strip()]
- for p in parts:
- # 如果部分看起来像是订单号(长度大于10)
- if len(p) > 10 and not p.isdigit():
- order_ids.append(p)
-
- # 映射到车次号
- for oid in order_ids:
- if oid: # 非空
- result[oid] = shipment_no
-
- return result
- class FileSearchCopyApp:
- def __init__(self, root):
- self.root = root
- self.root.title("文件搜索复制工具")
- self.root.geometry("950x800")
-
- # 设置样式
- style = ttk.Style()
- style.theme_use('clam')
-
- # 主框架
- main_frame = ttk.Frame(root, padding="10")
- main_frame.grid(row=0, column=0, sticky=(W, E, N, S))
-
- # 搜索目录选择
- ttk.Label(main_frame, text="搜索目录:").grid(row=0, column=0, sticky=W, pady=5)
- self.search_dir = StringVar()
- ttk.Entry(main_frame, textvariable=self.search_dir, width=80).grid(row=0, column=1, padx=5, pady=5)
- ttk.Button(main_frame, text="浏览", command=self.select_search_dir).grid(row=0, column=2, padx=5, pady=5)
-
- # 输出目录选择
- ttk.Label(main_frame, text="输出目录:").grid(row=1, column=0, sticky=W, pady=5)
- self.output_dir = StringVar()
- ttk.Entry(main_frame, textvariable=self.output_dir, width=80).grid(row=1, column=1, padx=5, pady=5)
- ttk.Button(main_frame, text="浏览", command=self.select_output_dir).grid(row=1, column=2, padx=5, pady=5)
-
- # 自动获取数据按钮
- auto_frame = ttk.Frame(main_frame)
- auto_frame.grid(row=2, column=0, columnspan=3, pady=10)
- self.auto_button = ttk.Button(auto_frame, text="🤖 自动获取数据", command=self.auto_get_data, width=20)
- self.auto_button.pack()
-
- # 数据输入区域
- ttk.Label(main_frame, text="数据输入(制表符分隔,每行一条记录,提取第3列):").grid(row=3, column=0, columnspan=3, sticky=W, pady=5)
- self.text_area = scrolledtext.ScrolledText(main_frame, height=12, width=110, font=('Consolas', 9))
- self.text_area.grid(row=4, column=0, columnspan=3, pady=5)
-
- # 按钮框架
- button_frame = ttk.Frame(main_frame)
- button_frame.grid(row=5, column=0, columnspan=3, pady=10)
-
- ttk.Button(button_frame, text="加载示例数据", command=self.load_example).pack(side=LEFT, padx=5)
- ttk.Button(button_frame, text="清空数据", command=self.clear_data).pack(side=LEFT, padx=5)
- ttk.Button(button_frame, text="清空日志", command=self.clear_log).pack(side=LEFT, padx=5)
-
- # 启动按钮
- self.start_button = ttk.Button(button_frame, text="启动搜索复制", command=self.start_process, width=20)
- self.start_button.pack(side=LEFT, padx=20)
-
- # 进度条
- self.progress = ttk.Progressbar(main_frame, mode='indeterminate')
- self.progress.grid(row=6, column=0, columnspan=3, sticky=(W, E), pady=5)
-
- # 统计信息框架
- stats_frame = ttk.LabelFrame(main_frame, text="统计信息", padding="5")
- stats_frame.grid(row=7, column=0, columnspan=3, sticky=(W, E), pady=5)
-
- self.stats_label = ttk.Label(stats_frame, text="等待处理...")
- self.stats_label.pack()
-
- # 日志输出区域
- ttk.Label(main_frame, text="运行日志:").grid(row=8, column=0, columnspan=3, sticky=W, pady=5)
- self.log_area = scrolledtext.ScrolledText(main_frame, height=15, width=110, fg='blue')
- self.log_area.grid(row=9, column=0, columnspan=3, pady=5)
-
- # 配置网格权重
- root.columnconfigure(0, weight=1)
- root.rowconfigure(0, weight=1)
- main_frame.columnconfigure(1, weight=1)
- main_frame.rowconfigure(4, weight=1)
- main_frame.rowconfigure(9, weight=1)
-
- def select_search_dir(self):
- directory = filedialog.askdirectory()
- if directory:
- self.search_dir.set(directory)
-
- def select_output_dir(self):
- directory = filedialog.askdirectory()
- if directory:
- self.output_dir.set(directory)
-
- def clear_data(self):
- self.text_area.delete(1.0, END)
-
- def clear_log(self):
- self.log_area.delete(1.0, END)
-
- def log_message(self, message, is_error=False):
- """在日志区域显示消息"""
- timestamp = time.strftime("%H:%M:%S")
- self.log_area.insert(END, f"[{timestamp}] {message}\n")
- self.log_area.see(END)
- if is_error:
- # 可以添加错误计数或其他处理
- pass
-
- def auto_get_data(self):
- """自动获取数据:点击指定位置,然后获取剪贴板内容"""
- # 在新线程中执行,避免界面卡顿
- thread = threading.Thread(target=self._auto_get_data_thread)
- thread.daemon = True
- thread.start()
-
- def _auto_get_data_thread(self):
- global objTemp
- """自动获取数据的线程函数"""
- try:
- # 禁用按钮,防止重复点击
- self.auto_button.config(state=DISABLED)
- self.log_message("开始自动获取数据...")
- self.log_message("请在5秒内切换到目标窗口...")
-
- # 等待5秒,让用户切换到目标窗口
- for i in range(5, 0, -1):
- self.log_message(f"倒计时: {i} 秒")
- time.sleep(1)
-
- pyautogui.moveTo(389, 290, duration=0.3)
- pyautogui.click(button="right")
- time.sleep(0.5)
- pyautogui.moveTo(482, 472, duration=0.3)
- time.sleep(1)
- pyautogui.moveTo(606, 472, duration=0.3)
- pyautogui.click(button="left")
- time.sleep(1)
- # 获取剪贴板内容
- clipboard_content2 = pyperclip.paste()
- objTemp = parse_jit_to_dict(clipboard_content2)
- # 定义点击位置
- click_positions = [
- (55, 663, "第一个位置", "left"),
- (115, 719, "第一个位置", "right"),
- (213, 897, "第二个位置", "left"),
- (334, 897, "第三个位置", "left")
- ]
-
- # 执行点击操作
- for x, y, description, button in click_positions:
- self.log_message(f"准备点击{description}: ({x}, {y})")
- time.sleep(0.5) # 间隔0.5秒
-
- # 移动并点击
- pyautogui.moveTo(x, y, duration=0.3)
- pyautogui.click(button=button)
- self.log_message(f"✓ 已点击{description}: ({x}, {y})")
-
- # 等待一下确保复制操作完成
- time.sleep(0.5)
-
- # 获取剪贴板内容
- clipboard_content = pyperclip.paste()
-
- if clipboard_content:
- self.log_message(f"成功获取剪贴板内容,长度: {len(clipboard_content)} 字符")
- self.log_message("内容预览(前200字符):")
- preview = clipboard_content[:200] + "..." if len(clipboard_content) > 200 else clipboard_content
- self.log_message(preview)
-
- # 设置到输入框
- self.text_area.delete(1.0, END)
- self.text_area.insert(1.0, clipboard_content)
- self.log_message("✓ 数据已设置到输入框")
- else:
- self.log_message("警告:剪贴板内容为空!", True)
-
- except pyautogui.FailSafeException:
- self.log_message("操作被用户中断(触发了紧急停止)", True)
- except Exception as e:
- self.log_message(f"自动获取数据失败: {str(e)}", True)
- finally:
- self.auto_button.config(state=NORMAL)
- self.log_message("自动获取数据流程结束")
-
- def load_example(self):
- example_data = """SCSQ202605260234 POCY2605260119298S01 S25092303841500101 S250923038415 8条装深浅条纹套装(4方巾+2毛巾+2浴巾) 8条装灰色 1 https://p16-she-sg.ibyteimg.com/tos-alisg-i-f7lenjegsh-sg/7e711c20b2ff4205910b659d75fd2d4e~tplv-f7lenjegsh-origin-webp.webp?dr=16384&from=3626923735&idc=sg1&ps=933b5bde&shcp=3c3d9ffb&shp=d82e76d7&t=16 0 灰+2*深浅条纹(1毛巾+2方巾+1浴巾) 25042214242424 0 灰+2*深浅条纹(1毛巾+2方巾+1浴巾) 2504221424 Gray 8pcs-2bath towel+2towel+4hand towel
- SCSQ202605260219 POCY2605260123778S01 S25122500884400102 S251225008844 80*210cm拼接浴裙+公主帽套装 粉色 1 https://p16-she-sg.ibyteimg.com/tos-alisg-i-f7lenjegsh-sg/85be0d0d66ab46c6bb5ccb056fc02eb3~tplv-f7lenjegsh-origin-webp.webp?dr=16384&from=3626923735&idc=sg1&ps=933b5bde&shcp=3c3d9ffb&shp=d82e76d7&t=16 0 Pink And White-XXXL-XXXXL 26010529164844 0 Pink And White-XXXL-XXXXL 2601052916 Pink And White XXXL-XXXXL. """
- self.text_area.delete(1.0, END)
- self.text_area.insert(1.0, example_data)
-
- def update_stats(self, total_orders, processed_orders, total_files):
- """更新统计信息"""
- self.stats_label.config(text=f"总订单数: {total_orders} | 已处理: {processed_orders} | 复制文件数: {total_files}")
-
- def search_files(self, search_dir, order_number):
- """在搜索目录中搜索包含订单号的文件"""
- found_files = []
- search_path = Path(search_dir)
-
- try:
- # 遍历所有文件
- for file_path in search_path.rglob('*'):
- if file_path.is_file() and order_number in file_path.name:
- found_files.append(file_path)
- except Exception as e:
- self.log_message(f"搜索 {order_number} 时出错: {str(e)}", True)
-
- return found_files
-
- def copy_files(self, files, output_dir, order_number):
- """复制文件到目标目录"""
- # 创建订单号目录
- target_dir = Path(output_dir) / order_number
- target_dir.mkdir(parents=True, exist_ok=True)
-
- copied_count = 0
- for file_path in files:
- try:
- # 构建目标文件路径
- target_file = target_dir / file_path.name
-
- # 如果文件已存在,添加序号
- if target_file.exists():
- base_name = file_path.stem
- extension = file_path.suffix
- counter = 1
- while target_file.exists():
- new_name = f"{base_name}_{counter}{extension}"
- target_file = target_dir / new_name
- counter += 1
-
- # 复制文件
- shutil.copy2(file_path, target_file)
- copied_count += 1
- self.log_message(f" 已复制: {file_path.name} -> {target_file}")
-
- except Exception as e:
- self.log_message(f" 复制失败 {file_path.name}: {str(e)}", True)
-
- return copied_count
-
- def process_data(self):
- """处理数据的主函数"""
- search_dir = self.search_dir.get()
- output_dir = self.output_dir.get()
- data_text = self.text_area.get(1.0, END).strip()
-
- # 验证输入
- if not search_dir:
- self.log_message("错误:请选择搜索目录!", True)
- return False
-
- if not output_dir:
- self.log_message("错误:请选择输出目录!", True)
- return False
-
- if not data_text:
- self.log_message("错误:请在数据输入框中输入数据!", True)
- return False
-
- if not os.path.exists(search_dir):
- self.log_message(f"错误:搜索目录不存在! {search_dir}", True)
- return False
-
- if not os.path.exists(output_dir):
- self.log_message(f"错误:输出目录不存在! {output_dir}", True)
- return False
-
- # 解析数据,提取订单号(每行的第3列)
- lines = data_text.split('\n')
- order_numbers = [] # 保留顺序
- order_numbers_set = set() # 用于去重
- 平台单号列表 = {}
- for line_num, line in enumerate(lines, 1):
- if not line.strip():
- continue
-
- # 按制表符分割
- parts = line.split('\t')
- if len(parts) >= 3:
- order_number = parts[2].strip() # 第3列(索引2)
- 平台单号 = parts[1].strip() # 第2列(索引1)
- 平台单号列表[order_number] = 平台单号
- if order_number:
- if order_number not in order_numbers_set:
- order_numbers.append(order_number)
- order_numbers_set.add(order_number)
- self.log_message(f"提取订单号 [{len(order_numbers)}]: {order_number}")
- else:
- self.log_message(f"跳过重复订单号: {order_number}")
- else:
- self.log_message(f"警告:第{line_num}行第3列为空")
- else:
- self.log_message(f"警告:第{line_num}行格式不正确,字段数不足3个(当前{len(parts)}个字段)")
-
- if not order_numbers:
- self.log_message("错误:未能提取到有效的订单号!", True)
- return False
-
- self.log_message(f"\n共提取到 {len(order_numbers)} 个唯一的订单号")
- self.log_message("=" * 80)
-
- # 处理每个订单号
- total_files_copied = 0
- processed_orders = 0
-
- for i, order_number in enumerate(order_numbers, 1):
- self.log_message(f"\n[{i}/{len(order_numbers)}] 处理订单号: {order_number}")
- self.log_message(f"搜索目录: {search_dir}")
-
- # 搜索文件
- found_files = self.search_files(search_dir, order_number)
-
- if found_files:
- self.log_message(f"找到 {len(found_files)} 个相关文件:")
- for file_path in found_files:
- try:
- rel_path = file_path.relative_to(search_dir)
- self.log_message(f" - {rel_path}")
- except:
- self.log_message(f" - {file_path}")
-
- # 复制文件
- copied_count = self.copy_files(found_files, output_dir, objTemp[平台单号列表[order_number]])
- total_files_copied += copied_count
- self.log_message(f"✓ 成功复制 {copied_count} 个文件到 {output_dir}/{objTemp[平台单号列表[order_number]]}")
- processed_orders += 1
- else:
- self.log_message(f"✗ 警告:未找到包含 {order_number} 的文件", True)
-
- # 更新统计信息
- self.update_stats(len(order_numbers), processed_orders, total_files_copied)
-
- self.log_message("\n" + "=" * 80)
- self.log_message("处理完成!")
- self.log_message(f"总订单数: {len(order_numbers)}")
- self.log_message(f"成功处理: {processed_orders}")
- self.log_message(f"失败/未找到: {len(order_numbers) - processed_orders}")
- self.log_message(f"共复制文件: {total_files_copied}")
-
- return True
-
- def run_in_thread(self):
- """在新线程中运行处理程序"""
- self.start_button.config(state=DISABLED)
- self.progress.start()
-
- try:
- success = self.process_data()
- if success:
- messagebox.showinfo("完成", "文件搜索复制完成!")
- else:
- messagebox.showerror("错误", "处理过程中出现错误,请查看日志。")
- except Exception as e:
- self.log_message(f"程序运行错误: {str(e)}", True)
- messagebox.showerror("错误", f"程序运行错误: {str(e)}")
- finally:
- self.progress.stop()
- self.start_button.config(state=NORMAL)
-
- def start_process(self):
- """启动处理程序"""
- # 在新线程中运行,避免界面卡顿
- thread = threading.Thread(target=self.run_in_thread)
- thread.daemon = True
- thread.start()
- def main():
- root = Tk()
- app = FileSearchCopyApp(root)
- root.mainloop()
- if __name__ == "__main__":
- main()
|