More refactoring of export code, #462
This commit is contained in:
@@ -40,6 +40,7 @@ from .fileutil import FileUtil
|
||||
from .photokit import (
|
||||
PHOTOS_VERSION_CURRENT,
|
||||
PHOTOS_VERSION_ORIGINAL,
|
||||
PHOTOS_VERSION_UNADJUSTED,
|
||||
PhotoKitFetchFailed,
|
||||
PhotoLibrary,
|
||||
)
|
||||
@@ -104,7 +105,8 @@ class ExportOptions:
|
||||
update (bool, default=False): if True export will run in update mode, that is, it will not export the photo if the current version already exists in the destination
|
||||
use_albums_as_keywords (bool, default = False): if True, will include album names in keywords when exporting metadata with exiftool or sidecar
|
||||
use_persons_as_keywords (bool, default = False): if True, will include person names in keywords when exporting metadata with exiftool or sidecar
|
||||
use_photos_export (bool, default=False): if True will attempt to export photo via applescript interaction with Photos
|
||||
use_photos_export (bool, default=False): if True will attempt to export photo via applescript interaction with Photos (see also use_photokit)
|
||||
use_photokit (bool, default=False): if True, will use photokit to export photos when use_photos_export is True
|
||||
verbose (Callable): optional callable function to use for printing verbose text during processing; if None (default), does not print output.
|
||||
"""
|
||||
|
||||
@@ -150,6 +152,54 @@ class ExportOptions:
|
||||
return asdict(self)
|
||||
|
||||
|
||||
class StagedFiles:
|
||||
"""Represents files staged for export"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
original: Optional[str] = None,
|
||||
original_live: Optional[str] = None,
|
||||
edited: Optional[str] = None,
|
||||
edited_live: Optional[str] = None,
|
||||
preview: Optional[str] = None,
|
||||
raw: Optional[str] = None,
|
||||
error: Optional[List[str]] = None,
|
||||
):
|
||||
self.original = original
|
||||
self.original_live = original_live
|
||||
self.edited = edited
|
||||
self.edited_live = edited_live
|
||||
self.preview = preview
|
||||
self.raw = raw
|
||||
self.error = error or []
|
||||
|
||||
# TODO: bursts?
|
||||
|
||||
def __ior__(self, other):
|
||||
self.original = self.original or other.original
|
||||
self.original_live = self.original_live or other.original_live
|
||||
self.edited = self.edited or other.edited
|
||||
self.edited_live = self.edited_live or other.edited_live
|
||||
self.preview = self.preview or other.preview
|
||||
self.raw = self.raw or other.raw
|
||||
self.error += other.error
|
||||
return self
|
||||
|
||||
def __str__(self):
|
||||
return str(self.asdict())
|
||||
|
||||
def asdict(self):
|
||||
return {
|
||||
"original": self.original,
|
||||
"original_live": self.original_live,
|
||||
"edited": self.edited,
|
||||
"edited_live": self.edited_live,
|
||||
"preview": self.preview,
|
||||
"raw": self.raw,
|
||||
"error": self.error,
|
||||
}
|
||||
|
||||
|
||||
class ExportResults:
|
||||
"""Results class which holds export results for export2"""
|
||||
|
||||
@@ -291,6 +341,12 @@ class PhotoExporter:
|
||||
self._render_options = RenderOptions()
|
||||
self._verbose = self.photo._verbose
|
||||
|
||||
# temp directory for staging downloaded missing files
|
||||
self._temp_dir = tempfile.TemporaryDirectory(
|
||||
prefix=f"osxphotos_photo_exporter_{self.photo.uuid}_"
|
||||
)
|
||||
self._temp_dir_path = pathlib.Path(self._temp_dir.name)
|
||||
|
||||
def export(
|
||||
self,
|
||||
dest,
|
||||
@@ -414,6 +470,8 @@ class PhotoExporter:
|
||||
options: Optional[ExportOptions] = None,
|
||||
):
|
||||
"""export photo, like export but with update and dry_run options
|
||||
|
||||
Args:
|
||||
dest: must be valid destination path or exception raised
|
||||
filename: (optional): name of exported picture; if not provided, will use current filename
|
||||
**NOTE**: if provided, user must ensure file extension (suffix) is correct.
|
||||
@@ -423,28 +481,9 @@ class PhotoExporter:
|
||||
in which case export will use the extension provided by Photos upon export.
|
||||
e.g. to get the extension of the edited photo,
|
||||
reference PhotoInfo.path_edited
|
||||
options (ExportOptions): optional ExportOptions instance
|
||||
|
||||
Returns: ExportResults class
|
||||
ExportResults has attributes:
|
||||
"exported",
|
||||
"new",
|
||||
"updated",
|
||||
"skipped",
|
||||
"exif_updated",
|
||||
"touched",
|
||||
"converted_to_jpeg",
|
||||
"sidecar_json_written",
|
||||
"sidecar_json_skipped",
|
||||
"sidecar_exiftool_written",
|
||||
"sidecar_exiftool_skipped",
|
||||
"sidecar_xmp_written",
|
||||
"sidecar_xmp_skipped",
|
||||
"missing",
|
||||
"error",
|
||||
"error_str",
|
||||
"exiftool_warning",
|
||||
"exiftool_error",
|
||||
|
||||
Returns: ExportResults instance
|
||||
|
||||
Note: to use dry run mode, you must set options.dry_run=True and also pass in memory version of export_db,
|
||||
and no-op fileutil (e.g. ExportDBInMemory and FileUtilNoOp) in options.export_db and options.fileutil respectively
|
||||
@@ -452,6 +491,14 @@ class PhotoExporter:
|
||||
|
||||
options = options or ExportOptions()
|
||||
|
||||
verbose = options.verbose or self._verbose
|
||||
if verbose and not callable(verbose):
|
||||
raise TypeError("verbose must be callable")
|
||||
|
||||
# can't use export_as_hardlink with use_photos_export as can't hardlink the temporary files downloaded
|
||||
if options.export_as_hardlink and options.use_photos_export:
|
||||
raise ValueError("Cannot use export_as_hardlink with use_photos_export")
|
||||
|
||||
# when called from export(), won't get an export_db, so use no-op version
|
||||
options.export_db = options.export_db or ExportDBNoOp()
|
||||
export_db = options.export_db
|
||||
@@ -460,10 +507,6 @@ class PhotoExporter:
|
||||
options.fileutil = options.fileutil or FileUtil
|
||||
fileutil = options.fileutil
|
||||
|
||||
verbose = options.verbose or self._verbose
|
||||
if verbose and not callable(verbose):
|
||||
raise TypeError("verbose must be callable")
|
||||
|
||||
self._render_options = options.render_options or RenderOptions()
|
||||
|
||||
# export_original, and export_edited are just used for clarity in the code
|
||||
@@ -515,18 +558,9 @@ class PhotoExporter:
|
||||
self._render_options.filepath = str(dest)
|
||||
all_results = ExportResults()
|
||||
|
||||
if options.use_photos_export:
|
||||
self._export_photo_with_photos_export(
|
||||
dest=dest, all_results=all_results, options=options
|
||||
)
|
||||
else:
|
||||
# find the source file on disk and export
|
||||
# get path to source file and verify it's not None and is valid file
|
||||
# TODO: how to handle ismissing or not hasadjustments and edited=True cases?
|
||||
src = self.photo.path_edited if options.edited else self.photo.path
|
||||
if src and not pathlib.Path(src).is_file():
|
||||
raise FileNotFoundError(f"{src} does not appear to exist")
|
||||
staged_files = self._stage_photos_for_export(options)
|
||||
|
||||
src = staged_files.edited if options.edited else staged_files.original
|
||||
if src:
|
||||
# found source now try to find right destination
|
||||
if options.update and dest.exists():
|
||||
@@ -585,10 +619,10 @@ class PhotoExporter:
|
||||
export_original
|
||||
and options.live_photo
|
||||
and self.photo.live_photo
|
||||
and self.photo.path_live_photo
|
||||
and staged_files.original_live
|
||||
):
|
||||
live_name = dest.parent / f"{dest.stem}.mov"
|
||||
src_live = self.photo.path_live_photo
|
||||
src_live = staged_files.original_live
|
||||
results = self._export_photo(
|
||||
src_live,
|
||||
live_name,
|
||||
@@ -601,10 +635,10 @@ class PhotoExporter:
|
||||
export_edited
|
||||
and options.live_photo
|
||||
and self.photo.live_photo
|
||||
and self.photo.path_edited_live_photo
|
||||
and staged_files.edited_live
|
||||
):
|
||||
live_name = dest.parent / f"{dest.stem}.mov"
|
||||
src_live = self.photo.path_edited_live_photo
|
||||
src_live = staged_files.edited_live
|
||||
results = self._export_photo(
|
||||
src_live,
|
||||
live_name,
|
||||
@@ -614,8 +648,8 @@ class PhotoExporter:
|
||||
all_results += results
|
||||
|
||||
# copy associated RAW image if requested
|
||||
if options.raw_photo and self.photo.has_raw and self.photo.path_raw:
|
||||
raw_path = pathlib.Path(self.photo.path_raw)
|
||||
if options.raw_photo and self.photo.has_raw and staged_files.raw:
|
||||
raw_path = pathlib.Path(staged_files.raw)
|
||||
raw_ext = raw_path.suffix
|
||||
raw_name = dest.parent / f"{dest.stem}{raw_ext}"
|
||||
if raw_path is not None:
|
||||
@@ -627,10 +661,10 @@ class PhotoExporter:
|
||||
all_results += results
|
||||
|
||||
# copy preview image if requested
|
||||
if options.preview and self.photo.path_derivatives:
|
||||
if options.preview and staged_files.preview:
|
||||
# Photos keeps multiple different derivatives and path_derivatives returns list of them
|
||||
# first derivative is the largest so export that one
|
||||
preview_path = pathlib.Path(self.photo.path_derivatives[0])
|
||||
preview_path = pathlib.Path(staged_files.preview)
|
||||
preview_ext = preview_path.suffix
|
||||
preview_name = (
|
||||
dest.parent / f"{dest.stem}{options.preview_suffix}{preview_ext}"
|
||||
@@ -721,25 +755,275 @@ class PhotoExporter:
|
||||
)
|
||||
return dest, count
|
||||
|
||||
def _export_photo_with_photos_export(
|
||||
def _stage_photos_for_export(self, options: ExportOptions) -> StagedFiles:
|
||||
"""Stages photos for export
|
||||
|
||||
If photo is present on disk in the library, uses path to the photo on disk.
|
||||
If photo is missing and use_photos_export is true, downloads the photo from iCloud to temporary location.
|
||||
"""
|
||||
|
||||
# TODO: this changes behavior in that Photos download is only called if file is actually missing
|
||||
# Need an option to force download if user wants to only use Photos export
|
||||
|
||||
staged = StagedFiles()
|
||||
|
||||
if options.raw_photo and self.photo.has_raw:
|
||||
staged.raw = self.photo.path_raw
|
||||
|
||||
if options.preview and self.photo.path_derivatives:
|
||||
staged.preview = self.photo.path_derivatives[0]
|
||||
|
||||
if not options.edited:
|
||||
# original file
|
||||
if self.photo.path:
|
||||
staged.original = self.photo.path
|
||||
if options.live_photo and self.photo.live_photo:
|
||||
staged.original_live = self.photo.path_live_photo
|
||||
|
||||
if options.edited:
|
||||
# edited file
|
||||
staged.edited = self.photo.path_edited
|
||||
if options.live_photo and self.photo.live_photo:
|
||||
staged.edited_live = self.photo.path_edited_live_photo
|
||||
|
||||
# download any missing files
|
||||
if options.use_photos_export:
|
||||
live_photo = staged.edited_live if options.edited else staged.original_live
|
||||
missing_options = ExportOptions(
|
||||
edited=options.edited,
|
||||
# TODO: missing previews are not generated/downloaded
|
||||
preview=options.preview and not staged.preview,
|
||||
raw_photo=options.raw_photo and not staged.raw,
|
||||
live_photo=options.live_photo and not live_photo,
|
||||
)
|
||||
if options.use_photokit:
|
||||
missing_staged = self._stage_photo_for_export_with_photokit(
|
||||
options=missing_options
|
||||
)
|
||||
else:
|
||||
missing_staged = self._stage_photo_for_export_with_applescript(
|
||||
options=missing_options
|
||||
)
|
||||
staged |= missing_staged
|
||||
return staged
|
||||
|
||||
# def _export_photo_with_photos_export(
|
||||
# self,
|
||||
# dest: pathlib.Path,
|
||||
# all_results: ExportResults,
|
||||
# options: ExportOptions,
|
||||
# ):
|
||||
# # TODO: if using applescript and exporting edited with live_photo doesn't seem to export the edited live photo
|
||||
# # this does work with photokit, but not with applescript
|
||||
# # TODO: duplicative code with the if edited/else--remove it
|
||||
# fileutil = options.fileutil
|
||||
# export_db = options.export_db
|
||||
|
||||
# # export live_photo .mov file?
|
||||
# live_photo = bool(options.live_photo and self.photo.live_photo)
|
||||
# overwrite = options.overwrite or options.update
|
||||
# if options.edited or self.photo.shared:
|
||||
# # exported edited version and not original
|
||||
# # shared photos (in shared albums) show up as not having adjustments (not edited)
|
||||
# # but Photos is unable to export the "original" as only a jpeg copy is shared in iCloud
|
||||
# # so tell Photos to export the current version in this case
|
||||
# # didn't get passed a filename, add _edited
|
||||
# uti = (
|
||||
# self.photo.uti_edited
|
||||
# if options.edited and self.photo.uti_edited
|
||||
# else self.photo.uti
|
||||
# )
|
||||
# ext = get_preferred_uti_extension(uti)
|
||||
# dest = dest.parent / f"{dest.stem}.{ext}"
|
||||
|
||||
# if options.use_photokit:
|
||||
# photolib = PhotoLibrary()
|
||||
# photo = None
|
||||
# try:
|
||||
# photo = photolib.fetch_uuid(self.photo.uuid)
|
||||
# except PhotoKitFetchFailed as e:
|
||||
# # if failed to find UUID, might be a burst photo
|
||||
# if self.photo.burst and self.photo._info["burstUUID"]:
|
||||
# bursts = photolib.fetch_burst_uuid(
|
||||
# self.photo._info["burstUUID"], all=True
|
||||
# )
|
||||
# # PhotoKit UUIDs may contain "/L0/001" so only look at beginning
|
||||
# photo = [
|
||||
# p for p in bursts if p.uuid.startswith(self.photo.uuid)
|
||||
# ]
|
||||
# photo = photo[0] if photo else None
|
||||
# if not photo:
|
||||
# all_results.error.append(
|
||||
# (
|
||||
# str(dest),
|
||||
# f"PhotoKitFetchFailed exception exporting photo {self.photo.uuid}: {e} ({lineno(__file__)})",
|
||||
# )
|
||||
# )
|
||||
# if photo:
|
||||
# if options.dry_run:
|
||||
# # dry_run, don't actually export
|
||||
# all_results.exported.append(str(dest))
|
||||
# else:
|
||||
# try:
|
||||
# exported = photo.export(
|
||||
# dest.parent,
|
||||
# dest.name,
|
||||
# version=PHOTOS_VERSION_CURRENT,
|
||||
# overwrite=overwrite,
|
||||
# video=live_photo,
|
||||
# )
|
||||
# all_results.exported.extend(exported)
|
||||
# except Exception as e:
|
||||
# all_results.error.append(
|
||||
# (str(dest), f"{e} ({lineno(__file__)})")
|
||||
# )
|
||||
# else:
|
||||
# try:
|
||||
# exported = _export_photo_uuid_applescript(
|
||||
# self.photo.uuid,
|
||||
# dest.parent,
|
||||
# filestem=dest.stem,
|
||||
# original=False,
|
||||
# edited=True,
|
||||
# live_photo=live_photo,
|
||||
# timeout=options.timeout,
|
||||
# burst=self.photo.burst,
|
||||
# dry_run=options.dry_run,
|
||||
# overwrite=overwrite,
|
||||
# )
|
||||
# all_results.exported.extend(exported)
|
||||
# except ExportError as e:
|
||||
# all_results.error.append((str(dest), f"{e} ({lineno(__file__)})"))
|
||||
# else:
|
||||
# # export original version and not edited
|
||||
# if options.use_photokit:
|
||||
# photolib = PhotoLibrary()
|
||||
# photo = None
|
||||
# try:
|
||||
# photo = photolib.fetch_uuid(self.photo.uuid)
|
||||
# except PhotoKitFetchFailed:
|
||||
# # if failed to find UUID, might be a burst photo
|
||||
# if self.photo.burst and self.photo._info["burstUUID"]:
|
||||
# bursts = photolib.fetch_burst_uuid(
|
||||
# self.photo._info["burstUUID"], all=True
|
||||
# )
|
||||
# # PhotoKit UUIDs may contain "/L0/001" so only look at beginning
|
||||
# photo = [
|
||||
# p for p in bursts if p.uuid.startswith(self.photo.uuid)
|
||||
# ]
|
||||
# photo = photo[0] if photo else None
|
||||
# if photo:
|
||||
# if not options.dry_run:
|
||||
# try:
|
||||
# exported = photo.export(
|
||||
# dest.parent,
|
||||
# dest.name,
|
||||
# version=PHOTOS_VERSION_ORIGINAL,
|
||||
# overwrite=overwrite,
|
||||
# video=live_photo,
|
||||
# )
|
||||
# all_results.exported.extend(exported)
|
||||
# except Exception as e:
|
||||
# all_results.error.append(
|
||||
# (str(dest), f"{e} ({lineno(__file__)})")
|
||||
# )
|
||||
# else:
|
||||
# # dry_run, don't actually export
|
||||
# all_results.exported.append(str(dest))
|
||||
# else:
|
||||
# try:
|
||||
# exported = _export_photo_uuid_applescript(
|
||||
# self.photo.uuid,
|
||||
# dest.parent,
|
||||
# filestem=dest.stem,
|
||||
# original=True,
|
||||
# edited=False,
|
||||
# live_photo=live_photo,
|
||||
# timeout=options.timeout,
|
||||
# burst=self.photo.burst,
|
||||
# dry_run=options.dry_run,
|
||||
# overwrite=overwrite,
|
||||
# )
|
||||
# all_results.exported.extend(exported)
|
||||
# except ExportError as e:
|
||||
# all_results.error.append((str(dest), f"{e} ({lineno(__file__)})"))
|
||||
# if all_results.exported:
|
||||
# for idx, photopath in enumerate(all_results.exported):
|
||||
# converted_stat = (None, None, None)
|
||||
# photopath = pathlib.Path(photopath)
|
||||
# if (
|
||||
# options.convert_to_jpeg
|
||||
# and self.photo.isphoto
|
||||
# and photopath.suffix.lower() not in LIVE_VIDEO_EXTENSIONS
|
||||
# ):
|
||||
# dest_str = photopath.parent / f"{photopath.stem}.jpeg"
|
||||
# fileutil.convert_to_jpeg(
|
||||
# photopath,
|
||||
# dest_str,
|
||||
# compression_quality=options.jpeg_quality,
|
||||
# )
|
||||
# converted_stat = fileutil.file_sig(dest_str)
|
||||
# fileutil.unlink(photopath)
|
||||
# all_results.exported[idx] = dest_str
|
||||
# all_results.converted_to_jpeg.append(dest_str)
|
||||
# photopath = dest_str
|
||||
|
||||
# photopath = str(photopath)
|
||||
# export_db.set_data(
|
||||
# filename=photopath,
|
||||
# uuid=self.photo.uuid,
|
||||
# orig_stat=fileutil.file_sig(photopath),
|
||||
# exif_stat=(None, None, None),
|
||||
# converted_stat=converted_stat,
|
||||
# edited_stat=(None, None, None),
|
||||
# info_json=self.photo.json(),
|
||||
# exif_json=None,
|
||||
# )
|
||||
|
||||
# # todo: handle signatures
|
||||
# if options.jpeg_ext:
|
||||
# # use_photos_export (both PhotoKit and AppleScript) don't use the
|
||||
# # file extension provided (instead they use extension for UTI)
|
||||
# # so if jpeg_ext is set, rename any non-conforming jpegs
|
||||
# all_results.exported = rename_jpeg_files(
|
||||
# all_results.exported, options.jpeg_ext, fileutil
|
||||
# )
|
||||
# if options.touch_file:
|
||||
# for exported_file in all_results.exported:
|
||||
# all_results.touched.append(exported_file)
|
||||
# ts = int(self.photo.date.timestamp())
|
||||
# fileutil.utime(exported_file, (ts, ts))
|
||||
# if options.update:
|
||||
# all_results.new.extend(all_results.exported)
|
||||
|
||||
def _stage_photo_for_export_with_photokit(
|
||||
self,
|
||||
dest: str,
|
||||
all_results: ExportResults,
|
||||
options: ExportOptions,
|
||||
):
|
||||
# TODO: duplicative code with the if edited/else--remove it
|
||||
fileutil = options.fileutil
|
||||
export_db = options.export_db
|
||||
) -> StagedFiles:
|
||||
"""Stage a photo for export with photokit to a temporary directory"""
|
||||
|
||||
if options.edited and not self.photo.hasadjustments:
|
||||
raise ValueError("Edited version requested but photo has no adjustments")
|
||||
|
||||
dest = self._temp_dir_path / self.photo.original_filename
|
||||
|
||||
# export live_photo .mov file?
|
||||
live_photo = bool(options.live_photo and self.photo.live_photo)
|
||||
|
||||
overwrite = options.overwrite or options.update
|
||||
|
||||
# figure out which photo version to request
|
||||
if options.edited or self.photo.shared:
|
||||
# exported edited version and not original
|
||||
# shared photos (in shared albums) show up as not having adjustments (not edited)
|
||||
# but Photos is unable to export the "original" as only a jpeg copy is shared in iCloud
|
||||
# so tell Photos to export the current version in this case
|
||||
# didn't get passed a filename, add _edited
|
||||
photos_version = PHOTOS_VERSION_CURRENT
|
||||
elif self.photo.has_raw:
|
||||
# PhotoKit always returns the raw photo of raw+jpeg pair for PHOTOS_VERSION_ORIGINAL even if JPEG is the original
|
||||
photos_version = PHOTOS_VERSION_UNADJUSTED
|
||||
else:
|
||||
photos_version = PHOTOS_VERSION_ORIGINAL
|
||||
|
||||
uti = (
|
||||
self.photo.uti_edited
|
||||
if options.edited and self.photo.uti_edited
|
||||
@@ -748,8 +1032,8 @@ class PhotoExporter:
|
||||
ext = get_preferred_uti_extension(uti)
|
||||
dest = dest.parent / f"{dest.stem}.{ext}"
|
||||
|
||||
if options.use_photokit:
|
||||
photolib = PhotoLibrary()
|
||||
results = StagedFiles()
|
||||
photo = None
|
||||
try:
|
||||
photo = photolib.fetch_uuid(self.photo.uuid)
|
||||
@@ -760,153 +1044,143 @@ class PhotoExporter:
|
||||
self.photo._info["burstUUID"], all=True
|
||||
)
|
||||
# PhotoKit UUIDs may contain "/L0/001" so only look at beginning
|
||||
photo = [
|
||||
p for p in bursts if p.uuid.startswith(self.photo.uuid)
|
||||
]
|
||||
photo = [p for p in bursts if p.uuid.startswith(self.photo.uuid)]
|
||||
photo = photo[0] if photo else None
|
||||
if not photo:
|
||||
all_results.error.append(
|
||||
results.error.append(
|
||||
(
|
||||
str(dest),
|
||||
f"PhotoKitFetchFailed exception exporting photo {self.photo.uuid}: {e} ({lineno(__file__)})",
|
||||
)
|
||||
)
|
||||
if photo:
|
||||
if options.dry_run:
|
||||
# dry_run, don't actually export
|
||||
all_results.exported.append(str(dest))
|
||||
else:
|
||||
return results
|
||||
|
||||
# now export the requested version of the photo
|
||||
try:
|
||||
exported = photo.export(
|
||||
dest.parent,
|
||||
dest.name,
|
||||
version=PHOTOS_VERSION_CURRENT,
|
||||
version=photos_version,
|
||||
overwrite=overwrite,
|
||||
video=live_photo,
|
||||
)
|
||||
all_results.exported.extend(exported)
|
||||
except Exception as e:
|
||||
all_results.error.append(
|
||||
(str(dest), f"{e} ({lineno(__file__)})")
|
||||
if len(exported) == 1:
|
||||
results_attr = "edited" if options.edited else "original"
|
||||
setattr(results, results_attr, exported[0])
|
||||
elif len(exported) == 2:
|
||||
for exported_file in exported:
|
||||
if exported_file.lower().endswith(".mov"):
|
||||
# live photo
|
||||
results_attr = (
|
||||
"edited_live" if options.edited else "original_live"
|
||||
)
|
||||
else:
|
||||
results_attr = "edited" if options.edited else "original"
|
||||
setattr(results, results_attr, exported_file)
|
||||
except Exception as e:
|
||||
results.error.append((str(dest), f"{e} ({lineno(__file__)})"))
|
||||
|
||||
if options.raw_photo and self.photo.has_raw:
|
||||
# also request the raw photo
|
||||
try:
|
||||
exported = photo.export(
|
||||
dest.parent,
|
||||
dest.name,
|
||||
version=photos_version,
|
||||
raw=True,
|
||||
overwrite=overwrite,
|
||||
video=live_photo,
|
||||
)
|
||||
if exported:
|
||||
results.raw = exported[0]
|
||||
except Exception as e:
|
||||
results.error.append((str(dest), f"{e} ({lineno(__file__)})"))
|
||||
|
||||
return results
|
||||
|
||||
def _stage_photo_for_export_with_applescript(
|
||||
self,
|
||||
options: ExportOptions,
|
||||
) -> StagedFiles:
|
||||
"""Stage a photo for export with AppleScript to a temporary directory
|
||||
|
||||
Note: If exporting an edited live photo, the associated live video will not be exported.
|
||||
This is a limitation of the Photos AppleScript interface and Photos behaves the same way."""
|
||||
|
||||
if options.edited and not self.photo.hasadjustments:
|
||||
raise ValueError("Edited version requested but photo has no adjustments")
|
||||
|
||||
dest = self._temp_dir_path / self.photo.original_filename
|
||||
dest = pathlib.Path(increment_filename(dest))
|
||||
|
||||
# export live_photo .mov file?
|
||||
live_photo = bool(options.live_photo and self.photo.live_photo)
|
||||
overwrite = options.overwrite or options.update
|
||||
edited_version = options.edited or self.photo.shared
|
||||
# shared photos (in shared albums) show up as not having adjustments (not edited)
|
||||
# but Photos is unable to export the "original" as only a jpeg copy is shared in iCloud
|
||||
# so tell Photos to export the current version in this case
|
||||
uti = (
|
||||
self.photo.uti_edited
|
||||
if options.edited and self.photo.uti_edited
|
||||
else self.photo.uti
|
||||
)
|
||||
ext = get_preferred_uti_extension(uti)
|
||||
dest = dest.parent / f"{dest.stem}.{ext}"
|
||||
|
||||
results = StagedFiles()
|
||||
|
||||
try:
|
||||
exported = _export_photo_uuid_applescript(
|
||||
self.photo.uuid,
|
||||
dest.parent,
|
||||
filestem=dest.stem,
|
||||
original=False,
|
||||
edited=True,
|
||||
original=not edited_version,
|
||||
edited=edited_version,
|
||||
live_photo=live_photo,
|
||||
timeout=options.timeout,
|
||||
burst=self.photo.burst,
|
||||
dry_run=options.dry_run,
|
||||
overwrite=overwrite,
|
||||
)
|
||||
all_results.exported.extend(exported)
|
||||
except ExportError as e:
|
||||
all_results.error.append((str(dest), f"{e} ({lineno(__file__)})"))
|
||||
else:
|
||||
# export original version and not edited
|
||||
if options.use_photokit:
|
||||
photolib = PhotoLibrary()
|
||||
photo = None
|
||||
try:
|
||||
photo = photolib.fetch_uuid(self.photo.uuid)
|
||||
except PhotoKitFetchFailed:
|
||||
# if failed to find UUID, might be a burst photo
|
||||
if self.photo.burst and self.photo._info["burstUUID"]:
|
||||
bursts = photolib.fetch_burst_uuid(
|
||||
self.photo._info["burstUUID"], all=True
|
||||
)
|
||||
# PhotoKit UUIDs may contain "/L0/001" so only look at beginning
|
||||
photo = [
|
||||
p for p in bursts if p.uuid.startswith(self.photo.uuid)
|
||||
]
|
||||
photo = photo[0] if photo else None
|
||||
if photo:
|
||||
if not options.dry_run:
|
||||
try:
|
||||
exported = photo.export(
|
||||
dest.parent,
|
||||
dest.name,
|
||||
version=PHOTOS_VERSION_ORIGINAL,
|
||||
overwrite=overwrite,
|
||||
video=live_photo,
|
||||
)
|
||||
all_results.exported.extend(exported)
|
||||
except Exception as e:
|
||||
all_results.error.append(
|
||||
(str(dest), f"{e} ({lineno(__file__)})")
|
||||
)
|
||||
else:
|
||||
# dry_run, don't actually export
|
||||
all_results.exported.append(str(dest))
|
||||
else:
|
||||
try:
|
||||
exported = _export_photo_uuid_applescript(
|
||||
self.photo.uuid,
|
||||
dest.parent,
|
||||
filestem=dest.stem,
|
||||
original=True,
|
||||
edited=False,
|
||||
live_photo=live_photo,
|
||||
timeout=options.timeout,
|
||||
burst=self.photo.burst,
|
||||
dry_run=options.dry_run,
|
||||
overwrite=overwrite,
|
||||
)
|
||||
all_results.exported.extend(exported)
|
||||
except ExportError as e:
|
||||
all_results.error.append((str(dest), f"{e} ({lineno(__file__)})"))
|
||||
if all_results.exported:
|
||||
for idx, photopath in enumerate(all_results.exported):
|
||||
converted_stat = (None, None, None)
|
||||
photopath = pathlib.Path(photopath)
|
||||
if (
|
||||
options.convert_to_jpeg
|
||||
and self.photo.isphoto
|
||||
and photopath.suffix.lower() not in LIVE_VIDEO_EXTENSIONS
|
||||
):
|
||||
dest_str = photopath.parent / f"{photopath.stem}.jpeg"
|
||||
fileutil.convert_to_jpeg(
|
||||
photopath,
|
||||
dest_str,
|
||||
compression_quality=options.jpeg_quality,
|
||||
)
|
||||
converted_stat = fileutil.file_sig(dest_str)
|
||||
fileutil.unlink(photopath)
|
||||
all_results.exported[idx] = dest_str
|
||||
all_results.converted_to_jpeg.append(dest_str)
|
||||
photopath = dest_str
|
||||
results.error.append((str(dest), f"{e} ({lineno(__file__)})"))
|
||||
return results
|
||||
|
||||
photopath = str(photopath)
|
||||
export_db.set_data(
|
||||
filename=photopath,
|
||||
uuid=self.photo.uuid,
|
||||
orig_stat=fileutil.file_sig(photopath),
|
||||
exif_stat=(None, None, None),
|
||||
converted_stat=converted_stat,
|
||||
edited_stat=(None, None, None),
|
||||
info_json=self.photo.json(),
|
||||
exif_json=None,
|
||||
if len(exported) == 1:
|
||||
results_attr = "edited" if options.edited else "original"
|
||||
setattr(results, results_attr, exported[0])
|
||||
elif len(exported) == 2:
|
||||
# could be live or raw+jpeg
|
||||
for exported_file in exported:
|
||||
if exported_file.lower().endswith(".mov"):
|
||||
# live photo
|
||||
results_attr = (
|
||||
"edited_live"
|
||||
if live_photo and options.edited
|
||||
else "original_live"
|
||||
if live_photo
|
||||
else None
|
||||
)
|
||||
elif self.photo.has_raw and pathlib.Path(
|
||||
exported_file.lower()
|
||||
).suffix not in [
|
||||
".jpg",
|
||||
".jpeg",
|
||||
".heic",
|
||||
]:
|
||||
# assume raw photo if not a common non-raw image format
|
||||
results_attr = "raw" if options.raw_photo else None
|
||||
else:
|
||||
results_attr = "edited" if options.edited else "original"
|
||||
if results_attr:
|
||||
setattr(results, results_attr, exported_file)
|
||||
|
||||
# todo: handle signatures
|
||||
if options.jpeg_ext:
|
||||
# use_photos_export (both PhotoKit and AppleScript) don't use the
|
||||
# file extension provided (instead they use extension for UTI)
|
||||
# so if jpeg_ext is set, rename any non-conforming jpegs
|
||||
all_results.exported = rename_jpeg_files(
|
||||
all_results.exported, options.jpeg_ext, fileutil
|
||||
)
|
||||
if options.touch_file:
|
||||
for exported_file in all_results.exported:
|
||||
all_results.touched.append(exported_file)
|
||||
ts = int(self.photo.date.timestamp())
|
||||
fileutil.utime(exported_file, (ts, ts))
|
||||
if options.update:
|
||||
all_results.new.extend(all_results.exported)
|
||||
return results
|
||||
|
||||
def _is_temp_file(self, filepath: str) -> bool:
|
||||
"""Returns True if file is in the PhotosExporter temp directory otherwise False"""
|
||||
filepath = pathlib.Path(filepath)
|
||||
return filepath.parent == self._temp_dir_path
|
||||
|
||||
def _export_photo(
|
||||
self,
|
||||
@@ -918,7 +1192,7 @@ class PhotoExporter:
|
||||
Does the actual copy or hardlink taking the appropriate
|
||||
action depending on update, overwrite, export_as_hardlink
|
||||
Assumes destination is the right destination (e.g. UUID matches)
|
||||
sets UUID and JSON info foo exported file using set_uuid_for_file, set_info_for_uuid
|
||||
sets UUID and JSON info for exported file using set_uuid_for_file, set_info_for_uuid
|
||||
|
||||
Args:
|
||||
src (str): src path
|
||||
@@ -937,6 +1211,9 @@ class PhotoExporter:
|
||||
"export_as_hardlink and convert_to_jpeg cannot both be True"
|
||||
)
|
||||
|
||||
if options.export_as_hardlink and self._is_temp_file(src):
|
||||
raise ValueError("export_as_hardlink cannot be used with temp files")
|
||||
|
||||
exported_files = []
|
||||
update_updated_files = []
|
||||
update_new_files = []
|
||||
|
||||
Reference in New Issue
Block a user