From 785580115b29f5ccb895de22be1243f56dbb43dc Mon Sep 17 00:00:00 2001 From: Rhet Turnbull Date: Sun, 23 Jan 2022 21:57:51 -0800 Subject: [PATCH] Added query options to repl, #597 --- osxphotos/cli.py | 186 ++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 174 insertions(+), 12 deletions(-) diff --git a/osxphotos/cli.py b/osxphotos/cli.py index 02c92cc7..2f6ad38d 100644 --- a/osxphotos/cli.py +++ b/osxphotos/cli.py @@ -4,6 +4,7 @@ import atexit import code import cProfile import csv +import dataclasses import datetime import io import json @@ -18,7 +19,7 @@ import subprocess import sys import time from runpy import run_module, run_path -from typing import Dict +from typing import Dict, List import bitmath import click @@ -69,6 +70,7 @@ from .photoexporter import ExportOptions, ExportResults, PhotoExporter from .photoinfo import PhotoInfo from .photokit import check_photokit_authorization, request_photokit_authorization from .photosalbum import PhotosAlbum +from .photosdb import PhotosDB from .photosdb.photosdb_utils import get_photos_library_version from .phototemplate import PhotoTemplate, RenderOptions from .pyrepl import embed_repl @@ -90,7 +92,7 @@ __all__ = [ "TimeISO8601", "FunctionCall", "CLI_Obj", - "deleted_options", + "DELETED_OPTIONS", "QUERY_OPTIONS", "cli", "export", @@ -261,6 +263,10 @@ class FunctionCall(click.ParamType): return (function, value) +class IncompatibleQueryOptions(Exception): + pass + + # Click CLI object & context settings class CLI_Obj: def __init__(self, db=None, json=False, debug=False): @@ -298,7 +304,7 @@ JSON_OPTION = click.option( ) -def deleted_options(f): +def DELETED_OPTIONS(f): o = click.option options = [ o( @@ -677,7 +683,7 @@ def cli(ctx, db, json_, debug): is_flag=True, help="Export only photos missing from the Photos library; must be used with --download-missing.", ) -@deleted_options +@DELETED_OPTIONS @click.option( "--update", is_flag=True, @@ -1664,7 +1670,9 @@ def export( if any([exiftool, exiftool_merge_keywords, exiftool_merge_persons]): verbose_(f"exiftool path: {exiftool_path}") - photos = movies = True # default searches for everything + # default searches for everything + photos = True + movies = True if only_movies: photos = False if only_photos: @@ -2138,7 +2146,7 @@ def help(ctx, topic, **kw): @DB_OPTION @JSON_OPTION @QUERY_OPTIONS -@deleted_options +@DELETED_OPTIONS @click.option("--missing", is_flag=True, help="Search for photos missing from disk.") @click.option( "--not-missing", @@ -2327,7 +2335,9 @@ def query( return # actually have something to query - photos = movies = True # default searches for everything + # default searches for everything + photos = True + movies = True if only_movies: photos = False if only_photos: @@ -4014,7 +4024,7 @@ def places(ctx, cli_obj, db, json_, photos_library): @cli.command() @DB_OPTION @JSON_OPTION -@deleted_options +@DELETED_OPTIONS @DB_ARGUMENT @click.pass_obj @click.pass_context @@ -4216,7 +4226,7 @@ def _load_photos_db(dbpath): return photosdb -def _get_photos(photosdb): +def _get_all_photos(photosdb): """get list of all photos in photosdb""" photos = photosdb.photos(images=True, movies=True) photos.extend(photosdb.photos(images=True, movies=True, intrash=True)) @@ -4251,7 +4261,42 @@ def _spotlight_photo(photo: PhotoInfo): default=False, help="Launch REPL with Emacs keybindings (default is vi bindings)", ) -def repl(ctx, cli_obj, db, emacs): +@click.option( + "--beta", + is_flag=True, + default=False, + hidden=True, + help="Enable beta options.", +) +@QUERY_OPTIONS +@DELETED_OPTIONS +@click.option("--missing", is_flag=True, help="Search for photos missing from disk.") +@click.option( + "--not-missing", + is_flag=True, + help="Search for photos present on disk (e.g. not missing).", +) +@click.option( + "--cloudasset", + is_flag=True, + help="Search for photos that are part of an iCloud library", +) +@click.option( + "--not-cloudasset", + is_flag=True, + help="Search for photos that are not part of an iCloud library", +) +@click.option( + "--incloud", + is_flag=True, + help="Search for photos that are in iCloud (have been synched)", +) +@click.option( + "--not-incloud", + is_flag=True, + help="Search for photos that are not in iCloud (have not been synched)", +) +def repl(ctx, cli_obj, db, emacs, beta, **kwargs): """Run interactive osxphotos REPL shell (useful for debugging, prototyping, and inspecting your Photos library)""" import logging @@ -4276,9 +4321,20 @@ def repl(ctx, cli_obj, db, emacs): print(f"osxphotos version: {osxphotos._version.__version__}") db = db or get_photos_db() photosdb = _load_photos_db(db) + # enable beta features if requested + if beta: + photosdb._beta = beta + print("Beta mode enabled") print("Getting photos") tic = time.perf_counter() - photos = _get_photos(photosdb) + try: + query_options = _query_options_from_kwargs(**kwargs) + except IncompatibleQueryOptions: + click.echo("Incompatible query options", err=True) + click.echo(cli.commands["repl"].get_help(ctx), err=True) + sys.exit(1) + photos = _query_photos(photosdb, query_options) + all_photos = _get_all_photos(photosdb) toc = time.perf_counter() tictoc = toc - tic @@ -4305,7 +4361,10 @@ def repl(ctx, cli_obj, db, emacs): print("The following variables are defined:") print(f"- photosdb: PhotosDB() instance for {photosdb.library_path}") print( - f"- photos: list of PhotoInfo objects for all photos in photosdb, including those in the trash (len={len(photos)})" + f"- photos: list of PhotoInfo objects for all photos filtered with any query options passed on command line (len={len(photos)})" + ) + print( + f"- all_photos: list of PhotoInfo objects for all photos in photosdb, including those in the trash (len={len(all_photos)})" ) print( f"- selected: list of PhotoInfo objects for any photos selected in Photos (len={len(selected)})" @@ -4623,3 +4682,106 @@ def diff(ctx, cli_obj, db, raw_output, style, db2, verbose): def run(python_file): """Run a python file using same environment as osxphotos""" run_path(python_file, run_name="__main__") + + +def _query_options_from_kwargs(**kwargs) -> QueryOptions: + """Validate query options and create a QueryOptions instance""" + # sanity check input args + nonexclusive = [ + "keyword", + "person", + "album", + "folder", + "name", + "uuid", + "uuid_from_file", + "edited", + "external_edit", + "uti", + "has_raw", + "from_date", + "to_date", + "from_time", + "to_time", + "label", + "is_reference", + "query_eval", + "query_function", + "min_size", + "max_size", + "regex", + "selected", + "exif", + "duplicate", + ] + exclusive = [ + ("favorite", "not_favorite"), + ("hidden", "not_hidden"), + ("missing", "not_missing"), + ("only_photos", "only_movies"), + ("burst", "not_burst"), + ("live", "not_live"), + ("cloudasset", "not_cloudasset"), + ("incloud", "not_incloud"), + ("portrait", "not_portrait"), + ("screenshot", "not_screenshot"), + ("slow_mo", "not_slow_mo"), + ("time_lapse", "not_time_lapse"), + ("hdr", "not_hdr"), + ("selfie", "not_selfie"), + ("panorama", "not_panorama"), + ("deleted", "deleted_only"), + ("shared", "not_shared"), + ("has_comment", "no_comment"), + ("has_likes", "no_likes"), + ("in_album", "not_in_album"), + ("location", "no_location"), + ] + # print help if no non-exclusive term or a double exclusive term is given + # TODO: add option to validate requiring at least one query arg + if any(all([kwargs[b], kwargs[n]]) for b, n in exclusive) or any( + [ + all([any(kwargs["title"]), kwargs["no_title"]]), + all([any(kwargs["description"]), kwargs["no_description"]]), + all([any(kwargs["place"]), kwargs["no_place"]]), + ] + ): + raise IncompatibleQueryOptions + + # actually have something to query + include_photos = True + include_movies = True # default searches for everything + if kwargs["only_movies"]: + include_photos = False + if kwargs["only_photos"]: + include_movies = False + + # load UUIDs if necessary and append to any uuids passed with --uuid + uuid = None + if kwargs["uuid_from_file"]: + uuid_list = list(kwargs["uuid"]) # Click option is a tuple + uuid_list.extend(load_uuid_from_file(kwargs["uuid_from_file"])) + uuid = tuple(uuid_list) + + query_fields = [field.name for field in dataclasses.fields(QueryOptions)] + query_dict = {field: kwargs.get(field) for field in query_fields} + query_dict["photos"] = include_photos + query_dict["movies"] = include_movies + query_dict["uuid"] = uuid + return QueryOptions(**query_dict) + + +def _query_photos(photosdb: PhotosDB, query_options: QueryOptions) -> List: + """Query photos given a QueryOptions instance""" + try: + photos = photosdb.query(query_options) + except ValueError as e: + if "Invalid query_eval CRITERIA:" in str(e): + msg = str(e).split(":")[1] + raise click.BadOptionUsage( + "query_eval", f"Invalid query-eval CRITERIA: {msg}" + ) + else: + raise ValueError(e) + + return photos