Refactored photosdb and photoinfo to add SearchInfo and labels

This commit is contained in:
Rhet Turnbull
2020-05-10 19:55:09 -07:00
parent 397db0d72f
commit 98b3f63a92
13 changed files with 707 additions and 21 deletions

View File

@@ -60,3 +60,6 @@ _MAX_IPTC_KEYWORD_LEN = 64
# Sentinel value for detecting if a template in keyword_template doesn't match
# If anyone has a keyword matching this, then too bad...
_OSXPHOTOS_NONE_SENTINEL = "OSXPhotosXYZZY42_Sentinel$"
# SearchInfo categories for Photos 5, corresponds to categories in database/search/psi.sqlite
SEARCH_CATEGORY_LABEL = 2024

View File

@@ -1,3 +1,3 @@
""" version info """
__version__ = "0.28.14"
__version__ = "0.28.15"

View File

@@ -0,0 +1,8 @@
"""
PhotoInfo class
Represents a single photo in the Photos library and provides access to the photo's attributes
PhotosDB.photos() returns a list of PhotoInfo objects
"""
from .photoinfo import PhotoInfo

View File

@@ -19,7 +19,7 @@ from pprint import pformat
import yaml
from mako.template import Template
from ._constants import (
from .._constants import (
_MAX_IPTC_KEYWORD_LEN,
_MOVIE_TYPE,
_OSXPHOTOS_NONE_SENTINEL,
@@ -30,16 +30,16 @@ from ._constants import (
_UNKNOWN_PERSON,
_XMP_TEMPLATE_NAME,
)
from .albuminfo import AlbumInfo
from .datetime_formatter import DateTimeFormatter
from .exiftool import ExifTool
from .placeinfo import PlaceInfo4, PlaceInfo5
from .template import (
from ..albuminfo import AlbumInfo
from ..datetime_formatter import DateTimeFormatter
from ..exiftool import ExifTool
from ..placeinfo import PlaceInfo4, PlaceInfo5
from ..template import (
MULTI_VALUE_SUBSTITUTIONS,
TEMPLATE_SUBSTITUTIONS,
TEMPLATE_SUBSTITUTIONS_MULTI_VALUED,
)
from .utils import (
from ..utils import (
_hardlink_file,
_copy_file,
_export_photo_uuid_applescript,
@@ -49,8 +49,11 @@ from .utils import (
get_preferred_uti_extension,
)
# Mixins
from .photoinfo_mixin_searchinfo import PhotoInfoMixinSearchInfo, SearchInfo
class PhotoInfo:
class PhotoInfo(PhotoInfoMixinSearchInfo):
"""
Info about a specific photo, contains all the details about the photo
including keywords, persons, albums, uuid, path, etc.
@@ -808,7 +811,7 @@ class PhotoInfo:
if export_as_hardlink:
_hardlink_file(src, dest)
else:
_copy_file(src, dest, norsrc=no_xattr)
_copy_file(src, dest, norsrc=no_xattr)
exported_files.append(str(dest))
# copy live photo associated .mov if requested

View File

@@ -0,0 +1,96 @@
""" SearchInfo class exposing labels and other search info for Photos 5 databases
and
PhotoInfoMixinSearchInfo mixin class for PhotoInfo """
from .._constants import _PHOTOS_4_VERSION, SEARCH_CATEGORY_LABEL
class PhotoInfoMixinSearchInfo:
""" Mixin class for PhotoInfo exposing SearchInfo data such as labels
Adds the following properties to PhotoInfo (valid only for Photos 5):
search_info: returns a SearchInfo object
labels: returns list of labels
labels_normalized: returns list of normalized labels
"""
@property
def search_info(self):
""" returns SearchInfo object for photo
only valid on Photos 5, on older libraries, returns None
"""
if self._db._db_version <= _PHOTOS_4_VERSION:
return None
# memoize SearchInfo object
try:
return self._search_info
except AttributeError:
self._search_info = SearchInfo(self)
return self._search_info
@property
def labels(self):
""" returns list of labels applied to photo by Photos image categorization
only valid on Photos 5, on older libraries returns empty list
"""
if self._db._db_version <= _PHOTOS_4_VERSION:
return []
return self.search_info.labels
@property
def labels_normalized(self):
""" returns normalized list of labels applied to photo by Photos image categorization
only valid on Photos 5, on older libraries returns empty list
"""
if self._db._db_version <= _PHOTOS_4_VERSION:
return []
return self.search_info.labels_normalized
class SearchInfo:
""" Info about search terms such as machine learning labels that Photos knows about a photo """
def __init__(self, photo):
""" photo: PhotoInfo object """
if photo._db._db_version <= _PHOTOS_4_VERSION:
raise NotImplementedError(
f"search info not implemented for this database version"
)
self._photo = photo
self.uuid = photo.uuid
try:
# get search info for this UUID
# there might not be any search info data (e.g. if Photo was missing or photoanalysisd not run yet)
self._db_searchinfo = photo._db._db_searchinfo_uuid[self.uuid]
except KeyError:
self._db_searchinfo = None
@property
def labels(self):
""" return list of labels associated with Photo """
if self._db_searchinfo:
labels = [
rec["content_string"]
for rec in self._db_searchinfo
if rec["category"] == SEARCH_CATEGORY_LABEL
]
else:
labels = []
return labels
@property
def labels_normalized(self):
""" return list of normalized labels associated with Photo """
if self._db_searchinfo:
labels = [
rec["normalized_string"]
for rec in self._db_searchinfo
if rec["category"] == SEARCH_CATEGORY_LABEL
]
else:
labels = []
return labels

View File

@@ -0,0 +1,6 @@
"""
PhotosDB class
Processes a Photos.app library database to extract information about photos
"""
from .photosdb import PhotosDB

View File

@@ -15,7 +15,7 @@ from datetime import datetime
from pprint import pformat
from shutil import copyfile
from ._constants import (
from .._constants import (
_MOVIE_TYPE,
_PHOTO_TYPE,
_PHOTOS_3_VERSION,
@@ -32,10 +32,10 @@ from ._constants import (
_TESTED_OS_VERSIONS,
_UNKNOWN_PERSON,
)
from ._version import __version__
from .albuminfo import AlbumInfo, FolderInfo
from .photoinfo import PhotoInfo
from .utils import (
from .._version import __version__
from ..albuminfo import AlbumInfo, FolderInfo
from ..photoinfo import PhotoInfo
from ..utils import (
_check_file_exists,
_db_is_locked,
_debug,
@@ -44,6 +44,9 @@ from .utils import (
get_last_library_path,
)
# mixins
from .photosdb_mixin_searchinfo import PhotosDBMixinSearchInfo
# TODO: Add test for imageTimeZoneOffsetSeconds = None
# TODO: Fix command line so multiple --keyword, etc. are AND (instead of OR as they are in .photos())
# Or fix the help text to match behavior
@@ -52,7 +55,7 @@ from .utils import (
# TODO: fix "if X not in y" dictionary checks to use try/except EAFP style
class PhotosDB:
class PhotosDB(PhotosDBMixinSearchInfo):
""" Processes a Photos.app library database to extract information about photos """
def __init__(self, *dbfile_, dbfile=None):
@@ -1842,6 +1845,9 @@ class PhotosDB:
# close connection and remove temporary files
conn.close()
# process search info
self._process_searchinfo()
# done processing, dump debug data if requested
if _debug():
logging.debug("Faces (_dbfaces_uuid):")

View File

@@ -0,0 +1,189 @@
""" Mixin class for PhotosDB to add Photos 5 search info such as machine learning labels """
import logging
import pathlib
import uuid as uuidlib
from pprint import pformat
from .._constants import _PHOTOS_4_VERSION, SEARCH_CATEGORY_LABEL
from ..utils import _db_is_locked, _debug, _open_sql_file
class PhotosDBMixinSearchInfo:
""" Mixin class to extend PhotosDB to process search info terms
This mixin adds the following method to PhotosDB:
_process_searchinfo: process search terms from psi.sqlite
The following properties are added to PhotosDB
labels: list of all labels in the library
labels_normalized: list of all labels normalized in the library
labels_as_dict: dict of {label: count of photos} in reverse sorted order (most photos first)
labels_normalized_as_dict: dict of {normalized label: count of photos} in reverse sorted order (most photos first)
The following data structures are added to PhotosDB
self._db_searchinfo_categories
self._db_searchinfo_uuid
self._db_searchinfo_labels
self._db_searchinfo_labels_normalized
These methods only work on Photos 5 databases. Will print warning on earlier library versions.
"""
def _process_searchinfo(self):
""" load machine learning/search term label info from a Photos library
db_connection: a connection to the SQLite database file containing the
search terms. In Photos 5, this is called psi.sqlite
Note: Only works on Photos version == 5.0 """
if self._db_version <= _PHOTOS_4_VERSION:
raise NotImplementedError(
f"search info not implemented for this database version"
)
search_db_path = pathlib.Path(self._dbfile).parent / "search" / "psi.sqlite"
if not search_db_path.exists():
raise FileNotFoundError(f"could not find search db: {search_db_path}")
if _db_is_locked(search_db_path):
search_db = self._copy_db_file(search_db_path)
else:
search_db = search_db_path
(conn, c) = _open_sql_file(search_db)
result = conn.execute(
"""
select
ga.rowid,
assets.uuid_0,
assets.uuid_1,
groups.rowid as groupid,
groups.category,
groups.owning_groupid,
groups.content_string,
groups.normalized_string,
groups.lookup_identifier
from
ga
join groups on groups.rowid = ga.groupid
join assets on ga.assetid = assets.rowid
order by
ga.rowid
"""
)
# _db_searchinfo_uuid is dict in form {uuid : [list of associated search info records]
_db_searchinfo_uuid = {}
# _db_searchinfo_categories is dict in form {search info category id: list normalized strings for the category
# right now, this is mostly for debugging to easily see which search terms are in the library
_db_searchinfo_categories = {}
# _db_searchinfo_labels is dict in form {normalized label: [list of photo uuids]}
# this serves as a reverse index from label to photos containing the label
# _db_searchinfo_labels_normalized is the same but with normalized (lower case) version of the label
_db_searchinfo_labels = {}
_db_searchinfo_labels_normalized = {}
cols = [c[0] for c in result.description]
for row in result.fetchall():
record = dict(zip(cols, row))
uuid = ints_to_uuid(record["uuid_0"], record["uuid_1"])
# strings have null character appended, so strip it
for key in record:
if isinstance(record[key], str):
record[key] = record[key].replace("\x00", "")
try:
_db_searchinfo_uuid[uuid].append(record)
except KeyError:
_db_searchinfo_uuid[uuid] = [record]
category = record["category"]
try:
_db_searchinfo_categories[record["category"]].append(
record["normalized_string"]
)
except KeyError:
_db_searchinfo_categories[record["category"]] = [
record["normalized_string"]
]
if record["category"] == SEARCH_CATEGORY_LABEL:
label = record["content_string"]
label_norm = record["normalized_string"]
try:
_db_searchinfo_labels[label].append(uuid)
_db_searchinfo_labels_normalized[label_norm].append(uuid)
except KeyError:
_db_searchinfo_labels[label] = [uuid]
_db_searchinfo_labels_normalized[label_norm] = [uuid]
self._db_searchinfo_categories = _db_searchinfo_categories
self._db_searchinfo_uuid = _db_searchinfo_uuid
self._db_searchinfo_labels = _db_searchinfo_labels
self._db_searchinfo_labels_normalized = _db_searchinfo_labels_normalized
if _debug():
logging.debug(
"_db_searchinfo_categories: \n"
+ pformat(self._db_searchinfo_categories)
)
logging.debug("_db_searchinfo_uuid: \n" + pformat(self._db_searchinfo_uuid))
logging.debug(
"_db_searchinfo_labels: \n" + pformat(self._db_searchinfo_labels)
)
logging.debug(
"_db_searchinfo_labels_normalized: \n"
+ pformat(self._db_searchinfo_labels_normalized)
)
@property
def labels(self):
""" return list of all search info labels found in the library """
if self._db_version <= _PHOTOS_4_VERSION:
logging.warning(f"SearchInfo not implemented for this library version")
return []
return list(self._db_searchinfo_labels.keys())
@property
def labels_normalized(self):
""" return list of all normalized search info labels found in the library """
if self._db_version <= _PHOTOS_4_VERSION:
logging.warning(f"SearchInfo not implemented for this library version")
return []
return list(self._db_searchinfo_labels_normalized.keys())
@property
def labels_as_dict(self):
""" return labels as dict of label: count in reverse sorted order (descending) """
if self._db_version <= _PHOTOS_4_VERSION:
logging.warning(f"SearchInfo not implemented for this library version")
return dict()
labels = {k: len(v) for k, v in self._db_searchinfo_labels.items()}
labels = dict(sorted(labels.items(), key=lambda kv: kv[1], reverse=True))
return labels
@property
def labels_normalized_as_dict(self):
""" return normalized labels as dict of label: count in reverse sorted order (descending) """
if self._db_version <= _PHOTOS_4_VERSION:
logging.warning(f"SearchInfo not implemented for this library version")
return dict()
labels = {k: len(v) for k, v in self._db_searchinfo_labels_normalized.items()}
labels = dict(sorted(labels.items(), key=lambda kv: kv[1], reverse=True))
return labels
def ints_to_uuid(uuid_0, uuid_1):
""" convert two signed ints into a UUID strings
uuid_0, uuid_1: the two int components of an RFC 4122 UUID """
# assumes uuid imported as uuidlib (to avoid namespace conflict with other uses of uuid)
bytes_ = uuid_0.to_bytes(8, "little", signed=True) + uuid_1.to_bytes(
8, "little", signed=True
)
return str(uuidlib.UUID(bytes=bytes_)).upper()