Feature UUID from stdin 965 (#979)
* Allow --uuid-from-file to read from stdin, #965 * Load query options before opening the database
This commit is contained in:
parent
f7ca3977a9
commit
97a0a65d8a
@ -135,8 +135,6 @@ def add_locations(
|
||||
|
||||
verbose("Searching for photos with missing location data...")
|
||||
|
||||
# load photos database
|
||||
photosdb = osxphotos.PhotosDB(verbose=verbose)
|
||||
try:
|
||||
query_options = query_options_from_kwargs(**kwargs)
|
||||
except IncompatibleQueryOptions as e:
|
||||
@ -144,6 +142,7 @@ def add_locations(
|
||||
echo_error(ctx.obj.group.commands["repl"].get_help(ctx))
|
||||
ctx.exit(1)
|
||||
|
||||
photosdb = osxphotos.PhotosDB(verbose=verbose)
|
||||
photos = photosdb.query(query_options)
|
||||
|
||||
# sort photos by date
|
||||
|
||||
@ -59,9 +59,9 @@ def config_verbose_callback(ctx: click.Context, param: click.Parameter, value: t
|
||||
|
||||
def get_photos_for_query(ctx: click.Context):
|
||||
"""Return list of PhotoInfo objects for the photos matching the query options in ctx.params"""
|
||||
options = query_options_from_kwargs(**ctx.params)
|
||||
db = ctx.params.get("db")
|
||||
photosdb = PhotosDB(dbfile=db, verbose=verbose)
|
||||
options = query_options_from_kwargs(**ctx.params)
|
||||
return photosdb.query(options=options)
|
||||
|
||||
|
||||
|
||||
@ -196,8 +196,9 @@ _QUERY_PARAMETERS_DICT = {
|
||||
default=None,
|
||||
multiple=False,
|
||||
help="Search for photos with UUID(s) loaded from FILE. "
|
||||
"Format is a single UUID per line. Lines preceded with # are ignored.",
|
||||
type=click.Path(exists=True),
|
||||
"Format is a single UUID per line. Lines preceded with # are ignored. "
|
||||
"If FILE is '-', read UUIDs from stdin.",
|
||||
type=PathOrStdin(exists=True),
|
||||
),
|
||||
"--title": click.Option(
|
||||
["--title"],
|
||||
|
||||
@ -57,6 +57,8 @@ def debug_dump(
|
||||
_list_libraries()
|
||||
return
|
||||
|
||||
query_options = query_options_from_kwargs(**kwargs)
|
||||
|
||||
start_t = time.perf_counter()
|
||||
print(f"Opening database: {db}")
|
||||
photosdb = osxphotos.PhotosDB(dbfile=db, verbose=verbose)
|
||||
@ -90,7 +92,6 @@ def debug_dump(
|
||||
print("_dbpersons_fullname:")
|
||||
pprint.pprint(photosdb._dbpersons_fullname)
|
||||
elif attr == "photos":
|
||||
query_options = query_options_from_kwargs(**kwargs)
|
||||
photos = photosdb.query(options=query_options)
|
||||
uuid = [photo.uuid for photo in photos]
|
||||
for uuid_ in uuid:
|
||||
|
||||
@ -1336,13 +1336,6 @@ def export(
|
||||
# save config to export_db
|
||||
export_db.set_config(cfg.write_to_str())
|
||||
|
||||
photosdb = osxphotos.PhotosDB(
|
||||
dbfile=db, verbose=verbose, exiftool=exiftool_path, rich=True
|
||||
)
|
||||
|
||||
# enable beta features if requested
|
||||
photosdb._beta = beta
|
||||
|
||||
query_kwargs = locals()
|
||||
# skip missing bursts if using --download-missing by itself as AppleScript otherwise causes errors
|
||||
query_kwargs["missing_bursts"] = (
|
||||
@ -1350,6 +1343,14 @@ def export(
|
||||
)
|
||||
query_kwargs["burst_photos"] = export_bursts
|
||||
query_options = query_options_from_kwargs(**query_kwargs)
|
||||
|
||||
photosdb = osxphotos.PhotosDB(
|
||||
dbfile=db, verbose=verbose, exiftool=exiftool_path, rich=True
|
||||
)
|
||||
|
||||
# enable beta features if requested
|
||||
photosdb._beta = beta
|
||||
|
||||
try:
|
||||
photos = photosdb.query(query_options)
|
||||
except ValueError as e:
|
||||
|
||||
@ -23,6 +23,7 @@ __all__ = [
|
||||
"DeprecatedPath",
|
||||
"ExportDBType",
|
||||
"FunctionCall",
|
||||
"PathOrStdin",
|
||||
"StrpDateTimePattern",
|
||||
"TemplateString",
|
||||
"TimeISO8601",
|
||||
@ -52,6 +53,18 @@ class DeprecatedPath(click.Path):
|
||||
return super().convert(value, param, ctx)
|
||||
|
||||
|
||||
class PathOrStdin(click.Path):
|
||||
"""A click.Path or "-" to represent STDIN."""
|
||||
|
||||
name = "PATH_OR_STDIN"
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def convert(self, value, param, ctx):
|
||||
return value if value == "-" else super().convert(value, param, ctx)
|
||||
|
||||
|
||||
class DateTimeISO8601(click.ParamType):
|
||||
|
||||
name = "DATETIME"
|
||||
|
||||
@ -102,12 +102,13 @@ def query(
|
||||
_list_libraries()
|
||||
return
|
||||
|
||||
photosdb = osxphotos.PhotosDB(dbfile=db)
|
||||
try:
|
||||
query_options = query_options_from_kwargs(**kwargs)
|
||||
except Exception as e:
|
||||
raise click.BadOptionUsage("query", str(e)) from e
|
||||
|
||||
photosdb = osxphotos.PhotosDB(dbfile=db)
|
||||
|
||||
try:
|
||||
photos = photosdb.query(query_options)
|
||||
except ValueError as e:
|
||||
|
||||
@ -763,7 +763,7 @@ def sync(
|
||||
print_import_summary(results)
|
||||
|
||||
if export_path:
|
||||
photosdb = PhotosDB(dbfile=db, verbose=verbose)
|
||||
query_options = query_options_from_kwargs(**kwargs)
|
||||
photosdb = PhotosDB(dbfile=db, verbose=verbose)
|
||||
photos = photosdb.query(query_options)
|
||||
export_metadata(photos, export_path, verbose)
|
||||
|
||||
@ -2,12 +2,14 @@
|
||||
|
||||
import dataclasses
|
||||
import datetime
|
||||
import io
|
||||
import pathlib
|
||||
import re
|
||||
import sys
|
||||
from dataclasses import asdict, dataclass
|
||||
from typing import Iterable, List, Optional, Tuple
|
||||
|
||||
import bitmath
|
||||
import click
|
||||
|
||||
__all__ = ["QueryOptions", "query_options_from_kwargs", "IncompatibleQueryOptions"]
|
||||
|
||||
@ -194,7 +196,12 @@ class QueryOptions:
|
||||
|
||||
|
||||
def query_options_from_kwargs(**kwargs) -> QueryOptions:
|
||||
"""Validate query options and create a QueryOptions instance"""
|
||||
""" Validate query options and create a QueryOptions instance.
|
||||
Note: this will block on stdin if uuid_from_file is set to "-"
|
||||
so it is best to call function before creating the PhotosDB instance
|
||||
so that the validation of query options can happen before the database
|
||||
is loaded.
|
||||
"""
|
||||
# sanity check input args
|
||||
nonexclusive = [
|
||||
"added_after",
|
||||
@ -294,10 +301,12 @@ def query_options_from_kwargs(**kwargs) -> QueryOptions:
|
||||
return QueryOptions(**query_dict)
|
||||
|
||||
|
||||
def load_uuid_from_file(filename):
|
||||
"""Load UUIDs from file. Does not validate UUIDs.
|
||||
Format is 1 UUID per line, any line beginning with # is ignored.
|
||||
Whitespace is stripped.
|
||||
def load_uuid_from_file(filename: str) ->list[str]:
|
||||
"""
|
||||
Load UUIDs from file.
|
||||
Does not validate UUIDs but does validate that the UUIDs are in the correct format.
|
||||
Format is 1 UUID per line, any line beginning with # is ignored.
|
||||
Whitespace is stripped.
|
||||
|
||||
Arguments:
|
||||
filename: file name of the file containing UUIDs
|
||||
@ -307,15 +316,44 @@ def load_uuid_from_file(filename):
|
||||
|
||||
Raises:
|
||||
FileNotFoundError if file does not exist
|
||||
ValueError if UUID is not in correct format
|
||||
"""
|
||||
|
||||
if filename == "-":
|
||||
return _load_uuid_from_stream(sys.stdin)
|
||||
|
||||
if not pathlib.Path(filename).is_file():
|
||||
raise FileNotFoundError(f"Could not find file {filename}")
|
||||
|
||||
with open(filename, "r") as f:
|
||||
return _load_uuid_from_stream(f)
|
||||
|
||||
def _load_uuid_from_stream(stream: io.IOBase) -> list[str]:
|
||||
"""
|
||||
Load UUIDs from stream.
|
||||
Does not validate UUIDs but does validate that the UUIDs are in the correct format.
|
||||
Format is 1 UUID per line, any line beginning with # is ignored.
|
||||
Whitespace is stripped.
|
||||
|
||||
Arguments:
|
||||
filename: file name of the file containing UUIDs
|
||||
|
||||
Returns:
|
||||
list of UUIDs or empty list of no UUIDs in file
|
||||
|
||||
Raises:
|
||||
ValueError if UUID is not in correct format
|
||||
"""
|
||||
|
||||
uuid = []
|
||||
with open(filename, "r") as uuid_file:
|
||||
for line in uuid_file:
|
||||
line = line.strip()
|
||||
if len(line) and line[0] != "#":
|
||||
uuid.append(line)
|
||||
for line in stream:
|
||||
line = line.strip()
|
||||
if len(line) and line[0] != "#":
|
||||
if not re.match(
|
||||
r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$",
|
||||
line,
|
||||
):
|
||||
raise ValueError(f"Invalid UUID: {line}")
|
||||
line = line.upper()
|
||||
uuid.append(line)
|
||||
return uuid
|
||||
|
||||
@ -1176,6 +1176,25 @@ def test_query_uuid_from_file_1():
|
||||
assert sorted(UUID_EXPECTED_FROM_FILE) == sorted(uuid_got)
|
||||
|
||||
|
||||
def test_query_uuid_from_file_stdin():
|
||||
"""Test query with --uuid-from-file reading from stdin"""
|
||||
|
||||
runner = CliRunner()
|
||||
cwd = os.getcwd()
|
||||
input_text = open(UUID_FILE, "r").read()
|
||||
result = runner.invoke(
|
||||
query,
|
||||
["--json", "--db", os.path.join(cwd, PHOTOS_DB_15_7), "--uuid-from-file", "-"],
|
||||
input=input_text,
|
||||
)
|
||||
assert result.exit_code == 0
|
||||
|
||||
# build list of uuids we got from the output JSON
|
||||
json_got = json.loads(result.output)
|
||||
uuid_got = [photo["uuid"] for photo in json_got]
|
||||
assert sorted(UUID_EXPECTED_FROM_FILE) == sorted(uuid_got)
|
||||
|
||||
|
||||
def test_query_has_comment():
|
||||
"""Test query with --has-comment"""
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user