diff --git a/osxphotos/photoexporter.py b/osxphotos/photoexporter.py index e6313494..33e81c7c 100644 --- a/osxphotos/photoexporter.py +++ b/osxphotos/photoexporter.py @@ -40,6 +40,7 @@ from .fileutil import FileUtil from .photokit import ( PHOTOS_VERSION_CURRENT, PHOTOS_VERSION_ORIGINAL, + PHOTOS_VERSION_UNADJUSTED, PhotoKitFetchFailed, PhotoLibrary, ) @@ -104,7 +105,8 @@ class ExportOptions: update (bool, default=False): if True export will run in update mode, that is, it will not export the photo if the current version already exists in the destination use_albums_as_keywords (bool, default = False): if True, will include album names in keywords when exporting metadata with exiftool or sidecar use_persons_as_keywords (bool, default = False): if True, will include person names in keywords when exporting metadata with exiftool or sidecar - use_photos_export (bool, default=False): if True will attempt to export photo via applescript interaction with Photos + use_photos_export (bool, default=False): if True will attempt to export photo via applescript interaction with Photos (see also use_photokit) + use_photokit (bool, default=False): if True, will use photokit to export photos when use_photos_export is True verbose (Callable): optional callable function to use for printing verbose text during processing; if None (default), does not print output. """ @@ -150,6 +152,54 @@ class ExportOptions: return asdict(self) +class StagedFiles: + """Represents files staged for export""" + + def __init__( + self, + original: Optional[str] = None, + original_live: Optional[str] = None, + edited: Optional[str] = None, + edited_live: Optional[str] = None, + preview: Optional[str] = None, + raw: Optional[str] = None, + error: Optional[List[str]] = None, + ): + self.original = original + self.original_live = original_live + self.edited = edited + self.edited_live = edited_live + self.preview = preview + self.raw = raw + self.error = error or [] + + # TODO: bursts? + + def __ior__(self, other): + self.original = self.original or other.original + self.original_live = self.original_live or other.original_live + self.edited = self.edited or other.edited + self.edited_live = self.edited_live or other.edited_live + self.preview = self.preview or other.preview + self.raw = self.raw or other.raw + self.error += other.error + return self + + def __str__(self): + return str(self.asdict()) + + def asdict(self): + return { + "original": self.original, + "original_live": self.original_live, + "edited": self.edited, + "edited_live": self.edited_live, + "preview": self.preview, + "raw": self.raw, + "error": self.error, + } + + class ExportResults: """Results class which holds export results for export2""" @@ -291,6 +341,12 @@ class PhotoExporter: self._render_options = RenderOptions() self._verbose = self.photo._verbose + # temp directory for staging downloaded missing files + self._temp_dir = tempfile.TemporaryDirectory( + prefix=f"osxphotos_photo_exporter_{self.photo.uuid}_" + ) + self._temp_dir_path = pathlib.Path(self._temp_dir.name) + def export( self, dest, @@ -414,8 +470,10 @@ class PhotoExporter: options: Optional[ExportOptions] = None, ): """export photo, like export but with update and dry_run options - dest: must be valid destination path or exception raised - filename: (optional): name of exported picture; if not provided, will use current filename + + Args: + dest: must be valid destination path or exception raised + filename: (optional): name of exported picture; if not provided, will use current filename **NOTE**: if provided, user must ensure file extension (suffix) is correct. For example, if photo is .CR2 file, edited image may be .jpeg. If you provide an extension different than what the actual file is, @@ -423,28 +481,9 @@ class PhotoExporter: in which case export will use the extension provided by Photos upon export. e.g. to get the extension of the edited photo, reference PhotoInfo.path_edited + options (ExportOptions): optional ExportOptions instance - Returns: ExportResults class - ExportResults has attributes: - "exported", - "new", - "updated", - "skipped", - "exif_updated", - "touched", - "converted_to_jpeg", - "sidecar_json_written", - "sidecar_json_skipped", - "sidecar_exiftool_written", - "sidecar_exiftool_skipped", - "sidecar_xmp_written", - "sidecar_xmp_skipped", - "missing", - "error", - "error_str", - "exiftool_warning", - "exiftool_error", - + Returns: ExportResults instance Note: to use dry run mode, you must set options.dry_run=True and also pass in memory version of export_db, and no-op fileutil (e.g. ExportDBInMemory and FileUtilNoOp) in options.export_db and options.fileutil respectively @@ -452,6 +491,14 @@ class PhotoExporter: options = options or ExportOptions() + verbose = options.verbose or self._verbose + if verbose and not callable(verbose): + raise TypeError("verbose must be callable") + + # can't use export_as_hardlink with use_photos_export as can't hardlink the temporary files downloaded + if options.export_as_hardlink and options.use_photos_export: + raise ValueError("Cannot use export_as_hardlink with use_photos_export") + # when called from export(), won't get an export_db, so use no-op version options.export_db = options.export_db or ExportDBNoOp() export_db = options.export_db @@ -460,10 +507,6 @@ class PhotoExporter: options.fileutil = options.fileutil or FileUtil fileutil = options.fileutil - verbose = options.verbose or self._verbose - if verbose and not callable(verbose): - raise TypeError("verbose must be callable") - self._render_options = options.render_options or RenderOptions() # export_original, and export_edited are just used for clarity in the code @@ -515,142 +558,133 @@ class PhotoExporter: self._render_options.filepath = str(dest) all_results = ExportResults() - if options.use_photos_export: - self._export_photo_with_photos_export( - dest=dest, all_results=all_results, options=options + staged_files = self._stage_photos_for_export(options) + + src = staged_files.edited if options.edited else staged_files.original + if src: + # found source now try to find right destination + if options.update and dest.exists(): + # destination exists, check to see if destination is the right UUID + dest_uuid = export_db.get_uuid_for_file(dest) + if dest_uuid is None and fileutil.cmp(src, dest): + # might be exporting into a pre-ExportDB folder or the DB got deleted + dest_uuid = self.photo.uuid + export_db.set_data( + filename=dest, + uuid=self.photo.uuid, + orig_stat=fileutil.file_sig(dest), + exif_stat=(None, None, None), + converted_stat=(None, None, None), + edited_stat=(None, None, None), + info_json=self.photo.json(), + exif_json=None, + ) + if dest_uuid != self.photo.uuid: + # not the right file, find the right one + glob_str = str(dest.parent / f"{dest.stem} (*{dest.suffix}") + dest_files = glob.glob(glob_str) + for file_ in dest_files: + dest_uuid = export_db.get_uuid_for_file(file_) + if dest_uuid == self.photo.uuid: + dest = pathlib.Path(file_) + break + elif dest_uuid is None and fileutil.cmp(src, file_): + # files match, update the UUID + dest = pathlib.Path(file_) + export_db.set_data( + filename=dest, + uuid=self.photo.uuid, + orig_stat=fileutil.file_sig(dest), + exif_stat=(None, None, None), + converted_stat=(None, None, None), + edited_stat=(None, None, None), + info_json=self.photo.json(), + exif_json=None, + ) + break + else: + # increment the destination file + dest = pathlib.Path(increment_filename(dest)) + + # export the dest file + results = self._export_photo( + src, + dest, + options=options, ) - else: - # find the source file on disk and export - # get path to source file and verify it's not None and is valid file - # TODO: how to handle ismissing or not hasadjustments and edited=True cases? - src = self.photo.path_edited if options.edited else self.photo.path - if src and not pathlib.Path(src).is_file(): - raise FileNotFoundError(f"{src} does not appear to exist") + all_results += results - if src: - # found source now try to find right destination - if options.update and dest.exists(): - # destination exists, check to see if destination is the right UUID - dest_uuid = export_db.get_uuid_for_file(dest) - if dest_uuid is None and fileutil.cmp(src, dest): - # might be exporting into a pre-ExportDB folder or the DB got deleted - dest_uuid = self.photo.uuid - export_db.set_data( - filename=dest, - uuid=self.photo.uuid, - orig_stat=fileutil.file_sig(dest), - exif_stat=(None, None, None), - converted_stat=(None, None, None), - edited_stat=(None, None, None), - info_json=self.photo.json(), - exif_json=None, - ) - if dest_uuid != self.photo.uuid: - # not the right file, find the right one - glob_str = str(dest.parent / f"{dest.stem} (*{dest.suffix}") - dest_files = glob.glob(glob_str) - for file_ in dest_files: - dest_uuid = export_db.get_uuid_for_file(file_) - if dest_uuid == self.photo.uuid: - dest = pathlib.Path(file_) - break - elif dest_uuid is None and fileutil.cmp(src, file_): - # files match, update the UUID - dest = pathlib.Path(file_) - export_db.set_data( - filename=dest, - uuid=self.photo.uuid, - orig_stat=fileutil.file_sig(dest), - exif_stat=(None, None, None), - converted_stat=(None, None, None), - edited_stat=(None, None, None), - info_json=self.photo.json(), - exif_json=None, - ) - break - else: - # increment the destination file - dest = pathlib.Path(increment_filename(dest)) + # copy live photo associated .mov if requested + if ( + export_original + and options.live_photo + and self.photo.live_photo + and staged_files.original_live + ): + live_name = dest.parent / f"{dest.stem}.mov" + src_live = staged_files.original_live + results = self._export_photo( + src_live, + live_name, + # don't try to convert the live photo + options=dataclasses.replace(options, convert_to_jpeg=False), + ) + all_results += results - # export the dest file + if ( + export_edited + and options.live_photo + and self.photo.live_photo + and staged_files.edited_live + ): + live_name = dest.parent / f"{dest.stem}.mov" + src_live = staged_files.edited_live + results = self._export_photo( + src_live, + live_name, + # don't try to convert the live photo + options=dataclasses.replace(options, convert_to_jpeg=False), + ) + all_results += results + + # copy associated RAW image if requested + if options.raw_photo and self.photo.has_raw and staged_files.raw: + raw_path = pathlib.Path(staged_files.raw) + raw_ext = raw_path.suffix + raw_name = dest.parent / f"{dest.stem}{raw_ext}" + if raw_path is not None: results = self._export_photo( - src, - dest, + raw_path, + raw_name, options=options, ) all_results += results - # copy live photo associated .mov if requested - if ( - export_original - and options.live_photo - and self.photo.live_photo - and self.photo.path_live_photo - ): - live_name = dest.parent / f"{dest.stem}.mov" - src_live = self.photo.path_live_photo + # copy preview image if requested + if options.preview and staged_files.preview: + # Photos keeps multiple different derivatives and path_derivatives returns list of them + # first derivative is the largest so export that one + preview_path = pathlib.Path(staged_files.preview) + preview_ext = preview_path.suffix + preview_name = ( + dest.parent / f"{dest.stem}{options.preview_suffix}{preview_ext}" + ) + # if original is missing, the filename won't have been incremented so + # need to check here to make sure there aren't duplicate preview files in + # the export directory + preview_name = ( + preview_name + if options.overwrite or options.update + else pathlib.Path(increment_filename(preview_name)) + ) + if preview_path is not None: results = self._export_photo( - src_live, - live_name, - # don't try to convert the live photo - options=dataclasses.replace(options, convert_to_jpeg=False), + preview_path, + preview_name, + options=options, ) all_results += results - if ( - export_edited - and options.live_photo - and self.photo.live_photo - and self.photo.path_edited_live_photo - ): - live_name = dest.parent / f"{dest.stem}.mov" - src_live = self.photo.path_edited_live_photo - results = self._export_photo( - src_live, - live_name, - # don't try to convert the live photo - options=dataclasses.replace(options, convert_to_jpeg=False), - ) - all_results += results - - # copy associated RAW image if requested - if options.raw_photo and self.photo.has_raw and self.photo.path_raw: - raw_path = pathlib.Path(self.photo.path_raw) - raw_ext = raw_path.suffix - raw_name = dest.parent / f"{dest.stem}{raw_ext}" - if raw_path is not None: - results = self._export_photo( - raw_path, - raw_name, - options=options, - ) - all_results += results - - # copy preview image if requested - if options.preview and self.photo.path_derivatives: - # Photos keeps multiple different derivatives and path_derivatives returns list of them - # first derivative is the largest so export that one - preview_path = pathlib.Path(self.photo.path_derivatives[0]) - preview_ext = preview_path.suffix - preview_name = ( - dest.parent / f"{dest.stem}{options.preview_suffix}{preview_ext}" - ) - # if original is missing, the filename won't have been incremented so - # need to check here to make sure there aren't duplicate preview files in - # the export directory - preview_name = ( - preview_name - if options.overwrite or options.update - else pathlib.Path(increment_filename(preview_name)) - ) - if preview_path is not None: - results = self._export_photo( - preview_path, - preview_name, - options=options, - ) - all_results += results - results = self._write_sidecar_files(dest=dest, options=options) all_results += results @@ -721,192 +755,432 @@ class PhotoExporter: ) return dest, count - def _export_photo_with_photos_export( + def _stage_photos_for_export(self, options: ExportOptions) -> StagedFiles: + """Stages photos for export + + If photo is present on disk in the library, uses path to the photo on disk. + If photo is missing and use_photos_export is true, downloads the photo from iCloud to temporary location. + """ + + # TODO: this changes behavior in that Photos download is only called if file is actually missing + # Need an option to force download if user wants to only use Photos export + + staged = StagedFiles() + + if options.raw_photo and self.photo.has_raw: + staged.raw = self.photo.path_raw + + if options.preview and self.photo.path_derivatives: + staged.preview = self.photo.path_derivatives[0] + + if not options.edited: + # original file + if self.photo.path: + staged.original = self.photo.path + if options.live_photo and self.photo.live_photo: + staged.original_live = self.photo.path_live_photo + + if options.edited: + # edited file + staged.edited = self.photo.path_edited + if options.live_photo and self.photo.live_photo: + staged.edited_live = self.photo.path_edited_live_photo + + # download any missing files + if options.use_photos_export: + live_photo = staged.edited_live if options.edited else staged.original_live + missing_options = ExportOptions( + edited=options.edited, + # TODO: missing previews are not generated/downloaded + preview=options.preview and not staged.preview, + raw_photo=options.raw_photo and not staged.raw, + live_photo=options.live_photo and not live_photo, + ) + if options.use_photokit: + missing_staged = self._stage_photo_for_export_with_photokit( + options=missing_options + ) + else: + missing_staged = self._stage_photo_for_export_with_applescript( + options=missing_options + ) + staged |= missing_staged + return staged + + # def _export_photo_with_photos_export( + # self, + # dest: pathlib.Path, + # all_results: ExportResults, + # options: ExportOptions, + # ): + # # TODO: if using applescript and exporting edited with live_photo doesn't seem to export the edited live photo + # # this does work with photokit, but not with applescript + # # TODO: duplicative code with the if edited/else--remove it + # fileutil = options.fileutil + # export_db = options.export_db + + # # export live_photo .mov file? + # live_photo = bool(options.live_photo and self.photo.live_photo) + # overwrite = options.overwrite or options.update + # if options.edited or self.photo.shared: + # # exported edited version and not original + # # shared photos (in shared albums) show up as not having adjustments (not edited) + # # but Photos is unable to export the "original" as only a jpeg copy is shared in iCloud + # # so tell Photos to export the current version in this case + # # didn't get passed a filename, add _edited + # uti = ( + # self.photo.uti_edited + # if options.edited and self.photo.uti_edited + # else self.photo.uti + # ) + # ext = get_preferred_uti_extension(uti) + # dest = dest.parent / f"{dest.stem}.{ext}" + + # if options.use_photokit: + # photolib = PhotoLibrary() + # photo = None + # try: + # photo = photolib.fetch_uuid(self.photo.uuid) + # except PhotoKitFetchFailed as e: + # # if failed to find UUID, might be a burst photo + # if self.photo.burst and self.photo._info["burstUUID"]: + # bursts = photolib.fetch_burst_uuid( + # self.photo._info["burstUUID"], all=True + # ) + # # PhotoKit UUIDs may contain "/L0/001" so only look at beginning + # photo = [ + # p for p in bursts if p.uuid.startswith(self.photo.uuid) + # ] + # photo = photo[0] if photo else None + # if not photo: + # all_results.error.append( + # ( + # str(dest), + # f"PhotoKitFetchFailed exception exporting photo {self.photo.uuid}: {e} ({lineno(__file__)})", + # ) + # ) + # if photo: + # if options.dry_run: + # # dry_run, don't actually export + # all_results.exported.append(str(dest)) + # else: + # try: + # exported = photo.export( + # dest.parent, + # dest.name, + # version=PHOTOS_VERSION_CURRENT, + # overwrite=overwrite, + # video=live_photo, + # ) + # all_results.exported.extend(exported) + # except Exception as e: + # all_results.error.append( + # (str(dest), f"{e} ({lineno(__file__)})") + # ) + # else: + # try: + # exported = _export_photo_uuid_applescript( + # self.photo.uuid, + # dest.parent, + # filestem=dest.stem, + # original=False, + # edited=True, + # live_photo=live_photo, + # timeout=options.timeout, + # burst=self.photo.burst, + # dry_run=options.dry_run, + # overwrite=overwrite, + # ) + # all_results.exported.extend(exported) + # except ExportError as e: + # all_results.error.append((str(dest), f"{e} ({lineno(__file__)})")) + # else: + # # export original version and not edited + # if options.use_photokit: + # photolib = PhotoLibrary() + # photo = None + # try: + # photo = photolib.fetch_uuid(self.photo.uuid) + # except PhotoKitFetchFailed: + # # if failed to find UUID, might be a burst photo + # if self.photo.burst and self.photo._info["burstUUID"]: + # bursts = photolib.fetch_burst_uuid( + # self.photo._info["burstUUID"], all=True + # ) + # # PhotoKit UUIDs may contain "/L0/001" so only look at beginning + # photo = [ + # p for p in bursts if p.uuid.startswith(self.photo.uuid) + # ] + # photo = photo[0] if photo else None + # if photo: + # if not options.dry_run: + # try: + # exported = photo.export( + # dest.parent, + # dest.name, + # version=PHOTOS_VERSION_ORIGINAL, + # overwrite=overwrite, + # video=live_photo, + # ) + # all_results.exported.extend(exported) + # except Exception as e: + # all_results.error.append( + # (str(dest), f"{e} ({lineno(__file__)})") + # ) + # else: + # # dry_run, don't actually export + # all_results.exported.append(str(dest)) + # else: + # try: + # exported = _export_photo_uuid_applescript( + # self.photo.uuid, + # dest.parent, + # filestem=dest.stem, + # original=True, + # edited=False, + # live_photo=live_photo, + # timeout=options.timeout, + # burst=self.photo.burst, + # dry_run=options.dry_run, + # overwrite=overwrite, + # ) + # all_results.exported.extend(exported) + # except ExportError as e: + # all_results.error.append((str(dest), f"{e} ({lineno(__file__)})")) + # if all_results.exported: + # for idx, photopath in enumerate(all_results.exported): + # converted_stat = (None, None, None) + # photopath = pathlib.Path(photopath) + # if ( + # options.convert_to_jpeg + # and self.photo.isphoto + # and photopath.suffix.lower() not in LIVE_VIDEO_EXTENSIONS + # ): + # dest_str = photopath.parent / f"{photopath.stem}.jpeg" + # fileutil.convert_to_jpeg( + # photopath, + # dest_str, + # compression_quality=options.jpeg_quality, + # ) + # converted_stat = fileutil.file_sig(dest_str) + # fileutil.unlink(photopath) + # all_results.exported[idx] = dest_str + # all_results.converted_to_jpeg.append(dest_str) + # photopath = dest_str + + # photopath = str(photopath) + # export_db.set_data( + # filename=photopath, + # uuid=self.photo.uuid, + # orig_stat=fileutil.file_sig(photopath), + # exif_stat=(None, None, None), + # converted_stat=converted_stat, + # edited_stat=(None, None, None), + # info_json=self.photo.json(), + # exif_json=None, + # ) + + # # todo: handle signatures + # if options.jpeg_ext: + # # use_photos_export (both PhotoKit and AppleScript) don't use the + # # file extension provided (instead they use extension for UTI) + # # so if jpeg_ext is set, rename any non-conforming jpegs + # all_results.exported = rename_jpeg_files( + # all_results.exported, options.jpeg_ext, fileutil + # ) + # if options.touch_file: + # for exported_file in all_results.exported: + # all_results.touched.append(exported_file) + # ts = int(self.photo.date.timestamp()) + # fileutil.utime(exported_file, (ts, ts)) + # if options.update: + # all_results.new.extend(all_results.exported) + + def _stage_photo_for_export_with_photokit( self, - dest: str, - all_results: ExportResults, options: ExportOptions, - ): - # TODO: duplicative code with the if edited/else--remove it - fileutil = options.fileutil - export_db = options.export_db + ) -> StagedFiles: + """Stage a photo for export with photokit to a temporary directory""" + + if options.edited and not self.photo.hasadjustments: + raise ValueError("Edited version requested but photo has no adjustments") + + dest = self._temp_dir_path / self.photo.original_filename + + # export live_photo .mov file? + live_photo = bool(options.live_photo and self.photo.live_photo) + + overwrite = options.overwrite or options.update + + # figure out which photo version to request + if options.edited or self.photo.shared: + # shared photos (in shared albums) show up as not having adjustments (not edited) + # but Photos is unable to export the "original" as only a jpeg copy is shared in iCloud + # so tell Photos to export the current version in this case + photos_version = PHOTOS_VERSION_CURRENT + elif self.photo.has_raw: + # PhotoKit always returns the raw photo of raw+jpeg pair for PHOTOS_VERSION_ORIGINAL even if JPEG is the original + photos_version = PHOTOS_VERSION_UNADJUSTED + else: + photos_version = PHOTOS_VERSION_ORIGINAL + + uti = ( + self.photo.uti_edited + if options.edited and self.photo.uti_edited + else self.photo.uti + ) + ext = get_preferred_uti_extension(uti) + dest = dest.parent / f"{dest.stem}.{ext}" + + photolib = PhotoLibrary() + results = StagedFiles() + photo = None + try: + photo = photolib.fetch_uuid(self.photo.uuid) + except PhotoKitFetchFailed as e: + # if failed to find UUID, might be a burst photo + if self.photo.burst and self.photo._info["burstUUID"]: + bursts = photolib.fetch_burst_uuid( + self.photo._info["burstUUID"], all=True + ) + # PhotoKit UUIDs may contain "/L0/001" so only look at beginning + photo = [p for p in bursts if p.uuid.startswith(self.photo.uuid)] + photo = photo[0] if photo else None + if not photo: + results.error.append( + ( + str(dest), + f"PhotoKitFetchFailed exception exporting photo {self.photo.uuid}: {e} ({lineno(__file__)})", + ) + ) + return results + + # now export the requested version of the photo + try: + exported = photo.export( + dest.parent, + dest.name, + version=photos_version, + overwrite=overwrite, + video=live_photo, + ) + if len(exported) == 1: + results_attr = "edited" if options.edited else "original" + setattr(results, results_attr, exported[0]) + elif len(exported) == 2: + for exported_file in exported: + if exported_file.lower().endswith(".mov"): + # live photo + results_attr = ( + "edited_live" if options.edited else "original_live" + ) + else: + results_attr = "edited" if options.edited else "original" + setattr(results, results_attr, exported_file) + except Exception as e: + results.error.append((str(dest), f"{e} ({lineno(__file__)})")) + + if options.raw_photo and self.photo.has_raw: + # also request the raw photo + try: + exported = photo.export( + dest.parent, + dest.name, + version=photos_version, + raw=True, + overwrite=overwrite, + video=live_photo, + ) + if exported: + results.raw = exported[0] + except Exception as e: + results.error.append((str(dest), f"{e} ({lineno(__file__)})")) + + return results + + def _stage_photo_for_export_with_applescript( + self, + options: ExportOptions, + ) -> StagedFiles: + """Stage a photo for export with AppleScript to a temporary directory + + Note: If exporting an edited live photo, the associated live video will not be exported. + This is a limitation of the Photos AppleScript interface and Photos behaves the same way.""" + + if options.edited and not self.photo.hasadjustments: + raise ValueError("Edited version requested but photo has no adjustments") + + dest = self._temp_dir_path / self.photo.original_filename + dest = pathlib.Path(increment_filename(dest)) # export live_photo .mov file? live_photo = bool(options.live_photo and self.photo.live_photo) overwrite = options.overwrite or options.update - if options.edited or self.photo.shared: - # exported edited version and not original - # shared photos (in shared albums) show up as not having adjustments (not edited) - # but Photos is unable to export the "original" as only a jpeg copy is shared in iCloud - # so tell Photos to export the current version in this case - # didn't get passed a filename, add _edited - uti = ( - self.photo.uti_edited - if options.edited and self.photo.uti_edited - else self.photo.uti + edited_version = options.edited or self.photo.shared + # shared photos (in shared albums) show up as not having adjustments (not edited) + # but Photos is unable to export the "original" as only a jpeg copy is shared in iCloud + # so tell Photos to export the current version in this case + uti = ( + self.photo.uti_edited + if options.edited and self.photo.uti_edited + else self.photo.uti + ) + ext = get_preferred_uti_extension(uti) + dest = dest.parent / f"{dest.stem}.{ext}" + + results = StagedFiles() + + try: + exported = _export_photo_uuid_applescript( + self.photo.uuid, + dest.parent, + filestem=dest.stem, + original=not edited_version, + edited=edited_version, + live_photo=live_photo, + timeout=options.timeout, + burst=self.photo.burst, + overwrite=overwrite, ) - ext = get_preferred_uti_extension(uti) - dest = dest.parent / f"{dest.stem}.{ext}" + except ExportError as e: + results.error.append((str(dest), f"{e} ({lineno(__file__)})")) + return results - if options.use_photokit: - photolib = PhotoLibrary() - photo = None - try: - photo = photolib.fetch_uuid(self.photo.uuid) - except PhotoKitFetchFailed as e: - # if failed to find UUID, might be a burst photo - if self.photo.burst and self.photo._info["burstUUID"]: - bursts = photolib.fetch_burst_uuid( - self.photo._info["burstUUID"], all=True - ) - # PhotoKit UUIDs may contain "/L0/001" so only look at beginning - photo = [ - p for p in bursts if p.uuid.startswith(self.photo.uuid) - ] - photo = photo[0] if photo else None - if not photo: - all_results.error.append( - ( - str(dest), - f"PhotoKitFetchFailed exception exporting photo {self.photo.uuid}: {e} ({lineno(__file__)})", - ) - ) - if photo: - if options.dry_run: - # dry_run, don't actually export - all_results.exported.append(str(dest)) - else: - try: - exported = photo.export( - dest.parent, - dest.name, - version=PHOTOS_VERSION_CURRENT, - overwrite=overwrite, - video=live_photo, - ) - all_results.exported.extend(exported) - except Exception as e: - all_results.error.append( - (str(dest), f"{e} ({lineno(__file__)})") - ) - else: - try: - exported = _export_photo_uuid_applescript( - self.photo.uuid, - dest.parent, - filestem=dest.stem, - original=False, - edited=True, - live_photo=live_photo, - timeout=options.timeout, - burst=self.photo.burst, - dry_run=options.dry_run, - overwrite=overwrite, + if len(exported) == 1: + results_attr = "edited" if options.edited else "original" + setattr(results, results_attr, exported[0]) + elif len(exported) == 2: + # could be live or raw+jpeg + for exported_file in exported: + if exported_file.lower().endswith(".mov"): + # live photo + results_attr = ( + "edited_live" + if live_photo and options.edited + else "original_live" + if live_photo + else None ) - all_results.exported.extend(exported) - except ExportError as e: - all_results.error.append((str(dest), f"{e} ({lineno(__file__)})")) - else: - # export original version and not edited - if options.use_photokit: - photolib = PhotoLibrary() - photo = None - try: - photo = photolib.fetch_uuid(self.photo.uuid) - except PhotoKitFetchFailed: - # if failed to find UUID, might be a burst photo - if self.photo.burst and self.photo._info["burstUUID"]: - bursts = photolib.fetch_burst_uuid( - self.photo._info["burstUUID"], all=True - ) - # PhotoKit UUIDs may contain "/L0/001" so only look at beginning - photo = [ - p for p in bursts if p.uuid.startswith(self.photo.uuid) - ] - photo = photo[0] if photo else None - if photo: - if not options.dry_run: - try: - exported = photo.export( - dest.parent, - dest.name, - version=PHOTOS_VERSION_ORIGINAL, - overwrite=overwrite, - video=live_photo, - ) - all_results.exported.extend(exported) - except Exception as e: - all_results.error.append( - (str(dest), f"{e} ({lineno(__file__)})") - ) - else: - # dry_run, don't actually export - all_results.exported.append(str(dest)) - else: - try: - exported = _export_photo_uuid_applescript( - self.photo.uuid, - dest.parent, - filestem=dest.stem, - original=True, - edited=False, - live_photo=live_photo, - timeout=options.timeout, - burst=self.photo.burst, - dry_run=options.dry_run, - overwrite=overwrite, - ) - all_results.exported.extend(exported) - except ExportError as e: - all_results.error.append((str(dest), f"{e} ({lineno(__file__)})")) - if all_results.exported: - for idx, photopath in enumerate(all_results.exported): - converted_stat = (None, None, None) - photopath = pathlib.Path(photopath) - if ( - options.convert_to_jpeg - and self.photo.isphoto - and photopath.suffix.lower() not in LIVE_VIDEO_EXTENSIONS - ): - dest_str = photopath.parent / f"{photopath.stem}.jpeg" - fileutil.convert_to_jpeg( - photopath, - dest_str, - compression_quality=options.jpeg_quality, - ) - converted_stat = fileutil.file_sig(dest_str) - fileutil.unlink(photopath) - all_results.exported[idx] = dest_str - all_results.converted_to_jpeg.append(dest_str) - photopath = dest_str + elif self.photo.has_raw and pathlib.Path( + exported_file.lower() + ).suffix not in [ + ".jpg", + ".jpeg", + ".heic", + ]: + # assume raw photo if not a common non-raw image format + results_attr = "raw" if options.raw_photo else None + else: + results_attr = "edited" if options.edited else "original" + if results_attr: + setattr(results, results_attr, exported_file) - photopath = str(photopath) - export_db.set_data( - filename=photopath, - uuid=self.photo.uuid, - orig_stat=fileutil.file_sig(photopath), - exif_stat=(None, None, None), - converted_stat=converted_stat, - edited_stat=(None, None, None), - info_json=self.photo.json(), - exif_json=None, - ) + return results - # todo: handle signatures - if options.jpeg_ext: - # use_photos_export (both PhotoKit and AppleScript) don't use the - # file extension provided (instead they use extension for UTI) - # so if jpeg_ext is set, rename any non-conforming jpegs - all_results.exported = rename_jpeg_files( - all_results.exported, options.jpeg_ext, fileutil - ) - if options.touch_file: - for exported_file in all_results.exported: - all_results.touched.append(exported_file) - ts = int(self.photo.date.timestamp()) - fileutil.utime(exported_file, (ts, ts)) - if options.update: - all_results.new.extend(all_results.exported) + def _is_temp_file(self, filepath: str) -> bool: + """Returns True if file is in the PhotosExporter temp directory otherwise False""" + filepath = pathlib.Path(filepath) + return filepath.parent == self._temp_dir_path def _export_photo( self, @@ -918,7 +1192,7 @@ class PhotoExporter: Does the actual copy or hardlink taking the appropriate action depending on update, overwrite, export_as_hardlink Assumes destination is the right destination (e.g. UUID matches) - sets UUID and JSON info foo exported file using set_uuid_for_file, set_info_for_uuid + sets UUID and JSON info for exported file using set_uuid_for_file, set_info_for_uuid Args: src (str): src path @@ -937,6 +1211,9 @@ class PhotoExporter: "export_as_hardlink and convert_to_jpeg cannot both be True" ) + if options.export_as_hardlink and self._is_temp_file(src): + raise ValueError("export_as_hardlink cannot be used with temp files") + exported_files = [] update_updated_files = [] update_new_files = []