Feature phototables (#1059)
* Added tables() method to PhotoInfo to get access to underlying tables * This time with the phototables code...
This commit is contained in:
parent
1b0c91db97
commit
ed543aa2d0
@ -19,6 +19,7 @@ from .photoinfo import PhotoInfo
|
||||
from .photosalbum import PhotosAlbum, PhotosAlbumPhotoScript
|
||||
from .photosdb import PhotosDB
|
||||
from .photosdb._photosdb_process_comments import CommentInfo, LikeInfo
|
||||
from .phototables import PhotoTables
|
||||
from .phototemplate import PhotoTemplate
|
||||
from .placeinfo import PlaceInfo
|
||||
from .queryoptions import QueryOptions
|
||||
@ -53,6 +54,7 @@ __all__ = [
|
||||
"PersonInfo",
|
||||
"PhotoExporter",
|
||||
"PhotoInfo",
|
||||
"PhotoTables",
|
||||
"PhotoTemplate",
|
||||
"PhotosAlbum",
|
||||
"PhotosAlbumPhotoScript",
|
||||
|
||||
@ -1,14 +1,17 @@
|
||||
"""repl command for osxphotos CLI"""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import os.path
|
||||
import pathlib
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
from typing import List
|
||||
|
||||
import click
|
||||
import photoscript
|
||||
from applescript import ScriptError
|
||||
from rich import pretty, print
|
||||
|
||||
import osxphotos
|
||||
@ -191,10 +194,16 @@ def _get_selected(photosdb):
|
||||
"""get list of PhotoInfo objects for photos selected in Photos"""
|
||||
|
||||
def get_selected():
|
||||
selected = photoscript.PhotosLibrary().selection
|
||||
if not selected:
|
||||
return []
|
||||
return photosdb.photos(uuid=[p.uuid for p in selected])
|
||||
try:
|
||||
selected = photoscript.PhotosLibrary().selection
|
||||
except ScriptError as e:
|
||||
# some photos (e.g. shared items) can't be selected and raise ScriptError:
|
||||
# applescript.ScriptError: Photos got an error: Can’t get media item id "34C26DFA-0CEA-4DB7-8FDA-B87789B3209D/L0/001". (-1728) app='Photos' range=16820-16873
|
||||
# In this case, we can parse the UUID from the error (though this only works for a single selected item)
|
||||
if match := re.match(r".*Can’t get media item id \"(.*)\".*", str(e)):
|
||||
uuid = match[1].split("/")[0]
|
||||
return photosdb.photos(uuid=[uuid])
|
||||
return photosdb.photos(uuid=[p.uuid for p in selected]) if selected else []
|
||||
|
||||
return get_selected
|
||||
|
||||
|
||||
@ -59,6 +59,7 @@ from .exiftool import ExifToolCaching, get_exiftool_path
|
||||
from .momentinfo import MomentInfo
|
||||
from .personinfo import FaceInfo, PersonInfo
|
||||
from .photoexporter import ExportOptions, PhotoExporter
|
||||
from .phototables import PhotoTables
|
||||
from .phototemplate import PhotoTemplate, RenderOptions
|
||||
from .placeinfo import PlaceInfo4, PlaceInfo5
|
||||
from .query_builder import get_query
|
||||
@ -1893,6 +1894,10 @@ class PhotoInfo:
|
||||
dict_data[k] = sorted(v, key=lambda v: v if v is not None else "")
|
||||
return json.dumps(dict_data, sort_keys=True, default=default, indent=indent)
|
||||
|
||||
def tables(self) -> PhotoTables:
|
||||
"""Return PhotoTables object to provide access database tables associated with this photo (Photos 5+)"""
|
||||
return None if self._db._photos_ver < 5 else PhotoTables(self)
|
||||
|
||||
def _json_hexdigest(self):
|
||||
"""JSON for use by hexdigest()"""
|
||||
|
||||
|
||||
230
osxphotos/phototables.py
Normal file
230
osxphotos/phototables.py
Normal file
@ -0,0 +1,230 @@
|
||||
"""Provide direct access to the database tables associated with a photo."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sqlite3
|
||||
from typing import Any
|
||||
|
||||
import osxphotos
|
||||
|
||||
from ._constants import _DB_TABLE_NAMES
|
||||
|
||||
|
||||
def get_table_columns(conn: sqlite3.Connection, table_name: str) -> list[str]:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(f"PRAGMA table_info({table_name})")
|
||||
return [col[1] for col in cursor.fetchall()]
|
||||
|
||||
|
||||
class PhotoTables:
|
||||
def __init__(self, photo: osxphotos.PhotoInfo):
|
||||
"""Create a PhotoTables object.
|
||||
|
||||
Args:
|
||||
db: PhotosDB object
|
||||
uuid: The UUID of the photo.
|
||||
"""
|
||||
self.db = photo._db
|
||||
self.photo = photo
|
||||
self.uuid = photo.uuid
|
||||
self.version = self.db._photos_ver
|
||||
|
||||
@property
|
||||
def ZASSET(self) -> Table:
|
||||
"""Return the ZASSET table."""
|
||||
return AssetTable(self.db, self.version, self.uuid)
|
||||
|
||||
@property
|
||||
def ZADDITIONALASSETATTRIBUTES(self) -> Table:
|
||||
"""Return the ZADDITIONALASSETATTRIBUTES table."""
|
||||
return AdditionalAttributesTable(self.db, self.version, self.uuid)
|
||||
|
||||
@property
|
||||
def ZDETECTEDFACE(self) -> Table:
|
||||
"""Return the ZDETECTEDFACE table."""
|
||||
return DetectedFaceTable(self.db, self.version, self.uuid)
|
||||
|
||||
@property
|
||||
def ZPERSON(self) -> Table:
|
||||
"""Return the ZPERSON table."""
|
||||
return PersonTable(self.db, self.version, self.uuid)
|
||||
|
||||
|
||||
class Table:
|
||||
def __init__(self, db: osxphotos.PhotosDB, version: int, uuid: str):
|
||||
"""Create a Table object.
|
||||
|
||||
Args:
|
||||
db: PhotosDB object
|
||||
table_name: The name of the table.
|
||||
"""
|
||||
self.db = db
|
||||
self.conn, _ = self.db.get_db_connection()
|
||||
self.version = version
|
||||
self.uuid = uuid
|
||||
self.asset_table = _DB_TABLE_NAMES[self.version]["ASSET"]
|
||||
self.columns = [] # must be set in subclass
|
||||
self.table_name = "" # must be set in subclass
|
||||
|
||||
def rows(self) -> list[tuple[Any]]:
|
||||
"""Return rows for this photo from the table."""
|
||||
# this should be implemented in the subclass
|
||||
raise NotImplementedError
|
||||
|
||||
def rows_dict(self) -> list[dict[str, Any]]:
|
||||
"""Return rows for this photo from the table as a list of dicts."""
|
||||
rows = self.rows()
|
||||
return [dict(zip(self.columns, row)) for row in rows] if rows else []
|
||||
|
||||
def _get_column(self, column: str):
|
||||
"""Get column value for this photo from the table."""
|
||||
# this should be implemented in the subclass
|
||||
raise NotImplementedError
|
||||
|
||||
def __getattr__(self, name):
|
||||
"""Get column value for this photo from the table."""
|
||||
if name not in self.__dict__ and name in self.columns:
|
||||
return self._get_column(name)
|
||||
else:
|
||||
raise AttributeError(f"Table {self.table_name} has no column {name}")
|
||||
|
||||
|
||||
class AssetTable(Table):
|
||||
"""ZASSET table."""
|
||||
|
||||
def __init__(self, db: osxphotos.PhotosDB, version: int, uuid: str):
|
||||
"""Create a Table object."""
|
||||
super().__init__(db, version, uuid)
|
||||
self.columns = get_table_columns(self.conn, self.asset_table)
|
||||
self.table_name = self.asset_table
|
||||
|
||||
def rows(self) -> list[Any]:
|
||||
"""Return row2 for this photo from the ZASSET table."""
|
||||
conn, cursor = self.db.get_db_connection()
|
||||
cursor.execute(
|
||||
f"SELECT * FROM {self.asset_table} WHERE ZUUID = ?", (self.uuid,)
|
||||
)
|
||||
return result if (result := cursor.fetchall()) else []
|
||||
|
||||
def _get_column(self, column: str) -> tuple[Any]:
|
||||
"""Get column value for this photo from the ZASSET table."""
|
||||
conn, cursor = self.db.get_db_connection()
|
||||
cursor.execute(
|
||||
f"SELECT {column} FROM {self.asset_table} WHERE ZUUID = ?",
|
||||
(self.uuid,),
|
||||
)
|
||||
return (
|
||||
tuple(result[0] for result in results)
|
||||
if (results := cursor.fetchall())
|
||||
else ()
|
||||
)
|
||||
|
||||
|
||||
class AdditionalAttributesTable(Table):
|
||||
"""ZADDITIONALASSETATTRIBUTES table."""
|
||||
|
||||
def __init__(self, db: osxphotos.PhotosDB, version: int, uuid: str):
|
||||
"""Create a Table object."""
|
||||
super().__init__(db, version, uuid)
|
||||
self.columns = get_table_columns(self.conn, "ZADDITIONALASSETATTRIBUTES")
|
||||
self.table_name = "ZADDITIONALASSETATTRIBUTES"
|
||||
|
||||
def rows(self) -> list[tuple[Any]]:
|
||||
"""Return rows for this photo from the ZADDITIONALASSETATTRIBUTES table."""
|
||||
conn, cursor = self.db.get_db_connection()
|
||||
sql = f""" SELECT ZADDITIONALASSETATTRIBUTES.*
|
||||
FROM ZADDITIONALASSETATTRIBUTES
|
||||
JOIN {self.asset_table} ON {self.asset_table}.Z_PK = ZADDITIONALASSETATTRIBUTES.ZASSET
|
||||
WHERE {self.asset_table}.ZUUID = ?;
|
||||
"""
|
||||
cursor.execute(sql, (self.uuid,))
|
||||
return result if (result := cursor.fetchall()) else []
|
||||
|
||||
def _get_column(self, column: str) -> tuple[Any]:
|
||||
"""Get column value for this photo from the ZADDITIONALASSETATTRIBUTES table."""
|
||||
conn, cursor = self.db.get_db_connection()
|
||||
sql = f""" SELECT ZADDITIONALASSETATTRIBUTES.{column}
|
||||
FROM ZADDITIONALASSETATTRIBUTES
|
||||
JOIN {self.asset_table} ON {self.asset_table}.Z_PK = ZADDITIONALASSETATTRIBUTES.ZASSET
|
||||
WHERE {self.asset_table}.ZUUID = ?;
|
||||
"""
|
||||
cursor.execute(sql, (self.uuid,))
|
||||
return (
|
||||
tuple(result[0] for result in results)
|
||||
if (results := cursor.fetchall())
|
||||
else ()
|
||||
)
|
||||
|
||||
|
||||
class DetectedFaceTable(Table):
|
||||
"""ZDETECTEDFACE table."""
|
||||
|
||||
def __init__(self, db: osxphotos.PhotosDB, version: int, uuid: str):
|
||||
"""Create a Table object."""
|
||||
super().__init__(db, version, uuid)
|
||||
self.columns = get_table_columns(self.conn, "ZDETECTEDFACE")
|
||||
self.table_name = "ZDETECTEDFACE"
|
||||
|
||||
def rows(self) -> list[tuple[Any]]:
|
||||
"""Return rows for this photo from the ZDETECTEDFACE table."""
|
||||
conn, cursor = self.db.get_db_connection()
|
||||
sql = f""" SELECT ZDETECTEDFACE.*
|
||||
FROM ZDETECTEDFACE
|
||||
JOIN {self.asset_table} ON {self.asset_table}.Z_PK = ZDETECTEDFACE.ZASSET
|
||||
WHERE {self.asset_table}.ZUUID = ?;
|
||||
"""
|
||||
cursor.execute(sql, (self.uuid,))
|
||||
return result if (result := cursor.fetchall()) else []
|
||||
|
||||
def _get_column(self, column: str) -> tuple[Any]:
|
||||
"""Get column value for this photo from the ZDETECTEDFACE table."""
|
||||
conn, cursor = self.db.get_db_connection()
|
||||
sql = f""" SELECT ZDETECTEDFACE.{column}
|
||||
FROM ZDETECTEDFACE
|
||||
JOIN {self.asset_table} ON {self.asset_table}.Z_PK = ZDETECTEDFACE.ZASSET
|
||||
WHERE {self.asset_table}.ZUUID = ?;
|
||||
"""
|
||||
cursor.execute(sql, (self.uuid,))
|
||||
return (
|
||||
tuple(result[0] for result in results)
|
||||
if (results := cursor.fetchall())
|
||||
else ()
|
||||
)
|
||||
|
||||
|
||||
class PersonTable(Table):
|
||||
"""ZPERSON table."""
|
||||
|
||||
def __init__(self, db: osxphotos.PhotosDB, version: int, uuid: str):
|
||||
"""Create a Table object."""
|
||||
super().__init__(db, version, uuid)
|
||||
self.columns = get_table_columns(self.conn, "ZPERSON")
|
||||
self.table_name = "ZPERSON"
|
||||
|
||||
def rows(self) -> list[tuple[Any]]:
|
||||
"""Return rows for this photo from the ZPERSON table."""
|
||||
conn, cursor = self.db.get_db_connection()
|
||||
sql = f""" SELECT ZPERSON.*
|
||||
FROM ZPERSON
|
||||
JOIN ZDETECTEDFACE ON ZDETECTEDFACE.ZPERSON = ZPERSON.Z_PK
|
||||
JOIN ZASSET ON ZASSET.Z_PK = ZDETECTEDFACE.ZASSET
|
||||
WHERE {self.asset_table}.ZUUID = ?;
|
||||
"""
|
||||
cursor.execute(sql, (self.uuid,))
|
||||
return result if (result := cursor.fetchall()) else []
|
||||
|
||||
def _get_column(self, column: str) -> tuple[Any]:
|
||||
"""Get column value for this photo from the ZPERSON table."""
|
||||
conn, cursor = self.db.get_db_connection()
|
||||
sql = f""" SELECT ZPERSON.{column}
|
||||
FROM ZPERSON
|
||||
JOIN ZDETECTEDFACE ON ZDETECTEDFACE.ZPERSON = ZPERSON.Z_PK
|
||||
JOIN ZASSET ON ZASSET.Z_PK = ZDETECTEDFACE.ZASSET
|
||||
WHERE {self.asset_table}.ZUUID = ?;
|
||||
"""
|
||||
cursor.execute(sql, (self.uuid,))
|
||||
return (
|
||||
tuple(result[0] for result in results)
|
||||
if (results := cursor.fetchall())
|
||||
else ()
|
||||
)
|
||||
@ -5,6 +5,7 @@ from collections import namedtuple
|
||||
|
||||
import pytest
|
||||
|
||||
import osxphotos
|
||||
from osxphotos._constants import _UNKNOWN_PERSON
|
||||
|
||||
PHOTOS_DB = "./tests/Test-10.14.6.photoslibrary/database/photos.db"
|
||||
@ -531,9 +532,10 @@ def test_photosdb_repr():
|
||||
|
||||
|
||||
def test_photosinfo_repr():
|
||||
import osxphotos
|
||||
import datetime # needed for eval to work
|
||||
|
||||
import osxphotos
|
||||
|
||||
photosdb = osxphotos.PhotosDB(dbfile=PHOTOS_DB)
|
||||
photos = photosdb.photos(uuid=[UUID_DICT["favorite"]])
|
||||
photo = photos[0]
|
||||
@ -689,3 +691,10 @@ def test_fingerprint(photosdb):
|
||||
for uuid, fingerprint in UUID_FINGERPRINT.items():
|
||||
photo = photosdb.get_photo(uuid)
|
||||
assert photo.fingerprint == fingerprint
|
||||
|
||||
|
||||
def test_tables(photosdb: osxphotos.PhotosDB):
|
||||
"""Test PhotoInfo.tables"""
|
||||
photo = photosdb.get_photo(UUID_DICT["favorite"])
|
||||
tables = photo.tables()
|
||||
assert tables is None
|
||||
|
||||
@ -1304,3 +1304,14 @@ def test_photosdb_photos_by_uuid(photosdb: osxphotos.PhotosDB):
|
||||
assert len(photos) == len(UUID_DICT)
|
||||
for photo in photos:
|
||||
assert photo.uuid in UUID_DICT.values()
|
||||
|
||||
|
||||
def test_tables(photosdb: osxphotos.PhotosDB):
|
||||
"""Test PhotoInfo.tables"""
|
||||
photo = photosdb.get_photo(UUID_DICT["in_album"])
|
||||
tables = photo.tables()
|
||||
assert isinstance(tables, osxphotos.PhotoTables)
|
||||
assert tables.ZASSET.ZUUID[0] == photo.uuid
|
||||
assert tables.ZADDITIONALASSETATTRIBUTES.ZTITLE[0] == photo.title
|
||||
assert len(tables.ZASSET.rows()) == 1
|
||||
assert tables.ZASSET.rows_dict()[0]["ZUUID"] == photo.uuid
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user