Added MomentInfo for Photos 5+, #71

This commit is contained in:
Rhet Turnbull 2021-12-05 19:12:29 -08:00
parent 3e038bf124
commit a52b4d2f43
3 changed files with 195 additions and 0 deletions

69
osxphotos/momentinfo.py Normal file
View File

@ -0,0 +1,69 @@
"""MomentInfo class with details about photo moments."""
class MomentInfo:
"""Info about a photo moment"""
def __init__(self, db, moment_pk):
"""Initialize with a moment PK; returns None if PK not found."""
self._db = db
self._pk = moment_pk
self._moment = self._db._db_moment_pk.get(moment_pk)
if not self._moment:
raise ValueError(f"No moment with PK {moment_pk}")
@property
def pk(self):
"""Primary key of the moment."""
return self._pk
@property
def location(self):
"""Location of the moment."""
return (self._moment.get("latitude"), self._moment.get("longitude"))
@property
def title(self):
"""Title of the moment."""
return self._moment.get("title")
@property
def subtitle(self):
"""Subtitle of the moment."""
return self._moment.get("subtitle")
@property
def start_date(self):
"""Start date of the moment."""
return self._moment.get("startDate")
@property
def end_date(self):
"""Stop date of the moment."""
return self._moment.get("endDate")
@property
def date(self):
"""Date of the moment."""
return self._moment.get("representativeDate")
@property
def modification_date(self):
"""Modification date of the moment."""
return self._moment.get("modificationDate")
@property
def photos(self):
"""All photos in this moment"""
try:
return self._photos
except AttributeError:
photo_uuids = [
uuid
for uuid, photo in self._db._dbphotos.items()
if photo["momentID"] == self._pk
]
self._photos = self._db.photos_by_uuid(photo_uuids)
return self._photos

View File

@ -35,6 +35,7 @@ from .._constants import (
)
from ..adjustmentsinfo import AdjustmentsInfo
from ..albuminfo import AlbumInfo, ImportInfo
from ..momentinfo import MomentInfo
from ..personinfo import FaceInfo, PersonInfo
from ..phototemplate import PhotoTemplate, RenderOptions
from ..placeinfo import PlaceInfo4, PlaceInfo5
@ -494,6 +495,18 @@ class PhotoInfo:
self._faceinfo = []
return self._faceinfo
@property
def moment(self):
"""Moment photo belongs to"""
try:
return self._moment
except AttributeError:
try:
self._moment = MomentInfo(db=self._db, moment_pk=self._info["momentID"])
except ValueError:
self._moment = None
return self._moment
@property
def albums(self):
"""list of albums picture is contained in"""

View File

@ -18,6 +18,7 @@ from typing import List
import bitmath
import photoscript
from rich import print
from .._constants import (
_DB_TABLE_NAMES,
@ -250,6 +251,10 @@ class PhotosDB:
# Dict to hold information on volume names (Photos 5+)
self._db_filesystem_volumes = {}
# Dict to hold information on moments (Photos 5+)
# key is Z_PK of ZMOMENT table and values are the moment info
self._db_moment_pk = {}
if _debug():
logging.debug(f"dbfile = {dbfile}")
@ -2491,6 +2496,10 @@ class PhotosDB:
verbose("Processing comments and likes for shared photos.")
self._process_comments()
# process moments
verbose("Processing moments.")
self._process_moments()
# done processing, dump debug data if requested
verbose("Done processing details from Photos library.")
if _debug():
@ -2536,6 +2545,109 @@ class PhotosDB:
logging.debug("Burst Photos (dbphotos_burst:")
logging.debug(pformat(self._dbphotos_burst))
def _process_moments(self):
"""Process data from ZMOMENT table"""
# _db_moment_pk is dict in form {pk: {moment info}} by ZMOMENT.Z_PK
if self._db_version <= _PHOTOS_4_VERSION:
raise NotImplementedError(
f"Moment info implemented for this database version"
)
else:
self._process_moment_5()
def _process_moment_5(self):
"""Process moment info for Photos 5 databases"""
self._db_moment_pk = {}
results = self.execute(
f"""
SELECT
Z_PK,
ZTIMEZONEOFFSET,
ZTRASHEDSTATE,
ZAPPROXIMATELATITUDE,
ZAPPROXIMATELONGITUDE,
ZENDDATE,
ZMODIFICATIONDATE,
ZREPRESENTATIVEDATE,
ZSTARTDATE,
ZSUBTITLE,
ZTITLE,
ZUUID
FROM ZMOMENT"""
)
# results
# 0 Z_PK,
# 1 ZTIMEZONEOFFSET,
# 2 ZTRASHEDSTATE,
# 3 ZAPPROXIMATELATITUDE,
# 4 ZAPPROXIMATELONGITUDE,
# 5 ZENDDATE,
# 6 ZMODIFICATIONDATE,
# 7 ZREPRESENTATIVEDATE,
# 8 ZSTARTDATE,
# 9 ZSUBTITLE,
# 10 ZTITLE,
# 11 ZUUID
for row in results:
moment_info = {}
moment_info["pk"] = row[0]
moment_info["timezoneOffset"] = row[1]
moment_info["trashedState"] = row[2]
moment_info["approximateLatitude"] = row[3]
moment_info["approximateLongitude"] = row[4]
moment_info["endDate"] = row[5]
moment_info["modificationDate"] = row[6]
moment_info["representativeDate"] = row[7]
moment_info["startDate"] = row[8]
moment_info["subtitle"] = row[9]
moment_info["title"] = row[10]
moment_info["uuid"] = row[11]
# if both lat/lon == -180, then it means location undefined
if (
moment_info["approximateLatitude"] == -180.0
and moment_info["approximateLongitude"] == -180.0
):
moment_info["latitude"] = None
moment_info["longitude"] = None
else:
moment_info["latitude"] = moment_info["approximateLatitude"]
moment_info["longitude"] = moment_info["approximateLongitude"]
# process date stamps
offset_seconds = moment_info["timezoneOffset"] or 0
delta = timedelta(seconds=offset_seconds)
tz = timezone(delta)
for date_name in [
"startDate",
"endDate",
"modificationDate",
"representativeDate",
]:
date_stamp = moment_info[date_name]
try:
moment_date = datetime.fromtimestamp(date_stamp + TIME_DELTA)
# save raw time stamp valu
moment_info[date_name + "_stamp"] = moment_info[date_name]
moment_info[date_name] = moment_date.astimezone(tz=tz)
except ValueError:
# sometimes imageDate is invalid so use 1 Jan 1970 in UTC as image date
moment_date = datetime(1970, 1, 1)
tz = timezone(timedelta(0))
moment_info[date_name + "_stamp"] = date_stamp
moment_info[date_name] = moment_date.astimezone(tz=tz)
# process title/subtitle
moment_info["title"] = moment_info["title"] or ""
moment_info["subtitle"] = moment_info["subtitle"] or ""
self._db_moment_pk[moment_info["pk"]] = moment_info
def _build_album_folder_hierarchy_5(self, uuid, folders=None):
"""recursively build folder/album hierarchy
uuid: uuid of the album/folder being processed
@ -3366,6 +3478,7 @@ class PhotosDB:
def execute(self, sql):
"""Execute sql statement and return cursor"""
self._db_connection, _ = self.get_db_connection()
return self._db_connection.cursor().execute(sql)
def _duplicate_signature(self, uuid):