Find out all the different files from two different paths efficiently in Windows (with Python)

32 Views Asked by At

Well, recently I'm working on a program which is able to sync files between different folders.

However, as a fresh man, the algorithm I worte looks really disgusting (bruh~)

The following lines are the related things I wrote,

I really hope there would be experts who can help me improve my code

def scan_items(folder_path):  # 扫描路径下所有文件夹
    def scan_folders_in(f_path):  # 扫描目录下所有的文件夹,并返回路径列表
        surf_items = os.scandir(f_path)
        folders = [f_path]
        for item_data in surf_items:
            if item_data.is_dir():
                folders.extend(scan_folders_in(item_data.path))  # 继续遍历文件夹内文件夹,直到记下全部文件夹路径
        folders = sorted(set(folders))  # 排序 + 排除重复项
        surf_items.close()
        return folders

    file_store = []
    folder_store = scan_folders_in(folder_path)
    for folder in folder_store:  # 遍历所有文件夹
        files = [folder + '\\' + dI for dI in os.listdir(folder) if os.path.isfile(os.path.join(folder, dI))]
        # 如上只生成本文件夹内 文件的路径
        file_store.extend(files)  # 存储上面文件路径
    for i in range(len(file_store)):
        file_store[i] = file_store[i][len(folder_path)::]  # 返回相对位置
    result = [folder_store, file_store]
    return result

And here is the main part, and the 'get_task()' part is the most important one

def sf_sync_dir(path1, path2, single_sync, language_number, area_name=None, pass_item_rpath='', pass_folder_paths=''):
    from LT_Dic import sf_label_text_dic  # Label information from another file

    def sf_show_notice(path_1, path_2, sf_errorname):  # Win10toast used

        toaster.show_toast('Sync Successfully',
                           'The Files in "' + path_1 + '" and "' + path_2 + '" are Synchronized',
                           icon_path=mf_data_path + r'Movefile.ico',
                           duration=10,
                           threaded=False)
        if len(sf_errorname) > 0:
            toaster.show_toast("Couldn't sync files",
                               sf_errorname + sf_label_text_dic['can_not_move_notice'][language_number],
                               icon_path=mf_data_path + r'Movefile.ico',
                               duration=10,
                               threaded=False)

    def get_task(barroot):  # shitty block here, help me!  (baroot is a root showing progress)
        all_files_1 = scan_items(path1)[1]
        all_files_2 = scan_items(path2)[1]
        sync_tasks = []
        pass_folder_rpaths = []
        task_number = 0

        for pass_folder in pass_folder_paths.split(','):
            if pass_folder.startswith(path1):
                pass_folder_rpaths.append(pass_folder.replace(path1, path1.split('\\')[-1]))
            elif pass_folder:
                pass_folder_rpaths.append(pass_folder.replace(path2, path2.split('\\')[-1]))

        file_info_1 = {}  # 存储文件1的信息:(哈希值, 大小, 修改时间)

        for file1 in all_files_1:
            file1_path = path1 + file1
            file_info_1[file1] = (filehash(file1_path), os.path.getsize(file1_path), os.path.getmtime(file1_path))

        for file2 in all_files_2:
            file2_path = path2 + file2

            if file2 in all_files_1:
                file1 = file2
                file1_path = path1 + file1
                file_info = file_info_1[file1]
                file2_info = (filehash(file2_path), os.path.getsize(file2_path), os.path.getmtime(file2_path))

                if file_info == file2_info:
                    continue

                if single_sync and file_info[0] == file2_info[0]:
                    continue

                new_file, old_file = file1, file2
                new_file_path, old_file_path = file1_path, file2_path
                new_file_rpath = path1.split('\\')[-1] + file1
                old_file_rpath = path2.split('\\')[-1] + file2

                if int(os.stat(new_file_path).st_mtime) < int(os.stat(old_file_path).st_mtime):
                    if single_sync:
                        continue
                    old_file, new_file = new_file, old_file
                    new_file_path, old_file_path = old_file_path, new_file_path
                    new_file_rpath, old_file_rpath = old_file_path, new_file_rpath

                if any(pfolder.startswith(old_file_rpath) for pfolder in pass_folder_rpaths) or any(
                        old_file.endswith(pfile) for pfile in pass_item_rpath.split(',')):
                    continue

                task_number += 1
                barroot.set_label1(sf_label_text_dic['main_progress_label'][language_number] + file1.split('\\')[-1])
                sync_tasks.append([new_file_path, old_file_path, False])
            else:
                new_file_rpath = path2.split('\\')[-1] + file2

                if any(pfolder.startswith(new_file_rpath) for pfolder in pass_folder_rpaths):
                    continue

                task_number += 1
                barroot.set_label1(sf_label_text_dic['main_progress_label'][language_number] + file2.split('\\')[-1])
                barroot.progress_root.update_idletasks()
                sync_tasks.append([file2_path, path1 + file2, True])

        if not single_sync:
            for file1 in all_files_1:
                if file1 not in all_files_2:
                    file1_path = path1 + file1
                    newfile1_rpath = path2.split('\\')[-1] + file1

                    if any(pfolder.startswith(newfile1_rpath) for pfolder in pass_folder_rpaths):
                        continue

                    task_number += 1
                    barroot.set_label1(
                        sf_label_text_dic['main_progress_label'][language_number] + file1.split('\\')[-1])
                    barroot.progress_root.update_idletasks()
                    sync_tasks.append([file1_path, path2 + file1, True])

        return sync_tasks

    def synchronize_files(baroot, task):
        baroot.set_label2(sf_label_text_dic["current_file_label1"][language_number] + task[0].split('\\')[-1])
        new_file_path, old_file_path, create_folder = task
        if create_folder:
            try:
                sf_creat_folder(old_file_path)
            except:
                pass
        try:
            shutil.copy2(new_file_path, old_file_path)
        except:
            return new_file_path
        return None

    def run_sync_tasks(baroot):
        sf_errorname = ''
        baroot.main_progress_bar['value'] = 0
        baroot.progress_root.update_idletasks()
        tasks = get_task(baroot)
        baroot.main_progress_bar['maximum'] = len(tasks)
        baroot.set_label1(
            f'{sf_label_text_dic["main_progress_label1"][language_number][0]}{str(baroot.main_progress_bar["value"])}/{str(len(tasks))}  {sf_label_text_dic["main_progress_label1"][language_number][1]}')

        with ThreadPoolExecutor() as executor:
            futures = [executor.submit(synchronize_files, baroot, task) for task in tasks]

            for future in as_completed(futures):
                result = future.result()
                if result:
                    sf_errorname += result + ' , '

                baroot.main_progress_bar['value'] += 1
                baroot.set_label1(
                    f'{sf_label_text_dic["main_progress_label1"][language_number][0]}{str(baroot.main_progress_bar["value"])}/{str(len(tasks))}  {sf_label_text_dic["main_progress_label1"][language_number][1]}')
                baroot.progress_root.update_idletasks()

        baroot.progress_root.withdraw()
        path_name_1 = path1.split('\\')[-1]
        if area_name:
            path_name_1 = area_name
        try:
            sf_show_notice(path_name_1, path2.split('\\')[-1], sf_errorname)
        except:
            pass
        finally:
            baroot.progress_root.withdraw()

    global sync_bar_root, sync_bar_root_task
    sync_bar_root = ProgressBar('Movefile  -Syncfile Progress',
                                sf_label_text_dic["main_progress_label2"][language_number],
                                sf_label_text_dic["current_file_label"][language_number],
                                language_number)
    sync_bar_root_task = threading.Thread(target=lambda: sync_bar_root.launch(), daemon=True)
    sync_bar_root_task.start()
    while not sync_bar_root.initialization_done:
        time.sleep(0.01)
    run_tasks = threading.Thread(target=lambda: run_sync_tasks(sync_bar_root), daemon=True)
    run_tasks.start()

