Refactored _query to PhotosDB.query()

This commit is contained in:
Rhet Turnbull
2021-04-18 08:32:13 -07:00
parent 952f1a6c3c
commit 345c052353
7 changed files with 526 additions and 448 deletions

View File

@@ -3,6 +3,7 @@ from .photoinfo import PhotoInfo
from .photosdb import PhotosDB from .photosdb import PhotosDB
from .photosdb._photosdb_process_comments import CommentInfo, LikeInfo from .photosdb._photosdb_process_comments import CommentInfo, LikeInfo
from .phototemplate import PhotoTemplate from .phototemplate import PhotoTemplate
from .queryoptions import QueryOptions
from .utils import _debug, _get_logger, _set_debug from .utils import _debug, _get_logger, _set_debug
# TODO: Add test for imageTimeZoneOffsetSeconds = None # TODO: Add test for imageTimeZoneOffsetSeconds = None

View File

@@ -1,3 +1,3 @@
""" version info """ """ version info """
__version__ = "0.42.4" __version__ = "0.42.5"

View File

@@ -51,6 +51,7 @@ from .fileutil import FileUtil, FileUtilNoOp
from .path_utils import is_valid_filepath, sanitize_filename, sanitize_filepath from .path_utils import is_valid_filepath, sanitize_filename, sanitize_filepath
from .photoinfo import ExportResults from .photoinfo import ExportResults
from .photokit import check_photokit_authorization, request_photokit_authorization from .photokit import check_photokit_authorization, request_photokit_authorization
from .queryoptions import QueryOptions
from .utils import get_preferred_uti_extension from .utils import get_preferred_uti_extension
# global variable to control verbose output # global variable to control verbose output
@@ -72,19 +73,6 @@ def verbose_(*args, **kwargs):
click.echo(*styled_args, **kwargs) click.echo(*styled_args, **kwargs)
def normalize_unicode(value):
""" normalize unicode data """
if value is not None:
if isinstance(value, tuple):
return tuple(unicodedata.normalize(UNICODE_FORMAT, v) for v in value)
elif isinstance(value, str):
return unicodedata.normalize(UNICODE_FORMAT, value)
else:
return value
else:
return None
def get_photos_db(*db_options): def get_photos_db(*db_options):
"""Return path to photos db, select first non-None db_options """Return path to photos db, select first non-None db_options
If no db_options are non-None, try to find library to use in If no db_options are non-None, try to find library to use in
@@ -223,7 +211,7 @@ def deleted_options(f):
return f return f
def query_options(f): def QUERY_OPTIONS(f):
o = click.option o = click.option
options = [ options = [
o( o(
@@ -519,7 +507,7 @@ def cli(ctx, db, json_, debug):
@cli.command(cls=ExportCommand) @cli.command(cls=ExportCommand)
@DB_OPTION @DB_OPTION
@click.option("--verbose", "-V", "verbose", is_flag=True, help="Print verbose output.") @click.option("--verbose", "-V", "verbose", is_flag=True, help="Print verbose output.")
@query_options @QUERY_OPTIONS
@click.option( @click.option(
"--missing", "--missing",
is_flag=True, is_flag=True,
@@ -1338,11 +1326,11 @@ def export(
if any([exiftool, exiftool_merge_keywords, exiftool_merge_persons]): if any([exiftool, exiftool_merge_keywords, exiftool_merge_persons]):
verbose_(f"exiftool path: {exiftool_path}") verbose_(f"exiftool path: {exiftool_path}")
isphoto = ismovie = True # default searches for everything photos = movies = True # default searches for everything
if only_movies: if only_movies:
isphoto = False photos = False
if only_photos: if only_photos:
ismovie = False movies = False
# load UUIDs if necessary and append to any uuids passed with --uuid # load UUIDs if necessary and append to any uuids passed with --uuid
if uuid_from_file: if uuid_from_file:
@@ -1424,8 +1412,7 @@ def export(
# enable beta features if requested # enable beta features if requested
photosdb._beta = beta photosdb._beta = beta
photos = _query( query_options = QueryOptions(
photosdb=photosdb,
keyword=keyword, keyword=keyword,
person=person, person=person,
album=album, album=album,
@@ -1446,8 +1433,8 @@ def export(
not_missing=None, not_missing=None,
shared=shared, shared=shared,
not_shared=not_shared, not_shared=not_shared,
isphoto=isphoto, photos=photos,
ismovie=ismovie, movies=movies,
uti=uti, uti=uti,
burst=burst, burst=burst,
not_burst=not_burst, not_burst=not_burst,
@@ -1497,6 +1484,17 @@ def export(
query_eval=query_eval, query_eval=query_eval,
) )
try:
photos = photosdb.query(query_options)
except ValueError as e:
if "Invalid query_eval CRITERIA:" in str(e):
msg = str(e).split(":")[1]
raise click.BadOptionUsage(
"query_eval", f"Invalid query-eval CRITERIA: {msg}"
)
else:
raise ValueError(e)
if photos: if photos:
if only_new: if only_new:
# ignore previously exported files # ignore previously exported files
@@ -1679,7 +1677,7 @@ def help(ctx, topic, **kw):
@cli.command() @cli.command()
@DB_OPTION @DB_OPTION
@JSON_OPTION @JSON_OPTION
@query_options @QUERY_OPTIONS
@deleted_options @deleted_options
@click.option("--missing", is_flag=True, help="Search for photos missing from disk.") @click.option("--missing", is_flag=True, help="Search for photos missing from disk.")
@click.option( @click.option(
@@ -1847,11 +1845,11 @@ def query(
return return
# actually have something to query # actually have something to query
isphoto = ismovie = True # default searches for everything photos = movies = True # default searches for everything
if only_movies: if only_movies:
isphoto = False photos = False
if only_photos: if only_photos:
ismovie = False movies = False
# load UUIDs if necessary and append to any uuids passed with --uuid # load UUIDs if necessary and append to any uuids passed with --uuid
if uuid_from_file: if uuid_from_file:
@@ -1869,8 +1867,7 @@ def query(
return return
photosdb = osxphotos.PhotosDB(dbfile=db, verbose=verbose_) photosdb = osxphotos.PhotosDB(dbfile=db, verbose=verbose_)
photos = _query( query_options = QueryOptions(
photosdb=photosdb,
keyword=keyword, keyword=keyword,
person=person, person=person,
album=album, album=album,
@@ -1891,8 +1888,8 @@ def query(
not_missing=not_missing, not_missing=not_missing,
shared=shared, shared=shared,
not_shared=not_shared, not_shared=not_shared,
isphoto=isphoto, photos=photos,
ismovie=ismovie, movies=movies,
uti=uti, uti=uti,
burst=burst, burst=burst,
not_burst=not_burst, not_burst=not_burst,
@@ -1939,6 +1936,17 @@ def query(
query_eval=query_eval, query_eval=query_eval,
) )
try:
photos = photosdb.query(query_options)
except ValueError as e:
if "Invalid query_eval CRITERIA:" in str(e):
msg = str(e).split(":")[1]
raise click.BadOptionUsage(
"query_eval", f"Invalid query-eval CRITERIA: {msg}"
)
else:
raise ValueError(e)
# below needed for to make CliRunner work for testing # below needed for to make CliRunner work for testing
cli_json = cli_obj.json if cli_obj is not None else None cli_json = cli_obj.json if cli_obj is not None else None
print_photo_info(photos, cli_json or json_) print_photo_info(photos, cli_json or json_)
@@ -2048,417 +2056,6 @@ def print_photo_info(photos, json=False):
csv_writer.writerow(row) csv_writer.writerow(row)
def _query(
photosdb,
keyword=None,
person=None,
album=None,
folder=None,
uuid=None,
title=None,
no_title=None,
description=None,
no_description=None,
ignore_case=None,
edited=None,
external_edit=None,
favorite=None,
not_favorite=None,
hidden=None,
not_hidden=None,
missing=None,
not_missing=None,
shared=None,
not_shared=None,
isphoto=None,
ismovie=None,
uti=None,
burst=None,
not_burst=None,
live=None,
not_live=None,
cloudasset=None,
not_cloudasset=None,
incloud=None,
not_incloud=None,
from_date=None,
to_date=None,
from_time=None,
to_time=None,
portrait=None,
not_portrait=None,
screenshot=None,
not_screenshot=None,
slow_mo=None,
not_slow_mo=None,
time_lapse=None,
not_time_lapse=None,
hdr=None,
not_hdr=None,
selfie=None,
not_selfie=None,
panorama=None,
not_panorama=None,
has_raw=None,
place=None,
no_place=None,
label=None,
deleted=False,
deleted_only=False,
has_comment=False,
no_comment=False,
has_likes=False,
no_likes=False,
is_reference=False,
in_album=False,
not_in_album=False,
burst_photos=None,
missing_bursts=None,
name=None,
min_size=None,
max_size=None,
query_eval=None,
):
"""Run a query against PhotosDB to extract the photos based on user supply criteria used by query and export commands
Args:
photosdb: PhotosDB object
"""
if deleted or deleted_only:
photos = photosdb.photos(
uuid=uuid,
images=isphoto,
movies=ismovie,
from_date=from_date,
to_date=to_date,
intrash=True,
)
else:
photos = []
if not deleted_only:
photos += photosdb.photos(
uuid=uuid,
images=isphoto,
movies=ismovie,
from_date=from_date,
to_date=to_date,
)
person = normalize_unicode(person)
keyword = normalize_unicode(keyword)
album = normalize_unicode(album)
folder = normalize_unicode(folder)
title = normalize_unicode(title)
description = normalize_unicode(description)
place = normalize_unicode(place)
label = normalize_unicode(label)
if album:
photos = get_photos_by_attribute(photos, "albums", album, ignore_case)
if keyword:
photos = get_photos_by_attribute(photos, "keywords", keyword, ignore_case)
if person:
photos = get_photos_by_attribute(photos, "persons", person, ignore_case)
if label:
photos = get_photos_by_attribute(photos, "labels", label, ignore_case)
if folder:
# search for photos in an album in folder
# finds photos that have albums whose top level folder matches folder
photo_list = []
for f in folder:
photo_list.extend(
[
p
for p in photos
if p.album_info
and f in [a.folder_names[0] for a in p.album_info if a.folder_names]
]
)
photos = photo_list
if title:
# search title field for text
# if more than one, find photos with all title values in title
photo_list = []
if ignore_case:
# case-insensitive
for t in title:
t = t.lower()
photo_list.extend(
[p for p in photos if p.title and t in p.title.lower()]
)
else:
for t in title:
photo_list.extend([p for p in photos if p.title and t in p.title])
photos = photo_list
elif no_title:
photos = [p for p in photos if not p.title]
if description:
# search description field for text
# if more than one, find photos with all description values in description
photo_list = []
if ignore_case:
# case-insensitive
for d in description:
d = d.lower()
photo_list.extend(
[p for p in photos if p.description and d in p.description.lower()]
)
else:
for d in description:
photo_list.extend(
[p for p in photos if p.description and d in p.description]
)
photos = photo_list
elif no_description:
photos = [p for p in photos if not p.description]
if place:
# search place.names for text matching place
# if more than one place, find photos with all place values in description
if ignore_case:
# case-insensitive
for place_name in place:
place_name = place_name.lower()
photos = [
p
for p in photos
if p.place
and any(
pname
for pname in p.place.names
if any(
pvalue for pvalue in pname if place_name in pvalue.lower()
)
)
]
else:
for place_name in place:
photos = [
p
for p in photos
if p.place
and any(
pname
for pname in p.place.names
if any(pvalue for pvalue in pname if place_name in pvalue)
)
]
elif no_place:
photos = [p for p in photos if not p.place]
if edited:
photos = [p for p in photos if p.hasadjustments]
if external_edit:
photos = [p for p in photos if p.external_edit]
if favorite:
photos = [p for p in photos if p.favorite]
elif not_favorite:
photos = [p for p in photos if not p.favorite]
if hidden:
photos = [p for p in photos if p.hidden]
elif not_hidden:
photos = [p for p in photos if not p.hidden]
if missing:
photos = [p for p in photos if not p.path]
elif not_missing:
photos = [p for p in photos if p.path]
if shared:
photos = [p for p in photos if p.shared]
elif not_shared:
photos = [p for p in photos if not p.shared]
if shared:
photos = [p for p in photos if p.shared]
elif not_shared:
photos = [p for p in photos if not p.shared]
if uti:
photos = [p for p in photos if uti in p.uti_original]
if burst:
photos = [p for p in photos if p.burst]
elif not_burst:
photos = [p for p in photos if not p.burst]
if live:
photos = [p for p in photos if p.live_photo]
elif not_live:
photos = [p for p in photos if not p.live_photo]
if portrait:
photos = [p for p in photos if p.portrait]
elif not_portrait:
photos = [p for p in photos if not p.portrait]
if screenshot:
photos = [p for p in photos if p.screenshot]
elif not_screenshot:
photos = [p for p in photos if not p.screenshot]
if slow_mo:
photos = [p for p in photos if p.slow_mo]
elif not_slow_mo:
photos = [p for p in photos if not p.slow_mo]
if time_lapse:
photos = [p for p in photos if p.time_lapse]
elif not_time_lapse:
photos = [p for p in photos if not p.time_lapse]
if hdr:
photos = [p for p in photos if p.hdr]
elif not_hdr:
photos = [p for p in photos if not p.hdr]
if selfie:
photos = [p for p in photos if p.selfie]
elif not_selfie:
photos = [p for p in photos if not p.selfie]
if panorama:
photos = [p for p in photos if p.panorama]
elif not_panorama:
photos = [p for p in photos if not p.panorama]
if cloudasset:
photos = [p for p in photos if p.iscloudasset]
elif not_cloudasset:
photos = [p for p in photos if not p.iscloudasset]
if incloud:
photos = [p for p in photos if p.incloud]
elif not_incloud:
photos = [p for p in photos if not p.incloud]
if has_raw:
photos = [p for p in photos if p.has_raw]
if has_comment:
photos = [p for p in photos if p.comments]
elif no_comment:
photos = [p for p in photos if not p.comments]
if has_likes:
photos = [p for p in photos if p.likes]
elif no_likes:
photos = [p for p in photos if not p.likes]
if is_reference:
photos = [p for p in photos if p.isreference]
if in_album:
photos = [p for p in photos if p.albums]
elif not_in_album:
photos = [p for p in photos if not p.albums]
if from_time:
photos = [p for p in photos if p.date.time() >= from_time]
if to_time:
photos = [p for p in photos if p.date.time() <= to_time]
if burst_photos:
# add the burst_photos to the export set
photos_burst = [p for p in photos if p.burst]
for burst in photos_burst:
if missing_bursts:
# include burst photos that are missing
photos.extend(burst.burst_photos)
else:
# don't include missing burst images (these can't be downloaded with AppleScript)
photos.extend([p for p in burst.burst_photos if not p.ismissing])
# remove duplicates as each burst photo in the set that's selected would
# result in the entire set being added above
# can't use set() because PhotoInfo not hashable
seen_uuids = {}
for p in photos:
if p.uuid in seen_uuids:
continue
seen_uuids[p.uuid] = p
photos = list(seen_uuids.values())
if name:
# search filename fields for text
# if more than one, find photos with all title values in filename
photo_list = []
if ignore_case:
# case-insensitive
for n in name:
n = n.lower()
photo_list.extend(
[
p
for p in photos
if n in p.filename.lower() or n in p.original_filename.lower()
]
)
else:
for n in name:
photo_list.extend(
[p for p in photos if n in p.filename or n in p.original_filename]
)
photos = photo_list
if min_size:
photos = [p for p in photos if bitmath.Byte(p.original_filesize) >= min_size]
if max_size:
photos = [p for p in photos if bitmath.Byte(p.original_filesize) <= max_size]
if query_eval:
for q in query_eval:
query_string = f"[photo for photo in photos if {q}]"
try:
photos = eval(query_string)
except Exception as e:
raise click.BadOptionUsage(
"query_eval", f"Invalid query-eval CRITERIA: {e}"
)
return photos
def get_photos_by_attribute(photos, attribute, values, ignore_case):
"""Search for photos based on values being in PhotoInfo.attribute
Args:
photos: a list of PhotoInfo objects
attribute: str, name of PhotoInfo attribute to search (e.g. keywords, persons, etc)
values: list of values to search in property
ignore_case: ignore case when searching
Returns:
list of PhotoInfo objects matching search criteria
"""
photos_search = []
if ignore_case:
# case-insensitive
for x in values:
x = x.lower()
photos_search.extend(
p
for p in photos
if x in [attr.lower() for attr in getattr(p, attribute)]
)
else:
for x in values:
photos_search.extend(p for p in photos if x in getattr(p, attribute))
return photos_search
def export_photo( def export_photo(
photo=None, photo=None,
dest=None, dest=None,

View File

@@ -12,6 +12,9 @@ import sys
import tempfile import tempfile
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from pprint import pformat from pprint import pformat
from typing import List
import bitmath
from .._constants import ( from .._constants import (
_DB_TABLE_NAMES, _DB_TABLE_NAMES,
@@ -47,6 +50,7 @@ from ..utils import (
noop, noop,
normalize_unicode, normalize_unicode,
) )
from ..queryoptions import QueryOptions
from .photosdb_utils import get_db_model_version, get_db_version from .photosdb_utils import get_db_model_version, get_db_version
# TODO: Add test for imageTimeZoneOffsetSeconds = None # TODO: Add test for imageTimeZoneOffsetSeconds = None
@@ -2833,6 +2837,346 @@ class PhotosDB:
pass pass
return photos return photos
def query(self, options: QueryOptions) -> List[PhotoInfo]:
"""Run a query against PhotosDB to extract the photos based on user supplied options
Args:
options: a QueryOptions instance
"""
if options.deleted or options.deleted_only:
photos = self.photos(
uuid=options.uuid,
images=options.photos,
movies=options.movies,
from_date=options.from_date,
to_date=options.to_date,
intrash=True,
)
else:
photos = []
if not options.deleted_only:
photos += self.photos(
uuid=options.uuid,
images=options.photos,
movies=options.movies,
from_date=options.from_date,
to_date=options.to_date,
)
person = normalize_unicode(options.person)
keyword = normalize_unicode(options.keyword)
album = normalize_unicode(options.album)
folder = normalize_unicode(options.folder)
title = normalize_unicode(options.title)
description = normalize_unicode(options.description)
place = normalize_unicode(options.place)
label = normalize_unicode(options.label)
name = normalize_unicode(options.name)
if album:
photos = _get_photos_by_attribute(
photos, "albums", album, options.ignore_case
)
if keyword:
photos = _get_photos_by_attribute(
photos, "keywords", keyword, options.ignore_case
)
if person:
photos = _get_photos_by_attribute(
photos, "persons", person, options.ignore_case
)
if label:
photos = _get_photos_by_attribute(
photos, "labels", label, options.ignore_case
)
if folder:
# search for photos in an album in folder
# finds photos that have albums whose top level folder matches folder
photo_list = []
for f in folder:
photo_list.extend(
[
p
for p in photos
if p.album_info
and f
in [a.folder_names[0] for a in p.album_info if a.folder_names]
]
)
photos = photo_list
if title:
# search title field for text
# if more than one, find photos with all title values in title
photo_list = []
if options.ignore_case:
# case-insensitive
for t in title:
t = t.lower()
photo_list.extend(
[p for p in photos if p.title and t in p.title.lower()]
)
else:
for t in title:
photo_list.extend([p for p in photos if p.title and t in p.title])
photos = photo_list
elif options.no_title:
photos = [p for p in photos if not p.title]
if description:
# search description field for text
# if more than one, find photos with all description values in description
photo_list = []
if options.ignore_case:
# case-insensitive
for d in description:
d = d.lower()
photo_list.extend(
[
p
for p in photos
if p.description and d in p.description.lower()
]
)
else:
for d in description:
photo_list.extend(
[p for p in photos if p.description and d in p.description]
)
photos = photo_list
elif options.no_description:
photos = [p for p in photos if not p.description]
if place:
# search place.names for text matching place
# if more than one place, find photos with all place values in description
if options.ignore_case:
# case-insensitive
for place_name in place:
place_name = place_name.lower()
photos = [
p
for p in photos
if p.place
and any(
pname
for pname in p.place.names
if any(
pvalue
for pvalue in pname
if place_name in pvalue.lower()
)
)
]
else:
for place_name in place:
photos = [
p
for p in photos
if p.place
and any(
pname
for pname in p.place.names
if any(pvalue for pvalue in pname if place_name in pvalue)
)
]
elif options.no_place:
photos = [p for p in photos if not p.place]
if options.edited:
photos = [p for p in photos if p.hasadjustments]
if options.external_edit:
photos = [p for p in photos if p.external_edit]
if options.favorite:
photos = [p for p in photos if p.favorite]
elif options.not_favorite:
photos = [p for p in photos if not p.favorite]
if options.hidden:
photos = [p for p in photos if p.hidden]
elif options.not_hidden:
photos = [p for p in photos if not p.hidden]
if options.missing:
photos = [p for p in photos if not p.path]
elif options.not_missing:
photos = [p for p in photos if p.path]
if options.shared:
photos = [p for p in photos if p.shared]
elif options.not_shared:
photos = [p for p in photos if not p.shared]
if options.shared:
photos = [p for p in photos if p.shared]
elif options.not_shared:
photos = [p for p in photos if not p.shared]
if options.uti:
photos = [p for p in photos if options.uti in p.uti_original]
if options.burst:
photos = [p for p in photos if p.burst]
elif options.not_burst:
photos = [p for p in photos if not p.burst]
if options.live:
photos = [p for p in photos if p.live_photo]
elif options.not_live:
photos = [p for p in photos if not p.live_photo]
if options.portrait:
photos = [p for p in photos if p.portrait]
elif options.not_portrait:
photos = [p for p in photos if not p.portrait]
if options.screenshot:
photos = [p for p in photos if p.screenshot]
elif options.not_screenshot:
photos = [p for p in photos if not p.screenshot]
if options.slow_mo:
photos = [p for p in photos if p.slow_mo]
elif options.not_slow_mo:
photos = [p for p in photos if not p.slow_mo]
if options.time_lapse:
photos = [p for p in photos if p.time_lapse]
elif options.not_time_lapse:
photos = [p for p in photos if not p.time_lapse]
if options.hdr:
photos = [p for p in photos if p.hdr]
elif options.not_hdr:
photos = [p for p in photos if not p.hdr]
if options.selfie:
photos = [p for p in photos if p.selfie]
elif options.not_selfie:
photos = [p for p in photos if not p.selfie]
if options.panorama:
photos = [p for p in photos if p.panorama]
elif options.not_panorama:
photos = [p for p in photos if not p.panorama]
if options.cloudasset:
photos = [p for p in photos if p.iscloudasset]
elif options.not_cloudasset:
photos = [p for p in photos if not p.iscloudasset]
if options.incloud:
photos = [p for p in photos if p.incloud]
elif options.not_incloud:
photos = [p for p in photos if not p.incloud]
if options.has_raw:
photos = [p for p in photos if p.has_raw]
if options.has_comment:
photos = [p for p in photos if p.comments]
elif options.no_comment:
photos = [p for p in photos if not p.comments]
if options.has_likes:
photos = [p for p in photos if p.likes]
elif options.no_likes:
photos = [p for p in photos if not p.likes]
if options.is_reference:
photos = [p for p in photos if p.isreference]
if options.in_album:
photos = [p for p in photos if p.albums]
elif options.not_in_album:
photos = [p for p in photos if not p.albums]
if options.from_time:
photos = [p for p in photos if p.date.time() >= options.from_time]
if options.to_time:
photos = [p for p in photos if p.date.time() <= options.to_time]
if options.burst_photos:
# add the burst_photos to the export set
photos_burst = [p for p in photos if p.burst]
for burst in photos_burst:
if options.missing_bursts:
# include burst photos that are missing
photos.extend(burst.burst_photos)
else:
# don't include missing burst images (these can't be downloaded with AppleScript)
photos.extend([p for p in burst.burst_photos if not p.ismissing])
# remove duplicates as each burst photo in the set that's selected would
# result in the entire set being added above
# can't use set() because PhotoInfo not hashable
seen_uuids = {}
for p in photos:
if p.uuid in seen_uuids:
continue
seen_uuids[p.uuid] = p
photos = list(seen_uuids.values())
if name:
# search filename fields for text
# if more than one, find photos with all title values in filename
photo_list = []
if options.ignore_case:
# case-insensitive
for n in name:
n = n.lower()
photo_list.extend(
[
p
for p in photos
if n in p.filename.lower()
or n in p.original_filename.lower()
]
)
else:
for n in name:
photo_list.extend(
[
p
for p in photos
if n in p.filename or n in p.original_filename
]
)
photos = photo_list
if options.min_size:
photos = [
p
for p in photos
if bitmath.Byte(p.original_filesize) >= options.min_size
]
if options.max_size:
photos = [
p
for p in photos
if bitmath.Byte(p.original_filesize) <= options.max_size
]
if options.query_eval:
for q in options.query_eval:
query_string = f"[photo for photo in photos if {q}]"
try:
photos = eval(query_string)
except Exception as e:
raise ValueError(f"Invalid query_eval CRITERIA: {e}")
return photos
def __repr__(self): def __repr__(self):
return f"osxphotos.{self.__class__.__name__}(dbfile='{self.db_path}')" return f"osxphotos.{self.__class__.__name__}(dbfile='{self.db_path}')"
@@ -2848,3 +3192,32 @@ class PhotosDB:
Includes recently deleted photos and non-selected burst images Includes recently deleted photos and non-selected burst images
""" """
return len(self._dbphotos) return len(self._dbphotos)
def _get_photos_by_attribute(photos, attribute, values, ignore_case):
"""Search for photos based on values being in PhotoInfo.attribute
Args:
photos: a list of PhotoInfo objects
attribute: str, name of PhotoInfo attribute to search (e.g. keywords, persons, etc)
values: list of values to search in property
ignore_case: ignore case when searching
Returns:
list of PhotoInfo objects matching search criteria
"""
photos_search = []
if ignore_case:
# case-insensitive
for x in values:
x = x.lower()
photos_search.extend(
p
for p in photos
if x in [attr.lower() for attr in getattr(p, attribute)]
)
else:
for x in values:
photos_search.extend(p for p in photos if x in getattr(p, attribute))
return photos_search

104
osxphotos/queryoptions.py Normal file
View File

@@ -0,0 +1,104 @@
""" QueryOptions class for PhotosDB.query """
from dataclasses import dataclass
from typing import Optional, Iterable
import datetime
import bitmath
@dataclass
class QueryOptions:
keyword: Optional[Iterable[str]] = None
person: Optional[Iterable[str]] = None
album: Optional[Iterable[str]] = None
folder: Optional[Iterable[str]] = None
uuid: Optional[Iterable[str]] = None
title: Optional[Iterable[str]] = None
no_title: Optional[bool] = None
description: Optional[Iterable[str]] = None
no_description: Optional[bool] = None
ignore_case: Optional[bool] = None
edited: Optional[bool] = None
external_edit: Optional[bool] = None
favorite: Optional[bool] = None
not_favorite: Optional[bool] = None
hidden: Optional[bool] = None
not_hidden: Optional[bool] = None
missing: Optional[bool] = None
not_missing: Optional[bool] = None
shared: Optional[bool] = None
not_shared: Optional[bool] = None
photos: Optional[bool] = True
movies: Optional[bool] = True
uti: Optional[Iterable[str]] = None
burst: Optional[bool] = None
not_burst: Optional[bool] = None
live: Optional[bool] = None
not_live: Optional[bool] = None
cloudasset: Optional[bool] = None
not_cloudasset: Optional[bool] = None
incloud: Optional[bool] = None
not_incloud: Optional[bool] = None
from_date: Optional[datetime.datetime] = None
to_date: Optional[datetime.datetime] = None
from_time: Optional[datetime.time] = None
to_time: Optional[datetime.time] = None
portrait: Optional[bool] = None
not_portrait: Optional[bool] = None
screenshot: Optional[bool] = None
not_screenshot: Optional[bool] = None
slow_mo: Optional[bool] = None
not_slow_mo: Optional[bool] = None
time_lapse: Optional[bool] = None
not_time_lapse: Optional[bool] = None
hdr: Optional[bool] = None
not_hdr: Optional[bool] = None
selfie: Optional[bool] = None
not_selfie: Optional[bool] = None
panorama: Optional[bool] = None
not_panorama: Optional[bool] = None
has_raw: Optional[bool] = None
place: Optional[Iterable[str]] = None
no_place: Optional[bool] = None
label: Optional[Iterable[str]] = None
deleted: Optional[bool] = None
deleted_only: Optional[bool] = None
has_comment: Optional[bool] = None
no_comment: Optional[bool] = None
has_likes: Optional[bool] = None
no_likes: Optional[bool] = None
is_reference: Optional[bool] = None
in_album: Optional[bool] = None
not_in_album: Optional[bool] = None
burst_photos: Optional[bool] = None
missing_bursts: Optional[bool] = None
name: Optional[Iterable[str]] = None
min_size: Optional[bitmath.Byte] = None
max_size: Optional[bitmath.Byte] = None
query_eval: Optional[Iterable[str]] = None
def asdict(self):
return asdict(self)
# def init_with_values(
# self,
# keyword: Optional[Iterable[str]] = None,
# person: Optional[Iterable[str]] = None,
# album: Optional[Iterable[str]] = None,
# folder: Optional[Iterable[str]] = None,
# uuid: Optional[Iterable[str]] = None,
# title: Optional[Iterable[str]] = None,
# no_title: Optional[bool] = None,
# **kwargs
# ):
# self.keyword = keyword
# self.person = person
# self.album = album
# self.folder = folder
# self.uuid = uuid
# self.title = title
# self.no_title
# print(asdict(self))

View File

@@ -373,13 +373,16 @@ def _db_is_locked(dbname):
def normalize_unicode(value): def normalize_unicode(value):
""" normalize unicode data """ """ normalize unicode data """
if value is None: if value is not None:
if isinstance(value, (tuple, list)):
return tuple(unicodedata.normalize(UNICODE_FORMAT, v) for v in value)
elif isinstance(value, str):
return unicodedata.normalize(UNICODE_FORMAT, value)
else:
return value
else:
return None return None
if not isinstance(value, str):
raise ValueError("value must be str")
return unicodedata.normalize(UNICODE_FORMAT, value)
def increment_filename(filepath): def increment_filename(filepath):
""" Return filename (1).ext, etc if filename.ext exists """ Return filename (1).ext, etc if filename.ext exists