diff --git a/osxphotos/_version.py b/osxphotos/_version.py index 775b76b7..fc0d502f 100644 --- a/osxphotos/_version.py +++ b/osxphotos/_version.py @@ -1,3 +1,3 @@ """ version info """ -__version__ = "0.27.5" +__version__ = "0.27.6" diff --git a/osxphotos/photoinfo.py b/osxphotos/photoinfo.py index 94882912..63247744 100644 --- a/osxphotos/photoinfo.py +++ b/osxphotos/photoinfo.py @@ -34,6 +34,8 @@ from .utils import ( _export_photo_uuid_applescript, _get_resource_loc, dd_to_dms_str, + findfiles, + get_preferred_uti_extension, ) @@ -239,6 +241,60 @@ class PhotoInfo: return photopath + @property + def path_raw(self): + """ absolute path of associated RAW image or None if there is not one """ + + # In Photos 5, raw is in same folder as original but with _4.ext + # Unless "Copy Items to the Photos Library" is not checked + # then RAW image is not renamed but has same name is jpeg buth with raw extension + # Current implementation uses findfiles to find images with the correct raw UTI extension + # in same folder as the original and with same stem as original in form: original_stem*.raw_ext + # TODO: I don't like this -- would prefer a more deterministic approach but until I have more + # data on how Photos stores and retrieves RAW images, this seems to be working + + if self._db._db_version < _PHOTOS_5_VERSION: + logging.warning("Not yet implemented for Photos version < 5") + return None + + if self._info["isMissing"] == 1: + return None # path would be meaningless until downloaded + + if not self.has_raw: + return None # no raw image to get path for + + # if self._info["shared"]: + # # shared photo + # photopath = os.path.join( + # self._db._library_path, + # _PHOTOS_5_SHARED_PHOTO_PATH, + # self._info["directory"], + # self._info["filename"], + # ) + # return photopath + + filestem = pathlib.Path(self._info["filename"]).stem + raw_ext = get_preferred_uti_extension(self._info["UTI_raw"]) + + if self._info["directory"].startswith("/"): + filepath = self._info["directory"] + else: + filepath = os.path.join(self._db._masters_path, self._info["directory"]) + + glob_str = f"{filestem}*.{raw_ext}" + raw_file = findfiles(glob_str, filepath) + if len(raw_file) != 1: + logging.warning(f"Error getting path to RAW file: {filepath}/{glob_str}") + photopath = None + else: + photopath = os.path.join(filepath, raw_file[0]) + if not os.path.isfile(photopath): + logging.debug( + f"MISSING PATH: RAW photo for UUID {self._uuid} should be at {photopath} but does not appear to exist" + ) + photopath = None + return photopath + @property def description(self): """ long / extended description of picture """ @@ -341,6 +397,14 @@ class PhotoInfo: """ return self._info["UTI"] + @property + def uti_raw(self): + """ Returns Uniform Type Identifier (UTI) for the RAW image if there is one + for example: com.canon.cr2-raw-image + Returns None if no associated RAW image + """ + return self._info["UTI_raw"] + @property def ismovie(self): """ Returns True if file is a movie, otherwise False @@ -521,6 +585,22 @@ class PhotoInfo: self._place = None return self._place + @property + def has_raw(self): + """ returns True if photo has an associated RAW image, otherwise False """ + if self._db._db_version < _PHOTOS_5_VERSION: + logging.warning("Not yet implemented for Photos version < 5") + return None + + return self._info["has_raw"] + + @property + def raw_original(self): + """ returns True if associated RAW image and the RAW image is selected in Photos + via "Use RAW as Original " + otherwise returns False """ + return True if self._info["original_resource_choice"] == 1 else False + def export( self, dest, diff --git a/osxphotos/photosdb.py b/osxphotos/photosdb.py index 1b9d6085..9694b57d 100644 --- a/osxphotos/photosdb.py +++ b/osxphotos/photosdb.py @@ -780,6 +780,18 @@ class PhotosDB: self._dbphotos[uuid]["cloudAvailable"] = None self._dbphotos[uuid]["incloud"] = None + # TODO: NOT YET USED -- PLACEHOLDER for RAW processing (currently only in _process_database5) + # original resource choice (e.g. RAW or jpeg) + self._dbphotos[uuid]["original_resource_choice"] = None + + # associated RAW image info + # will be filled in later + self._dbphotos[uuid]["has_raw"] = None + self._dbphotos[uuid]["UTI_raw"] = None + self._dbphotos[uuid]["raw_data_length"] = None + self._dbphotos[uuid]["resource_type"] = None + self._dbphotos[uuid]["datastore_subtype"] = None + # get details needed to find path of the edited photos c.execute( """ SELECT RKVersion.uuid, RKVersion.adjustmentUuid, RKModelResource.modelId, @@ -1230,7 +1242,8 @@ class PhotosDB: ZADDITIONALASSETATTRIBUTES.ZCAMERACAPTUREDEVICE, ZGENERICASSET.ZCLOUDASSETGUID, ZADDITIONALASSETATTRIBUTES.ZREVERSELOCATIONDATA, - ZGENERICASSET.ZMOMENT + ZGENERICASSET.ZMOMENT, + ZADDITIONALASSETATTRIBUTES.ZORIGINALRESOURCECHOICE FROM ZGENERICASSET JOIN ZADDITIONALASSETATTRIBUTES ON ZADDITIONALASSETATTRIBUTES.ZASSET = ZGENERICASSET.Z_PK WHERE ZGENERICASSET.ZTRASHEDSTATE = 0 @@ -1265,6 +1278,7 @@ class PhotosDB: # (e.g. user has "iCloud Photos" checked in Photos preferences) # 25 ZADDITIONALASSETATTRIBUTES.ZREVERSELOCATIONDATA -- reverse geolocation data # 26 ZGENERICASSET.ZMOMENT -- FK for ZMOMENT.Z_PK + # 27 ZADDITIONALASSETATTRIBUTES.ZORIGINALRESOURCECHOICE -- 1 if associated RAW image is original else 0 for row in c: uuid = row[0] @@ -1399,6 +1413,17 @@ class PhotosDB: # moment info info["momentID"] = row[26] + # original resource choice (e.g. RAW or jpeg) + info["original_resource_choice"] = row[27] + + # associated RAW image info + # will be filled in later + info["has_raw"] = False + info["raw_data_length"] = None + info["UTI_raw"] = None + info["datastore_subtype"] = None + info["resource_type"] = None + self._dbphotos[uuid] = info # # if row[19] is not None and ((row[20] == 2) or (row[20] == 4)): @@ -1535,6 +1560,31 @@ class PhotosDB: self._dbphotos[uuid]["cloudLocalState"] = row[1] self._dbphotos[uuid]["incloud"] = True if row[1] == 3 else False + # get information about associted RAW images + # RAW images have ZDATASTORESUBTYPE = 17 + c.execute( + """ SELECT + ZGENERICASSET.ZUUID, + ZINTERNALRESOURCE.ZDATALENGTH, + ZUNIFORMTYPEIDENTIFIER.ZIDENTIFIER, + ZINTERNALRESOURCE.ZDATASTORESUBTYPE, + ZINTERNALRESOURCE.ZRESOURCETYPE + FROM ZGENERICASSET + JOIN ZINTERNALRESOURCE ON ZINTERNALRESOURCE.ZASSET = ZADDITIONALASSETATTRIBUTES.ZASSET + JOIN ZADDITIONALASSETATTRIBUTES ON ZADDITIONALASSETATTRIBUTES.ZASSET = ZGENERICASSET.Z_PK + JOIN ZUNIFORMTYPEIDENTIFIER ON ZUNIFORMTYPEIDENTIFIER.Z_PK = ZINTERNALRESOURCE.ZUNIFORMTYPEIDENTIFIER + WHERE ZINTERNALRESOURCE.ZDATASTORESUBTYPE = 17 + """ + ) + for row in c: + uuid = row[0] + if uuid in self._dbphotos: + self._dbphotos[uuid]["has_raw"] = True + self._dbphotos[uuid]["raw_data_length"] = row[1] + self._dbphotos[uuid]["UTI_raw"] = row[2] + self._dbphotos[uuid]["datastore_subtype"] = row[3] + self._dbphotos[uuid]["resource_type"] = row[4] + # add faces and keywords to photo data for uuid in self._dbphotos: # keywords @@ -1709,565 +1759,6 @@ class PhotosDB: hierarchy = _recurse_folder_hierarchy(folders) return hierarchy - def _process_database5X(self): - """ ALPHA: TESTING using SimpleNamespace to clean up code for info, DO NOT CALL THIS METHOD """ - """ Needs to be updated for changes in process_database5 due to adding PlaceInfo """ - """ process the Photos database to extract info """ - """ works on Photos version >= 5.0 """ - - if _debug(): - logging.debug(f"_process_database5X") - - from types import SimpleNamespace - - _DB_FIELD_NAMES = [ - "adjustment_format_id", - "adjustment_uuid", - "albums", - "burst_key", - "burst_pick_type", - "burst_uuid", - "burst", - "cloud_asset_guid", - "cloud_available", - "cloud_batch_publish_date", - "cloud_library_state", - "cloud_local_state", - "cloud_status", - "custom_rendered_value", - "directory", - "extended_description", - "favorite", - "filename", - "has_adjustments", - "has_albums", - "has_keywords", - "has_persons", - "hdr", - "hidden", - "image_date", - "image_tz_offset_seconds", - "in_cloud", - "is_missing", - "keywords", - "last_modified_date", - "latitude", - "live_photo", - "local_availability", - "longitude", - "master_fingerprint", - "master_uuid", - "model_id", - "name", - "original_filename", - "panorama", - "portrait", - "remote_availability", - "screenshot", - "selfie", - "shared", - "slow_mo", - "subtype", - "time_lapse", - "title", - "type", - "uti", - "uuid", - ] - _DB_FIELDS = {field: None for field in _DB_FIELD_NAMES} - - # Epoch is Jan 1, 2001 - td = (datetime(2001, 1, 1, 0, 0) - datetime(1970, 1, 1, 0, 0)).total_seconds() - - (conn, c) = _open_sql_file(self._tmp_db) - - # Look for all combinations of persons and pictures - if _debug(): - logging.debug(f"Getting information about persons") - - c.execute( - "SELECT ZPERSON.ZFULLNAME, ZGENERICASSET.ZUUID " - "FROM ZPERSON, ZDETECTEDFACE, ZGENERICASSET " - "WHERE ZDETECTEDFACE.ZPERSON = ZPERSON.Z_PK AND ZDETECTEDFACE.ZASSET = ZGENERICASSET.Z_PK " - "AND ZGENERICASSET.ZTRASHEDSTATE = 0" - ) - for person in c: - if person[0] is None: - continue - person_name = person[0] if person[0] != "" else _UNKNOWN_PERSON - if not person[1] in self._dbfaces_uuid: - self._dbfaces_uuid[person[1]] = [] - if not person_name in self._dbfaces_person: - self._dbfaces_person[person_name] = [] - self._dbfaces_uuid[person[1]].append(person_name) - self._dbfaces_person[person_name].append(person[1]) - - if _debug(): - logging.debug(f"Finished walking through persons") - logging.debug(pformat(self._dbfaces_person)) - logging.debug(self._dbfaces_uuid) - - c.execute( - "SELECT ZGENERICALBUM.ZUUID, ZGENERICASSET.ZUUID " - "FROM ZGENERICASSET " - "JOIN Z_26ASSETS ON Z_26ASSETS.Z_34ASSETS = ZGENERICASSET.Z_PK " - "JOIN ZGENERICALBUM ON ZGENERICALBUM.Z_PK = Z_26ASSETS.Z_26ALBUMS " - "WHERE ZGENERICASSET.ZTRASHEDSTATE = 0 " - ) - for album in c: - # store by uuid in _dbalbums_uuid and by album in _dbalbums_album - if not album[1] in self._dbalbums_uuid: - self._dbalbums_uuid[album[1]] = [] - if not album[0] in self._dbalbums_album: - self._dbalbums_album[album[0]] = [] - self._dbalbums_uuid[album[1]].append(album[0]) - self._dbalbums_album[album[0]].append(album[1]) - - # now get additional details about albums - c.execute( - "SELECT " - "ZUUID, " # 0 - "ZTITLE, " # 1 - "ZCLOUDLOCALSTATE, " # 2 - "ZCLOUDOWNERFIRSTNAME, " # 3 - "ZCLOUDOWNERLASTNAME, " # 4 - "ZCLOUDOWNERHASHEDPERSONID " # 5 - "FROM ZGENERICALBUM" - ) - for album in c: - self._dbalbum_details[album[0]] = { - "title": album[1], - "cloudlocalstate": album[2], - "cloudownerfirstname": album[3], - "cloudownderlastname": album[4], - "cloudownerhashedpersonid": album[5], - "cloudlibrarystate": None, # Photos 4 - "cloudidentifier": None, # Photos4 - } - - if _debug(): - logging.debug(f"Finished walking through albums") - logging.debug(pformat(self._dbalbums_album)) - logging.debug(pformat(self._dbalbums_uuid)) - logging.debug(pformat(self._dbalbum_details)) - - # get details on keywords - c.execute( - "SELECT ZKEYWORD.ZTITLE, ZGENERICASSET.ZUUID " - "FROM ZGENERICASSET " - "JOIN ZADDITIONALASSETATTRIBUTES ON ZADDITIONALASSETATTRIBUTES.ZASSET = ZGENERICASSET.Z_PK " - "JOIN Z_1KEYWORDS ON Z_1KEYWORDS.Z_1ASSETATTRIBUTES = ZADDITIONALASSETATTRIBUTES.Z_PK " - "JOIN ZKEYWORD ON ZKEYWORD.Z_PK = Z_1KEYWORDS.Z_37KEYWORDS " - "WHERE ZGENERICASSET.ZTRASHEDSTATE = 0 " - ) - for keyword in c: - if not keyword[1] in self._dbkeywords_uuid: - self._dbkeywords_uuid[keyword[1]] = [] - if not keyword[0] in self._dbkeywords_keyword: - self._dbkeywords_keyword[keyword[0]] = [] - self._dbkeywords_uuid[keyword[1]].append(keyword[0]) - self._dbkeywords_keyword[keyword[0]].append(keyword[1]) - - if _debug(): - logging.debug(f"Finished walking through keywords") - logging.debug(pformat(self._dbkeywords_keyword)) - logging.debug(pformat(self._dbkeywords_uuid)) - - # get details on disk volumes - c.execute("SELECT ZUUID, ZNAME from ZFILESYSTEMVOLUME") - for vol in c: - self._dbvolumes[vol[0]] = vol[1] - - if _debug(): - logging.debug(f"Finished walking through volumes") - logging.debug(self._dbvolumes) - - # get details about photos - logging.debug(f"Getting information about photos") - c.execute( - """SELECT ZGENERICASSET.ZUUID, - ZADDITIONALASSETATTRIBUTES.ZMASTERFINGERPRINT, - ZADDITIONALASSETATTRIBUTES.ZTITLE, - ZADDITIONALASSETATTRIBUTES.ZORIGINALFILENAME, - ZGENERICASSET.ZMODIFICATIONDATE, - ZGENERICASSET.ZDATECREATED, - ZADDITIONALASSETATTRIBUTES.ZTIMEZONEOFFSET, - ZADDITIONALASSETATTRIBUTES.ZINFERREDTIMEZONEOFFSET, - ZADDITIONALASSETATTRIBUTES.ZTIMEZONENAME, - ZGENERICASSET.ZHIDDEN, - ZGENERICASSET.ZFAVORITE, - ZGENERICASSET.ZDIRECTORY, - ZGENERICASSET.ZFILENAME, - ZGENERICASSET.ZLATITUDE, - ZGENERICASSET.ZLONGITUDE, - ZGENERICASSET.ZHASADJUSTMENTS, - ZGENERICASSET.ZCLOUDBATCHPUBLISHDATE, - ZGENERICASSET.ZKIND, - ZGENERICASSET.ZUNIFORMTYPEIDENTIFIER, - ZGENERICASSET.ZAVALANCHEUUID, - ZGENERICASSET.ZAVALANCHEPICKTYPE, - ZGENERICASSET.ZKINDSUBTYPE, - ZGENERICASSET.ZCUSTOMRENDEREDVALUE, - ZADDITIONALASSETATTRIBUTES.ZCAMERACAPTUREDEVICE, - ZGENERICASSET.ZCLOUDASSETGUID - FROM ZGENERICASSET - JOIN ZADDITIONALASSETATTRIBUTES ON ZADDITIONALASSETATTRIBUTES.ZASSET = ZGENERICASSET.Z_PK - WHERE ZGENERICASSET.ZTRASHEDSTATE = 0 - ORDER BY ZGENERICASSET.ZUUID """ - ) - # Order of results - # 0 SELECT ZGENERICASSET.ZUUID, - # 1 ZADDITIONALASSETATTRIBUTES.ZMASTERFINGERPRINT, - # 2 ZADDITIONALASSETATTRIBUTES.ZTITLE, - # 3 ZADDITIONALASSETATTRIBUTES.ZORIGINALFILENAME, - # 4 ZGENERICASSET.ZMODIFICATIONDATE, - # 5 ZGENERICASSET.ZDATECREATED, - # 6 ZADDITIONALASSETATTRIBUTES.ZTIMEZONEOFFSET, - # 7 ZADDITIONALASSETATTRIBUTES.ZINFERREDTIMEZONEOFFSET, - # 8 ZADDITIONALASSETATTRIBUTES.ZTIMEZONENAME, - # 9 ZGENERICASSET.ZHIDDEN, - # 10 ZGENERICASSET.ZFAVORITE, - # 11 ZGENERICASSET.ZDIRECTORY, - # 12 ZGENERICASSET.ZFILENAME, - # 13 ZGENERICASSET.ZLATITUDE, - # 14 ZGENERICASSET.ZLONGITUDE, - # 15 ZGENERICASSET.ZHASADJUSTMENTS - # 16 ZCLOUDBATCHPUBLISHDATE -- If not null, indicates a shared photo - # 17 ZKIND, -- 0 = photo, 1 = movie - # 18 ZUNIFORMTYPEIDENTIFIER -- UTI - # 19 ZGENERICASSET.ZAVALANCHEUUID, -- if not NULL, is burst photo - # 20 ZGENERICASSET.ZAVALANCHEPICKTYPE -- if not 2, is a selected burst photo - # 21 ZGENERICASSET.ZKINDSUBTYPE -- determine if live photos, etc - # 22 ZGENERICASSET.ZCUSTOMRENDEREDVALUE -- determine if HDR photo - # 23 ZADDITIONALASSETATTRIBUTES.ZCAMERACAPTUREDEVICE -- 1 if selfie (front facing camera) - # 25 ZGENERICASSET.ZCLOUDASSETGUID -- not null if asset is cloud asset - # (e.g. user has "iCloud Photos" checked in Photos preferences) - - for row in c: - info = SimpleNamespace(**_DB_FIELDS) - info.uuid = uuid = row[0] # stored here for easier debugging - info.master_fingerprint = row[1] - info.title = info.name = row[2] # TODO: replace all uses of name with title - - # There are sometimes negative values for lastmodifieddate in the database - # I don't know what these mean but they will raise exception in datetime if - # not accounted for - if row[4] is not None and row[4] >= 0: - info.last_modified_date = datetime.fromtimestamp(row[4] + td) - else: - info.last_modified_dat = None - - info.image_date = datetime.fromtimestamp(row[5] + td) - info.image_tz_offset_seconds = row[6] - info.hidden = row[9] - info.favorite = row[10] - info.original_filename = row[3] - info.filename = row[12] - info.directory = row[11] - - # set latitude and longitude - # if both latitude and longitude = -180.0, then they are NULL - if row[13] == -180.0 and row[14] == -180.0: - info.latitude = None - info.longitude = None - else: - info.latitude = row[13] - info.longitude = row[14] - - info.has_adjustments = row[15] - - info.cloud_batch_publish_date = row[16] - info.shared = True if row[16] is not None else False - - # these will get filled in later - # init to avoid key errors - # info.extended_description = None # fill this in later - # info.local_availability = None - # info.remote_availability = None - # info.is_missing = None - # info.has_adjustments = None - # info.adjustment_format_id = None - - # find type - if row[17] == 0: - info.type = _PHOTO_TYPE - elif row[17] == 1: - info.type = _MOVIE_TYPE - else: - if _debug(): - logging.debug(f"WARNING: {uuid} found unknown type {row[17]}") - info.type = None - - info.uti = row[18] - - # handle burst photos - # if burst photo, determine whether or not it's a selected burst photo - # in Photos 5, burstUUID is called avalancheUUID - info.burst_uuid = row[19] # avalancheUUID - info.burst_pick_type = row[20] # avalanchePickType - if row[19] is not None: - # it's a burst photo - info.burst = True - burst_uuid = row[19] - if burst_uuid not in self._dbphotos_burst: - self._dbphotos_burst[burst_uuid] = set() - self._dbphotos_burst[burst_uuid].add(uuid) - if row[20] != 2 and row[20] != 4: - info.burst_key = True # it's a key photo (selected from the burst) - else: - info.burst_key = ( - False # it's a burst photo but not one that's selected - ) - else: - # not a burst photo - info.burst = False - info.burst_key = None - - # Info on sub-type (live photo, panorama, etc) - # ZGENERICASSET.ZKINDSUBTYPE - # 1 == panorama - # 2 == live photo - # 10 = screenshot - # 100 = shared movie (MP4) ?? - # 101 = slow-motion video - # 102 = Time lapse video - info.subtype = row[21] - info.live_photo = True if row[21] == 2 else False - info.screenshot = True if row[21] == 10 else False - info.slow_mo = True if row[21] == 101 else False - info.time_lapse = True if row[21] == 102 else False - - # Handle HDR photos and portraits - # ZGENERICASSET.ZCUSTOMRENDEREDVALUE - # 3 = HDR photo - # 4 = non-HDR version of the photo - # 6 = panorama - # 8 = portrait - info.custom_rendered_value = row[22] - info.hdr = True if row[22] == 3 else False - info.portrait = True if row[22] == 8 else False - - # Set panorama from either KindSubType or RenderedValue - info.panorama = True if row[21] == 1 or row[22] == 6 else False - - # Handle selfies (front facing camera, ZCAMERACAPTUREDEVICE=1) - info.selfie = True if row[23] == 1 else False - - # Determine if photo is part of cloud library (ZGENERICASSET.ZCLOUDASSETGUID not NULL) - # Initialize cloud fields that will filled in later - info.cloud_asset_guid = row[24] - info.cloud_local_state = None - info.in_cloud = None - info.cloud_library_state = None # Photos 4 - info.cloud_status = None # Photos 4 - info.cloud_available = None # Photos 4 - - self._dbphotos[uuid] = info - - # # if row[19] is not None and ((row[20] == 2) or (row[20] == 4)): - # # burst photo - # if row[19] is not None: - # # burst photo, add to _dbphotos_burst - # info["burst"] = True - # burst_uuid = row[19] - # if burst_uuid not in self._dbphotos_burst: - # self._dbphotos_burst[burst_uuid] = {} - # self._dbphotos_burst[burst_uuid][uuid] = info - # else: - # info["burst"] = False - - # Get extended description - c.execute( - "SELECT ZGENERICASSET.ZUUID, " - "ZASSETDESCRIPTION.ZLONGDESCRIPTION " - "FROM ZGENERICASSET " - "JOIN ZADDITIONALASSETATTRIBUTES ON ZADDITIONALASSETATTRIBUTES.ZASSET = ZGENERICASSET.Z_PK " - "JOIN ZASSETDESCRIPTION ON ZASSETDESCRIPTION.Z_PK = ZADDITIONALASSETATTRIBUTES.ZASSETDESCRIPTION " - "ORDER BY ZGENERICASSET.ZUUID " - ) - for row in c: - uuid = row[0] - if uuid in self._dbphotos: - self._dbphotos[uuid].extended_description = row[1] - else: - if _debug(): - logging.debug( - f"WARNING: found description {row[1]} but no photo for {uuid}" - ) - - # get information about adjusted/edited photos - c.execute( - "SELECT ZGENERICASSET.ZUUID, " - "ZGENERICASSET.ZHASADJUSTMENTS, " - "ZUNMANAGEDADJUSTMENT.ZADJUSTMENTFORMATIDENTIFIER " - "FROM ZGENERICASSET, ZUNMANAGEDADJUSTMENT " - "JOIN ZADDITIONALASSETATTRIBUTES ON ZADDITIONALASSETATTRIBUTES.ZASSET = ZGENERICASSET.Z_PK " - "WHERE ZADDITIONALASSETATTRIBUTES.ZUNMANAGEDADJUSTMENT = ZUNMANAGEDADJUSTMENT.Z_PK " - "AND ZGENERICASSET.ZTRASHEDSTATE = 0 " - ) - for row in c: - uuid = row[0] - if uuid in self._dbphotos: - self._dbphotos[uuid].adjustment_format_id = row[2] - else: - if _debug(): - logging.debug( - f"WARNING: found adjustmentformatidentifier {row[2]} but no photo for uuid {row[0]}" - ) - - # Find missing photos - # TODO: this code is very kludgy and I had to make lots of assumptions - # it's probably wrong and needs to be re-worked once I figure out how to reliably - # determine if a photo is missing in Photos 5 - - # Get info on remote/local availability for photos in shared albums - # Shared photos have a null fingerprint (and some other photos do too) - # TODO: There may be a bug here, perhaps ZDATASTORESUBTYPE should be 1 --> it's the longest ZDATALENGTH (is this the original) - c.execute( - """ SELECT - ZGENERICASSET.ZUUID, - ZINTERNALRESOURCE.ZLOCALAVAILABILITY, - ZINTERNALRESOURCE.ZREMOTEAVAILABILITY - FROM ZGENERICASSET - JOIN ZADDITIONALASSETATTRIBUTES ON ZADDITIONALASSETATTRIBUTES.ZASSET = ZGENERICASSET.Z_PK - JOIN ZINTERNALRESOURCE ON ZINTERNALRESOURCE.ZASSET = ZADDITIONALASSETATTRIBUTES.ZASSET - WHERE ZDATASTORESUBTYPE = 0 OR ZDATASTORESUBTYPE = 3 """ - # WHERE ZDATASTORESUBTYPE = 1 OR ZDATASTORESUBTYPE = 3 """ - # WHERE ZDATASTORESUBTYPE = 0 OR ZDATASTORESUBTYPE = 3 """ - # WHERE ZINTERNALRESOURCE.ZFINGERPRINT IS NULL AND ZINTERNALRESOURCE.ZDATASTORESUBTYPE = 3 """ - ) - - for row in c: - uuid = row[0] - if uuid in self._dbphotos: - # and self._dbphotos[uuid]["isMissing"] is None: - self._dbphotos[uuid].local_availability = row[1] - self._dbphotos[uuid].remote_availability = row[2] - - # old = self._dbphotos[uuid]["isMissing"] - - if row[1] != 1: - self._dbphotos[uuid].is_missing = 1 - else: - self._dbphotos[uuid].is_missing = 0 - - # if old is not None and old != self._dbphotos[uuid]["isMissing"]: - # logging.warning( - # f"{uuid} isMissing changed: {old} {self._dbphotos[uuid]['isMissing']}" - # ) - - # get information on local/remote availability - c.execute( - """ SELECT ZGENERICASSET.ZUUID, - ZINTERNALRESOURCE.ZLOCALAVAILABILITY, - ZINTERNALRESOURCE.ZREMOTEAVAILABILITY - FROM ZGENERICASSET - JOIN ZADDITIONALASSETATTRIBUTES ON ZADDITIONALASSETATTRIBUTES.ZASSET = ZGENERICASSET.Z_PK - JOIN ZINTERNALRESOURCE ON ZINTERNALRESOURCE.ZFINGERPRINT = ZADDITIONALASSETATTRIBUTES.ZMASTERFINGERPRINT """ - ) - - for row in c: - uuid = row[0] - if uuid in self._dbphotos: - self._dbphotos[uuid].local_availability = row[1] - self._dbphotos[uuid].remote_availability = row[2] - - # old = self._dbphotos[uuid]["isMissing"] - - if row[1] != 1: - self._dbphotos[uuid].is_missing = 1 - else: - self._dbphotos[uuid].is_missing = 0 - - # if old is not None and old != self._dbphotos[uuid]["isMissing"]: - # logging.warning( - # f"{uuid} isMissing changed: {old} {self._dbphotos[uuid]['isMissing']}" - # ) - - # get information about cloud sync state - c.execute( - """ SELECT - ZGENERICASSET.ZUUID, - ZCLOUDMASTER.ZCLOUDLOCALSTATE - FROM ZCLOUDMASTER, ZGENERICASSET - WHERE ZGENERICASSET.ZMASTER = ZCLOUDMASTER.Z_PK """ - ) - for row in c: - uuid = row[0] - if uuid in self._dbphotos: - self._dbphotos[uuid].cloud_local_state = row[1] - self._dbphotos[uuid].in_cloud = True if row[1] == 3 else False - - # add faces and keywords to photo data - for uuid in self._dbphotos: - # keywords - if uuid in self._dbkeywords_uuid: - self._dbphotos[uuid].has_keywords = 1 - self._dbphotos[uuid].keywords = self._dbkeywords_uuid[uuid] - else: - self._dbphotos[uuid].has_keywords = 0 - self._dbphotos[uuid].keywords = [] - - if uuid in self._dbfaces_uuid: - self._dbphotos[uuid].has_persons = 1 - self._dbphotos[uuid].persons = self._dbfaces_uuid[uuid] - else: - self._dbphotos[uuid].has_persons = 0 - self._dbphotos[uuid].persons = [] - - if uuid in self._dbalbums_uuid: - self._dbphotos[uuid].has_albums = 1 - self._dbphotos[uuid].albums = self._dbalbums_uuid[uuid] - else: - self._dbphotos[uuid].has_albums = 0 - self._dbphotos[uuid].albums = [] - - # build album_titles dictionary - for album_id in self._dbalbum_details: - title = self._dbalbum_details[album_id]["title"] - if title in self._dbalbum_titles: - self._dbalbum_titles[title].append(album_id) - else: - self._dbalbum_titles[title] = [album_id] - - # close connection and remove temporary files - conn.close() - - # done processing, dump debug data if requested - if _debug(): - logging.debug("Faces (_dbfaces_uuid):") - logging.debug(pformat(self._dbfaces_uuid)) - - logging.debug("Faces by person (_dbfaces_person):") - logging.debug(pformat(self._dbfaces_person)) - - logging.debug("Keywords by uuid (_dbkeywords_uuid):") - logging.debug(pformat(self._dbkeywords_uuid)) - - logging.debug("Keywords by keyword (_dbkeywords_keywords):") - logging.debug(pformat(self._dbkeywords_keyword)) - - logging.debug("Albums by uuid (_dbalbums_uuid):") - logging.debug(pformat(self._dbalbums_uuid)) - - logging.debug("Albums by album (_dbalbums_albums):") - logging.debug(pformat(self._dbalbums_album)) - - logging.debug("Album details (_dbalbum_details):") - logging.debug(pformat(self._dbalbum_details)) - - logging.debug("Album titles (_dbalbum_titles):") - logging.debug(pformat(self._dbalbum_titles)) - - logging.debug("Volumes (_dbvolumes):") - logging.debug(pformat(self._dbvolumes)) - - logging.debug("Photos (_dbphotos):") - logging.debug(pformat(self._dbphotos)) - - logging.debug("Burst Photos (dbphotos_burst:") - logging.debug(pformat(self._dbphotos_burst)) - def photos( self, keywords=None, diff --git a/osxphotos/utils.py b/osxphotos/utils.py index ed1f72a3..ef1aede5 100644 --- a/osxphotos/utils.py +++ b/osxphotos/utils.py @@ -1,8 +1,11 @@ +import fnmatch import glob import logging +import os import os.path import pathlib import platform +import re import sqlite3 import subprocess import sys @@ -11,6 +14,7 @@ import urllib.parse from plistlib import load as plistload import CoreFoundation +import CoreServices import objc from Foundation import * @@ -302,6 +306,28 @@ def create_path_by_date(dest, dt): return new_dest +def get_preferred_uti_extension(uti): + """ get preferred extension for a UTI type + uti: UTI str, e.g. 'public.jpeg' + returns: preferred extension as str """ + + # reference: https://developer.apple.com/documentation/coreservices/1442744-uttypecopypreferredtagwithclass?language=objc + + ext = CoreServices.UTTypeCopyPreferredTagWithClass( + uti, CoreServices.kUTTagClassFilenameExtension + ) + return ext + + +def findfiles(pattern, path_): + """Returns list of filenames from path_ matched by pattern + shell pattern. Matching is case-insensitive.""" + # See: https://gist.github.com/techtonik/5694830 + + rule = re.compile(fnmatch.translate(pattern), re.IGNORECASE) + return [name for name in os.listdir(path_) if rule.match(name)] + + # TODO: this doesn't always work, still looking for a way to # force Photos to open the library being operated on # def _open_photos_library_applescript(library_path): diff --git a/tests/test_utils.py b/tests/test_utils.py index a534b76b..fe3872e4 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -4,6 +4,8 @@ DB_LOCKED_10_12 = "./tests/Test-Lock-10_12.photoslibrary/database/photos.db" DB_LOCKED_10_15 = "./tests/Test-Lock-10_15_1.photoslibrary/database/Photos.sqlite" DB_UNLOCKED_10_15 = "./tests/Test-10.15.1.photoslibrary/database/photos.db" +UTI_DICT = {"public.jpeg": "jpeg", "com.canon.cr2-raw-image": "cr2"} + def test_debug_enable(): import osxphotos @@ -89,3 +91,26 @@ def test_copy_file_norsrc(): result = _copy_file(src, temp_dir.name, norsrc=True) assert result == 0 assert os.path.isfile(os.path.join(temp_dir.name, "wedding.jpg")) + + +def test_get_preferred_uti_extension(): + from osxphotos.utils import get_preferred_uti_extension + + for uti, extension in UTI_DICT.items(): + assert get_preferred_uti_extension(uti) == extension + + +def test_findfiles(): + import tempfile + import os.path + from osxphotos.utils import findfiles + + temp_dir = tempfile.TemporaryDirectory(prefix="osxphotos_") + fd = open(os.path.join(temp_dir.name, "file1.jpg"), "w+") + fd.close + fd = open(os.path.join(temp_dir.name, "file2.JPG"), "w+") + fd.close + files = findfiles("*.jpg", temp_dir.name) + assert len(files) == 2 + assert "file1.jpg" in files + assert "file2.JPG" in files