Implemented show command, #964 (#982)

This commit is contained in:
Rhet Turnbull 2023-02-12 08:11:38 -08:00 committed by GitHub
parent ce297ced0a
commit b03670dc70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 353 additions and 24 deletions

View File

@ -459,3 +459,7 @@ PROFILE_SORT_KEYS = [
"time", "time",
"tottime", "tottime",
] ]
UUID_PATTERN = (
r"[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}"
)

View File

@ -79,6 +79,7 @@ from .photo_inspect import photo_inspect
from .places import places from .places import places
from .query import query from .query import query
from .repl import repl from .repl import repl
from .show_command import show
from .snap_diff import diff, snap from .snap_diff import diff, snap
from .sync import sync from .sync import sync
from .theme import theme from .theme import theme
@ -125,6 +126,7 @@ __all__ = [
"run", "run",
"selection_command", "selection_command",
"set_debug", "set_debug",
"show",
"snap", "snap",
"tutorial", "tutorial",
"uuid", "uuid",

View File

@ -35,6 +35,7 @@ from .photo_inspect import photo_inspect
from .places import places from .places import places
from .query import query from .query import query
from .repl import repl from .repl import repl
from .show_command import show
from .snap_diff import diff, snap from .snap_diff import diff, snap
from .sync import sync from .sync import sync
from .theme import theme from .theme import theme
@ -130,6 +131,7 @@ for command in [
query, query,
repl, repl,
run, run,
show,
snap, snap,
sync, sync,
theme, theme,

View File

@ -27,10 +27,15 @@ from .cli_params import (
) )
from .click_rich_echo import rich_click_echo as echo from .click_rich_echo import rich_click_echo as echo
from .click_rich_echo import rich_echo_error as echo_error from .click_rich_echo import rich_echo_error as echo_error
from .click_rich_echo import set_rich_theme
from .color_themes import get_theme
from .verbose import verbose, verbose_print from .verbose import verbose, verbose_print
logger = logging.getLogger("osxphotos") logger = logging.getLogger("osxphotos")
# ensure echo, echo_error are configured with correct theme
set_rich_theme(get_theme())
__all__ = [ __all__ = [
"abort", "abort",
"echo", "echo",

View File

@ -14,7 +14,7 @@ from typing import Dict
import click import click
from osxphotos import PhotosDB from osxphotos import PhotosDB
from osxphotos._constants import _PHOTOS_4_VERSION from osxphotos._constants import _PHOTOS_4_VERSION, UUID_PATTERN
from osxphotos.fileutil import FileUtil from osxphotos.fileutil import FileUtil
from osxphotos.utils import increment_filename, pluralize from osxphotos.utils import increment_filename, pluralize
@ -131,8 +131,7 @@ def scan_for_files(directory: str, uuid_dict: Dict):
Note: modifies uuid_dict Note: modifies uuid_dict
""" """
uuid_pattern = r"([0-9a-fA-F]{8}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{12})" uuid_regex = re.compile(UUID_PATTERN)
uuid_regex = re.compile(uuid_pattern)
for dirpath, dirname, filenames in os.walk(directory): for dirpath, dirname, filenames in os.walk(directory):
for filename in filenames: for filename in filenames:
if match := uuid_regex.match(filename): if match := uuid_regex.match(filename):

View File

@ -0,0 +1,74 @@
"""osxphotos show command"""
import re
import click
from osxphotos._constants import UUID_PATTERN
from osxphotos.photoscript_utils import (
photoscript_object_from_name,
photoscript_object_from_uuid,
)
from osxphotos.photosdb.photosdb_utils import get_photos_library_version
from osxphotos.utils import get_last_library_path
from .cli_commands import echo, echo_error
from .cli_params import DB_OPTION
from .click_rich_echo import set_rich_theme
@click.command(name="show")
@DB_OPTION
@click.argument("uuid_or_name", metavar="UUID_OR_NAME", nargs=1, required=True)
@click.pass_context
def show(ctx, db, uuid_or_name):
"""Show photo, album, or folder in Photos from UUID_OR_NAME
Examples:
osxphotos show 12345678-1234-1234-1234-123456789012
osxphotos show "My Album"
osxphotos show "My Folder"
osxphotos show IMG_1234.JPG
Notes:
This command requires Photos library version 5 or higher.
Currently this command cannot be used to show subfolders in Photos.
"""
db = db or get_last_library_path()
if not db:
echo(
"Could not find Photos library. Use --library/--db to specify path to Photos library."
)
ctx.exit(1)
if get_photos_library_version(db) < 5:
echo_error("[error]show command requires Photos library version 5 or higher")
ctx.exit(1)
try:
if re.match(UUID_PATTERN, uuid_or_name):
if not (obj := photoscript_object_from_uuid(uuid_or_name, db)):
raise ValueError(
f"could not find asset with UUID [uuid]{uuid_or_name}[/]"
)
obj_type = obj.__class__.__name__
echo(f"Found [filename]{obj_type}[/] with UUID: [uuid]{uuid_or_name}[/]")
obj.spotlight()
elif obj := photoscript_object_from_name(uuid_or_name, db):
obj_type = obj.__class__.__name__
echo(
f"Found [filename]{obj_type}[/] with name: [filepath]{uuid_or_name}[/]"
)
obj.spotlight()
else:
raise ValueError(
f"could not find asset with name [filepath]{uuid_or_name}[/]"
)
except Exception as e:
echo_error(f"[error]Error finding asset [uuid]{uuid_or_name}[/]: {e}")
ctx.exit(1)

View File

@ -0,0 +1,172 @@
"""Utilities for creating photoscript objects from a name or UUID"""
from __future__ import annotations
import sqlite3
import photoscript
from ._constants import _DB_TABLE_NAMES, _PHOTOS_5_ALBUM_KIND, _PHOTOS_5_FOLDER_KIND
from .photosdb.photosdb_utils import get_db_path_for_library, get_photos_library_version
from .sqlite_utils import sqlite_open_ro
def casefold(s: str | None) -> str | None:
return s.casefold() if s else None
def photoscript_object_from_uuid(
uuid: str, photos_database: str
) -> photoscript.Photo | None:
"""Return a photoscript object from a uuid"""
photos_database = get_db_path_for_library(photos_database)
photos_version = get_photos_library_version(photos_database)
connection, cursor = sqlite_open_ro(photos_database)
uuid = uuid.upper()
if _uuid_is_asset(uuid, connection, photos_version):
return photoscript.Photo(uuid)
elif _uuid_is_album(uuid, connection, photos_version):
return photoscript.Album(uuid)
elif _uuid_is_folder(uuid, connection, photos_version):
return photoscript.Folder(uuid)
else:
return None
def photoscript_object_from_name(
name: str, photos_database: str
) -> photoscript.Photo | None:
"""Return a photoscript object from a name"""
photos_database = get_db_path_for_library(photos_database)
photos_version = get_photos_library_version(photos_database)
connection, cursor = sqlite_open_ro(photos_database)
connection.create_function("CASEFOLD", 1, casefold)
if uuid := _asset_uuid_for_name(name, connection, photos_version):
return photoscript.Photo(uuid)
elif uuid := _album_uuid_for_name(name, connection, photos_version):
return photoscript.Album(uuid)
elif uuid := _folder_uuid_for_name(name, connection, photos_version):
return photoscript.Folder(uuid)
else:
return None
def _uuid_is_asset(
uuid: str, connection: sqlite3.Connection, version: int
) -> str | None:
"""Return uuid if uuid is an asset uuid otherwise None"""
asset_table = _DB_TABLE_NAMES[version]["ASSET"]
cursor = connection.cursor()
if results := cursor.execute(
f"""
SELECT ZUUID
FROM {asset_table}
WHERE ZUUID=?
""",
(uuid,),
).fetchone():
return results[0]
else:
return None
def _uuid_is_album(
uuid: str, connection: sqlite3.Connection, version: int
) -> str | None:
"""Return uuid if uuid is an album uuid otherwise None"""
cursor = connection.cursor()
if results := cursor.execute(
"""
SELECT ZUUID
FROM ZGENERICALBUM
WHERE ZUUID=?
AND ZKIND=?
""",
(uuid, _PHOTOS_5_ALBUM_KIND),
).fetchone():
return results[0]
else:
return None
def _uuid_is_folder(
uuid: str, connection: sqlite3.Connection, version: int
) -> str | None:
"""Return uuid if uuid is an folder uuid otherwise None"""
cursor = connection.cursor()
if results := cursor.execute(
"""
SELECT ZUUID
FROM ZGENERICALBUM
WHERE ZUUID=?
AND ZKIND=?
""",
(uuid, _PHOTOS_5_FOLDER_KIND),
).fetchone():
return results[0]
else:
return None
def _asset_uuid_for_name(
name: str, connection: sqlite3.Connection, version: int
) -> str | None:
"""Return uuid for asset with name or None if not found"""
asset_table = _DB_TABLE_NAMES[version]["ASSET"]
cursor = connection.cursor()
if results := cursor.execute(
f"""
SELECT {asset_table}.ZUUID
FROM {asset_table}
JOIN ZADDITIONALASSETATTRIBUTES ON ZADDITIONALASSETATTRIBUTES.ZASSET = {asset_table}.Z_PK
WHERE CASEFOLD(ZADDITIONALASSETATTRIBUTES.ZORIGINALFILENAME)=?
ORDER BY {asset_table}.ZDATECREATED DESC
""",
(casefold(name),),
).fetchone():
return results[0]
else:
return None
def _album_uuid_for_name(
name: str, connection: sqlite3.Connection, version: int
) -> str | None:
"""Return uuid for album with name or None if not found"""
return _folder_album_uuid_for_name(name, connection, version, album=True)
def _folder_uuid_for_name(
name: str, connection: sqlite3.Connection, version: int
) -> str | None:
"""Return uuid for album with name or None if not found"""
return _folder_album_uuid_for_name(name, connection, version, folder=True)
def _folder_album_uuid_for_name(
name: str,
connection: sqlite3.Connection,
version: int,
album: bool = False,
folder: bool = False,
) -> str | None:
"""Return uuid for album with name or None if not found"""
if album and folder:
raise ValueError("album and folder cannot both be True")
if not album and not folder:
raise ValueError("album and folder cannot both be False")
kind = _PHOTOS_5_ALBUM_KIND if album else _PHOTOS_5_FOLDER_KIND
cursor = connection.cursor()
if results := cursor.execute(
"""
SELECT ZUUID
FROM ZGENERICALBUM
WHERE CASEFOLD(ZTITLE)=?
AND ZKIND=?
ORDER BY ZCREATIONDATE DESC
""",
(casefold(name), kind),
).fetchone():
return results[0]
else:
return None

View File

@ -1,5 +1,7 @@
""" utility functions used by PhotosDB """ """ utility functions used by PhotosDB """
from __future__ import annotations
import logging import logging
import pathlib import pathlib
import plistlib import plistlib
@ -63,7 +65,8 @@ def get_db_version(db_file):
if version not in _TESTED_DB_VERSIONS: if version not in _TESTED_DB_VERSIONS:
print( print(
f"WARNING: Only tested on database versions [{', '.join(_TESTED_DB_VERSIONS)}]" f"WARNING: Only tested on database versions [{', '.join(_TESTED_DB_VERSIONS)}]"
+ f" You have database version={version} which has not been tested", file=sys.stderr + f" You have database version={version} which has not been tested",
file=sys.stderr,
) )
return version return version
@ -115,11 +118,18 @@ def get_db_model_version(db_file: str) -> int:
return 8 return 8
def get_photos_library_version(library_path): def get_photos_library_version(library_path: str | pathlib.Path) -> int:
"""Return int indicating which Photos version a library was created with""" """Return int indicating which Photos version a library was created with
Args:
library_path: path to Photos library; may be path to the root of the library or the photos.db file
Returns: int of major Photos version number (e.g. 5, 6, ...)
"""
library_path = pathlib.Path(library_path) library_path = pathlib.Path(library_path)
db_ver = get_db_version(str(library_path / "database" / "photos.db")) if library_path.is_dir():
db_ver = int(db_ver) library_path = library_path / "database" / "photos.db"
db_ver = int(get_db_version(str(library_path)))
if db_ver == int(_PHOTOS_2_VERSION): if db_ver == int(_PHOTOS_2_VERSION):
return 2 return 2
if db_ver == int(_PHOTOS_3_VERSION): if db_ver == int(_PHOTOS_3_VERSION):
@ -128,7 +138,8 @@ def get_photos_library_version(library_path):
return 4 return 4
# assume it's a Photos 5+ library, get the model version to determine which version # assume it's a Photos 5+ library, get the model version to determine which version
model_ver = get_model_version(str(library_path / "database" / "Photos.sqlite")) library_path = library_path.parent / "Photos.sqlite"
model_ver = get_model_version(str(library_path))
model_ver = int(model_ver) model_ver = int(model_ver)
if _PHOTOS_5_MODEL_VERSION[0] <= model_ver <= _PHOTOS_5_MODEL_VERSION[1]: if _PHOTOS_5_MODEL_VERSION[0] <= model_ver <= _PHOTOS_5_MODEL_VERSION[1]:
return 5 return 5
@ -142,3 +153,23 @@ def get_photos_library_version(library_path):
f"Unknown db / model version: db_ver={db_ver}, model_ver={model_ver}; assuming Photos 8" f"Unknown db / model version: db_ver={db_ver}, model_ver={model_ver}; assuming Photos 8"
) )
return 8 return 8
def get_db_path_for_library(photos_library: str | pathlib.Path) -> pathlib.Path:
"""Returns path to Photos database file for Photos library
Args:
photos_library: path to Photos library; may be path to the root of the library or the photos.db file
Returns: pathlib.Path to Photos database file
"""
photos_library = pathlib.Path(photos_library)
if photos_library.is_file():
return photos_library
photos_version = get_photos_library_version(photos_library)
if photos_version < 5:
if photos_library.is_dir():
photos_library = photos_library / "database" / "photos.db"
elif photos_library.is_dir():
photos_library = photos_library / "database" / "Photos.sqlite"
return photos_library

View File

@ -11,6 +11,8 @@ from typing import Iterable, List, Optional, Tuple
import bitmath import bitmath
from ._constants import UUID_PATTERN
__all__ = ["QueryOptions", "query_options_from_kwargs", "IncompatibleQueryOptions"] __all__ = ["QueryOptions", "query_options_from_kwargs", "IncompatibleQueryOptions"]
@ -196,12 +198,12 @@ class QueryOptions:
def query_options_from_kwargs(**kwargs) -> QueryOptions: def query_options_from_kwargs(**kwargs) -> QueryOptions:
""" Validate query options and create a QueryOptions instance. """Validate query options and create a QueryOptions instance.
Note: this will block on stdin if uuid_from_file is set to "-" Note: this will block on stdin if uuid_from_file is set to "-"
so it is best to call function before creating the PhotosDB instance so it is best to call function before creating the PhotosDB instance
so that the validation of query options can happen before the database so that the validation of query options can happen before the database
is loaded. is loaded.
""" """
# sanity check input args # sanity check input args
nonexclusive = [ nonexclusive = [
"added_after", "added_after",
@ -301,7 +303,7 @@ def query_options_from_kwargs(**kwargs) -> QueryOptions:
return QueryOptions(**query_dict) return QueryOptions(**query_dict)
def load_uuid_from_file(filename: str) ->list[str]: def load_uuid_from_file(filename: str) -> list[str]:
""" """
Load UUIDs from file. Load UUIDs from file.
Does not validate UUIDs but does validate that the UUIDs are in the correct format. Does not validate UUIDs but does validate that the UUIDs are in the correct format.
@ -328,6 +330,7 @@ def load_uuid_from_file(filename: str) ->list[str]:
with open(filename, "r") as f: with open(filename, "r") as f:
return _load_uuid_from_stream(f) return _load_uuid_from_stream(f)
def _load_uuid_from_stream(stream: io.IOBase) -> list[str]: def _load_uuid_from_stream(stream: io.IOBase) -> list[str]:
""" """
Load UUIDs from stream. Load UUIDs from stream.
@ -349,10 +352,7 @@ def _load_uuid_from_stream(stream: io.IOBase) -> list[str]:
for line in stream: for line in stream:
line = line.strip() line = line.strip()
if len(line) and line[0] != "#": if len(line) and line[0] != "#":
if not re.match( if not re.match(f"^{UUID_PATTERN}$", line):
r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$",
line,
):
raise ValueError(f"Invalid UUID: {line}") raise ValueError(f"Invalid UUID: {line}")
line = line.upper() line = line.upper()
uuid.append(line) uuid.append(line)

View File

@ -19,6 +19,7 @@ from photoscript import Photo
from pytest import MonkeyPatch, approx from pytest import MonkeyPatch, approx
from osxphotos import PhotosDB, QueryOptions from osxphotos import PhotosDB, QueryOptions
from osxphotos._constants import UUID_PATTERN
from osxphotos.cli.import_cli import import_cli from osxphotos.cli.import_cli import import_cli
from osxphotos.datetime_utils import datetime_remove_tz from osxphotos.datetime_utils import datetime_remove_tz
from osxphotos.exiftool import get_exiftool_path from osxphotos.exiftool import get_exiftool_path
@ -101,9 +102,7 @@ def parse_import_output(output: str) -> Dict[str, str]:
results = {} results = {}
for line in output.split("\n"): for line in output.split("\n"):
pattern = re.compile( pattern = re.compile(r"Imported ([\w\.]+)\s.*UUID\s(" + UUID_PATTERN + r")")
r"Imported ([\w\.]+)\s.*UUID\s([0-9a-fA-F]{8}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{12})"
)
if match := re.match(pattern, line): if match := re.match(pattern, line):
file = match[1] file = match[1]
uuid = match[2] uuid = match[2]

View File

@ -0,0 +1,41 @@
"""Test photosdb_utils """
import pathlib
import pytest
from osxphotos.photosdb.photosdb_utils import (
get_db_path_for_library,
get_photos_library_version,
)
LIBRARIES = {
2: pathlib.Path("tests/Test-10.12.6.photoslibrary"),
3: pathlib.Path("tests/Test-10.13.6.photoslibrary"),
4: pathlib.Path("tests/Test-10.14.6.photoslibrary"),
5: pathlib.Path("tests/Test-10.15.7.photoslibrary"),
6: pathlib.Path("tests/Test-10.16.0.photoslibrary"),
7: pathlib.Path("tests/Test-12.0.1.photoslibrary"),
8: pathlib.Path("tests/Test-13.0.0.photoslibrary"),
}
@pytest.mark.parametrize("version,library_path", list(LIBRARIES.items()))
def test_get_photos_library_version_library_path(version, library_path):
"""Test get_photos_library_version with library path"""
photos_version = get_photos_library_version(library_path)
assert photos_version == version
@pytest.mark.parametrize("version,library_path", list(LIBRARIES.items()))
def test_get_photos_library_version_db_path(version, library_path):
"""Test get_photos_library_version with database path"""
photos_version = get_photos_library_version(library_path / "database" / "photos.db")
assert photos_version == version
@pytest.mark.parametrize("library_path", list(LIBRARIES.values()))
def test_get_db_path_for_library(library_path):
"""Test get_db_path_for_library"""
db_path = get_db_path_for_library(library_path)
assert db_path.is_file()