|
19 | 19 | # Set up logger |
20 | 20 | logger = logging.getLogger(__name__) |
21 | 21 |
|
| 22 | +# Define a custom log level that always prints |
| 23 | +ALWAYS = logging.CRITICAL * 2 |
| 24 | +logging.addLevelName(ALWAYS, "ALWAYS") |
22 | 25 |
|
23 | | -def find_and_replace_owned_files(source_dir, target_dir, username, dry_run=False): |
| 26 | + |
| 27 | +def always(self, message, *args, **kwargs): |
| 28 | + """Log message that always appears regardless of log level.""" |
| 29 | + if self.isEnabledFor(ALWAYS): |
| 30 | + # pylint: disable=protected-access |
| 31 | + self._log(ALWAYS, message, args, **kwargs) |
| 32 | + |
| 33 | + |
| 34 | +logging.Logger.always = always |
| 35 | + |
| 36 | + |
| 37 | +def find_owned_files_scandir(directory, user_uid): |
| 38 | + """ |
| 39 | + Efficiently find all files owned by a specific user using os.scandir(). |
| 40 | +
|
| 41 | + This is more efficient than os.walk() because os.scandir() caches stat |
| 42 | + information during directory traversal, reducing system calls. |
| 43 | +
|
| 44 | + Args: |
| 45 | + directory (str): The root directory to search. |
| 46 | + user_uid (int): The UID of the user whose files to find. |
| 47 | +
|
| 48 | + Yields: |
| 49 | + str: Absolute paths to files owned by the user. |
| 50 | + """ |
| 51 | + try: |
| 52 | + with os.scandir(directory) as entries: |
| 53 | + for entry in entries: |
| 54 | + try: |
| 55 | + # Check if it's a file (not following symlinks) |
| 56 | + if entry.is_file(follow_symlinks=False): |
| 57 | + # Get stat info (cached by scandir, very efficient) |
| 58 | + stat_info = entry.stat(follow_symlinks=False) |
| 59 | + |
| 60 | + if stat_info.st_uid == user_uid: |
| 61 | + yield entry.path |
| 62 | + |
| 63 | + # Recursively process directories (not following symlinks) |
| 64 | + elif entry.is_dir(follow_symlinks=False): |
| 65 | + yield from find_owned_files_scandir(entry.path, user_uid) |
| 66 | + |
| 67 | + # Skip symlinks |
| 68 | + elif entry.is_symlink(): |
| 69 | + logger.info("Skipping symlink: %s", entry.path) |
| 70 | + |
| 71 | + except (OSError, PermissionError) as e: |
| 72 | + logger.debug("Error accessing %s: %s. Skipping.", entry.path, e) |
| 73 | + continue |
| 74 | + |
| 75 | + except (OSError, PermissionError) as e: |
| 76 | + logger.debug("Error accessing %s: %s. Skipping.", directory, e) |
| 77 | + |
| 78 | + |
| 79 | +def replace_files_with_symlinks(source_dir, target_dir, username, dry_run=False): |
24 | 80 | """ |
25 | 81 | Finds files owned by a specific user in a source directory tree, |
26 | 82 | deletes them, and replaces them with symbolic links to the same |
@@ -52,70 +108,52 @@ def find_and_replace_owned_files(source_dir, target_dir, username, dry_run=False |
52 | 108 | source_dir, |
53 | 109 | ) |
54 | 110 |
|
55 | | - for dirpath, _, filenames in os.walk(source_dir): |
56 | | - for filename in filenames: |
57 | | - file_path = os.path.join(dirpath, filename) |
58 | | - |
59 | | - # Use os.stat().st_uid to get the file's owner UID |
60 | | - try: |
61 | | - if os.path.islink(file_path): |
62 | | - logger.info("Skipping symlink: %s", file_path) |
63 | | - continue |
64 | | - |
65 | | - file_uid = os.stat(file_path).st_uid |
66 | | - except FileNotFoundError: |
67 | | - continue # Skip if file was deleted during traversal |
68 | | - |
69 | | - if file_uid == user_uid: |
70 | | - logger.info("Found owned file: %s", file_path) |
71 | | - |
72 | | - # Determine the relative path and the new link's destination |
73 | | - relative_path = os.path.relpath(file_path, source_dir) |
74 | | - link_target = os.path.join(target_dir, relative_path) |
75 | | - |
76 | | - # Check if the target file actually exists |
77 | | - if not os.path.exists(link_target): |
78 | | - logger.warning( |
79 | | - "Warning: Corresponding file not found in '%s' " |
80 | | - "for '%s'. Skipping.", |
81 | | - target_dir, |
82 | | - file_path, |
83 | | - ) |
84 | | - continue |
85 | | - |
86 | | - # Get the link name |
87 | | - link_name = file_path |
88 | | - |
89 | | - if dry_run: |
90 | | - logger.info( |
91 | | - "[DRY RUN] Would create symbolic link: %s -> %s", |
92 | | - link_name, |
93 | | - link_target, |
94 | | - ) |
95 | | - continue |
96 | | - |
97 | | - # Remove the original file |
98 | | - try: |
99 | | - os.rename(link_name, link_name + ".tmp") |
100 | | - logger.info("Deleted original file: %s", link_name) |
101 | | - except OSError as e: |
102 | | - logger.error("Error deleting file %s: %s. Skipping.", link_name, e) |
103 | | - continue |
104 | | - |
105 | | - # Create the symbolic link, handling necessary parent directories |
106 | | - try: |
107 | | - # Create parent directories for the link if they don't exist |
108 | | - os.makedirs(os.path.dirname(link_name), exist_ok=True) |
109 | | - os.symlink(link_target, link_name) |
110 | | - os.remove(link_name + ".tmp") |
111 | | - logger.info( |
112 | | - "Created symbolic link: %s -> %s", link_name, link_target |
113 | | - ) |
114 | | - except OSError as e: |
115 | | - os.rename(link_name + ".tmp", link_name) |
116 | | - logger.error( |
117 | | - "Error creating symlink for %s: %s. Skipping.", link_name, e |
118 | | - ) |
| 111 | + # Use efficient scandir-based search |
| 112 | + for file_path in find_owned_files_scandir(source_dir, user_uid): |
| 113 | + logger.info("Found owned file: %s", file_path) |
| 114 | + |
| 115 | + # Determine the relative path and the new link's destination |
| 116 | + relative_path = os.path.relpath(file_path, source_dir) |
| 117 | + link_target = os.path.join(target_dir, relative_path) |
| 118 | + |
| 119 | + # Check if the target file actually exists |
| 120 | + if not os.path.exists(link_target): |
| 121 | + logger.warning( |
| 122 | + "Warning: Corresponding file not found in '%s' for '%s'. Skipping.", |
| 123 | + target_dir, |
| 124 | + file_path, |
| 125 | + ) |
| 126 | + continue |
| 127 | + |
| 128 | + # Get the link name |
| 129 | + link_name = file_path |
| 130 | + |
| 131 | + if dry_run: |
| 132 | + logger.info( |
| 133 | + "[DRY RUN] Would create symbolic link: %s -> %s", |
| 134 | + link_name, |
| 135 | + link_target, |
| 136 | + ) |
| 137 | + continue |
| 138 | + |
| 139 | + # Remove the original file |
| 140 | + try: |
| 141 | + os.rename(link_name, link_name + ".tmp") |
| 142 | + logger.info("Deleted original file: %s", link_name) |
| 143 | + except OSError as e: |
| 144 | + logger.error("Error deleting file %s: %s. Skipping.", link_name, e) |
| 145 | + continue |
| 146 | + |
| 147 | + # Create the symbolic link, handling necessary parent directories |
| 148 | + try: |
| 149 | + # Create parent directories for the link if they don't exist |
| 150 | + os.makedirs(os.path.dirname(link_name), exist_ok=True) |
| 151 | + os.symlink(link_target, link_name) |
| 152 | + os.remove(link_name + ".tmp") |
| 153 | + logger.info("Created symbolic link: %s -> %s", link_name, link_target) |
| 154 | + except OSError as e: |
| 155 | + os.rename(link_name + ".tmp", link_name) |
| 156 | + logger.error("Error creating symlink for %s: %s. Skipping.", link_name, e) |
119 | 157 |
|
120 | 158 |
|
121 | 159 | def validate_directory(path): |
@@ -228,13 +266,13 @@ def main(): |
228 | 266 | start_time = time.time() |
229 | 267 |
|
230 | 268 | # --- Execution --- |
231 | | - find_and_replace_owned_files( |
| 269 | + replace_files_with_symlinks( |
232 | 270 | args.source_root, args.target_root, my_username, dry_run=args.dry_run |
233 | 271 | ) |
234 | 272 |
|
235 | 273 | if args.timing: |
236 | 274 | elapsed_time = time.time() - start_time |
237 | | - logger.info("Execution time: %.2f seconds", elapsed_time) |
| 275 | + logger.always("Execution time: %.2f seconds", elapsed_time) |
238 | 276 |
|
239 | 277 |
|
240 | 278 | if __name__ == "__main__": |
|
0 commit comments