This commit is contained in:
Rhet Turnbull
2022-05-23 22:53:39 -07:00
parent bc32b1827f
commit e9cc6ce137
26 changed files with 224 additions and 110 deletions

View File

@@ -1,3 +1,3 @@
""" version info """
__version__ = "0.49.6"
__version__ = "0.49.7"

Binary file not shown.

View File

@@ -6,8 +6,10 @@ import gzip
import json
import logging
import os
import os.path
import pathlib
import pickle
import re
import sqlite3
import sys
import time
@@ -15,7 +17,7 @@ from contextlib import suppress
from io import StringIO
from sqlite3 import Error
from tempfile import TemporaryDirectory
from typing import Any, Optional, Tuple, Union, List
from typing import Any, List, Optional, Tuple, Union
from tenacity import retry, stop_after_attempt
@@ -209,6 +211,35 @@ class ExportDB:
except Error as e:
logging.warning(e)
def get_target_for_file(
self, uuid: str, filename: Union[str, pathlib.Path]
) -> Optional[str]:
"""query database for file matching file name and return the matching filename if there is one;
otherwise return None; looks for file.ext, file (1).ext, file (2).ext and so on to find the
actual target name that was used to export filename
Returns: the matching filename or None if no match found
"""
conn = self._conn
c = conn.cursor()
filepath_normalized = self._normalize_filepath_relative(filename)
filepath_stem = os.path.splitext(filepath_normalized)[0]
c.execute(
"SELECT uuid, filepath, filepath_normalized FROM export_data WHERE uuid = ? AND filepath_normalized LIKE ?",
(
uuid,
f"{filepath_stem}%",
),
)
results = c.fetchall()
for result in results:
filepath_normalized = os.path.splitext(result[2])[0]
if re.match(re.escape(filepath_stem) + r"(\s\(\d+\))?$", filepath_normalized):
return os.path.join(self.export_dir, result[1])
return None
def get_previous_uuids(self):
"""returns list of UUIDs of previously exported photos found in export database"""
conn = self._conn

View File

@@ -44,7 +44,13 @@ from .photokit import (
from .phototemplate import RenderOptions
from .rich_utils import add_rich_markup_tag
from .uti import get_preferred_uti_extension
from .utils import hexdigest, increment_filename, lineno, list_directory
from .utils import (
hexdigest,
increment_filename,
increment_filename_with_count,
lineno,
list_directory,
)
__all__ = [
"ExportError",
@@ -488,7 +494,7 @@ class PhotoExporter:
src = staged_files.edited if options.edited else staged_files.original
# get the right destination path depending on options.update, etc.
dest = self._get_dest_path(src, dest, options)
dest = self._get_dest_path(dest, options)
self._render_options.filepath = str(dest)
all_results = ExportResults()
@@ -642,12 +648,11 @@ class PhotoExporter:
return edited_filename
def _get_dest_path(
self, src: str, dest: pathlib.Path, options: ExportOptions
self, dest: pathlib.Path, options: ExportOptions
) -> pathlib.Path:
"""If destination exists find match in ExportDB, on disk, or add (1), (2), and so on to filename to get a valid destination
Args:
src (str): source file path
dest (str): destination path
options (ExportOptions): Export options
@@ -663,6 +668,10 @@ class PhotoExporter:
f"destination exists ({dest}); overwrite={options.overwrite}, increment={options.increment}"
)
# if overwrite, we don't care if the file exists or not
if options.overwrite:
return dest
# if not update or overwrite, check to see if file exists and if so, add (1), (2), etc
# until we find one that works
# Photos checks the stem and adds (1), (2), etc which avoids collision with sidecars
@@ -675,29 +684,36 @@ class PhotoExporter:
return pathlib.Path(increment_filename(dest))
# if update and file exists, need to check to see if it's the right file by checking export db
if (options.update or options.force_update) and dest.exists() and src:
if options.update or options.force_update:
export_db = options.export_db
# destination exists, check to see if destination is the right UUID
dest_uuid = export_db.get_uuid_for_file(dest)
if dest_uuid != self.photo.uuid:
# not the right file, find the right one
# find files that match "dest_name (*.ext" (e.g. "dest_name (1).jpg", "dest_name (2).jpg)", ...)
dest_files = list_directory(
dest.parent,
startswith=f"{dest.stem} (",
endswith=dest.suffix,
include_path=True,
)
for file_ in dest_files:
dest_uuid = export_db.get_uuid_for_file(file_)
if dest_uuid == self.photo.uuid:
dest = pathlib.Path(file_)
break
else:
# increment the destination file
dest = pathlib.Path(increment_filename(dest))
if dest_uuid is None and not dest.exists():
# destination doesn't exist in export db and doesn't exist on disk
# so we can just use it
return dest
# either dest was updated in the if clause above or not updated at all
if dest_uuid == self.photo.uuid:
# destination is the right file
return dest
# either dest_uuid is wrong or file exists and there's no associated UUID, so find a name that matches
# or create a new name if no match
# find files that match "dest_name (*.ext" (e.g. "dest_name (1).jpg", "dest_name (2).jpg)", ...)
# first, find all matching files in export db and see if there's a match
if dest_target := export_db.get_target_for_file(self.photo.uuid, dest):
# there's a match so use that
return pathlib.Path(dest_target)
# no match so need to create a new name
# increment the destination file until we find one that doesn't exist and doesn't match another uuid in the database
count = 0
dest, count = increment_filename_with_count(dest, count)
count += 1
while export_db.get_uuid_for_file(dest) is not None:
dest, count = increment_filename_with_count(dest, count)
return pathlib.Path(dest)
# fail safe...I can't think of a case that gets here
return dest
def _should_update_photo(
@@ -1036,9 +1052,9 @@ class PhotoExporter:
def _export_photo(
self,
src,
dest,
options,
src: str,
dest: pathlib.Path,
options: ExportOptions,
):
"""Helper function for export()
Does the actual copy or hardlink taking the appropriate
@@ -1444,7 +1460,7 @@ class PhotoExporter:
) -> ExportResults:
"""Write exif metadata to src file using exiftool
Caution: This method modifies *src*, not *dest*,
Caution: This method modifies *src*, not *dest*,
so src must be a copy of the original file if you don't want the source modified;
it also does not write to dest (dest is the intended destination for purposes of
referencing the export database. This allows the exiftool update to be done on the

View File

@@ -411,7 +411,7 @@ def normalize_unicode(value):
def increment_filename_with_count(
filepath: Union[str, pathlib.Path], count: int = 0
) -> str:
) -> Tuple[str, int]:
"""Return filename (1).ext, etc if filename.ext exists
If file exists in filename's parent folder with same stem as filename,
@@ -447,6 +447,7 @@ def increment_filename(filepath: Union[str, pathlib.Path]) -> str:
Args:
filepath: str or pathlib.Path; full path, including file name
force: force the file count to increment by at least 1 even if filepath doesn't exist
Returns:
new filepath (or same if not incremented)
@@ -457,6 +458,13 @@ def increment_filename(filepath: Union[str, pathlib.Path]) -> str:
return new_filepath
def extract_increment_count_from_filename(filepath: Union[str, pathlib.Path]) -> int:
"""Extract a count from end of file name if it exists or 0 if not; count takes forms file (1).ext, file (2).ext, etc."""
filepath = str(filepath)
match = re.search(r"(?s:.*)\((\d+)\)", filepath)
return int(match[1]) if match else 0
def expand_and_validate_filepath(path: str) -> str:
"""validate and expand ~ in filepath, also un-escapes spaces