Performance improvements and refactoring, #462, partial for #591

This commit is contained in:
Rhet Turnbull 2022-01-27 06:28:12 -08:00
parent 3bc53fd92b
commit 22964afc69
2 changed files with 194 additions and 185 deletions

View File

@ -1,7 +1,6 @@
""" PhotoExport class to export photos
"""
# TODO: the various sidecar_json, sidecar_xmp, etc args should all be collapsed to a sidecar param using a bit mask
import dataclasses
import glob
@ -14,7 +13,7 @@ import re
import tempfile
from collections import namedtuple # pylint: disable=syntax-error
from dataclasses import asdict, dataclass
from typing import TYPE_CHECKING, Callable, List, Optional
from typing import TYPE_CHECKING, Callable, List, Optional, Tuple
import photoscript
from mako.template import Template
@ -222,6 +221,7 @@ class ExportResults:
skipped=None,
exif_updated=None,
touched=None,
to_touch=None,
converted_to_jpeg=None,
sidecar_json_written=None,
sidecar_json_skipped=None,
@ -247,6 +247,7 @@ class ExportResults:
self.skipped = skipped or []
self.exif_updated = exif_updated or []
self.touched = touched or []
self.to_touch = to_touch or []
self.converted_to_jpeg = converted_to_jpeg or []
self.sidecar_json_written = sidecar_json_written or []
self.sidecar_json_skipped = sidecar_json_skipped or []
@ -298,6 +299,7 @@ class ExportResults:
self.skipped += other.skipped
self.exif_updated += other.exif_updated
self.touched += other.touched
self.to_touch += other.to_touch
self.converted_to_jpeg += other.converted_to_jpeg
self.sidecar_json_written += other.sidecar_json_written
self.sidecar_json_skipped += other.sidecar_json_skipped
@ -326,6 +328,7 @@ class ExportResults:
+ f",skipped={self.skipped}"
+ f",exif_updated={self.exif_updated}"
+ f",touched={self.touched}"
+ f",to_touch={self.to_touch}"
+ f",converted_to_jpeg={self.converted_to_jpeg}"
+ f",sidecar_json_written={self.sidecar_json_written}"
+ f",sidecar_json_skipped={self.sidecar_json_skipped}"
@ -520,11 +523,9 @@ class PhotoExporter:
# when called from export(), won't get an export_db, so use no-op version
options.export_db = options.export_db or ExportDBNoOp()
export_db = options.export_db
# ensure there's a FileUtil class to use
options.fileutil = options.fileutil or FileUtil
fileutil = options.fileutil
self._render_options = options.render_options or RenderOptions()
@ -551,88 +552,24 @@ class PhotoExporter:
dest = pathlib.Path(dest) / filename
# Is there something to convert with convert_to_jpeg?
if options.convert_to_jpeg and self.photo.isphoto:
something_to_convert = False
ext = "." + options.jpeg_ext if options.jpeg_ext else ".jpeg"
if export_original and self.photo.uti_original != "public.jpeg":
# not a jpeg but will convert to jpeg upon export so fix file extension
something_to_convert = True
dest = dest.parent / f"{dest.stem}{ext}"
if export_edited and self.photo.uti != "public.jpeg":
# in Big Sur+, edited HEICs are HEIC
something_to_convert = True
dest = dest.parent / f"{dest.stem}{ext}"
convert_to_jpeg = something_to_convert
else:
convert_to_jpeg = False
options = dataclasses.replace(options, convert_to_jpeg=convert_to_jpeg)
dest, options = self._should_convert_to_jpeg(dest, options)
dest, _ = self._validate_dest_path(
dest,
increment=options.increment,
update=options.update,
overwrite=options.overwrite,
)
dest = pathlib.Path(dest)
# stage files for export by finding path in local library or downloading from iCloud as appropriate
staged_files = self._stage_photos_for_export(options)
src = staged_files.edited if options.edited else staged_files.original
# get the right destination path depending on options.update, etc.
dest = self._get_dest_path(src, dest, options)
self._render_options.filepath = str(dest)
all_results = ExportResults()
staged_files = self._stage_photos_for_export(options)
src = staged_files.edited if options.edited else staged_files.original
if src:
# found source now try to find right destination
if options.update and dest.exists():
# destination exists, check to see if destination is the right UUID
dest_uuid = export_db.get_uuid_for_file(dest)
if dest_uuid is None and fileutil.cmp(src, dest):
# might be exporting into a pre-ExportDB folder or the DB got deleted
dest_uuid = self.photo.uuid
export_db.set_data(
filename=dest,
uuid=self.photo.uuid,
orig_stat=fileutil.file_sig(dest),
exif_stat=(None, None, None),
converted_stat=(None, None, None),
edited_stat=(None, None, None),
info_json=self.photo.json(),
exif_json=None,
)
if dest_uuid != self.photo.uuid:
# not the right file, find the right one
glob_str = str(dest.parent / f"{dest.stem} (*{dest.suffix}")
# TODO: use the normalized code in utils
dest_files = glob.glob(glob_str)
for file_ in dest_files:
dest_uuid = export_db.get_uuid_for_file(file_)
if dest_uuid == self.photo.uuid:
dest = pathlib.Path(file_)
break
elif dest_uuid is None and fileutil.cmp(src, file_):
# files match, update the UUID
dest = pathlib.Path(file_)
export_db.set_data(
filename=dest,
uuid=self.photo.uuid,
orig_stat=fileutil.file_sig(dest),
exif_stat=(None, None, None),
converted_stat=(None, None, None),
edited_stat=(None, None, None),
info_json=self.photo.json(),
exif_json=None,
)
break
else:
# increment the destination file
dest = pathlib.Path(increment_filename(dest))
# export the dest file
results = self._export_photo(
all_results += self._export_photo(
src,
dest,
options=options,
)
all_results += results
# copy live photo associated .mov if requested
if (
@ -643,13 +580,12 @@ class PhotoExporter:
):
live_name = dest.parent / f"{dest.stem}.mov"
src_live = staged_files.original_live
results = self._export_photo(
all_results += self._export_photo(
src_live,
live_name,
# don't try to convert the live photo
options=dataclasses.replace(options, convert_to_jpeg=False),
)
all_results += results
if (
export_edited
@ -659,26 +595,23 @@ class PhotoExporter:
):
live_name = dest.parent / f"{dest.stem}.mov"
src_live = staged_files.edited_live
results = self._export_photo(
all_results += self._export_photo(
src_live,
live_name,
# don't try to convert the live photo
options=dataclasses.replace(options, convert_to_jpeg=False),
)
all_results += results
# copy associated RAW image if requested
if options.raw_photo and self.photo.has_raw and staged_files.raw:
raw_path = pathlib.Path(staged_files.raw)
raw_ext = raw_path.suffix
raw_name = dest.parent / f"{dest.stem}{raw_ext}"
if raw_path is not None:
results = self._export_photo(
raw_path,
raw_name,
options=options,
)
all_results += results
all_results += self._export_photo(
raw_path,
raw_name,
options=options,
)
# copy preview image if requested
if options.preview and staged_files.preview:
@ -697,41 +630,40 @@ class PhotoExporter:
if options.overwrite or options.update
else pathlib.Path(increment_filename(preview_name))
)
if preview_path is not None:
results = self._export_photo(
preview_path,
preview_name,
options=options,
)
all_results += results
all_results += self._export_photo(
preview_path,
preview_name,
options=options,
)
results = self._write_sidecar_files(dest=dest, options=options)
all_results += results
all_results += self._write_sidecar_files(dest=dest, options=options)
# if exiftool, write the metadata
if options.exiftool:
exif_files = (
all_results.new + all_results.updated + all_results.skipped
if options.update
else all_results.exported
all_results += self._write_exif_metadata_to_files(
all_results, options=options
)
for exported_file in exif_files:
results = self._write_exif_metadata_to_files(
exported_file=exported_file, options=options
)
all_results += results
if options.touch_file:
for exif_file in all_results.exif_updated:
verbose(f"Updating file modification time for {exif_file}")
all_results.touched.append(exif_file)
ts = int(self.photo.date.timestamp())
fileutil.utime(exif_file, (ts, ts))
all_results.touched = list(set(all_results.touched))
all_results += self._touch_files(all_results, options)
return all_results
def _touch_files(
self, results: ExportResults, options: ExportOptions
) -> ExportResults:
"""touch file date/time to match photo creation date/time"""
# verbose = options.verbose or self._verbose
fileutil = options.fileutil
touch_files = set(results.to_touch)
touch_results = ExportResults()
for touch_file in touch_files:
# verbose(f"Updating file modification time for {touch_file}")
ts = int(self.photo.date.timestamp())
fileutil.utime(touch_file, (ts, ts))
touch_results.touched.append(touch_file)
return touch_results
def _get_edited_filename(self, original_filename):
"""Return the filename for the exported edited photo
(used when filename isn't provided in call to export2)"""
@ -746,34 +678,86 @@ class PhotoExporter:
edited_filename = original_filename.stem + "_edited" + ext
return edited_filename
def _validate_dest_path(self, dest, increment, update, overwrite, count=0):
"""If destination exists, add (1), (2), and so on to filename to get a valid destination
def _get_dest_path(
self, src: str, dest: pathlib.Path, options: ExportOptions
) -> pathlib.Path:
"""If destination exists find match in ExportDB, on disk, or add (1), (2), and so on to filename to get a valid destination
Args:
dest (str): Destination path
increment (bool): Whether to increment the filename if it already exists
update (bool): Whether running in update mode
overwrite (bool): Whether running in overwrite mode
count: optional counter to start from (if 0, start from 1)
src (str): source file path
dest (str): destination path
options (ExportOptions): Export options
Returns:
new dest path (pathlib.Path), increment count (int)
new dest path (pathlib.Path)
"""
# check to see if file exists and if so, add (1), (2), etc until we find one that works
# if overwrite==False and #increment==False, export should fail if file exists
if dest.exists() and not any(
[options.increment, options.update, options.overwrite]
):
raise FileExistsError(
f"destination exists ({dest}); overwrite={options.overwrite}, increment={options.increment}"
)
# if not update or overwrite, check to see if file exists and if so, add (1), (2), etc
# until we find one that works
# Photos checks the stem and adds (1), (2), etc which avoids collision with sidecars
# e.g. exporting sidecar for file1.png and file1.jpeg
# if file1.png exists and exporting file1.jpeg,
# dest will be file1 (1).jpeg even though file1.jpeg doesn't exist to prevent sidecar collision
if increment and not update and not overwrite:
dest, count = increment_filename_with_count(dest, count=count)
dest = pathlib.Path(dest)
if options.increment and not options.update and not options.overwrite:
return pathlib.Path(increment_filename(dest))
# if overwrite==False and #increment==False, export should fail if file exists
if dest.exists() and all([not x for x in [increment, update, overwrite]]):
raise FileExistsError(
f"destination exists ({dest}); overwrite={overwrite}, increment={increment}"
)
return dest, count
# if update and file exists, need to check to see if it's the write file by checking export db
if options.update and dest.exists() and src:
export_db = options.export_db
fileutil = options.fileutil
# destination exists, check to see if destination is the right UUID
dest_uuid = export_db.get_uuid_for_file(dest)
if dest_uuid is None and fileutil.cmp(src, dest):
# might be exporting into a pre-ExportDB folder or the DB got deleted
dest_uuid = self.photo.uuid
export_db.set_data(
filename=dest,
uuid=self.photo.uuid,
orig_stat=fileutil.file_sig(dest),
exif_stat=(None, None, None),
converted_stat=(None, None, None),
edited_stat=(None, None, None),
info_json=self.photo.json(),
exif_json=None,
)
if dest_uuid != self.photo.uuid:
# not the right file, find the right one
glob_str = str(dest.parent / f"{dest.stem} (*{dest.suffix}")
# TODO: use the normalized code in utils
dest_files = glob.glob(glob_str)
for file_ in dest_files:
dest_uuid = export_db.get_uuid_for_file(file_)
if dest_uuid == self.photo.uuid:
dest = pathlib.Path(file_)
break
elif dest_uuid is None and fileutil.cmp(src, file_):
# files match, update the UUID
dest = pathlib.Path(file_)
export_db.set_data(
filename=dest,
uuid=self.photo.uuid,
orig_stat=fileutil.file_sig(dest),
exif_stat=(None, None, None),
converted_stat=(None, None, None),
edited_stat=(None, None, None),
info_json=self.photo.json(),
exif_json=None,
)
break
else:
# increment the destination file
dest = pathlib.Path(increment_filename(dest))
# either dest was updated in the if clause above or not updated at all
return dest
def _stage_photos_for_export(self, options: ExportOptions) -> StagedFiles:
"""Stages photos for export
@ -1013,6 +997,28 @@ class PhotoExporter:
return results
def _should_convert_to_jpeg(
self, dest: pathlib.Path, options: ExportOptions
) -> Tuple[pathlib.Path, ExportOptions]:
"""Determine if a file really should be converted to jpeg or not
and return the new destination and ExportOptions instance with the convert_to_jpeg flag set appropriately
"""
if not (options.convert_to_jpeg and self.photo.isphoto):
# nothing to convert
return dest, dataclasses.replace(options, convert_to_jpeg=False)
convert_to_jpeg = False
ext = "." + options.jpeg_ext if options.jpeg_ext else ".jpeg"
if not options.edited and self.photo.uti_original != "public.jpeg":
# not a jpeg but will convert to jpeg upon export so fix file extension
convert_to_jpeg = True
dest = dest.parent / f"{dest.stem}{ext}"
elif options.edited and self.photo.uti != "public.jpeg":
# in Big Sur+, edited HEICs are HEIC
convert_to_jpeg = True
dest = dest.parent / f"{dest.stem}{ext}"
return dest, dataclasses.replace(options, convert_to_jpeg=convert_to_jpeg)
def _is_temp_file(self, filepath: str) -> bool:
"""Returns True if file is in the PhotosExporter temp directory otherwise False"""
filepath = pathlib.Path(filepath)
@ -1191,16 +1197,12 @@ class PhotoExporter:
exif_json=None,
)
if touched_files:
ts = int(self.photo.date.timestamp())
fileutil.utime(dest, (ts, ts))
return ExportResults(
exported=exported_files + update_new_files + update_updated_files,
new=update_new_files,
updated=update_updated_files,
skipped=update_skipped_files,
touched=touched_files,
to_touch=touched_files,
converted_to_jpeg=converted_to_jpeg_files,
)
@ -1321,7 +1323,7 @@ class PhotoExporter:
def _write_exif_metadata_to_files(
self,
exported_file: str,
results: ExportResults,
options: ExportOptions,
) -> ExportResults:
"""Write exif metadata to files using exiftool."""
@ -1330,31 +1332,66 @@ class PhotoExporter:
fileutil = options.fileutil
verbose = options.verbose or self._verbose
results = ExportResults()
if options.update:
files_are_different = False
old_data = export_db.get_exifdata_for_file(exported_file)
if old_data is not None:
old_data = json.loads(old_data)[0]
current_data = json.loads(self._exiftool_json_sidecar(options=options))[
0
]
if old_data != current_data:
files_are_different = True
exif_files = (
results.new + results.updated + results.skipped
if options.update
else results.exported
)
if old_data is None or files_are_different:
# didn't have old data, assume we need to write it
# or files were different
exiftool_results = ExportResults()
for exported_file in exif_files:
if options.update:
files_are_different = False
old_data = export_db.get_exifdata_for_file(exported_file)
if old_data is not None:
old_data = json.loads(old_data)[0]
current_data = json.loads(
self._exiftool_json_sidecar(options=options)
)[0]
if old_data != current_data:
files_are_different = True
if old_data is None or files_are_different:
# didn't have old data, assume we need to write it
# or files were different
verbose(f"Writing metadata with exiftool for {exported_file}")
if not options.dry_run:
warning_, error_ = self._write_exif_data(
exported_file, options=options
)
if warning_:
exiftool_results.exiftool_warning.append(
(exported_file, warning_)
)
if error_:
exiftool_results.exiftool_error.append(
(exported_file, error_)
)
exiftool_results.error.append((exported_file, error_))
export_db.set_exifdata_for_file(
exported_file, self._exiftool_json_sidecar(options=options)
)
export_db.set_stat_exif_for_file(
exported_file, fileutil.file_sig(exported_file)
)
exiftool_results.exif_updated.append(exported_file)
exiftool_results.to_touch.append(exported_file)
else:
verbose(f"Skipped up to date exiftool metadata for {exported_file}")
else:
verbose(f"Writing metadata with exiftool for {exported_file}")
if not options.dry_run:
warning_, error_ = self._write_exif_data(
exported_file, options=options
)
if warning_:
results.exiftool_warning.append((exported_file, warning_))
exiftool_results.exiftool_warning.append(
(exported_file, warning_)
)
if error_:
results.exiftool_error.append((exported_file, error_))
results.error.append((exported_file, error_))
exiftool_results.exiftool_error.append((exported_file, error_))
exiftool_results.error.append((exported_file, error_))
export_db.set_exifdata_for_file(
exported_file, self._exiftool_json_sidecar(options=options)
@ -1362,27 +1399,9 @@ class PhotoExporter:
export_db.set_stat_exif_for_file(
exported_file, fileutil.file_sig(exported_file)
)
results.exif_updated.append(exported_file)
else:
verbose(f"Skipped up to date exiftool metadata for {exported_file}")
else:
verbose(f"Writing metadata with exiftool for {exported_file}")
if not options.dry_run:
warning_, error_ = self._write_exif_data(exported_file, options=options)
if warning_:
results.exiftool_warning.append((exported_file, warning_))
if error_:
results.exiftool_error.append((exported_file, error_))
results.error.append((exported_file, error_))
export_db.set_exifdata_for_file(
exported_file, self._exiftool_json_sidecar(options=options)
)
export_db.set_stat_exif_for_file(
exported_file, fileutil.file_sig(exported_file)
)
results.exif_updated.append(exported_file)
return results
exiftool_results.exif_updated.append(exported_file)
exiftool_results.to_touch.append(exported_file)
return exiftool_results
def _write_exif_data(self, filepath: str, options: ExportOptions):
"""write exif data to image file at filepath

View File

@ -93,7 +93,7 @@ def test_exportresults_iadd():
def test_all_files():
""" test ExportResults.all_files() """
"""test ExportResults.all_files()"""
results = ExportResults()
for x in EXPORT_RESULT_ATTRIBUTES:
setattr(results, x, [f"{x}1"])
@ -106,13 +106,3 @@ def test_all_files():
assert sorted(
results.all_files() + results.deleted_files + results.deleted_directories
) == sorted([f"{x}1" for x in EXPORT_RESULT_ATTRIBUTES])
def test_str():
""" test ExportResults.__str__ """
results = ExportResults()
assert (
str(results)
== "ExportResults(exported=[],new=[],updated=[],skipped=[],exif_updated=[],touched=[],converted_to_jpeg=[],sidecar_json_written=[],sidecar_json_skipped=[],sidecar_exiftool_written=[],sidecar_exiftool_skipped=[],sidecar_xmp_written=[],sidecar_xmp_skipped=[],missing=[],error=[],exiftool_warning=[],exiftool_error=[],deleted_files=[],deleted_directories=[],exported_album=[],skipped_album=[],missing_album=[])"
)