And the progress bar class:

class ProgressBar:
    def __init__(self, title, label1, label2, lang_num):
        self.initialization_done = False
        from LT_Dic import progress_root_label_dic
        self.title = title
        self.label1 = label1
        self.label2 = label2
        self.label_dic = progress_root_label_dic
        self.lang_num = lang_num
        self.main_progress_label = None
        self.main_progress_bar = None
        self.current_file_label = None
        self.show_running_bar = None
        self.progress_root = None
        self.roll_bar = None

    def set_label1(self, content):
        self.main_progress_label['text'] = content

    def set_label2(self, content):
        self.current_file_label['text'] = content

    def launch(self):
        self.progress_root = tk.Tk()
        self.progress_root.title(self.title)
        self.progress_root.geometry('420x115')
        self.progress_root.iconbitmap(mf_data_path + r'Movefile.ico')
        self.main_progress_label = ttk.Label(self.progress_root, text=self.label1)
        self.main_progress_label.grid(row=0, column=0, padx=10, pady=5, sticky='SW')
        self.main_progress_bar = ttk.Progressbar(self.progress_root)
        self.main_progress_bar.grid(row=1, column=0, padx=10, pady=0, ipadx=150, sticky='W')
        self.current_file_label = ttk.Label(self.progress_root, text=self.label2)
        self.current_file_label.grid(row=2, column=0, padx=10, pady=5, sticky='SW')
        self.show_running_bar = ttk.Progressbar(self.progress_root, mode='indeterminate')
        self.show_running_bar.grid(row=3, column=0, padx=10, pady=0, ipadx=150, sticky='W')
        self.progress_root.protocol('WM_DELETE_WINDOW', lambda: self.sync_bar_on_exit())
        self.roll_bar = threading.Thread(target=self.show_running, daemon=True)
        self.roll_bar.start()
        self.initialization_done = True
        self.progress_root.mainloop()

    def show_running(self):
        self.show_running_bar.start(10)

    def sync_bar_on_exit(self):
        if tkinter.messagebox.askyesno(title='Syncfile', message=self.label_dic['confirm_exit_text'][self.lang_num]):
            self.progress_root.withdraw()
            self.roll_bar.join()
            return True
        else:
            return False

    def progress_root_destruction(self):
        self.progress_root.quit()
        self.progress_root.destroy()

And the label part here, if you want~

sf_label_text_dic = {
    'main_progress_label': ['扫描文件中...  发现文件:', 'Scanning items...  Found item:'],
    'main_progress_label1': [['总进度:', '已完成'], ['Progress:', 'Completed']],
    'main_progress_label2': ['扫描文件中...', 'Scanning items...'],
    'current_file_label': ['等待中...', 'Waiting...'],
    'current_file_label1': ['同步中文件:', 'File in process:'],
    'exit_sync': ['''文件正在同步中,
确定中断同步进程并退出?''', '''Synchronization is in progress,
Are you sure to interrupt the process and exit?'''],
    'can_not_move_notice': ["""
无法被移动,请在关闭文件或移除重名文件后重试""", """
Couldn't be moved, Please try again after closing the file
or removing the duplicate file """]
    }

progress_root_label_dic = {
    'confirm_exit_text': ['''文件正在复制中,
确定中断进程并退出?''', '''The file is currently being copied,
Are you sure to interrupt the process and exit?''']
}

It sucks, as you can see......

So, help me write a more efficient code to implement the functions contained in the original code, please~

0

There are 0 best solutions below