More refactoring of export code, #462

This commit is contained in:
Rhet Turnbull 2022-01-02 22:38:22 -08:00
parent a73dc72558
commit 147b30f973
3 changed files with 237 additions and 225 deletions

View File

@ -4207,8 +4207,12 @@ def repl(ctx, cli_obj, db, emacs):
from osxphotos import ExifTool, PhotoInfo, PhotosDB
from osxphotos.albuminfo import AlbumInfo
from osxphotos.momentinfo import MomentInfo
from osxphotos.photoexporter import ExportResults, PhotoExporter
from osxphotos.placeinfo import PlaceInfo
from osxphotos.queryoptions import QueryOptions
from osxphotos.scoreinfo import ScoreInfo
from osxphotos.searchinfo import SearchInfo
logger = logging.getLogger()
logger.disabled = True
@ -4241,7 +4245,9 @@ def repl(ctx, cli_obj, db, emacs):
print(f"Found {len(photos)} photos in {tictoc:0.2f} seconds\n")
print("The following classes have been imported from osxphotos:")
print("- AlbumInfo, ExifTool, PhotoInfo, PhotosDB, PlaceInfo, QueryOptions\n")
print(
"- AlbumInfo, ExifTool, PhotoInfo, PhotoExporter, ExportResults, PhotosDB, PlaceInfo, QueryOptions, MomentInfo, ScoreInfo, SearchInfo\n"
)
print("The following variables are defined:")
print(f"- photosdb: PhotosDB() instance for {photosdb.library_path}")
print(

View File

@ -369,7 +369,7 @@ class PhotoExporter:
e.g. to get the extension of the edited photo,
reference PhotoInfo.path_edited
original: (boolean, default=True); if True, will export the original version of the photo
edited: (boolean, default=False); if True will export the edited version of the photo
edited: (boolean, default=False); if True will export the edited version of the photo (only one of original or edited can be used)
live_photo: (boolean, default=False); if True, will also export the associated .mov for live photos
raw_photo: (boolean, default=False); if True, will also export the associated RAW photo
export_as_hardlink: (boolean, default=False); if True, will hardlink files instead of copying them
@ -445,7 +445,6 @@ class PhotoExporter:
# NOTE: This function is very complex and does a lot of things.
# Don't modify this code if you don't fully understand everything it does.
# TODO: This is a good candidate for refactoring.
# when called from export(), won't get an export_db, so use no-op version
if export_db is None:
@ -461,7 +460,9 @@ class PhotoExporter:
export_original = original
export_edited = edited
if edited and not self.photo.hasadjustments:
if export_original and export_edited:
raise ValueError("Cannot export both original and edited photos")
if export_edited and not self.photo.hasadjustments:
raise ValueError(
"Photo does not have adjustments, cannot export edited version"
)
@ -475,24 +476,12 @@ class PhotoExporter:
original_filename = original_filename or self.photo.original_filename
dest_original = pathlib.Path(dest) / original_filename
if not edited_filename:
if not edited:
edited_filename = self.photo.original_filename
else:
original_name = pathlib.Path(self.photo.original_filename)
if self.photo.path_edited:
ext = pathlib.Path(self.photo.path_edited).suffix
else:
uti = (
self.photo.uti_edited
if edited and self.photo.uti_edited
else self.photo.uti
)
ext = get_preferred_uti_extension(uti)
ext = "." + ext
edited_filename = original_name.stem + "_edited" + ext
edited_filename = edited_filename or self._get_edited_filename(
original_filename
)
dest_edited = pathlib.Path(dest) / edited_filename
# Is there something to convert?
if convert_to_jpeg and self.photo.isphoto:
something_to_convert = False
ext = "." + jpeg_ext if jpeg_ext else ".jpeg"
@ -508,42 +497,21 @@ class PhotoExporter:
else:
convert_to_jpeg = False
# check to see if file exists and if so, add (1), (2), etc until we find one that works
# Photos checks the stem and adds (1), (2), etc which avoids collision with sidecars
# e.g. exporting sidecar for file1.png and file1.jpeg
# if file1.png exists and exporting file1.jpeg,
# dest will be file1 (1).jpeg even though file1.jpeg doesn't exist to prevent sidecar collision
increment_file_count = 0
if increment and not update and not overwrite:
dest_original, increment_file_count = increment_filename_with_count(
dest_original
)
dest_original = pathlib.Path(dest_original)
# if overwrite==False and #increment==False, export should fail if file exists
if (
dest_original.exists()
and export_original
and not update
and not overwrite
and not increment
):
raise FileExistsError(
f"destination exists ({dest_original}); overwrite={overwrite}, increment={increment}"
)
# TODO: need to look at this to see what happens if original not being exported but edited exists and already has an increment
dest_original, increment_file_count = self._validate_dest_path(
dest_original, increment=increment, update=update, overwrite=overwrite
)
dest_original = pathlib.Path(dest_original)
if export_edited:
if increment and not update and not overwrite:
dest_edited, increment_file_count = increment_filename_with_count(
dest_edited, increment_file_count
)
dest_edited = pathlib.Path(dest_edited)
# if overwrite==False and #increment==False, export should fail if file exists
if dest_edited.exists() and not update and not overwrite and not increment:
raise FileExistsError(
f"destination exists ({dest_edited}); overwrite={overwrite}, increment={increment}"
)
dest_edited, increment_file_count = self._validate_dest_path(
dest_edited,
increment=increment,
update=update,
overwrite=overwrite,
count=increment_file_count,
)
dest_edited = pathlib.Path(dest_edited)
self._render_options.filepath = (
str(dest_original) if export_original else str(dest_edited)
@ -551,43 +519,23 @@ class PhotoExporter:
all_results = ExportResults()
if use_photos_export:
# TODO: collapse these into a single call (refactor _export_photo_with_photos_export)
if original:
self._export_photo_with_photos_export(
dest_original,
all_results,
fileutil,
export_db,
use_photokit=use_photokit,
dry_run=dry_run,
timeout=timeout,
jpeg_ext=jpeg_ext,
touch_file=touch_file,
update=update,
overwrite=overwrite,
live_photo=live_photo,
edited=False,
convert_to_jpeg=convert_to_jpeg,
jpeg_quality=jpeg_quality,
)
if edited:
self._export_photo_with_photos_export(
dest_edited,
all_results,
fileutil,
export_db,
use_photokit=use_photokit,
dry_run=dry_run,
timeout=timeout,
jpeg_ext=jpeg_ext,
touch_file=touch_file,
update=update,
overwrite=overwrite,
live_photo=live_photo,
edited=True,
convert_to_jpeg=convert_to_jpeg,
jpeg_quality=jpeg_quality,
)
self._export_photo_with_photos_export(
dest=dest_original if export_original else dest_edited,
all_results=all_results,
fileutil=fileutil,
export_db=export_db,
use_photokit=use_photokit,
dry_run=dry_run,
timeout=timeout,
jpeg_ext=jpeg_ext,
touch_file=touch_file,
update=update,
overwrite=overwrite,
live_photo=live_photo,
edited=export_edited,
convert_to_jpeg=convert_to_jpeg,
jpeg_quality=jpeg_quality,
)
else:
# find the source file on disk and export
# get path to source file and verify it's not None and is valid file
@ -598,6 +546,7 @@ class PhotoExporter:
elif not edited and self.photo.path is not None:
export_src_dest.append((self.photo.path, dest_original))
# TODO: this for loop not necessary
for src, dest in export_src_dest:
if not pathlib.Path(src).is_file():
raise FileNotFoundError(f"{src} does not appear to exist")
@ -1044,6 +993,49 @@ class PhotoExporter:
return all_results
def _get_edited_filename(self, original_filename):
"""Return the filename for the exported edited photo
(used when filename isn't provided in call to export2)"""
# need to get the right extension for edited file
original_filename = pathlib.Path(original_filename)
if self.photo.path_edited:
ext = pathlib.Path(self.photo.path_edited).suffix
else:
uti = self.photo.uti_edited if self.photo.uti_edited else self.photo.uti
ext = get_preferred_uti_extension(uti)
ext = "." + ext
edited_filename = original_filename.stem + "_edited" + ext
return edited_filename
def _validate_dest_path(self, dest, increment, update, overwrite, count=0):
"""If destination exists, add (1), (2), and so on to filename to get a valid destination
Args:
dest (str): Destination path
increment (bool): Whether to increment the filename if it already exists
update (bool): Whether running in update mode
overwrite (bool): Whether running in overwrite mode
count: optional counter to start from (if 0, start from 1)
Returns:
new dest path (pathlib.Path), increment count (int)
"""
# check to see if file exists and if so, add (1), (2), etc until we find one that works
# Photos checks the stem and adds (1), (2), etc which avoids collision with sidecars
# e.g. exporting sidecar for file1.png and file1.jpeg
# if file1.png exists and exporting file1.jpeg,
# dest will be file1 (1).jpeg even though file1.jpeg doesn't exist to prevent sidecar collision
if increment and not update and not overwrite:
dest, count = increment_filename_with_count(dest, count=count)
dest = pathlib.Path(dest)
# if overwrite==False and #increment==False, export should fail if file exists
if dest.exists() and all([not x for x in [increment, update, overwrite]]):
raise FileExistsError(
f"destination exists ({dest}); overwrite={overwrite}, increment={increment}"
)
return dest, count
def _export_photo_with_photos_export(
self,
dest,

View File

@ -30,6 +30,7 @@ import Photos
import Quartz
from Foundation import NSNotificationCenter, NSObject
from PyObjCTools import AppHelper
from wurlitzer import pipes
from .fileutil import FileUtil
from .uti import get_preferred_uti_extension
@ -495,69 +496,73 @@ class PhotoAsset:
"""
with objc.autorelease_pool():
filename = (
pathlib.Path(filename)
if filename
else pathlib.Path(self.original_filename)
)
with pipes() as (out, err):
filename = (
pathlib.Path(filename)
if filename
else pathlib.Path(self.original_filename)
)
dest = pathlib.Path(dest)
if not dest.is_dir():
raise ValueError("dest must be a valid directory: {dest}")
dest = pathlib.Path(dest)
if not dest.is_dir():
raise ValueError("dest must be a valid directory: {dest}")
output_file = None
if self.isphoto:
# will hold exported image data and needs to be cleaned up at end
imagedata = None
if raw:
# export the raw component
resources = self._resources()
for resource in resources:
if resource.type() == Photos.PHAssetResourceTypeAlternatePhoto:
data = self._request_resource_data(resource)
ext = pathlib.Path(self.raw_filename).suffix[1:]
break
output_file = None
if self.isphoto:
# will hold exported image data and needs to be cleaned up at end
imagedata = None
if raw:
# export the raw component
resources = self._resources()
for resource in resources:
if (
resource.type()
== Photos.PHAssetResourceTypeAlternatePhoto
):
data = self._request_resource_data(resource)
ext = pathlib.Path(self.raw_filename).suffix[1:]
break
else:
raise PhotoKitExportError(
"Could not get image data for RAW photo"
)
else:
raise PhotoKitExportError(
"Could not get image data for RAW photo"
)
else:
# TODO: if user has selected use RAW as original, this returns the RAW
# can get the jpeg with resource.type() == Photos.PHAssetResourceTypePhoto
imagedata = self._request_image_data(version=version)
if not imagedata.image_data:
raise PhotoKitExportError("Could not get image data")
ext = get_preferred_uti_extension(imagedata.uti)
data = imagedata.image_data
# TODO: if user has selected use RAW as original, this returns the RAW
# can get the jpeg with resource.type() == Photos.PHAssetResourceTypePhoto
imagedata = self._request_image_data(version=version)
if not imagedata.image_data:
raise PhotoKitExportError("Could not get image data")
ext = get_preferred_uti_extension(imagedata.uti)
data = imagedata.image_data
output_file = dest / f"{filename.stem}.{ext}"
output_file = dest / f"{filename.stem}.{ext}"
if not overwrite:
output_file = pathlib.Path(increment_filename(output_file))
if not overwrite:
output_file = pathlib.Path(increment_filename(output_file))
with open(output_file, "wb") as fd:
fd.write(data)
with open(output_file, "wb") as fd:
fd.write(data)
if imagedata:
del imagedata
elif self.ismovie:
videodata = self._request_video_data(version=version)
if videodata.asset is None:
raise PhotoKitExportError("Could not get video for asset")
if imagedata:
del imagedata
elif self.ismovie:
videodata = self._request_video_data(version=version)
if videodata.asset is None:
raise PhotoKitExportError("Could not get video for asset")
url = videodata.asset.URL()
path = pathlib.Path(NSURL_to_path(url))
if not path.is_file():
raise FileNotFoundError("Could not get path to video file")
ext = path.suffix
output_file = dest / f"{filename.stem}{ext}"
url = videodata.asset.URL()
path = pathlib.Path(NSURL_to_path(url))
if not path.is_file():
raise FileNotFoundError("Could not get path to video file")
ext = path.suffix
output_file = dest / f"{filename.stem}{ext}"
if not overwrite:
output_file = pathlib.Path(increment_filename(output_file))
if not overwrite:
output_file = pathlib.Path(increment_filename(output_file))
FileUtil.copy(path, output_file)
FileUtil.copy(path, output_file)
return [str(output_file)]
return [str(output_file)]
def _request_image_data(self, version=PHOTOS_VERSION_ORIGINAL):
"""Request image data and metadata for self._phasset
@ -807,42 +812,46 @@ class VideoAsset(PhotoAsset):
"""
with objc.autorelease_pool():
if self.slow_mo and version == PHOTOS_VERSION_CURRENT:
return [
self._export_slow_mo(
dest, filename=filename, version=version, overwrite=overwrite
)
]
with pipes() as (out, err):
if self.slow_mo and version == PHOTOS_VERSION_CURRENT:
return [
self._export_slow_mo(
dest,
filename=filename,
version=version,
overwrite=overwrite,
)
]
filename = (
pathlib.Path(filename)
if filename
else pathlib.Path(self.original_filename)
)
filename = (
pathlib.Path(filename)
if filename
else pathlib.Path(self.original_filename)
)
dest = pathlib.Path(dest)
if not dest.is_dir():
raise ValueError("dest must be a valid directory: {dest}")
dest = pathlib.Path(dest)
if not dest.is_dir():
raise ValueError("dest must be a valid directory: {dest}")
output_file = None
videodata = self._request_video_data(version=version)
if videodata.asset is None:
raise PhotoKitExportError("Could not get video for asset")
output_file = None
videodata = self._request_video_data(version=version)
if videodata.asset is None:
raise PhotoKitExportError("Could not get video for asset")
url = videodata.asset.URL()
path = pathlib.Path(NSURL_to_path(url))
del videodata
if not path.is_file():
raise FileNotFoundError("Could not get path to video file")
ext = path.suffix
output_file = dest / f"{filename.stem}{ext}"
url = videodata.asset.URL()
path = pathlib.Path(NSURL_to_path(url))
del videodata
if not path.is_file():
raise FileNotFoundError("Could not get path to video file")
ext = path.suffix
output_file = dest / f"{filename.stem}{ext}"
if not overwrite:
output_file = pathlib.Path(increment_filename(output_file))
if not overwrite:
output_file = pathlib.Path(increment_filename(output_file))
FileUtil.copy(path, output_file)
FileUtil.copy(path, output_file)
return [str(output_file)]
return [str(output_file)]
def _export_slow_mo(
self, dest, filename=None, version=PHOTOS_VERSION_CURRENT, overwrite=False
@ -1053,64 +1062,69 @@ class LivePhotoAsset(PhotoAsset):
"""
with objc.autorelease_pool():
filename = (
pathlib.Path(filename)
if filename
else pathlib.Path(self.original_filename)
)
dest = pathlib.Path(dest)
if not dest.is_dir():
raise ValueError("dest must be a valid directory: {dest}")
request = LivePhotoRequest.alloc().initWithManager_Asset_(
self._manager, self.phasset
)
resources = request.requestLivePhotoResources(version=version)
video_resource = None
photo_resource = None
for resource in resources:
if resource.type() == Photos.PHAssetResourceTypePairedVideo:
video_resource = resource
elif resource.type() == Photos.PHAssetMediaTypeImage:
photo_resource = resource
if not video_resource or not photo_resource:
raise PhotoKitExportError(
"Did not find photo/video resources for live photo"
with pipes() as (out, err):
filename = (
pathlib.Path(filename)
if filename
else pathlib.Path(self.original_filename)
)
photo_ext = get_preferred_uti_extension(
photo_resource.uniformTypeIdentifier()
)
photo_output_file = dest / f"{filename.stem}.{photo_ext}"
video_ext = get_preferred_uti_extension(
video_resource.uniformTypeIdentifier()
)
video_output_file = dest / f"{filename.stem}.{video_ext}"
dest = pathlib.Path(dest)
if not dest.is_dir():
raise ValueError("dest must be a valid directory: {dest}")
if not overwrite:
photo_output_file = pathlib.Path(increment_filename(photo_output_file))
video_output_file = pathlib.Path(increment_filename(video_output_file))
request = LivePhotoRequest.alloc().initWithManager_Asset_(
self._manager, self.phasset
)
resources = request.requestLivePhotoResources(version=version)
exported = []
if photo:
data = self._request_resource_data(photo_resource)
# image_data = self.request_image_data(version=version)
with open(photo_output_file, "wb") as fd:
fd.write(data)
exported.append(str(photo_output_file))
del data
if video:
data = self._request_resource_data(video_resource)
with open(video_output_file, "wb") as fd:
fd.write(data)
exported.append(str(video_output_file))
del data
video_resource = None
photo_resource = None
for resource in resources:
if resource.type() == Photos.PHAssetResourceTypePairedVideo:
video_resource = resource
elif resource.type() == Photos.PHAssetMediaTypeImage:
photo_resource = resource
request.dealloc()
return exported
if not video_resource or not photo_resource:
raise PhotoKitExportError(
"Did not find photo/video resources for live photo"
)
photo_ext = get_preferred_uti_extension(
photo_resource.uniformTypeIdentifier()
)
photo_output_file = dest / f"{filename.stem}.{photo_ext}"
video_ext = get_preferred_uti_extension(
video_resource.uniformTypeIdentifier()
)
video_output_file = dest / f"{filename.stem}.{video_ext}"
if not overwrite:
photo_output_file = pathlib.Path(
increment_filename(photo_output_file)
)
video_output_file = pathlib.Path(
increment_filename(video_output_file)
)
exported = []
if photo:
data = self._request_resource_data(photo_resource)
# image_data = self.request_image_data(version=version)
with open(photo_output_file, "wb") as fd:
fd.write(data)
exported.append(str(photo_output_file))
del data
if video:
data = self._request_resource_data(video_resource)
with open(video_output_file, "wb") as fd:
fd.write(data)
exported.append(str(video_output_file))
del data
request.dealloc()
return exported
class PhotoLibrary: