Added query options to repl, #597

This commit is contained in:
Rhet Turnbull
2022-01-23 21:57:51 -08:00
parent b4bd04c146
commit 785580115b

View File

@@ -4,6 +4,7 @@ import atexit
import code import code
import cProfile import cProfile
import csv import csv
import dataclasses
import datetime import datetime
import io import io
import json import json
@@ -18,7 +19,7 @@ import subprocess
import sys import sys
import time import time
from runpy import run_module, run_path from runpy import run_module, run_path
from typing import Dict from typing import Dict, List
import bitmath import bitmath
import click import click
@@ -69,6 +70,7 @@ from .photoexporter import ExportOptions, ExportResults, PhotoExporter
from .photoinfo import PhotoInfo from .photoinfo import PhotoInfo
from .photokit import check_photokit_authorization, request_photokit_authorization from .photokit import check_photokit_authorization, request_photokit_authorization
from .photosalbum import PhotosAlbum from .photosalbum import PhotosAlbum
from .photosdb import PhotosDB
from .photosdb.photosdb_utils import get_photos_library_version from .photosdb.photosdb_utils import get_photos_library_version
from .phototemplate import PhotoTemplate, RenderOptions from .phototemplate import PhotoTemplate, RenderOptions
from .pyrepl import embed_repl from .pyrepl import embed_repl
@@ -90,7 +92,7 @@ __all__ = [
"TimeISO8601", "TimeISO8601",
"FunctionCall", "FunctionCall",
"CLI_Obj", "CLI_Obj",
"deleted_options", "DELETED_OPTIONS",
"QUERY_OPTIONS", "QUERY_OPTIONS",
"cli", "cli",
"export", "export",
@@ -261,6 +263,10 @@ class FunctionCall(click.ParamType):
return (function, value) return (function, value)
class IncompatibleQueryOptions(Exception):
pass
# Click CLI object & context settings # Click CLI object & context settings
class CLI_Obj: class CLI_Obj:
def __init__(self, db=None, json=False, debug=False): def __init__(self, db=None, json=False, debug=False):
@@ -298,7 +304,7 @@ JSON_OPTION = click.option(
) )
def deleted_options(f): def DELETED_OPTIONS(f):
o = click.option o = click.option
options = [ options = [
o( o(
@@ -677,7 +683,7 @@ def cli(ctx, db, json_, debug):
is_flag=True, is_flag=True,
help="Export only photos missing from the Photos library; must be used with --download-missing.", help="Export only photos missing from the Photos library; must be used with --download-missing.",
) )
@deleted_options @DELETED_OPTIONS
@click.option( @click.option(
"--update", "--update",
is_flag=True, is_flag=True,
@@ -1664,7 +1670,9 @@ def export(
if any([exiftool, exiftool_merge_keywords, exiftool_merge_persons]): if any([exiftool, exiftool_merge_keywords, exiftool_merge_persons]):
verbose_(f"exiftool path: {exiftool_path}") verbose_(f"exiftool path: {exiftool_path}")
photos = movies = True # default searches for everything # default searches for everything
photos = True
movies = True
if only_movies: if only_movies:
photos = False photos = False
if only_photos: if only_photos:
@@ -2138,7 +2146,7 @@ def help(ctx, topic, **kw):
@DB_OPTION @DB_OPTION
@JSON_OPTION @JSON_OPTION
@QUERY_OPTIONS @QUERY_OPTIONS
@deleted_options @DELETED_OPTIONS
@click.option("--missing", is_flag=True, help="Search for photos missing from disk.") @click.option("--missing", is_flag=True, help="Search for photos missing from disk.")
@click.option( @click.option(
"--not-missing", "--not-missing",
@@ -2327,7 +2335,9 @@ def query(
return return
# actually have something to query # actually have something to query
photos = movies = True # default searches for everything # default searches for everything
photos = True
movies = True
if only_movies: if only_movies:
photos = False photos = False
if only_photos: if only_photos:
@@ -4014,7 +4024,7 @@ def places(ctx, cli_obj, db, json_, photos_library):
@cli.command() @cli.command()
@DB_OPTION @DB_OPTION
@JSON_OPTION @JSON_OPTION
@deleted_options @DELETED_OPTIONS
@DB_ARGUMENT @DB_ARGUMENT
@click.pass_obj @click.pass_obj
@click.pass_context @click.pass_context
@@ -4216,7 +4226,7 @@ def _load_photos_db(dbpath):
return photosdb return photosdb
def _get_photos(photosdb): def _get_all_photos(photosdb):
"""get list of all photos in photosdb""" """get list of all photos in photosdb"""
photos = photosdb.photos(images=True, movies=True) photos = photosdb.photos(images=True, movies=True)
photos.extend(photosdb.photos(images=True, movies=True, intrash=True)) photos.extend(photosdb.photos(images=True, movies=True, intrash=True))
@@ -4251,7 +4261,42 @@ def _spotlight_photo(photo: PhotoInfo):
default=False, default=False,
help="Launch REPL with Emacs keybindings (default is vi bindings)", help="Launch REPL with Emacs keybindings (default is vi bindings)",
) )
def repl(ctx, cli_obj, db, emacs): @click.option(
"--beta",
is_flag=True,
default=False,
hidden=True,
help="Enable beta options.",
)
@QUERY_OPTIONS
@DELETED_OPTIONS
@click.option("--missing", is_flag=True, help="Search for photos missing from disk.")
@click.option(
"--not-missing",
is_flag=True,
help="Search for photos present on disk (e.g. not missing).",
)
@click.option(
"--cloudasset",
is_flag=True,
help="Search for photos that are part of an iCloud library",
)
@click.option(
"--not-cloudasset",
is_flag=True,
help="Search for photos that are not part of an iCloud library",
)
@click.option(
"--incloud",
is_flag=True,
help="Search for photos that are in iCloud (have been synched)",
)
@click.option(
"--not-incloud",
is_flag=True,
help="Search for photos that are not in iCloud (have not been synched)",
)
def repl(ctx, cli_obj, db, emacs, beta, **kwargs):
"""Run interactive osxphotos REPL shell (useful for debugging, prototyping, and inspecting your Photos library)""" """Run interactive osxphotos REPL shell (useful for debugging, prototyping, and inspecting your Photos library)"""
import logging import logging
@@ -4276,9 +4321,20 @@ def repl(ctx, cli_obj, db, emacs):
print(f"osxphotos version: {osxphotos._version.__version__}") print(f"osxphotos version: {osxphotos._version.__version__}")
db = db or get_photos_db() db = db or get_photos_db()
photosdb = _load_photos_db(db) photosdb = _load_photos_db(db)
# enable beta features if requested
if beta:
photosdb._beta = beta
print("Beta mode enabled")
print("Getting photos") print("Getting photos")
tic = time.perf_counter() tic = time.perf_counter()
photos = _get_photos(photosdb) try:
query_options = _query_options_from_kwargs(**kwargs)
except IncompatibleQueryOptions:
click.echo("Incompatible query options", err=True)
click.echo(cli.commands["repl"].get_help(ctx), err=True)
sys.exit(1)
photos = _query_photos(photosdb, query_options)
all_photos = _get_all_photos(photosdb)
toc = time.perf_counter() toc = time.perf_counter()
tictoc = toc - tic tictoc = toc - tic
@@ -4305,7 +4361,10 @@ def repl(ctx, cli_obj, db, emacs):
print("The following variables are defined:") print("The following variables are defined:")
print(f"- photosdb: PhotosDB() instance for {photosdb.library_path}") print(f"- photosdb: PhotosDB() instance for {photosdb.library_path}")
print( print(
f"- photos: list of PhotoInfo objects for all photos in photosdb, including those in the trash (len={len(photos)})" f"- photos: list of PhotoInfo objects for all photos filtered with any query options passed on command line (len={len(photos)})"
)
print(
f"- all_photos: list of PhotoInfo objects for all photos in photosdb, including those in the trash (len={len(all_photos)})"
) )
print( print(
f"- selected: list of PhotoInfo objects for any photos selected in Photos (len={len(selected)})" f"- selected: list of PhotoInfo objects for any photos selected in Photos (len={len(selected)})"
@@ -4623,3 +4682,106 @@ def diff(ctx, cli_obj, db, raw_output, style, db2, verbose):
def run(python_file): def run(python_file):
"""Run a python file using same environment as osxphotos""" """Run a python file using same environment as osxphotos"""
run_path(python_file, run_name="__main__") run_path(python_file, run_name="__main__")
def _query_options_from_kwargs(**kwargs) -> QueryOptions:
"""Validate query options and create a QueryOptions instance"""
# sanity check input args
nonexclusive = [
"keyword",
"person",
"album",
"folder",
"name",
"uuid",
"uuid_from_file",
"edited",
"external_edit",
"uti",
"has_raw",
"from_date",
"to_date",
"from_time",
"to_time",
"label",
"is_reference",
"query_eval",
"query_function",
"min_size",
"max_size",
"regex",
"selected",
"exif",
"duplicate",
]
exclusive = [
("favorite", "not_favorite"),
("hidden", "not_hidden"),
("missing", "not_missing"),
("only_photos", "only_movies"),
("burst", "not_burst"),
("live", "not_live"),
("cloudasset", "not_cloudasset"),
("incloud", "not_incloud"),
("portrait", "not_portrait"),
("screenshot", "not_screenshot"),
("slow_mo", "not_slow_mo"),
("time_lapse", "not_time_lapse"),
("hdr", "not_hdr"),
("selfie", "not_selfie"),
("panorama", "not_panorama"),
("deleted", "deleted_only"),
("shared", "not_shared"),
("has_comment", "no_comment"),
("has_likes", "no_likes"),
("in_album", "not_in_album"),
("location", "no_location"),
]
# print help if no non-exclusive term or a double exclusive term is given
# TODO: add option to validate requiring at least one query arg
if any(all([kwargs[b], kwargs[n]]) for b, n in exclusive) or any(
[
all([any(kwargs["title"]), kwargs["no_title"]]),
all([any(kwargs["description"]), kwargs["no_description"]]),
all([any(kwargs["place"]), kwargs["no_place"]]),
]
):
raise IncompatibleQueryOptions
# actually have something to query
include_photos = True
include_movies = True # default searches for everything
if kwargs["only_movies"]:
include_photos = False
if kwargs["only_photos"]:
include_movies = False
# load UUIDs if necessary and append to any uuids passed with --uuid
uuid = None
if kwargs["uuid_from_file"]:
uuid_list = list(kwargs["uuid"]) # Click option is a tuple
uuid_list.extend(load_uuid_from_file(kwargs["uuid_from_file"]))
uuid = tuple(uuid_list)
query_fields = [field.name for field in dataclasses.fields(QueryOptions)]
query_dict = {field: kwargs.get(field) for field in query_fields}
query_dict["photos"] = include_photos
query_dict["movies"] = include_movies
query_dict["uuid"] = uuid
return QueryOptions(**query_dict)
def _query_photos(photosdb: PhotosDB, query_options: QueryOptions) -> List:
"""Query photos given a QueryOptions instance"""
try:
photos = photosdb.query(query_options)
except ValueError as e:
if "Invalid query_eval CRITERIA:" in str(e):
msg = str(e).split(":")[1]
raise click.BadOptionUsage(
"query_eval", f"Invalid query-eval CRITERIA: {msg}"
)
else:
raise ValueError(e)
return photos