Added lock files to export to minimize name collisions (#981)
This commit is contained in:
parent
c4788c0fd2
commit
ce297ced0a
@ -180,6 +180,9 @@ _MAX_IPTC_KEYWORD_LEN = 64
|
|||||||
# If anyone has a keyword matching this, then too bad...
|
# If anyone has a keyword matching this, then too bad...
|
||||||
_OSXPHOTOS_NONE_SENTINEL = "OSXPhotosXYZZY42_Sentinel$"
|
_OSXPHOTOS_NONE_SENTINEL = "OSXPhotosXYZZY42_Sentinel$"
|
||||||
|
|
||||||
|
# Lock file extension for reserving filenames when exporting
|
||||||
|
_OSXPHOTOS_LOCK_EXTENSION = ".osxphotos.lock"
|
||||||
|
|
||||||
|
|
||||||
class SearchCategory:
|
class SearchCategory:
|
||||||
"""SearchInfo categories for Photos 5+; corresponds to categories in database/search/psi.sqlite:groups.category
|
"""SearchInfo categories for Photos 5+; corresponds to categories in database/search/psi.sqlite:groups.category
|
||||||
@ -351,7 +354,7 @@ def search_category_factory(version: int) -> SearchCategory:
|
|||||||
|
|
||||||
|
|
||||||
# Max filename length on MacOS
|
# Max filename length on MacOS
|
||||||
MAX_FILENAME_LEN = 255
|
MAX_FILENAME_LEN = 255 - len(_OSXPHOTOS_LOCK_EXTENSION)
|
||||||
|
|
||||||
# Max directory name length on MacOS
|
# Max directory name length on MacOS
|
||||||
MAX_DIRNAME_LEN = 255
|
MAX_DIRNAME_LEN = 255
|
||||||
|
|||||||
@ -1,5 +1,6 @@
|
|||||||
"""export command for osxphotos CLI"""
|
"""export command for osxphotos CLI"""
|
||||||
|
|
||||||
|
import atexit
|
||||||
import inspect
|
import inspect
|
||||||
import os
|
import os
|
||||||
import pathlib
|
import pathlib
|
||||||
@ -63,8 +64,10 @@ from osxphotos.utils import (
|
|||||||
get_macos_version,
|
get_macos_version,
|
||||||
normalize_fs_path,
|
normalize_fs_path,
|
||||||
pluralize,
|
pluralize,
|
||||||
|
under_test,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
from .cli_commands import logger
|
||||||
from .cli_params import (
|
from .cli_params import (
|
||||||
DB_ARGUMENT,
|
DB_ARGUMENT,
|
||||||
DB_OPTION,
|
DB_OPTION,
|
||||||
@ -1406,6 +1409,20 @@ def export(
|
|||||||
else None
|
else None
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def cleanup_lock_files():
|
||||||
|
"""Cleanup lock files"""
|
||||||
|
if not under_test():
|
||||||
|
verbose("Cleaning up lock files")
|
||||||
|
if dry_run:
|
||||||
|
return
|
||||||
|
for lock_file in pathlib.Path(dest).rglob("*.osxphotos.lock"):
|
||||||
|
try:
|
||||||
|
lock_file.unlink()
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"Error removing lock file {lock_file}: {e}")
|
||||||
|
|
||||||
|
atexit.register(cleanup_lock_files)
|
||||||
|
|
||||||
photo_num = 0
|
photo_num = 0
|
||||||
num_exported = 0
|
num_exported = 0
|
||||||
limit_str = f" (limit = [num]{limit}[/num])" if limit else ""
|
limit_str = f" (limit = [num]{limit}[/num])" if limit else ""
|
||||||
|
|||||||
@ -50,6 +50,8 @@ from .utils import (
|
|||||||
increment_filename_with_count,
|
increment_filename_with_count,
|
||||||
lineno,
|
lineno,
|
||||||
list_directory,
|
list_directory,
|
||||||
|
lock_filename,
|
||||||
|
unlock_filename,
|
||||||
)
|
)
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
@ -490,6 +492,7 @@ class PhotoExporter:
|
|||||||
f"Skipping missing {'edited' if options.edited else 'original'} photo {self._filename(self.photo.original_filename)} ({self._uuid(self.photo.uuid)})"
|
f"Skipping missing {'edited' if options.edited else 'original'} photo {self._filename(self.photo.original_filename)} ({self._uuid(self.photo.uuid)})"
|
||||||
)
|
)
|
||||||
all_results.missing.append(dest)
|
all_results.missing.append(dest)
|
||||||
|
unlock_filename(dest)
|
||||||
|
|
||||||
# copy live photo associated .mov if requested
|
# copy live photo associated .mov if requested
|
||||||
if export_original and options.live_photo and self.photo.live_photo:
|
if export_original and options.live_photo and self.photo.live_photo:
|
||||||
@ -565,7 +568,9 @@ class PhotoExporter:
|
|||||||
preview_name = (
|
preview_name = (
|
||||||
preview_name
|
preview_name
|
||||||
if any([options.overwrite, options.update, options.force_update])
|
if any([options.overwrite, options.update, options.force_update])
|
||||||
else pathlib.Path(increment_filename(preview_name))
|
else pathlib.Path(
|
||||||
|
increment_filename(preview_name, lock=not options.dry_run)
|
||||||
|
)
|
||||||
)
|
)
|
||||||
all_results += self._export_photo(
|
all_results += self._export_photo(
|
||||||
preview_path,
|
preview_path,
|
||||||
@ -625,11 +630,10 @@ class PhotoExporter:
|
|||||||
if self.photo.path_edited:
|
if self.photo.path_edited:
|
||||||
ext = pathlib.Path(self.photo.path_edited).suffix
|
ext = pathlib.Path(self.photo.path_edited).suffix
|
||||||
else:
|
else:
|
||||||
uti = self.photo.uti_edited if self.photo.uti_edited else self.photo.uti
|
uti = self.photo.uti_edited or self.photo.uti
|
||||||
ext = get_preferred_uti_extension(uti)
|
ext = get_preferred_uti_extension(uti)
|
||||||
ext = "." + ext
|
ext = f".{ext}"
|
||||||
edited_filename = original_filename.stem + "_edited" + ext
|
return f"{original_filename.stem}_edited{ext}"
|
||||||
return edited_filename
|
|
||||||
|
|
||||||
def _get_dest_path(
|
def _get_dest_path(
|
||||||
self, dest: pathlib.Path, options: ExportOptions
|
self, dest: pathlib.Path, options: ExportOptions
|
||||||
@ -644,6 +648,14 @@ class PhotoExporter:
|
|||||||
new dest path (pathlib.Path)
|
new dest path (pathlib.Path)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
# lock files are used to minimize chance of name collision when in parallel mode
|
||||||
|
# don't create lock files if in dry_run mode
|
||||||
|
lock = not options.dry_run
|
||||||
|
|
||||||
|
def _lock_filename(filename):
|
||||||
|
"""Lock filename if not in dry_run mode"""
|
||||||
|
return lock_filename(filename) if lock else filename
|
||||||
|
|
||||||
# if overwrite==False and #increment==False, export should fail if file exists
|
# if overwrite==False and #increment==False, export should fail if file exists
|
||||||
if dest.exists() and not any(
|
if dest.exists() and not any(
|
||||||
[options.increment, options.update, options.force_update, options.overwrite]
|
[options.increment, options.update, options.force_update, options.overwrite]
|
||||||
@ -653,7 +665,7 @@ class PhotoExporter:
|
|||||||
)
|
)
|
||||||
|
|
||||||
# if overwrite, we don't care if the file exists or not
|
# if overwrite, we don't care if the file exists or not
|
||||||
if options.overwrite:
|
if options.overwrite and _lock_filename(dest):
|
||||||
return dest
|
return dest
|
||||||
|
|
||||||
# if not update or overwrite, check to see if file exists and if so, add (1), (2), etc
|
# if not update or overwrite, check to see if file exists and if so, add (1), (2), etc
|
||||||
@ -665,19 +677,21 @@ class PhotoExporter:
|
|||||||
if options.increment and not any(
|
if options.increment and not any(
|
||||||
[options.update, options.force_update, options.overwrite]
|
[options.update, options.force_update, options.overwrite]
|
||||||
):
|
):
|
||||||
return pathlib.Path(increment_filename(dest))
|
return pathlib.Path(increment_filename(dest, lock=lock))
|
||||||
|
|
||||||
# if update and file exists, need to check to see if it's the right file by checking export db
|
# if update and file exists, need to check to see if it's the right file by checking export db
|
||||||
if options.update or options.force_update:
|
if options.update or options.force_update:
|
||||||
export_db = options.export_db
|
export_db = options.export_db
|
||||||
dest_uuid = export_db.get_uuid_for_file(dest)
|
dest_uuid = export_db.get_uuid_for_file(dest)
|
||||||
if dest_uuid is None and not dest.exists():
|
if dest_uuid is None and not dest.exists() and _lock_filename(dest):
|
||||||
# destination doesn't exist in export db and doesn't exist on disk
|
# destination doesn't exist in export db and doesn't exist on disk
|
||||||
# so we can just use it
|
# so we can just use it
|
||||||
return dest
|
return dest
|
||||||
|
|
||||||
if dest_uuid == self.photo.uuid:
|
if dest_uuid == self.photo.uuid:
|
||||||
# destination is the right file
|
# destination is the right file
|
||||||
|
# will use it even if locked so don't check return value of _lock_filename
|
||||||
|
_lock_filename(dest)
|
||||||
return dest
|
return dest
|
||||||
|
|
||||||
# either dest_uuid is wrong or file exists and there's no associated UUID, so find a name that matches
|
# either dest_uuid is wrong or file exists and there's no associated UUID, so find a name that matches
|
||||||
@ -686,18 +700,20 @@ class PhotoExporter:
|
|||||||
# first, find all matching files in export db and see if there's a match
|
# first, find all matching files in export db and see if there's a match
|
||||||
if dest_target := export_db.get_target_for_file(self.photo.uuid, dest):
|
if dest_target := export_db.get_target_for_file(self.photo.uuid, dest):
|
||||||
# there's a match so use that
|
# there's a match so use that
|
||||||
|
_lock_filename(dest_target)
|
||||||
return pathlib.Path(dest_target)
|
return pathlib.Path(dest_target)
|
||||||
|
|
||||||
# no match so need to create a new name
|
# no match so need to create a new name
|
||||||
# increment the destination file until we find one that doesn't exist and doesn't match another uuid in the database
|
# increment the destination file until we find one that doesn't exist and doesn't match another uuid in the database
|
||||||
count = 0
|
count = 0
|
||||||
dest, count = increment_filename_with_count(dest, count)
|
dest, count = increment_filename_with_count(dest, count, lock=lock)
|
||||||
count += 1
|
count += 1
|
||||||
while export_db.get_uuid_for_file(dest) is not None:
|
while export_db.get_uuid_for_file(dest) is not None:
|
||||||
dest, count = increment_filename_with_count(dest, count)
|
dest, count = increment_filename_with_count(dest, count, lock=lock)
|
||||||
return pathlib.Path(dest)
|
return pathlib.Path(dest)
|
||||||
|
|
||||||
# fail safe...I can't think of a case that gets here
|
# fail safe...I can't think of a case that gets here
|
||||||
|
_lock_filename(dest)
|
||||||
return dest
|
return dest
|
||||||
|
|
||||||
def _should_update_photo(
|
def _should_update_photo(
|
||||||
@ -1216,6 +1232,9 @@ class PhotoExporter:
|
|||||||
"exiftool_warning": exif_results.exiftool_warning,
|
"exiftool_warning": exif_results.exiftool_warning,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# clean up lock file
|
||||||
|
unlock_filename(dest_str)
|
||||||
|
|
||||||
return results
|
return results
|
||||||
|
|
||||||
def _export_photo_uuid_applescript(
|
def _export_photo_uuid_applescript(
|
||||||
|
|||||||
@ -39,6 +39,7 @@ __all__ = [
|
|||||||
"list_directory",
|
"list_directory",
|
||||||
"list_photo_libraries",
|
"list_photo_libraries",
|
||||||
"load_function",
|
"load_function",
|
||||||
|
"lock_filename",
|
||||||
"noop",
|
"noop",
|
||||||
"normalize_fs_path",
|
"normalize_fs_path",
|
||||||
"normalize_unicode",
|
"normalize_unicode",
|
||||||
@ -367,7 +368,7 @@ def normalize_unicode(value):
|
|||||||
|
|
||||||
|
|
||||||
def increment_filename_with_count(
|
def increment_filename_with_count(
|
||||||
filepath: Union[str, pathlib.Path], count: int = 0
|
filepath: Union[str, pathlib.Path], count: int = 0, lock: bool = False
|
||||||
) -> Tuple[str, int]:
|
) -> Tuple[str, int]:
|
||||||
"""Return filename (1).ext, etc if filename.ext exists
|
"""Return filename (1).ext, etc if filename.ext exists
|
||||||
|
|
||||||
@ -377,6 +378,7 @@ def increment_filename_with_count(
|
|||||||
Args:
|
Args:
|
||||||
filepath: str or pathlib.Path; full path, including file name
|
filepath: str or pathlib.Path; full path, including file name
|
||||||
count: int; starting increment value
|
count: int; starting increment value
|
||||||
|
lock: bool, if True, creates lock file to reserve filename
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
tuple of new filepath (or same if not incremented), count
|
tuple of new filepath (or same if not incremented), count
|
||||||
@ -393,10 +395,13 @@ def increment_filename_with_count(
|
|||||||
count += 1
|
count += 1
|
||||||
dest_new = normalize_fs_path(f"{dest.stem} ({count})")
|
dest_new = normalize_fs_path(f"{dest.stem} ({count})")
|
||||||
dest = dest.parent / f"{dest_new}{dest.suffix}"
|
dest = dest.parent / f"{dest_new}{dest.suffix}"
|
||||||
|
if lock and not lock_filename(dest):
|
||||||
|
# if lock fails, increment count and try again
|
||||||
|
return increment_filename_with_count(filepath, count + 1, lock=lock)
|
||||||
return normalize_fs_path(str(dest)), count
|
return normalize_fs_path(str(dest)), count
|
||||||
|
|
||||||
|
|
||||||
def increment_filename(filepath: Union[str, pathlib.Path]) -> str:
|
def increment_filename(filepath: Union[str, pathlib.Path], lock: bool = False) -> str:
|
||||||
"""Return filename (1).ext, etc if filename.ext exists
|
"""Return filename (1).ext, etc if filename.ext exists
|
||||||
|
|
||||||
If file exists in filename's parent folder with same stem as filename,
|
If file exists in filename's parent folder with same stem as filename,
|
||||||
@ -405,16 +410,47 @@ def increment_filename(filepath: Union[str, pathlib.Path]) -> str:
|
|||||||
Args:
|
Args:
|
||||||
filepath: str or pathlib.Path; full path, including file name
|
filepath: str or pathlib.Path; full path, including file name
|
||||||
force: force the file count to increment by at least 1 even if filepath doesn't exist
|
force: force the file count to increment by at least 1 even if filepath doesn't exist
|
||||||
|
lock: bool, if True, creates lock file to reserve filename
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
new filepath (or same if not incremented)
|
new filepath (or same if not incremented)
|
||||||
|
|
||||||
Note: This obviously is subject to race condition so using with caution.
|
Note: This obviously is subject to race condition so using with caution.
|
||||||
"""
|
"""
|
||||||
new_filepath, _ = increment_filename_with_count(filepath)
|
new_filepath, _ = increment_filename_with_count(filepath, lock=lock)
|
||||||
return new_filepath
|
return new_filepath
|
||||||
|
|
||||||
|
|
||||||
|
def lock_filename(filepath: Union[str, pathlib.Path]) -> bool:
|
||||||
|
"""Create empty lock file to reserve file.
|
||||||
|
Lock file will have name of filepath with .osxphotos.lock extension.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
filepath: str or pathlib.Path; full path, including file name
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
filepath if lock file created, False if lock file already exists
|
||||||
|
"""
|
||||||
|
|
||||||
|
lockfile = pathlib.Path(f"{filepath}.osxphotos.lock")
|
||||||
|
if lockfile.exists():
|
||||||
|
return False
|
||||||
|
lockfile.touch()
|
||||||
|
return filepath
|
||||||
|
|
||||||
|
|
||||||
|
def unlock_filename(filepath: Union[str, pathlib.Path]):
|
||||||
|
"""Remove lock file created by lock_filename()
|
||||||
|
|
||||||
|
Args:
|
||||||
|
filepath: str or pathlib.Path; full path, including file name
|
||||||
|
"""
|
||||||
|
|
||||||
|
lockfile = pathlib.Path(f"{filepath}.osxphotos.lock")
|
||||||
|
if lockfile.exists():
|
||||||
|
lockfile.unlink()
|
||||||
|
|
||||||
|
|
||||||
def extract_increment_count_from_filename(filepath: Union[str, pathlib.Path]) -> int:
|
def extract_increment_count_from_filename(filepath: Union[str, pathlib.Path]) -> int:
|
||||||
"""Extract a count from end of file name if it exists or 0 if not; count takes forms file (1).ext, file (2).ext, etc."""
|
"""Extract a count from end of file name if it exists or 0 if not; count takes forms file (1).ext, file (2).ext, etc."""
|
||||||
filepath = str(filepath)
|
filepath = str(filepath)
|
||||||
@ -501,3 +537,7 @@ def uuid_to_shortuuid(uuid: str) -> str:
|
|||||||
def shortuuid_to_uuid(short_uuid: str) -> str:
|
def shortuuid_to_uuid(short_uuid: str) -> str:
|
||||||
"""Convert shortuuid to uuid"""
|
"""Convert shortuuid to uuid"""
|
||||||
return str(shortuuid.decode(short_uuid)).upper()
|
return str(shortuuid.decode(short_uuid)).upper()
|
||||||
|
|
||||||
|
def under_test() -> bool:
|
||||||
|
"""Return True if running under pytest"""
|
||||||
|
return "pytest" in sys.modules
|
||||||
@ -9,8 +9,8 @@ import os.path
|
|||||||
import pathlib
|
import pathlib
|
||||||
import re
|
import re
|
||||||
import shutil
|
import shutil
|
||||||
import subprocess
|
|
||||||
import sqlite3
|
import sqlite3
|
||||||
|
import subprocess
|
||||||
import tempfile
|
import tempfile
|
||||||
import time
|
import time
|
||||||
from tempfile import TemporaryDirectory
|
from tempfile import TemporaryDirectory
|
||||||
@ -498,10 +498,9 @@ CLI_EXPORTED_FILENAME_TEMPLATE_FILENAMES_KEYWORD_PATHSEP = [
|
|||||||
]
|
]
|
||||||
|
|
||||||
CLI_EXPORTED_FILENAME_TEMPLATE_LONG_DESCRIPTION = [
|
CLI_EXPORTED_FILENAME_TEMPLATE_LONG_DESCRIPTION = [
|
||||||
"Lorem ipsum dolor sit amet, consectetuer adipiscing elit. "
|
"Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Aenean commodo"
|
||||||
"Aenean commodo ligula eget dolor. Aenean massa. "
|
" ligula eget dolor. Aenean massa. Cum sociis natoque penatibus et magnis "
|
||||||
"Cum sociis natoque penatibus et magnis dis parturient montes, "
|
"dis parturient montes, nascetu. Donec quam felis, ultricies nec, "
|
||||||
"nascetur ridiculus mus. Donec quam felis, ultricies nec, "
|
|
||||||
"pellentesque eu, pretium q.tif"
|
"pellentesque eu, pretium q.tif"
|
||||||
]
|
]
|
||||||
|
|
||||||
@ -4008,6 +4007,7 @@ def test_export_filename_template_long_description():
|
|||||||
],
|
],
|
||||||
)
|
)
|
||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
|
files = glob.glob("*.*")
|
||||||
for fname in CLI_EXPORTED_FILENAME_TEMPLATE_LONG_DESCRIPTION:
|
for fname in CLI_EXPORTED_FILENAME_TEMPLATE_LONG_DESCRIPTION:
|
||||||
assert pathlib.Path(fname).is_file()
|
assert pathlib.Path(fname).is_file()
|
||||||
|
|
||||||
|
|||||||
@ -1,8 +1,10 @@
|
|||||||
""" Test path_utils.py """
|
""" Test path_utils.py """
|
||||||
|
|
||||||
|
from osxphotos._constants import _OSXPHOTOS_LOCK_EXTENSION, MAX_FILENAME_LEN
|
||||||
|
from osxphotos.path_utils import sanitize_filename
|
||||||
|
|
||||||
|
|
||||||
def test_sanitize_filename():
|
def test_sanitize_filename():
|
||||||
from osxphotos.path_utils import sanitize_filename
|
|
||||||
from osxphotos._constants import MAX_FILENAME_LEN
|
|
||||||
|
|
||||||
# basic sanitize
|
# basic sanitize
|
||||||
filenames = {
|
filenames = {
|
||||||
@ -30,30 +32,33 @@ def test_sanitize_filename():
|
|||||||
filename = "foo" + "x" * 512
|
filename = "foo" + "x" * 512
|
||||||
new_filename = sanitize_filename(filename)
|
new_filename = sanitize_filename(filename)
|
||||||
assert len(new_filename) == MAX_FILENAME_LEN
|
assert len(new_filename) == MAX_FILENAME_LEN
|
||||||
assert new_filename == "foo" + "x" * 252
|
assert new_filename == "foo" + "x" * (252 - len(_OSXPHOTOS_LOCK_EXTENSION))
|
||||||
|
|
||||||
# filename too long with extension
|
# filename too long with extension
|
||||||
filename = "x" * 512 + ".jpeg"
|
filename = "x" * 512 + ".jpeg"
|
||||||
new_filename = sanitize_filename(filename)
|
new_filename = sanitize_filename(filename)
|
||||||
assert len(new_filename) == MAX_FILENAME_LEN
|
assert len(new_filename) == MAX_FILENAME_LEN
|
||||||
assert new_filename == "x" * 250 + ".jpeg"
|
assert new_filename == "x" * (250 - len(_OSXPHOTOS_LOCK_EXTENSION)) + ".jpeg"
|
||||||
|
|
||||||
# more than one extension
|
# more than one extension
|
||||||
filename = "foo.bar" + "x" * 255 + ".foo.bar.jpeg"
|
filename = "foo.bar" + "x" * 255 + ".foo.bar.jpeg"
|
||||||
new_filename = sanitize_filename(filename)
|
new_filename = sanitize_filename(filename)
|
||||||
assert len(new_filename) == MAX_FILENAME_LEN
|
assert len(new_filename) == MAX_FILENAME_LEN
|
||||||
assert new_filename == "foo.bar" + "x" * 243 + ".jpeg"
|
assert (
|
||||||
|
new_filename
|
||||||
|
== "foo.bar" + "x" * (243 - len(_OSXPHOTOS_LOCK_EXTENSION)) + ".jpeg"
|
||||||
|
)
|
||||||
|
|
||||||
# shorter than drop count
|
# shorter than drop count
|
||||||
filename = "foo." + "x" * 256
|
filename = "foo." + "x" * 256
|
||||||
new_filename = sanitize_filename(filename)
|
new_filename = sanitize_filename(filename)
|
||||||
assert len(new_filename) == MAX_FILENAME_LEN
|
assert len(new_filename) == MAX_FILENAME_LEN
|
||||||
assert new_filename == "foo." + "x" * 251
|
assert new_filename == "foo." + "x" * (251 - len(_OSXPHOTOS_LOCK_EXTENSION))
|
||||||
|
|
||||||
|
|
||||||
def test_sanitize_dirname():
|
def test_sanitize_dirname():
|
||||||
from osxphotos.path_utils import sanitize_dirname
|
|
||||||
from osxphotos._constants import MAX_DIRNAME_LEN
|
from osxphotos._constants import MAX_DIRNAME_LEN
|
||||||
|
from osxphotos.path_utils import sanitize_dirname
|
||||||
|
|
||||||
# basic sanitize
|
# basic sanitize
|
||||||
dirnames = {
|
dirnames = {
|
||||||
@ -83,9 +88,10 @@ def test_sanitize_dirname():
|
|||||||
assert len(new_dirname) == MAX_DIRNAME_LEN
|
assert len(new_dirname) == MAX_DIRNAME_LEN
|
||||||
assert new_dirname == "foo" + "x" * 252
|
assert new_dirname == "foo" + "x" * 252
|
||||||
|
|
||||||
|
|
||||||
def test_sanitize_pathpart():
|
def test_sanitize_pathpart():
|
||||||
from osxphotos.path_utils import sanitize_pathpart
|
|
||||||
from osxphotos._constants import MAX_DIRNAME_LEN
|
from osxphotos._constants import MAX_DIRNAME_LEN
|
||||||
|
from osxphotos.path_utils import sanitize_pathpart
|
||||||
|
|
||||||
# basic sanitize
|
# basic sanitize
|
||||||
dirnames = {
|
dirnames = {
|
||||||
@ -114,4 +120,3 @@ def test_sanitize_pathpart():
|
|||||||
new_dirname = sanitize_pathpart(dirname)
|
new_dirname = sanitize_pathpart(dirname)
|
||||||
assert len(new_dirname) == MAX_DIRNAME_LEN
|
assert len(new_dirname) == MAX_DIRNAME_LEN
|
||||||
assert new_dirname == "foo" + "x" * 252
|
assert new_dirname == "foo" + "x" * 252
|
||||||
|
|
||||||
|
|||||||
@ -125,6 +125,42 @@ def test_increment_filename():
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_increment_filename_with_lock():
|
||||||
|
# test that increment_filename works with lock=True
|
||||||
|
|
||||||
|
with tempfile.TemporaryDirectory(prefix="osxphotos_") as temp_dir:
|
||||||
|
temp_dir = pathlib.Path(temp_dir)
|
||||||
|
filename = str(temp_dir / "file.jpg")
|
||||||
|
assert increment_filename(filename, lock=True) == str(temp_dir / "file.jpg")
|
||||||
|
|
||||||
|
new_file = temp_dir / "file.jpg"
|
||||||
|
new_file.touch()
|
||||||
|
assert increment_filename(filename, lock=True) == str(temp_dir / "file (1).jpg")
|
||||||
|
|
||||||
|
# test increment_filename_with_count
|
||||||
|
filename = str(temp_dir / "file2.jpg")
|
||||||
|
assert increment_filename_with_count(filename, count=2, lock=True) == (
|
||||||
|
str(temp_dir / "file2 (2).jpg"),
|
||||||
|
2,
|
||||||
|
)
|
||||||
|
new_file = temp_dir / "file2 (2).jpg"
|
||||||
|
new_file.touch()
|
||||||
|
assert increment_filename_with_count(filename, count=2, lock=True) == (
|
||||||
|
str(temp_dir / "file2 (3).jpg"),
|
||||||
|
3,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_increment_filename_with_lock_exists():
|
||||||
|
# test that increment_filename works with lock=True when lock file already exists
|
||||||
|
|
||||||
|
with tempfile.TemporaryDirectory(prefix="osxphotos_") as temp_dir:
|
||||||
|
temp_dir = pathlib.Path(temp_dir)
|
||||||
|
filename = str(temp_dir / "file.jpg")
|
||||||
|
pathlib.Path(f"{filename}.osxphotos.lock").touch()
|
||||||
|
assert increment_filename(filename, lock=True) == str(temp_dir / "file (1).jpg")
|
||||||
|
|
||||||
|
|
||||||
def test_shortuuid_uuid():
|
def test_shortuuid_uuid():
|
||||||
"""Test shortuuid_to_uuid and uuid_to_shortuuid"""
|
"""Test shortuuid_to_uuid and uuid_to_shortuuid"""
|
||||||
uuid = "5CF8D91E-DCEB-4CC3-BFF7-920B05564EB0"
|
uuid = "5CF8D91E-DCEB-4CC3-BFF7-920B05564EB0"
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user