Fixed bug in --download-missing to fix issue #64

This commit is contained in:
Rhet Turnbull
2020-02-07 22:20:05 -08:00
parent 1e013b6802
commit c654e3dc61
8 changed files with 807 additions and 46 deletions

View File

@@ -655,13 +655,14 @@ Returns the path to the live video component of a [live photo](#live_photo). If
#### `json()`
Returns a JSON representation of all photo info
#### `export(dest, *filename, edited=False, overwrite=False, increment=True, sidecar_json=False, sidecar_xmp=False, use_photos_export=False, timeout=120,)`
#### `export(dest, *filename, edited=False, live_photo=False, overwrite=False, increment=True, sidecar_json=False, sidecar_xmp=False, use_photos_export=False, timeout=120,)`
Export photo from the Photos library to another destination on disk.
- dest: must be valid destination path as str (or exception raised).
- *filename (optional): name of picture as str; if not provided, will use current filename
- edited: boolean; if True (default=False), will export the edited version of the photo (or raise exception if no edited version)
- overwrite: boolean; if True (default=False), will overwrite files if they alreay exist
- live_photo: boolean; if True (default=False), will also export the associted .mov for live photos; exported live photo will be named filename.mov
- increment: boolean; if True (default=True), will increment file name until a non-existent name is found
- sidecar_json: (boolean, default = False); if True will also write a json sidecar with IPTC data in format readable by exiftool; sidecar filename will be dest/filename.json where filename is the stem of the photo name
- sidecar_xmp: (boolean, default = False); if True will also write a XMP sidecar with IPTC data; sidecar filename will be dest/filename.xmp where filename is the stem of the photo name

View File

@@ -1136,6 +1136,7 @@ def export_photo(
filename,
sidecar_json=sidecar_json,
sidecar_xmp=sidecar_xmp,
live_photo=export_live,
overwrite=overwrite,
use_photos_export=download_missing,
)
@@ -1160,22 +1161,22 @@ def export_photo(
else:
click.echo(f"Skipping missing edited photo for {filename}")
if export_live and photo.live_photo and photo.path_live_photo is not None:
# if destination exists, will be overwritten regardless of overwrite
# so that name matches name of live photo
live_name = pathlib.Path(photo_path)
live_name = f"{live_name.stem}.mov"
# if export_live and photo.live_photo and photo.path_live_photo is not None:
# # if destination exists, will be overwritten regardless of overwrite
# # so that name matches name of live photo
# live_name = pathlib.Path(photo_path)
# live_name = f"{live_name.stem}.mov"
src_live = photo.path_live_photo
dest_live = pathlib.Path(photo_path).parent / pathlib.Path(live_name)
# src_live = photo.path_live_photo
# dest_live = pathlib.Path(photo_path).parent / pathlib.Path(live_name)
if src_live is not None:
if verbose:
click.echo(f"Exporting live photo video of {filename} as {live_name}")
# if src_live is not None:
# if verbose:
# click.echo(f"Exporting live photo video of {filename} as {live_name}")
_copy_file(src_live, str(dest_live))
else:
click.echo(f"Skipping missing live movie for {filename}")
# _copy_file(src_live, str(dest_live))
# else:
# click.echo(f"Skipping missing live movie for {filename}")
return photo_path

View File

@@ -1,3 +1,3 @@
""" version info """
__version__ = "0.22.9"
__version__ = "0.22.10"

View File

@@ -458,6 +458,7 @@ class PhotoInfo:
dest,
*filename,
edited=False,
live_photo=False,
overwrite=False,
increment=True,
sidecar_json=False,
@@ -470,6 +471,7 @@ class PhotoInfo:
filename: (optional): name of picture; if not provided, will use current filename
edited: (boolean, default=False); if True will export the edited version of the photo
(or raise exception if no edited version)
live_photo: (boolean, default=False); if True, will also export the associted .mov for live photos
overwrite: (boolean, default=False); if True will overwrite files if they alreay exist
increment: (boolean, default=True); if True, will increment file name until a non-existant name is found
if overwrite=False and increment=False, export will fail if destination file already exists
@@ -503,7 +505,7 @@ class PhotoInfo:
# no filename provided so use the default
# if edited file requested, use filename but add _edited
# need to use file extension from edited file as Photos saves a jpeg once edited
if edited:
if edited and not use_photos_export:
# verify we have a valid path_edited and use that to get filename
if not self.path_edited:
raise FileNotFoundError(
@@ -584,18 +586,53 @@ class PhotoInfo:
# copy the file, _copy_file uses ditto to preserve Mac extended attributes
_copy_file(src, dest)
# copy live photo associated .mov if requested
if live_photo and self.live_photo:
live_name = dest.parent / f"{dest.stem}.mov"
src_live = self.path_live_photo
if src_live is not None:
logging.debug(
f"Exporting live photo video of {filename} as {live_name.name}"
)
_copy_file(src_live, str(live_name))
else:
logging.warning(f"Skipping missing live movie for {filename}")
else:
# use_photo_export
exported = None
# export live_photo .mov file?
live_photo = True if live_photo and self.live_photo else False
if edited:
# exported edited version and not original
exported = _export_photo_uuid_applescript(
self.uuid, dest, original=False, edited=True, timeout=timeout
)
if filename:
# use filename stem provided
filestem = dest.stem
else:
# didn't get passed a filename, add _edited
# TODO: add a test for this
filestem = f"{dest.stem}_edited"
exported = _export_photo_uuid_applescript(
self.uuid,
dest.parent,
filestem=filestem,
original=False,
edited=True,
live_photo=live_photo,
timeout=timeout,
)
else:
# export original version and not edited
filestem = dest.stem
exported = _export_photo_uuid_applescript(
self.uuid, dest, original=True, edited=False, timeout=timeout
self.uuid,
dest.parent,
filestem=filestem,
original=True,
edited=False,
live_photo=live_photo,
timeout=timeout,
)
if exported is None:
@@ -701,13 +738,15 @@ class PhotoInfo:
def _xmp_sidecar(self):
""" returns string for XMP sidecar """
# TODO: add additional fields to XMP file?
xmp_template = Template(
filename=os.path.join(_TEMPLATE_DIR, _XMP_TEMPLATE_NAME)
)
xmp_str = xmp_template.render(photo=self)
# remove extra lines that mako inserts from template
xmp_str = "\n".join([line for line in xmp_str.split("\n") if line.strip() != ""])
xmp_str = "\n".join(
[line for line in xmp_str.split("\n") if line.strip() != ""]
)
return xmp_str
def _write_sidecar(self, filename, sidecar_str):

View File

@@ -374,7 +374,7 @@ class PhotosDB:
""" returns the name of the temp file """
""" If sqlite shared memory and write-ahead log files exist, those are copied too """
# required because python's sqlite3 implementation can't read a locked file
_, suffix = os.path.splitext(fname)
# _, suffix = os.path.splitext(fname)
try:
dest_name = pathlib.Path(fname).name
dest_path = os.path.join(self._tempdir_name, dest_name)
@@ -1391,6 +1391,564 @@ class PhotosDB:
logging.debug("Burst Photos (dbphotos_burst:")
logging.debug(pformat(self._dbphotos_burst))
def _process_database5X(self):
""" ALPHA: TESTING using SimpleNamespace to clean up code for info, DO NOT CALL THIS METHOD """
""" process the Photos database to extract info """
""" works on Photos version >= 5.0 """
if _debug():
logging.debug(f"_process_database5X")
from types import SimpleNamespace
_DB_FIELD_NAMES = [
"adjustment_format_id",
"adjustment_uuid",
"albums",
"burst_key",
"burst_pick_type",
"burst_uuid",
"burst",
"cloud_asset_guid",
"cloud_available",
"cloud_batch_publish_date",
"cloud_library_state",
"cloud_local_state",
"cloud_status",
"custom_rendered_value",
"directory",
"extended_description",
"favorite",
"filename",
"has_adjustments",
"has_albums",
"has_keywords",
"has_persons",
"hdr",
"hidden",
"image_date",
"image_tz_offset_seconds",
"in_cloud",
"is_missing",
"keywords",
"last_modified_date",
"latitude",
"live_photo",
"local_availability",
"longitude",
"master_fingerprint",
"master_uuid",
"model_id",
"name",
"original_filename",
"panorama",
"portrait",
"remote_availability",
"screenshot",
"selfie",
"shared",
"slow_mo",
"subtype",
"time_lapse",
"title",
"type",
"uti",
"uuid",
]
_DB_FIELDS = {field: None for field in _DB_FIELD_NAMES}
# Epoch is Jan 1, 2001
td = (datetime(2001, 1, 1, 0, 0) - datetime(1970, 1, 1, 0, 0)).total_seconds()
(conn, c) = _open_sql_file(self._tmp_db)
# Look for all combinations of persons and pictures
if _debug():
logging.debug(f"Getting information about persons")
c.execute(
"SELECT ZPERSON.ZFULLNAME, ZGENERICASSET.ZUUID "
"FROM ZPERSON, ZDETECTEDFACE, ZGENERICASSET "
"WHERE ZDETECTEDFACE.ZPERSON = ZPERSON.Z_PK AND ZDETECTEDFACE.ZASSET = ZGENERICASSET.Z_PK "
"AND ZGENERICASSET.ZTRASHEDSTATE = 0"
)
for person in c:
if person[0] is None:
continue
person_name = person[0] if person[0] != "" else _UNKNOWN_PERSON
if not person[1] in self._dbfaces_uuid:
self._dbfaces_uuid[person[1]] = []
if not person_name in self._dbfaces_person:
self._dbfaces_person[person_name] = []
self._dbfaces_uuid[person[1]].append(person_name)
self._dbfaces_person[person_name].append(person[1])
if _debug():
logging.debug(f"Finished walking through persons")
logging.debug(pformat(self._dbfaces_person))
logging.debug(self._dbfaces_uuid)
c.execute(
"SELECT ZGENERICALBUM.ZUUID, ZGENERICASSET.ZUUID "
"FROM ZGENERICASSET "
"JOIN Z_26ASSETS ON Z_26ASSETS.Z_34ASSETS = ZGENERICASSET.Z_PK "
"JOIN ZGENERICALBUM ON ZGENERICALBUM.Z_PK = Z_26ASSETS.Z_26ALBUMS "
"WHERE ZGENERICASSET.ZTRASHEDSTATE = 0 "
)
for album in c:
# store by uuid in _dbalbums_uuid and by album in _dbalbums_album
if not album[1] in self._dbalbums_uuid:
self._dbalbums_uuid[album[1]] = []
if not album[0] in self._dbalbums_album:
self._dbalbums_album[album[0]] = []
self._dbalbums_uuid[album[1]].append(album[0])
self._dbalbums_album[album[0]].append(album[1])
# now get additional details about albums
c.execute(
"SELECT "
"ZUUID, " # 0
"ZTITLE, " # 1
"ZCLOUDLOCALSTATE, " # 2
"ZCLOUDOWNERFIRSTNAME, " # 3
"ZCLOUDOWNERLASTNAME, " # 4
"ZCLOUDOWNERHASHEDPERSONID " # 5
"FROM ZGENERICALBUM"
)
for album in c:
self._dbalbum_details[album[0]] = {
"title": album[1],
"cloudlocalstate": album[2],
"cloudownerfirstname": album[3],
"cloudownderlastname": album[4],
"cloudownerhashedpersonid": album[5],
"cloudlibrarystate": None, # Photos 4
"cloudidentifier": None, # Photos4
}
if _debug():
logging.debug(f"Finished walking through albums")
logging.debug(pformat(self._dbalbums_album))
logging.debug(pformat(self._dbalbums_uuid))
logging.debug(pformat(self._dbalbum_details))
# get details on keywords
c.execute(
"SELECT ZKEYWORD.ZTITLE, ZGENERICASSET.ZUUID "
"FROM ZGENERICASSET "
"JOIN ZADDITIONALASSETATTRIBUTES ON ZADDITIONALASSETATTRIBUTES.ZASSET = ZGENERICASSET.Z_PK "
"JOIN Z_1KEYWORDS ON Z_1KEYWORDS.Z_1ASSETATTRIBUTES = ZADDITIONALASSETATTRIBUTES.Z_PK "
"JOIN ZKEYWORD ON ZKEYWORD.Z_PK = Z_1KEYWORDS.Z_37KEYWORDS "
"WHERE ZGENERICASSET.ZTRASHEDSTATE = 0 "
)
for keyword in c:
if not keyword[1] in self._dbkeywords_uuid:
self._dbkeywords_uuid[keyword[1]] = []
if not keyword[0] in self._dbkeywords_keyword:
self._dbkeywords_keyword[keyword[0]] = []
self._dbkeywords_uuid[keyword[1]].append(keyword[0])
self._dbkeywords_keyword[keyword[0]].append(keyword[1])
if _debug():
logging.debug(f"Finished walking through keywords")
logging.debug(pformat(self._dbkeywords_keyword))
logging.debug(pformat(self._dbkeywords_uuid))
# get details on disk volumes
c.execute("SELECT ZUUID, ZNAME from ZFILESYSTEMVOLUME")
for vol in c:
self._dbvolumes[vol[0]] = vol[1]
if _debug():
logging.debug(f"Finished walking through volumes")
logging.debug(self._dbvolumes)
# get details about photos
logging.debug(f"Getting information about photos")
c.execute(
"""SELECT ZGENERICASSET.ZUUID,
ZADDITIONALASSETATTRIBUTES.ZMASTERFINGERPRINT,
ZADDITIONALASSETATTRIBUTES.ZTITLE,
ZADDITIONALASSETATTRIBUTES.ZORIGINALFILENAME,
ZGENERICASSET.ZMODIFICATIONDATE,
ZGENERICASSET.ZDATECREATED,
ZADDITIONALASSETATTRIBUTES.ZTIMEZONEOFFSET,
ZADDITIONALASSETATTRIBUTES.ZINFERREDTIMEZONEOFFSET,
ZADDITIONALASSETATTRIBUTES.ZTIMEZONENAME,
ZGENERICASSET.ZHIDDEN,
ZGENERICASSET.ZFAVORITE,
ZGENERICASSET.ZDIRECTORY,
ZGENERICASSET.ZFILENAME,
ZGENERICASSET.ZLATITUDE,
ZGENERICASSET.ZLONGITUDE,
ZGENERICASSET.ZHASADJUSTMENTS,
ZGENERICASSET.ZCLOUDBATCHPUBLISHDATE,
ZGENERICASSET.ZKIND,
ZGENERICASSET.ZUNIFORMTYPEIDENTIFIER,
ZGENERICASSET.ZAVALANCHEUUID,
ZGENERICASSET.ZAVALANCHEPICKTYPE,
ZGENERICASSET.ZKINDSUBTYPE,
ZGENERICASSET.ZCUSTOMRENDEREDVALUE,
ZADDITIONALASSETATTRIBUTES.ZCAMERACAPTUREDEVICE,
ZGENERICASSET.ZCLOUDASSETGUID
FROM ZGENERICASSET
JOIN ZADDITIONALASSETATTRIBUTES ON ZADDITIONALASSETATTRIBUTES.ZASSET = ZGENERICASSET.Z_PK
WHERE ZGENERICASSET.ZTRASHEDSTATE = 0
ORDER BY ZGENERICASSET.ZUUID """
)
# Order of results
# 0 SELECT ZGENERICASSET.ZUUID,
# 1 ZADDITIONALASSETATTRIBUTES.ZMASTERFINGERPRINT,
# 2 ZADDITIONALASSETATTRIBUTES.ZTITLE,
# 3 ZADDITIONALASSETATTRIBUTES.ZORIGINALFILENAME,
# 4 ZGENERICASSET.ZMODIFICATIONDATE,
# 5 ZGENERICASSET.ZDATECREATED,
# 6 ZADDITIONALASSETATTRIBUTES.ZTIMEZONEOFFSET,
# 7 ZADDITIONALASSETATTRIBUTES.ZINFERREDTIMEZONEOFFSET,
# 8 ZADDITIONALASSETATTRIBUTES.ZTIMEZONENAME,
# 9 ZGENERICASSET.ZHIDDEN,
# 10 ZGENERICASSET.ZFAVORITE,
# 11 ZGENERICASSET.ZDIRECTORY,
# 12 ZGENERICASSET.ZFILENAME,
# 13 ZGENERICASSET.ZLATITUDE,
# 14 ZGENERICASSET.ZLONGITUDE,
# 15 ZGENERICASSET.ZHASADJUSTMENTS
# 16 ZCLOUDBATCHPUBLISHDATE -- If not null, indicates a shared photo
# 17 ZKIND, -- 0 = photo, 1 = movie
# 18 ZUNIFORMTYPEIDENTIFIER -- UTI
# 19 ZGENERICASSET.ZAVALANCHEUUID, -- if not NULL, is burst photo
# 20 ZGENERICASSET.ZAVALANCHEPICKTYPE -- if not 2, is a selected burst photo
# 21 ZGENERICASSET.ZKINDSUBTYPE -- determine if live photos, etc
# 22 ZGENERICASSET.ZCUSTOMRENDEREDVALUE -- determine if HDR photo
# 23 ZADDITIONALASSETATTRIBUTES.ZCAMERACAPTUREDEVICE -- 1 if selfie (front facing camera)
# 25 ZGENERICASSET.ZCLOUDASSETGUID -- not null if asset is cloud asset
# (e.g. user has "iCloud Photos" checked in Photos preferences)
for row in c:
info = SimpleNamespace(**_DB_FIELDS)
info.uuid = uuid = row[0] # stored here for easier debugging
info.master_fingerprint = row[1]
info.title = info.name = row[2] # TODO: replace all uses of name with title
# There are sometimes negative values for lastmodifieddate in the database
# I don't know what these mean but they will raise exception in datetime if
# not accounted for
if row[4] is not None and row[4] >= 0:
info.last_modified_date = datetime.fromtimestamp(row[4] + td)
else:
info.last_modified_dat = None
info.image_date = datetime.fromtimestamp(row[5] + td)
info.image_tz_offset_seconds = row[6]
info.hidden = row[9]
info.favorite = row[10]
info.original_filename = row[3]
info.filename = row[12]
info.directory = row[11]
# set latitude and longitude
# if both latitude and longitude = -180.0, then they are NULL
if row[13] == -180.0 and row[14] == -180.0:
info.latitude = None
info.longitude = None
else:
info.latitude = row[13]
info.longitude = row[14]
info.has_adjustments = row[15]
info.cloud_batch_publish_date = row[16]
info.shared = True if row[16] is not None else False
# these will get filled in later
# init to avoid key errors
# info.extended_description = None # fill this in later
# info.local_availability = None
# info.remote_availability = None
# info.is_missing = None
# info.has_adjustments = None
# info.adjustment_format_id = None
# find type
if row[17] == 0:
info.type = _PHOTO_TYPE
elif row[17] == 1:
info.type = _MOVIE_TYPE
else:
if _debug():
logging.debug(f"WARNING: {uuid} found unknown type {row[17]}")
info.type = None
info.uti = row[18]
# handle burst photos
# if burst photo, determine whether or not it's a selected burst photo
# in Photos 5, burstUUID is called avalancheUUID
info.burst_uuid = row[19] # avalancheUUID
info.burst_pick_type = row[20] # avalanchePickType
if row[19] is not None:
# it's a burst photo
info.burst = True
burst_uuid = row[19]
if burst_uuid not in self._dbphotos_burst:
self._dbphotos_burst[burst_uuid] = set()
self._dbphotos_burst[burst_uuid].add(uuid)
if row[20] != 2 and row[20] != 4:
info.burst_key = True # it's a key photo (selected from the burst)
else:
info.burst_key = (
False
) # it's a burst photo but not one that's selected
else:
# not a burst photo
info.burst = False
info.burst_key = None
# Info on sub-type (live photo, panorama, etc)
# ZGENERICASSET.ZKINDSUBTYPE
# 1 == panorama
# 2 == live photo
# 10 = screenshot
# 100 = shared movie (MP4) ??
# 101 = slow-motion video
# 102 = Time lapse video
info.subtype = row[21]
info.live_photo = True if row[21] == 2 else False
info.screenshot = True if row[21] == 10 else False
info.slow_mo = True if row[21] == 101 else False
info.time_lapse = True if row[21] == 102 else False
# Handle HDR photos and portraits
# ZGENERICASSET.ZCUSTOMRENDEREDVALUE
# 3 = HDR photo
# 4 = non-HDR version of the photo
# 6 = panorama
# 8 = portrait
info.custom_rendered_value = row[22]
info.hdr = True if row[22] == 3 else False
info.portrait = True if row[22] == 8 else False
# Set panorama from either KindSubType or RenderedValue
info.panorama = True if row[21] == 1 or row[22] == 6 else False
# Handle selfies (front facing camera, ZCAMERACAPTUREDEVICE=1)
info.selfie = True if row[23] == 1 else False
# Determine if photo is part of cloud library (ZGENERICASSET.ZCLOUDASSETGUID not NULL)
# Initialize cloud fields that will filled in later
info.cloud_asset_guid = row[24]
info.cloud_local_state = None
info.in_cloud = None
info.cloud_library_state = None # Photos 4
info.cloud_status = None # Photos 4
info.cloud_available = None # Photos 4
self._dbphotos[uuid] = info
# # if row[19] is not None and ((row[20] == 2) or (row[20] == 4)):
# # burst photo
# if row[19] is not None:
# # burst photo, add to _dbphotos_burst
# info["burst"] = True
# burst_uuid = row[19]
# if burst_uuid not in self._dbphotos_burst:
# self._dbphotos_burst[burst_uuid] = {}
# self._dbphotos_burst[burst_uuid][uuid] = info
# else:
# info["burst"] = False
# Get extended description
c.execute(
"SELECT ZGENERICASSET.ZUUID, "
"ZASSETDESCRIPTION.ZLONGDESCRIPTION "
"FROM ZGENERICASSET "
"JOIN ZADDITIONALASSETATTRIBUTES ON ZADDITIONALASSETATTRIBUTES.ZASSET = ZGENERICASSET.Z_PK "
"JOIN ZASSETDESCRIPTION ON ZASSETDESCRIPTION.Z_PK = ZADDITIONALASSETATTRIBUTES.ZASSETDESCRIPTION "
"ORDER BY ZGENERICASSET.ZUUID "
)
for row in c:
uuid = row[0]
if uuid in self._dbphotos:
self._dbphotos[uuid].extended_description = row[1]
else:
if _debug():
logging.debug(
f"WARNING: found description {row[1]} but no photo for {uuid}"
)
# get information about adjusted/edited photos
c.execute(
"SELECT ZGENERICASSET.ZUUID, "
"ZGENERICASSET.ZHASADJUSTMENTS, "
"ZUNMANAGEDADJUSTMENT.ZADJUSTMENTFORMATIDENTIFIER "
"FROM ZGENERICASSET, ZUNMANAGEDADJUSTMENT "
"JOIN ZADDITIONALASSETATTRIBUTES ON ZADDITIONALASSETATTRIBUTES.ZASSET = ZGENERICASSET.Z_PK "
"WHERE ZADDITIONALASSETATTRIBUTES.ZUNMANAGEDADJUSTMENT = ZUNMANAGEDADJUSTMENT.Z_PK "
"AND ZGENERICASSET.ZTRASHEDSTATE = 0 "
)
for row in c:
uuid = row[0]
if uuid in self._dbphotos:
self._dbphotos[uuid].adjustment_format_id = row[2]
else:
if _debug():
logging.debug(
f"WARNING: found adjustmentformatidentifier {row[2]} but no photo for uuid {row[0]}"
)
# Find missing photos
# TODO: this code is very kludgy and I had to make lots of assumptions
# it's probably wrong and needs to be re-worked once I figure out how to reliably
# determine if a photo is missing in Photos 5
# Get info on remote/local availability for photos in shared albums
# Shared photos have a null fingerprint (and some other photos do too)
# TODO: There may be a bug here, perhaps ZDATASTORESUBTYPE should be 1 --> it's the longest ZDATALENGTH (is this the original)
c.execute(
""" SELECT
ZGENERICASSET.ZUUID,
ZINTERNALRESOURCE.ZLOCALAVAILABILITY,
ZINTERNALRESOURCE.ZREMOTEAVAILABILITY
FROM ZGENERICASSET
JOIN ZADDITIONALASSETATTRIBUTES ON ZADDITIONALASSETATTRIBUTES.ZASSET = ZGENERICASSET.Z_PK
JOIN ZINTERNALRESOURCE ON ZINTERNALRESOURCE.ZASSET = ZADDITIONALASSETATTRIBUTES.ZASSET
WHERE ZDATASTORESUBTYPE = 0 OR ZDATASTORESUBTYPE = 3 """
# WHERE ZDATASTORESUBTYPE = 1 OR ZDATASTORESUBTYPE = 3 """
# WHERE ZDATASTORESUBTYPE = 0 OR ZDATASTORESUBTYPE = 3 """
# WHERE ZINTERNALRESOURCE.ZFINGERPRINT IS NULL AND ZINTERNALRESOURCE.ZDATASTORESUBTYPE = 3 """
)
for row in c:
uuid = row[0]
if uuid in self._dbphotos:
# and self._dbphotos[uuid]["isMissing"] is None:
self._dbphotos[uuid].local_availability = row[1]
self._dbphotos[uuid].remote_availability = row[2]
# old = self._dbphotos[uuid]["isMissing"]
if row[1] != 1:
self._dbphotos[uuid].is_missing = 1
else:
self._dbphotos[uuid].is_missing = 0
# if old is not None and old != self._dbphotos[uuid]["isMissing"]:
# logging.warning(
# f"{uuid} isMissing changed: {old} {self._dbphotos[uuid]['isMissing']}"
# )
# get information on local/remote availability
c.execute(
""" SELECT ZGENERICASSET.ZUUID,
ZINTERNALRESOURCE.ZLOCALAVAILABILITY,
ZINTERNALRESOURCE.ZREMOTEAVAILABILITY
FROM ZGENERICASSET
JOIN ZADDITIONALASSETATTRIBUTES ON ZADDITIONALASSETATTRIBUTES.ZASSET = ZGENERICASSET.Z_PK
JOIN ZINTERNALRESOURCE ON ZINTERNALRESOURCE.ZFINGERPRINT = ZADDITIONALASSETATTRIBUTES.ZMASTERFINGERPRINT """
)
for row in c:
uuid = row[0]
if uuid in self._dbphotos:
self._dbphotos[uuid].local_availability = row[1]
self._dbphotos[uuid].remote_availability = row[2]
# old = self._dbphotos[uuid]["isMissing"]
if row[1] != 1:
self._dbphotos[uuid].is_missing = 1
else:
self._dbphotos[uuid].is_missing = 0
# if old is not None and old != self._dbphotos[uuid]["isMissing"]:
# logging.warning(
# f"{uuid} isMissing changed: {old} {self._dbphotos[uuid]['isMissing']}"
# )
# get information about cloud sync state
c.execute(
""" SELECT
ZGENERICASSET.ZUUID,
ZCLOUDMASTER.ZCLOUDLOCALSTATE
FROM ZCLOUDMASTER, ZGENERICASSET
WHERE ZGENERICASSET.ZMASTER = ZCLOUDMASTER.Z_PK """
)
for row in c:
uuid = row[0]
if uuid in self._dbphotos:
self._dbphotos[uuid].cloud_local_state = row[1]
self._dbphotos[uuid].in_cloud = True if row[1] == 3 else False
# add faces and keywords to photo data
for uuid in self._dbphotos:
# keywords
if uuid in self._dbkeywords_uuid:
self._dbphotos[uuid].has_keywords = 1
self._dbphotos[uuid].keywords = self._dbkeywords_uuid[uuid]
else:
self._dbphotos[uuid].has_keywords = 0
self._dbphotos[uuid].keywords = []
if uuid in self._dbfaces_uuid:
self._dbphotos[uuid].has_persons = 1
self._dbphotos[uuid].persons = self._dbfaces_uuid[uuid]
else:
self._dbphotos[uuid].has_persons = 0
self._dbphotos[uuid].persons = []
if uuid in self._dbalbums_uuid:
self._dbphotos[uuid].has_albums = 1
self._dbphotos[uuid].albums = self._dbalbums_uuid[uuid]
else:
self._dbphotos[uuid].has_albums = 0
self._dbphotos[uuid].albums = []
# build album_titles dictionary
for album_id in self._dbalbum_details:
title = self._dbalbum_details[album_id]["title"]
if title in self._dbalbum_titles:
self._dbalbum_titles[title].append(album_id)
else:
self._dbalbum_titles[title] = [album_id]
# close connection and remove temporary files
conn.close()
# done processing, dump debug data if requested
if _debug():
logging.debug("Faces (_dbfaces_uuid):")
logging.debug(pformat(self._dbfaces_uuid))
logging.debug("Faces by person (_dbfaces_person):")
logging.debug(pformat(self._dbfaces_person))
logging.debug("Keywords by uuid (_dbkeywords_uuid):")
logging.debug(pformat(self._dbkeywords_uuid))
logging.debug("Keywords by keyword (_dbkeywords_keywords):")
logging.debug(pformat(self._dbkeywords_keyword))
logging.debug("Albums by uuid (_dbalbums_uuid):")
logging.debug(pformat(self._dbalbums_uuid))
logging.debug("Albums by album (_dbalbums_albums):")
logging.debug(pformat(self._dbalbums_album))
logging.debug("Album details (_dbalbum_details):")
logging.debug(pformat(self._dbalbum_details))
logging.debug("Album titles (_dbalbum_titles):")
logging.debug(pformat(self._dbalbum_titles))
logging.debug("Volumes (_dbvolumes):")
logging.debug(pformat(self._dbvolumes))
logging.debug("Photos (_dbphotos):")
logging.debug(pformat(self._dbphotos))
logging.debug("Burst Photos (dbphotos_burst:")
logging.debug(pformat(self._dbphotos_burst))
def photos(
self,
keywords=None,

View File

@@ -309,17 +309,32 @@ def create_path_by_date(dest, dt):
def _export_photo_uuid_applescript(
uuid, dest, original=True, edited=False, timeout=120
uuid,
dest,
filestem=None,
original=True,
edited=False,
live_photo=False,
timeout=120,
):
""" Export photo to dest path using applescript to control Photos
If photo is a live photo, exports both the photo and associated .mov file
uuid: UUID of photo to export
dest: destination path to export to; may be either a directory or a filename
if filename provided and file exists, exiting file will be overwritten
dest: destination path to export to
filestem: (string) if provided, exported filename will be named stem.ext
where ext is extension of the file exported by photos (e.g. .jpeg, .mov, etc)
If not provided, file will be named with whatever name Photos uses
If filestem.ext exists, it wil be overwritten
original: (boolean) if True, export original image; default = True
edited: (boolean) if True, export edited photo; default = False
will produce an error if image does not have edits/adjustments
will produce an error if image does not have edits/adjustments
*Note*: must be called with either edited or original but not both,
will raise error if called with both edited and original = True
live_photo: (boolean) if True, export associated .mov live photo; default = False
timeout: timeout value in seconds; export will fail if applescript run time exceeds timeout
Returns: path to exported file or None if export failed
Returns: list of paths to exported file(s) or None if export failed
Note: For Live Photos, if edited=True, will export a jpeg but not the movie, even if photo
has not been edited. This is due to how Photos Applescript interface works.
"""
# setup the applescript to do the export
@@ -352,6 +367,13 @@ def _export_photo_uuid_applescript(
"""
)
dest = pathlib.Path(dest)
if not dest.is_dir:
raise ValueError(f"dest {dest} must be a directory")
if not original ^ edited:
raise ValueError(f"edited or original must be True but not both")
tmpdir = tempfile.TemporaryDirectory(prefix="osxphotos_")
# export original
@@ -366,15 +388,26 @@ def _export_photo_uuid_applescript(
if filename is not None:
# need to find actual filename as sometimes Photos renames JPG to jpeg on export
# this assumes only a single file in export folder, which should be true as
# may be more than one file exported (e.g. if Live Photo, Photos exports both .jpeg and .mov)
# TemporaryDirectory will cleanup on return
path = glob.glob(os.path.join(tmpdir.name, "*"))[0]
_copy_file(path, dest)
if os.path.isdir(dest):
new_path = os.path.join(dest, filename)
else:
new_path = dest
return new_path
files = glob.glob(os.path.join(tmpdir.name, "*"))
exported_paths = []
for fname in files:
path = pathlib.Path(fname)
if len(files) > 1 and not live_photo and path.suffix.lower() == ".mov":
# it's the .mov part of live photo but not requested, so don't export
logging.debug(f"Skipping live photo file {path}")
continue
if filestem:
# rename the file based on filestem, keeping original extension
dest_new = dest / f"{filestem}{path.suffix}"
else:
# use the name Photos provided
dest_new = dest / path.name
logging.debug(f"exporting {path} to dest_new: {dest_new}")
_copy_file(str(path), str(dest_new))
exported_paths.append(str(dest_new))
return exported_paths
else:
return None

View File

@@ -1,6 +1,9 @@
import pytest
from click.testing import CliRunner
CLI_PHOTOS_DB = "tests/Test-10.15.1.photoslibrary"
LIVE_PHOTOS_DB = "tests/Test-Cloud-10.15.1.photoslibrary/database/photos.db"
CLI_OUTPUT_NO_SUBCOMMAND = [
"Options:",
"--db <Photos database path> Specify Photos database path. Path to Photos",
@@ -37,7 +40,14 @@ CLI_EXPORT_FILENAMES = [
CLI_EXPORT_UUID = "D79B8D77-BFFC-460B-9312-034F2877D35B"
CLI_EXPORT_SIDECAR_FILENAMES = ["Pumkins2.jpg", "Pumpkins2.json", "Pumpkins2.xmp"]
CLI_EXPORT_SIDECAR_FILENAMES = ["Pumkins2.jpg", "Pumkins2.json", "Pumkins2.xmp"]
CLI_EXPORT_LIVE = [
"51F2BEF7-431A-4D31-8AC1-3284A57826AE.jpeg",
"51F2BEF7-431A-4D31-8AC1-3284A57826AE.mov",
]
CLI_EXPORT_LIVE_ORIGINAL = ["IMG_0728.JPG", "IMG_0728.mov"]
def test_osxphotos():
@@ -54,16 +64,20 @@ def test_osxphotos():
def test_query_uuid():
import json
import os
import os.path
import osxphotos
from osxphotos.__main__ import query
runner = CliRunner()
cwd = os.getcwd()
result = runner.invoke(
query,
[
"--json",
"--db",
"./tests/Test-10.15.1.photoslibrary",
os.path.join(cwd, CLI_PHOTOS_DB),
# "./tests/Test-10.15.1.photoslibrary",
"--uuid",
"D79B8D77-BFFC-460B-9312-034F2877D35B",
],
@@ -98,7 +112,7 @@ def test_export():
result = runner.invoke(
export,
[
os.path.join(cwd, "tests/Test-10.15.1.photoslibrary"),
os.path.join(cwd, CLI_PHOTOS_DB),
".",
"--original-name",
"--export-edited",
@@ -106,26 +120,32 @@ def test_export():
],
)
files = glob.glob("*.jpg")
assert files.sort() == CLI_EXPORT_FILENAMES.sort()
assert sorted(files) == sorted(CLI_EXPORT_FILENAMES)
def test_query_date():
import json
import osxphotos
import os
import os.path
from osxphotos.__main__ import query
runner = CliRunner()
cwd = os.getcwd()
result = runner.invoke(
query,
[
"--json",
"--db",
"./tests/Test-10.15.1.photoslibrary",
os.path.join(cwd, CLI_PHOTOS_DB),
"--from-date=2018-09-28",
"--to-date=2018-09-28T23:00:00",
],
)
assert result.exit_code == 0
import logging
logging.warning(result.output)
json_got = json.loads(result.output)
assert len(json_got) == 4
@@ -136,6 +156,35 @@ def test_export_sidecar():
import os
import os.path
import osxphotos
from osxphotos.__main__ import cli
runner = CliRunner()
cwd = os.getcwd()
with runner.isolated_filesystem():
result = runner.invoke(
cli,
[
"export",
"--db",
os.path.join(cwd, CLI_PHOTOS_DB),
".",
"--original-name",
"--sidecar=json",
"--sidecar=xmp",
f"--uuid={CLI_EXPORT_UUID}",
"-V",
],
)
files = glob.glob("*.*")
assert sorted(files) == sorted(CLI_EXPORT_SIDECAR_FILENAMES)
def test_export_live():
import glob
import os
import os.path
import osxphotos
from osxphotos.__main__ import export
runner = CliRunner()
@@ -144,14 +193,14 @@ def test_export_sidecar():
result = runner.invoke(
export,
[
os.path.join(cwd, "tests/Test-10.15.1.photoslibrary"),
os.path.join(cwd, LIVE_PHOTOS_DB),
".",
"--live",
"--original-name",
"--export-live",
"-V",
"--sidecar json",
"--sidecar xmp",
f"--uuid {CLI_EXPORT_UUID}",
],
)
files = glob.glob("*")
assert files.sort() == CLI_EXPORT_SIDECAR_FILENAMES.sort()
assert sorted(files) == sorted(CLI_EXPORT_LIVE_ORIGINAL)

View File

@@ -28,3 +28,83 @@ def test_not_live_photo():
assert not photos[0].live_photo
assert photos[0].path_live_photo is None
def test_export_live_1():
# export a live photo and associated .mov
import glob
import os.path
import pathlib
import tempfile
import osxphotos
dest = tempfile.TemporaryDirectory(prefix="osxphotos_")
photosdb = osxphotos.PhotosDB(dbfile=PHOTOS_DB)
photos = photosdb.photos(uuid=[UUID_DICT["live"]])
filename = photos[0].filename
expected_dest = os.path.join(dest.name, filename)
got_dest = photos[0].export(dest.name, live_photo=True)
got_movie = f"{pathlib.Path(got_dest).parent / pathlib.Path(got_dest).stem}.mov"
expected_dest = os.path.join(dest.name, filename)
files = glob.glob(os.path.join(dest.name, "*"))
assert len(files) == 2
assert expected_dest == got_dest
assert expected_dest in files
assert got_movie in files
def test_export_live_2():
# don't export the live photo
import glob
import os.path
import pathlib
import tempfile
import osxphotos
dest = tempfile.TemporaryDirectory(prefix="osxphotos_")
photosdb = osxphotos.PhotosDB(dbfile=PHOTOS_DB)
photos = photosdb.photos(uuid=[UUID_DICT["live"]])
filename = photos[0].filename
expected_dest = os.path.join(dest.name, filename)
got_dest = photos[0].export(dest.name, live_photo=False)
got_movie = f"{pathlib.Path(got_dest).parent / pathlib.Path(got_dest).stem}.mov"
files = glob.glob(os.path.join(dest.name, "*"))
assert len(files) == 1
assert expected_dest == got_dest
assert expected_dest in files
assert got_movie not in files
# def test_export_live_3():
# # export a live photo and associated .mov and edited file
# import glob
# import os.path
# import pathlib
# import tempfile
# import osxphotos
# dest = tempfile.TemporaryDirectory(prefix="osxphotos_")
# photosdb = osxphotos.PhotosDB(dbfile=PHOTOS_DB)
# photos = photosdb.photos(uuid=[UUID_DICT["live"]])
# filename = photos[0].filename
# expected_dest = os.path.join(dest.name, filename)
# got_dest = photos[0].export(dest.name, live_photo=True, edited=True)
# got_movie = f"{pathlib.Path(got_dest).parent / pathlib.Path(got_dest).stem}.mov"
# expected_dest = os.path.join(dest.name, filename)
# files = glob.glob(os.path.join(dest.name, "*"))
# assert len(files) == 2
# assert expected_dest == got_dest
# assert expected_dest in files
# assert got_movie in files