* Improved rich_echo, added rich_echo_via_pager

* Initial implementation for #647, added rich output
This commit is contained in:
Rhet Turnbull 2022-03-06 15:17:09 +00:00 committed by GitHub
parent 1227465aa7
commit 445010e7e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 869 additions and 253 deletions

View File

@ -1,23 +1,198 @@
"""click.echo replacement that supports rich text formatting"""
import inspect
import os
import typing as t
from io import StringIO
import click
from rich.console import Console
from rich.markdown import Markdown
from rich.theme import Theme
from .common import time_stamp
__all__ = [
"get_rich_console",
"get_rich_theme",
"rich_click_echo",
"rich_echo",
"rich_echo_error",
"rich_echo_via_pager",
"set_rich_console",
"set_rich_theme",
"set_rich_timestamp",
]
# TODO: this should really be a class instead of a module with a bunch of globals
# include emoji's in rich_echo_error output
ERROR_EMOJI = True
class _Console:
"""Store console object for rich output"""
def __init__(self):
self._console: t.Optional[Console] = None
@property
def console(self):
return self._console
@console.setter
def console(self, console: Console):
self._console = console
_console = _Console()
_theme = None
_timestamp = False
# set to 1 if running tests
OSXPHOTOS_IS_TESTING = bool(os.getenv("OSXPHOTOS_IS_TESTING", default=False))
def set_rich_console(console: Console) -> None:
"""Set the console object to use for rich_echo and rich_echo_via_pager"""
global _console
_console.console = console
def get_rich_console() -> Console:
"""Get console object
Returns:
Console object
"""
global _console
return _console.console
def set_rich_theme(theme: Theme) -> None:
"""Set the theme to use for rich_click_echo"""
global _theme
_theme = theme
def get_rich_theme() -> t.Optional[Theme]:
"""Get the theme to use for rich_click_echo"""
global _theme
return _theme
def set_rich_timestamp(timestamp: bool) -> None:
"""Set whether to print timestamp with rich_echo, rich_echo_error, and rich_click_error"""
global _timestamp
_timestamp = timestamp
def rich_echo(
message: t.Optional[t.Any] = None,
theme=None,
markdown=False,
highlight=False,
**kwargs: t.Any,
) -> None:
"""Echo text to the console with rich formatting.
Args:
message: The string or bytes to output. Other objects are converted to strings.
theme: optional rich.theme.Theme object to use for formatting
markdown: if True, interpret message as Markdown
highlight: if True, use automatic rich.print highlighting
kwargs: any extra arguments are passed to rich.console.Console.print() and click.echo
if kwargs contains 'file', 'nl', 'err', 'color', these are passed to click.echo,
all other values passed to rich.console.Console.print()
"""
Echo text to the console with rich formatting.
# args for click.echo that may have been passed in kwargs
echo_args = {}
for arg in ("file", "nl", "err", "color"):
val = kwargs.pop(arg, None)
if val is not None:
echo_args[arg] = val
width = kwargs.pop("width", None)
if width is None and OSXPHOTOS_IS_TESTING:
# if not outputting to terminal, use a huge width to avoid wrapping
# otherwise tests fail
width = 10_000
console = get_rich_console() or Console(theme=theme, width=width)
if markdown:
message = Markdown(message)
# Markdown always adds a new line so disable unless explicitly specified
global _timestamp
if _timestamp:
message = time_stamp() + message
console.print(message, highlight=highlight, **kwargs)
def rich_echo_error(
message: t.Optional[t.Any] = None,
theme=None,
markdown=False,
highlight=False,
**kwargs: t.Any,
) -> None:
"""Echo text to the console with rich formatting and if stdout is redirected, echo to stderr
Args:
message: The string or bytes to output. Other objects are converted to strings.
theme: optional rich.theme.Theme object to use for formatting
markdown: if True, interpret message as Markdown
highlight: if True, use automatic rich.print highlighting
kwargs: any extra arguments are passed to rich.console.Console.print() and click.echo
if kwargs contains 'file', 'nl', 'err', 'color', these are passed to click.echo,
all other values passed to rich.console.Console.print()
"""
global ERROR_EMOJI
if ERROR_EMOJI:
if "[error]" in message:
message = f":cross_mark-emoji: {message}"
elif "[warning]" in message:
message = f":warning-emoji: {message}"
console = get_rich_console() or Console(theme=theme or get_rich_theme())
if not console.is_terminal:
# if stdout is redirected, echo to stderr
rich_click_echo(
message,
theme=theme or get_rich_theme(),
markdown=markdown,
highlight=highlight,
**kwargs,
err=True,
)
else:
rich_echo(
message,
theme=theme or get_rich_theme(),
markdown=markdown,
highlight=highlight,
**kwargs,
)
def rich_click_echo(
message: t.Optional[t.Any] = None,
theme=None,
markdown=False,
highlight=False,
**kwargs: t.Any,
) -> None:
"""Echo text to the console with rich formatting using click.echo
This is a wrapper around click.echo that supports rich text formatting.
Args:
message: The string or bytes to output. Other objects are converted to strings.
theme: optional rich.theme.Theme object to use for formatting
markdown: if True, interpret message as Markdown
highlight: if True, use automatic rich.print highlighting
kwargs: any extra arguments are passed to rich.console.Console.print() and click.echo
if kwargs contains 'file', 'nl', 'err', 'color', these are passed to click.echo,
all other values passed to rich.console.Console.print()
@ -33,10 +208,65 @@ def rich_echo(
# click.echo will include "\n" so don't add it here unless specified
end = kwargs.pop("end", "")
# rich.console.Console defaults to 80 chars if it can't auto-detect, which in this case it won't
# so we need to set the width manually to a ridiculously large number
width = kwargs.pop("width", 10000)
if width := kwargs.pop("width", None) is None:
# if not outputting to terminal, use a huge width to avoid wrapping
# otherwise tests fail
temp_console = Console()
width = temp_console.width if temp_console.is_terminal else 10_000
output = StringIO()
console = Console(force_terminal=True, file=output, width=width)
console.print(message, end=end, **kwargs)
console = Console(
force_terminal=True,
theme=theme or get_rich_theme(),
file=output,
width=width,
)
if markdown:
message = Markdown(message)
# Markdown always adds a new line so disable unless explicitly specified
echo_args["nl"] = echo_args.get("nl") is True
global _timestamp
if _timestamp:
message = time_stamp() + message
console.print(message, end=end, highlight=highlight, **kwargs)
click.echo(output.getvalue(), **echo_args)
def rich_echo_via_pager(
text_or_generator: t.Union[t.Iterable[str], t.Callable[[], t.Iterable[str]], str],
theme: t.Optional[Theme] = None,
highlight=False,
markdown: bool = False,
**kwargs,
) -> None:
"""This function takes a text and shows it via an environment specific
pager on stdout.
Args:
text_or_generator: the text to page, or alternatively, a generator emitting the text to page.
theme: optional rich.theme.Theme object to use for formatting
markdown: if True, interpret message as Markdown
highlight: if True, use automatic rich.print highlighting
**kwargs: if "color" in kwargs, works the same as click.echo_via_pager(color=color)
otherwise any kwargs are passed to rich.Console.print()
"""
if inspect.isgeneratorfunction(text_or_generator):
text_or_generator = t.cast(t.Callable[[], t.Iterable[str]], text_or_generator)()
elif isinstance(text_or_generator, str):
text_or_generator = [text_or_generator]
else:
try:
text_or_generator = iter(text_or_generator)
except TypeError:
text_or_generator = [text_or_generator]
console = _console or Console(theme=theme)
color = kwargs.pop("color", None)
if color is None:
color = bool(console.color_system)
with console.pager(styles=color):
for x in text_or_generator:
if isinstance(x, str) and markdown:
x = Markdown(x)
console.print(x, highlight=highlight, **kwargs)

View File

@ -0,0 +1,124 @@
"""Support for colorized output for photos_time_warp"""
from typing import Optional
from rich.style import Style
from rich.themes import Theme
from .common import noop
from .darkmode import is_dark_mode
__all__ = ["get_theme"]
COLOR_THEMES = {
"dark": Theme(
{
# color pallette from https://github.com/dracula/dracula-theme
"color": Style(color="rgb(248,248,242)"),
"count": Style(color="rgb(139,233,253)"),
"error": Style(color="rgb(255,85,85)", bold=True),
"filename": Style(color="rgb(189,147,249)", bold=True),
"filepath": Style(color="rgb(80,250,123)", bold=True),
"highlight": Style(color="#000000", bgcolor="#d73a49", bold=True),
"num": Style(color="rgb(139,233,253)", bold=True),
"time": Style(color="rgb(139,233,253)", bold=True),
"uuid": Style(color="rgb(255,184,108)"),
"warning": Style(color="rgb(241,250,140)", bold=True),
"bar.back": Style(color="rgb(68,71,90)"),
"bar.complete": Style(color="rgb(249,38,114)"),
"bar.finished": Style(color="rgb(80,250,123)"),
"bar.pulse": Style(color="rgb(98,114,164)"),
"progress.elapsed": Style(color="rgb(139,233,253)"),
"progress.percentage": Style(color="rgb(255,121,198)"),
"progress.remaining": Style(color="rgb(139,233,253)"),
}
),
"light": Theme(
{
"color": Style(color="#000000"),
"count": Style(color="#005cc5", bold=True),
"error": Style(color="#b31d28", bold=True, underline=True, italic=True),
"filename": Style(color="#6f42c1", bold=True),
"filepath": Style(color="#22863a", bold=True),
"highlight": Style(color="#ffffff", bgcolor="#d73a49", bold=True),
"num": Style(color="#005cc5", bold=True),
"time": Style(color="#032f62", bold=True),
"uuid": Style(color="#d73a49", bold=True),
"warning": Style(color="#e36209", bold=True, underline=True, italic=True),
"bar.back": Style(color="grey23"),
"bar.complete": Style(color="rgb(249,38,114)"),
"bar.finished": Style(color="rgb(114,156,31)"),
"bar.pulse": Style(color="rgb(249,38,114)"),
"progress.elapsed": Style(color="#032f62", bold=True),
"progress.percentage": Style(color="#6f42c1", bold=True),
"progress.remaining": Style(color="#032f62", bold=True),
}
),
"mono": Theme(
{
"count": "bold",
"error": "reverse italic",
"filename": "bold",
"filepath": "bold underline",
"highlight": "reverse italic",
"num": "bold",
"time": "bold",
"uuid": "bold",
"warning": "bold italic",
"bar.back": "",
"bar.complete": "reverse",
"bar.finished": "bold",
"bar.pulse": "bold",
"progress.elapsed": "",
"progress.percentage": "bold",
"progress.remaining": "bold",
}
),
"plain": Theme(
{
"color": "",
"count": "",
"error": "",
"filename": "",
"filepath": "",
"highlight": "",
"num": "",
"time": "",
"uuid": "",
"warning": "",
"bar.back": "",
"bar.complete": "",
"bar.finished": "",
"bar.pulse": "",
"progress.elapsed": "",
"progress.percentage": "",
"progress.remaining": "",
}
),
}
def get_theme(
theme_name: Optional[str] = None,
theme_file: Optional[str] = None,
verbose=None,
):
"""Get the color theme based on the color flags or load from config file"""
if not verbose:
verbose = noop
# figure out which color theme to use
theme_name = theme_name or "default"
if theme_name == "default" and theme_file and theme_file.is_file():
# load theme from file
verbose(f"Loading color theme from {theme_file}")
try:
theme = Theme.read(theme_file)
except Exception as e:
raise ValueError(f"Error reading theme file {theme_file}: {e}")
elif theme_name == "default":
# try to auto-detect dark/light mode
theme = COLOR_THEMES["dark"] if is_dark_mode() else COLOR_THEMES["light"]
else:
theme = COLOR_THEMES[theme_name]
return theme

View File

@ -1,19 +1,17 @@
"""Globals and constants use by the CLI commands"""
import datetime
import os
import pathlib
import typing as t
from datetime import datetime
import click
import osxphotos
from osxphotos._version import __version__
from .click_rich_echo import rich_echo
from .param_types import *
# used to show/hide hidden commands
OSXPHOTOS_HIDDEN = not bool(os.getenv("OSXPHOTOS_SHOW_HIDDEN", default=False))
@ -21,64 +19,35 @@ OSXPHOTOS_HIDDEN = not bool(os.getenv("OSXPHOTOS_SHOW_HIDDEN", default=False))
OSXPHOTOS_SNAPSHOT_DIR = "/private/tmp/osxphotos_snapshots"
# where to write the crash report if osxphotos crashes
OSXPHOTOS_CRASH_LOG = os.getcwd() + "/osxphotos_crash.log"
OSXPHOTOS_CRASH_LOG = f"{os.getcwd()}/osxphotos_crash.log"
CLI_COLOR_ERROR = "red"
CLI_COLOR_WARNING = "yellow"
__all__ = [
"CLI_COLOR_ERROR",
"CLI_COLOR_WARNING",
"DB_ARGUMENT",
"DB_OPTION",
"DEBUG_OPTIONS",
"DELETED_OPTIONS",
"JSON_OPTION",
"QUERY_OPTIONS",
"get_photos_db",
"load_uuid_from_file",
"noop",
"time_stamp",
]
def noop(*args, **kwargs):
"""no-op function"""
pass
def verbose_print(
verbose: bool = True, timestamp: bool = False, rich=False, **kwargs: t.Any
) -> t.Callable:
"""Create verbose function to print output
Args:
verbose: if True, returns verbose print function otherwise returns no-op function
timestamp: if True, includes timestamp in verbose output
rich: use rich.print instead of click.echo
kwargs: any extra arguments to pass to click.echo or rich.print depending on whether rich==True
Returns:
function to print output
"""
if not verbose:
return noop
# closure to capture timestamp
def verbose_(*args):
"""print output if verbose flag set"""
styled_args = []
timestamp_str = f"{str(datetime.datetime.now())} -- " if timestamp else ""
for arg in args:
if type(arg) == str:
arg = timestamp_str + arg
if "error" in arg.lower():
arg = click.style(arg, fg=CLI_COLOR_ERROR)
elif "warning" in arg.lower():
arg = click.style(arg, fg=CLI_COLOR_WARNING)
styled_args.append(arg)
click.echo(*styled_args, **kwargs)
def rich_verbose_(*args):
"""print output if verbose flag set using rich.print"""
timestamp_str = f"{str(datetime.datetime.now())} -- " if timestamp else ""
new_args = []
for arg in args:
if type(arg) == str:
arg = timestamp_str + arg
if "error" in arg.lower():
arg = f"[{CLI_COLOR_ERROR}]{arg}[/{CLI_COLOR_ERROR}]"
elif "warning" in arg.lower():
arg = f"[{CLI_COLOR_WARNING}]{arg}[/{CLI_COLOR_WARNING}]"
new_args.append(arg)
rich_echo(*new_args, **kwargs)
return rich_verbose_ if rich else verbose_
def time_stamp() -> str:
"""return timestamp"""
return f"[time]{str(datetime.now())}[/time] -- "
def get_photos_db(*db_options):

19
osxphotos/cli/darkmode.py Normal file
View File

@ -0,0 +1,19 @@
"""Detect dark mode on MacOS >= 10.14"""
import objc
import Foundation
def theme():
with objc.autorelease_pool():
user_defaults = Foundation.NSUserDefaults.standardUserDefaults()
system_theme = user_defaults.stringForKey_("AppleInterfaceStyle")
return "dark" if system_theme == "Dark" else "light"
def is_dark_mode():
return theme() == "dark"
def is_light_mode():
return theme() == "light"

View File

@ -9,15 +9,9 @@ from rich import print
import osxphotos
from osxphotos._constants import _PHOTOS_4_VERSION, _UNKNOWN_PLACE
from .common import (
DB_ARGUMENT,
DB_OPTION,
JSON_OPTION,
OSXPHOTOS_HIDDEN,
get_photos_db,
verbose_print,
)
from .common import DB_ARGUMENT, DB_OPTION, JSON_OPTION, OSXPHOTOS_HIDDEN, get_photos_db
from .list import _list_libraries
from .verbose import verbose_print
@click.command(hidden=OSXPHOTOS_HIDDEN)

View File

@ -57,6 +57,15 @@ from osxphotos.queryoptions import QueryOptions
from osxphotos.uti import get_preferred_uti_extension
from osxphotos.utils import format_sec_to_hhmmss, normalize_fs_path
from .click_rich_echo import (
rich_click_echo,
rich_echo,
rich_echo_error,
set_rich_console,
set_rich_theme,
set_rich_timestamp,
)
from .color_themes import get_theme
from .common import (
CLI_COLOR_ERROR,
CLI_COLOR_WARNING,
@ -71,11 +80,12 @@ from .common import (
get_photos_db,
load_uuid_from_file,
noop,
verbose_print,
)
from .help import ExportCommand, get_help_msg
from .list import _list_libraries
from .param_types import ExportDBType, FunctionCall
from .rich_progress import rich_progress
from .verbose import get_verbose_console, time_stamp, verbose_print
@click.command(cls=ExportCommand)
@ -629,6 +639,14 @@ from .param_types import ExportDBType, FunctionCall
f"Can be specified multiple times. Valid options are: {PROFILE_SORT_KEYS}. "
"Default = 'cumulative'.",
)
@click.option(
"--theme",
metavar="THEME",
type=click.Choice(["dark", "light", "mono", "plain"], case_sensitive=False),
help="Specify the color theme to use for --verbose output. "
"Valid themes are 'dark', 'light', 'mono', and 'plain'. "
"Defaults to 'dark' or 'light' depending on system dark mode setting.",
)
@DEBUG_OPTIONS
@DB_ARGUMENT
@click.argument("dest", nargs=1, type=click.Path(exists=True))
@ -782,6 +800,7 @@ def export(
preview_if_missing,
profile,
profile_sort,
theme,
debug, # debug, watch, breakpoint handled in cli/__init__.py
watch,
breakpoint,
@ -830,18 +849,28 @@ def export(
ignore=["ctx", "cli_obj", "dest", "load_config", "save_config", "config_only"],
)
verbose_ = verbose_print(verbose, timestamp, rich=True, highlight=False)
color_theme = get_theme(theme)
verbose_ = verbose_print(
verbose, timestamp, rich=True, theme=color_theme, highlight=False
)
# set console for rich_echo to be same as for verbose_
set_rich_console(get_verbose_console())
set_rich_theme(color_theme)
set_rich_timestamp(timestamp)
if load_config:
try:
cfg.load_from_file(load_config)
except ConfigOptionsLoadError as e:
click.echo(
click.style(
f"Error parsing {load_config} config file: {e.message}",
fg=CLI_COLOR_ERROR,
),
err=True,
# click.echo(
# click.style(
# f"Error parsing {load_config} config file: {e.message}",
# fg=CLI_COLOR_ERROR,
# ),
# err=True,
# )
rich_click_echo(
f"[error]Error parsing {load_config} config file: {e.message}", err=True
)
sys.exit(1)
@ -963,10 +992,11 @@ def export(
skip_uuid_from_file = cfg.skip_uuid_from_file
slow_mo = cfg.slow_mo
strip = cfg.strip
tmpdir = cfg.tmpdir
theme = cfg.theme
time_lapse = cfg.time_lapse
timestamp = cfg.timestamp
title = cfg.title
tmpdir = cfg.tmpdir
to_date = cfg.to_date
to_time = cfg.to_time
touch_file = cfg.touch_file
@ -980,8 +1010,15 @@ def export(
xattr_template = cfg.xattr_template
# config file might have changed verbose
verbose_ = verbose_print(verbose, timestamp, rich=True, highlight=False)
verbose_(f"Loaded options from file {load_config}")
color_theme = get_theme(theme)
verbose_ = verbose_print(
verbose, timestamp, rich=True, theme=color_theme, highlight=False
)
# set console for rich_echo to be same as for verbose_
set_rich_console(get_verbose_console())
set_rich_timestamp(timestamp)
verbose_(f"Loaded options from file [filepath]{load_config}")
set_crash_data("cfg", cfg.asdict())
@ -1028,28 +1065,22 @@ def export(
try:
cfg.validate(exclusive=exclusive_options, dependent=dependent_options, cli=True)
except ConfigOptionsInvalidError as e:
click.echo(
click.style(
f"Incompatible export options: {e.message}", fg=CLI_COLOR_ERROR
),
rich_click_echo(
f"[error]Incompatible export options: {e.message}",
err=True,
)
sys.exit(1)
if config_only and not save_config:
click.secho(
"--config-only must be used with --save-config",
fg=CLI_COLOR_ERROR,
rich_click_echo(
"[error]--config-only must be used with --save-config",
err=True,
)
sys.exit(1)
if all(x in [s.lower() for s in sidecar] for x in ["json", "exiftool"]):
click.echo(
click.style(
"Cannot use --sidecar json with --sidecar exiftool due to name collisions",
fg=CLI_COLOR_ERROR,
),
rich_click_echo(
"[error]Cannot use --sidecar json with --sidecar exiftool due to name collisions",
err=True,
)
sys.exit(1)
@ -1057,21 +1088,18 @@ def export(
if xattr_template:
for attr, _ in xattr_template:
if attr not in EXTENDED_ATTRIBUTE_NAMES:
click.echo(
click.style(
f"Invalid attribute '{attr}' for --xattr-template; "
f"valid values are {', '.join(EXTENDED_ATTRIBUTE_NAMES_QUOTED)}",
fg=CLI_COLOR_ERROR,
),
rich_click_echo(
f"[error]Invalid attribute '{attr}' for --xattr-template; "
f"valid values are {', '.join(EXTENDED_ATTRIBUTE_NAMES_QUOTED)}",
err=True,
)
sys.exit(1)
if save_config:
verbose_(f"Saving options to config file '{save_config}'")
verbose_(f"Saving options to config file '[filepath]{save_config}'")
cfg.write_to_file(save_config)
if config_only:
click.echo(f"Saved config file to '{save_config}'")
rich_echo(f"Saved config file to '[filepath]{save_config}'")
sys.exit(0)
# set defaults for options that need them
@ -1086,18 +1114,14 @@ def export(
retry = 0 if not retry else retry
if not os.path.isdir(dest):
click.echo(
click.style(f"DEST {dest} must be valid path", fg=CLI_COLOR_ERROR), err=True
)
rich_click_echo(f"[error]DEST {dest} must be valid path", err=True)
sys.exit(1)
dest = str(pathlib.Path(dest).resolve())
if report and os.path.isdir(report):
click.echo(
click.style(
f"report is a directory, must be file name", fg=CLI_COLOR_ERROR
),
rich_click_echo(
f"[error]report is a directory, must be file name",
err=True,
)
sys.exit(1)
@ -1130,18 +1154,15 @@ def export(
try:
exiftool_path = get_exiftool_path()
except FileNotFoundError:
click.echo(
click.style(
"Could not find exiftool. Please download and install"
" from https://exiftool.org/",
fg=CLI_COLOR_ERROR,
),
rich_click_echo(
"[error]Could not find exiftool. Please download and install"
" from https://exiftool.org/",
err=True,
)
ctx.exit(2)
ctx.exit(1)
if any([exiftool, exiftool_merge_keywords, exiftool_merge_persons]):
verbose_(f"exiftool path: {exiftool_path}")
verbose_(f"exiftool path: [filepath]{exiftool_path}")
# default searches for everything
photos = True
@ -1161,26 +1182,24 @@ def export(
cli_db = cli_obj.db if cli_obj is not None else None
db = get_photos_db(*photos_library, db, cli_db)
if not db:
click.echo(get_help_msg(export), err=True)
click.echo("\n\nLocated the following Photos library databases: ", err=True)
rich_click_echo(get_help_msg(export), err=True)
rich_click_echo(
"\n\nLocated the following Photos library databases: ", err=True
)
_list_libraries()
return
# sanity check exportdb
if exportdb and exportdb != OSXPHOTOS_EXPORT_DB:
if pathlib.Path(pathlib.Path(dest) / OSXPHOTOS_EXPORT_DB).exists():
click.echo(
click.style(
f"Warning: export database is '{exportdb}' but found '{OSXPHOTOS_EXPORT_DB}' in {dest}; using '{exportdb}'",
fg=CLI_COLOR_WARNING,
)
rich_click_echo(
f"[warning]Warning: export database is '{exportdb}' but found '{OSXPHOTOS_EXPORT_DB}' in {dest}; using '{exportdb}'",
err=True,
)
if pathlib.Path(exportdb).resolve().parent != pathlib.Path(dest):
click.echo(
click.style(
f"Warning: export database '{pathlib.Path(exportdb).resolve()}' is in a different directory than export destination '{dest}'",
fg=CLI_COLOR_WARNING,
)
rich_click_echo(
f"[warning]Warning: export database '{pathlib.Path(exportdb).resolve()}' is in a different directory than export destination '{dest}'",
err=True,
)
# open export database
@ -1189,21 +1208,18 @@ def export(
# check that export isn't in the parent or child of a previously exported library
other_db_files = find_files_in_branch(dest, OSXPHOTOS_EXPORT_DB)
if other_db_files:
click.echo(
click.style(
"WARNING: found other export database files in this destination directory branch. "
+ "This likely means you are attempting to export files into a directory "
+ "that is either the parent or a child directory of a previous export. "
+ "Proceeding may cause your exported files to be overwritten.",
fg=CLI_COLOR_WARNING,
),
rich_click_echo(
"[warning]WARNING: found other export database files in this destination directory branch. "
+ "This likely means you are attempting to export files into a directory "
+ "that is either the parent or a child directory of a previous export. "
+ "Proceeding may cause your exported files to be overwritten.",
err=True,
)
click.echo(
rich_click_echo(
f"You are exporting to {dest}, found {OSXPHOTOS_EXPORT_DB} files in:"
)
for other_db in other_db_files:
click.echo(f"{other_db}")
rich_click_echo(f"{other_db}")
click.confirm("Do you want to continue?", abort=True)
if dry_run:
@ -1219,19 +1235,21 @@ def export(
if verbose_:
if export_db.was_created:
verbose_(f"Created export database {export_db_path}")
verbose_(f"Created export database [filepath]{export_db_path}")
else:
verbose_(f"Using export database {export_db_path}")
verbose_(f"Using export database [filepath]{export_db_path}")
upgraded = export_db.was_upgraded
if upgraded:
verbose_(
f"Upgraded export database {export_db_path} from version {upgraded[0]} to {upgraded[1]}"
f"Upgraded export database [filepath]{export_db_path}[/] from version [num]{upgraded[0]}[/] to [num]{upgraded[1]}[/]"
)
# save config to export_db
export_db.set_config(cfg.write_to_str())
photosdb = osxphotos.PhotosDB(dbfile=db, verbose=verbose_, exiftool=exiftool_path)
photosdb = osxphotos.PhotosDB(
dbfile=db, verbose=verbose_, exiftool=exiftool_path, rich=True
)
# enable beta features if requested
photosdb._beta = beta
@ -1345,7 +1363,9 @@ def export(
num_photos = len(photos)
# TODO: photos or photo appears several times, pull into a separate function
photo_str = "photos" if num_photos > 1 else "photo"
click.echo(f"Exporting {num_photos} {photo_str} to {dest}...")
rich_echo(
f"Exporting [num]{num_photos}[/num] {photo_str} to [filepath]{dest}[/]..."
)
start_time = time.perf_counter()
# though the command line option is current_name, internally all processing
# logic uses original_name which is the boolean inverse of current_name
@ -1370,10 +1390,11 @@ def export(
)
photo_num = 0
# send progress bar output to /dev/null if verbose to hide the progress bar
fp = open(os.devnull, "w") if verbose else None
with click.progressbar(photos, show_pos=True, file=fp) as bar:
for p in bar:
with rich_progress(console=get_verbose_console()) as progress:
task = progress.add_task(
f"Exporting [num]{num_photos}[/] photos", total=num_photos
)
for p in photos:
photo_num += 1
export_results = export_photo(
photo=p,
@ -1430,15 +1451,13 @@ def export(
if post_function:
for function in post_function:
# post function is tuple of (function, filename.py::function_name)
verbose_(f"Calling post-function {function[1]}")
verbose_(f"Calling post-function [bold]{function[1]}")
if not dry_run:
try:
function[0](p, export_results, verbose_)
except Exception as e:
click.secho(
f"Error running post-function {function[1]}: {e}",
fg=CLI_COLOR_ERROR,
err=True,
rich_echo_error(
f"[error]Error running post-function [italic]{function[1]}[/italic]: {e}"
)
run_post_command(
@ -1536,32 +1555,31 @@ def export(
results.xattr_written.extend(xattr_written)
results.xattr_skipped.extend(xattr_skipped)
if fp is not None:
fp.close()
progress.advance(task)
photo_str_total = "photos" if len(photos) != 1 else "photo"
if update or force_update:
summary = (
f"Processed: {len(photos)} {photo_str_total}, "
f"exported: {len(results.new)}, "
f"updated: {len(results.updated)}, "
f"skipped: {len(results.skipped)}, "
f"updated EXIF data: {len(results.exif_updated)}, "
f"Processed: [num]{len(photos)}[/] {photo_str_total}, "
f"exported: [num]{len(results.new)}[/], "
f"updated: [num]{len(results.updated)}[/], "
f"skipped: [num]{len(results.skipped)}[/], "
f"updated EXIF data: [num]{len(results.exif_updated)}[/], "
)
else:
summary = (
f"Processed: {len(photos)} {photo_str_total}, "
f"exported: {len(results.exported)}, "
f"Processed: [num]{len(photos)}[/] {photo_str_total}, "
f"exported: [num]{len(results.exported)}[/], "
)
summary += f"missing: {len(results.missing)}, "
summary += f"error: {len(results.error)}"
summary += f"missing: [num]{len(results.missing)}[/], "
summary += f"error: [num]{len(results.error)}[/]"
if touch_file:
summary += f", touched date: {len(results.touched)}"
click.echo(summary)
summary += f", touched date: [num]{len(results.touched)}[/]"
rich_echo(summary)
stop_time = time.perf_counter()
click.echo(f"Elapsed time: {format_sec_to_hhmmss(stop_time-start_time)}")
rich_echo(f"Elapsed time: [time]{format_sec_to_hhmmss(stop_time-start_time)}")
else:
click.echo("Did not find any photos to export")
rich_echo("Did not find any photos to export")
# cleanup files and do report if needed
if cleanup:
@ -1587,25 +1605,25 @@ def export(
+ [r[0] for r in results.error]
+ db_files
)
click.echo(f"Cleaning up {dest}")
rich_echo(f"Cleaning up [filepath]{dest}")
cleaned_files, cleaned_dirs = cleanup_files(
dest, all_files, fileutil, verbose_=verbose_
)
file_str = "files" if len(cleaned_files) != 1 else "file"
dir_str = "directories" if len(cleaned_dirs) != 1 else "directory"
click.echo(
f"Deleted: {len(cleaned_files)} {file_str}, {len(cleaned_dirs)} {dir_str}"
rich_echo(
f"Deleted: [num]{len(cleaned_files)}[/num] {file_str}, [num]{len(cleaned_dirs)}[/num] {dir_str}"
)
results.deleted_files = cleaned_files
results.deleted_directories = cleaned_dirs
if report:
verbose_(f"Writing export report to {report}")
verbose_(f"Writing export report to [filepath]{report}")
write_export_report(report, results)
# close export_db and write changes if needed
if ramdb and not dry_run:
verbose_(f"Writing export database changes back to {export_db.path}")
verbose_(f"Writing export database changes back to [filepath]{export_db.path}")
export_db.write_to_disk()
export_db.close()
@ -1742,7 +1760,7 @@ def export_photo(
export_original = True
export_edited = False
verbose_(
f"Edited file for {photo.original_filename} is missing, exporting original"
f"Edited file for [filename]{photo.original_filename}[/] is missing, exporting original"
)
# check for missing photos before downloading
@ -1838,7 +1856,7 @@ def export_photo(
original_filename = str(original_filename)
verbose_(
f"Exporting {photo.original_filename} ({photo.filename}) as {original_filename} ({photo_num}/{num_photos})"
f"Exporting [filename]{photo.original_filename}[/] ([filename]{photo.filename}[/]) as [filepath]{original_filename}[/] ([count]{photo_num}/{num_photos}[/])"
)
results += export_photo_to_directory(
@ -1952,7 +1970,7 @@ def export_photo(
)
verbose_(
f"Exporting edited version of {photo.original_filename} ({photo.filename}) as {edited_filename}"
f"Exporting edited version of [filename]{photo.original_filename}[/filename] ([filename]{photo.filename}[/filename]) as [filepath]{edited_filename}[/filepath]"
)
results += export_photo_to_directory(
@ -2103,7 +2121,7 @@ def export_photo_to_directory(
render_options = RenderOptions(export_dir=export_dir, dest_path=dest_path)
if not export_original and not edited:
verbose_(f"Skipping original version of {photo.original_filename}")
verbose_(f"Skipping original version of [filename]{photo.original_filename}")
return results
tries = 0
@ -2147,69 +2165,61 @@ def export_photo_to_directory(
use_photos_export=use_photos_export,
verbose=verbose_,
tmpdir=tmpdir,
rich=True,
)
exporter = PhotoExporter(photo)
export_results = exporter.export(
dest=dest_path, filename=filename, options=export_options
)
for warning_ in export_results.exiftool_warning:
verbose_(f"exiftool warning for file {warning_[0]}: {warning_[1]}")
verbose_(
f"[warning]exiftool warning for file {warning_[0]}: {warning_[1]}"
)
for error_ in export_results.exiftool_error:
click.echo(
click.style(
f"exiftool error for file {error_[0]}: {error_[1]}",
fg=CLI_COLOR_ERROR,
),
err=True,
rich_echo_error(
f"[error]exiftool error for file {error_[0]}: {error_[1]}"
)
for error_ in export_results.error:
click.echo(
click.style(
f"Error exporting photo ({photo.uuid}: {photo.original_filename}) as {error_[0]}: {error_[1]}",
fg=CLI_COLOR_ERROR,
),
err=True,
rich_echo_error(
f"[error]Error exporting photo ({photo.uuid}: {photo.original_filename}) as {error_[0]}: {error_[1]}"
)
error += 1
if not error or tries > retry:
results += export_results
break
else:
click.echo(
f"Retrying export for photo ({photo.uuid}: {photo.original_filename})"
rich_echo(
f"Retrying export for photo ([uuid]{photo.uuid}[/uuid]: [filename]{photo.original_filename}[/filename])"
)
except Exception as e:
if is_debug():
# if debug mode, don't swallow the exceptions
raise e
click.echo(
click.style(
f"Error exporting photo ({photo.uuid}: {photo.original_filename}) as {filename}: {e}",
fg=CLI_COLOR_ERROR,
),
rich_echo(
f"[error]Error exporting photo ([uuid]{photo.uuid}[/uuid]: [filename]{photo.original_filename}[/filename]) as [filepath]{filename}[/filepath]: {e}",
err=True,
)
if tries > retry:
results.error.append((str(pathlib.Path(dest) / filename), e))
break
else:
click.echo(
f"Retrying export for photo ({photo.uuid}: {photo.original_filename})"
rich_echo(
f"Retrying export for photo ([uuid]{photo.uuid}[/uuid]: [filename]{photo.original_filename}[/filename])"
)
if verbose_:
if update or force_update:
for new in results.new:
verbose_(f"Exported new file {new}")
verbose_(f"Exported new file [filepath]{new}")
for updated in results.updated:
verbose_(f"Exported updated file {updated}")
verbose_(f"Exported updated file [filepath]{updated}")
for skipped in results.skipped:
verbose_(f"Skipped up to date file {skipped}")
verbose_(f"Skipped up to date file [filepath]{skipped}")
else:
for exported in results.exported:
verbose_(f"Exported {exported}")
verbose_(f"Exported [filepath]{exported}")
for touched in results.touched:
verbose_(f"Touched date on file {touched}")
verbose_(f"Touched date on file [filepath]{touched}")
return results
@ -2519,10 +2529,7 @@ def write_export_report(report_file, results):
for data in [result for result in all_results.values()]:
writer.writerow(data)
except IOError:
click.echo(
click.style("Could not open output file for writing", fg=CLI_COLOR_ERROR),
err=True,
)
rich_echo_error("[error]Could not open output file for writing"),
sys.exit(1)
@ -2545,7 +2552,7 @@ def cleanup_files(dest_path, files_to_keep, fileutil, verbose_):
deleted_files = []
for p in pathlib.Path(dest_path).rglob("*"):
if p.is_file() and normalize_fs_path(str(p).lower()) not in keepers:
verbose_(f"Deleting {p}")
verbose_(f"Deleting [filepath]{p}")
fileutil.unlink(p)
deleted_files.append(str(p))
@ -2603,6 +2610,7 @@ def write_finder_tags(
use_persons_as_keywords=person_keyword,
keyword_template=keyword_template,
merge_exif_keywords=exiftool_merge_keywords,
rich=True,
)
exif = PhotoExporter(photo)._exiftool_dict(options=export_options)
try:
@ -2628,12 +2636,8 @@ def write_finder_tags(
)
if unmatched:
click.echo(
click.style(
f"Warning: unknown field for template: {template_str} unknown field = {unmatched}",
fg=CLI_COLOR_WARNING,
),
err=True,
rich_echo(
f"[warning]Warning: unknown field for template: {template_str} unknown field = {unmatched}"
)
rendered_tags.extend(rendered)
@ -2692,12 +2696,8 @@ def write_extended_attributes(
f"Invalid template for --xattr-template '{template_str}': {e}",
)
if unmatched:
click.echo(
click.style(
f"Warning: unmatched template substitution for template: {template_str} unknown field={unmatched}",
fg=CLI_COLOR_WARNING,
),
err=True,
rich_echo(
f"[warning]Warning: unmatched template substitution for template: {template_str} unknown field={unmatched}"
)
# filter out any template values that didn't match by looking for sentinel
@ -2775,10 +2775,6 @@ def run_post_command(
finally:
run_error = run_error or run_results.returncode
if run_error:
click.echo(
click.style(
f'Error running command "{command}": {run_error}',
fg=CLI_COLOR_ERROR,
),
err=True,
rich_echo_error(
f'[error]Error running command "{command}": {run_error}'
)

View File

@ -19,7 +19,8 @@ from osxphotos.export_db_utils import (
export_db_vacuum,
)
from .common import OSXPHOTOS_HIDDEN, verbose_print
from .common import OSXPHOTOS_HIDDEN
from .verbose import verbose_print
@click.command(name="exportdb", hidden=OSXPHOTOS_HIDDEN)

View File

@ -10,8 +10,6 @@ import osxmetadata
from rich.console import Console
from rich.markdown import Markdown
from .click_rich_echo import rich_echo
from osxphotos._constants import (
EXTENDED_ATTRIBUTE_NAMES,
EXTENDED_ATTRIBUTE_NAMES_QUOTED,
@ -25,6 +23,9 @@ from osxphotos.phototemplate import (
get_template_help,
)
from .click_rich_echo import rich_echo
from .color_themes import get_theme
__all__ = [
"ExportCommand",
"template_help",
@ -57,8 +58,11 @@ def help(ctx, topic, subtopic, **kw):
if subtopic:
cmd = ctx.obj.group.commands[topic]
theme = get_theme("light")
rich_echo(
get_subtopic_help(cmd, ctx, subtopic), width=click.HelpFormatter().width
get_subtopic_help(cmd, ctx, subtopic),
theme=theme,
width=click.HelpFormatter().width,
)
return
@ -93,15 +97,11 @@ def get_subtopic_help(cmd: click.Command, ctx: click.Context, subtopic: str):
formatter.write_paragraph()
if options:
option_str = format_options_help(options, ctx, highlight=subtopic)
formatter.write(
f"Options that match '[{HIGHLIGHT_COLOR}]{subtopic}[/{HIGHLIGHT_COLOR}]':\n"
)
formatter.write(f"Options that match '[highlight]{subtopic}[/highlight]':\n")
formatter.write_paragraph()
formatter.write(option_str)
else:
formatter.write(
f"No options match '[{HIGHLIGHT_COLOR}]{subtopic}[/{HIGHLIGHT_COLOR}]'"
)
formatter.write(f"No options match '[highlight]{subtopic}[/highlight]'")
return formatter.getvalue()
@ -150,16 +150,18 @@ def format_options_help(
for record in opt_help:
record[0] = re.sub(
f"({highlight})",
f"[{HIGHLIGHT_COLOR}]\\1" + f"[/{HIGHLIGHT_COLOR}]",
"[highlight]\\1[/highlight]",
record[0],
re.IGNORECASE,
)
record[1] = re.sub(
f"({highlight})",
f"[{HIGHLIGHT_COLOR}]\\1" + f"[/{HIGHLIGHT_COLOR}]",
"[highlight]\\1[/highlight]",
record[1],
re.IGNORECASE,
)
# convert back to list of tuples as that's what write_dl expects
opt_help = [tuple(opt) for opt in opt_help]
formatter.write_dl(opt_help)

View File

@ -0,0 +1,68 @@
"""rich Progress bar factory that can return a rich Progress bar or a mock Progress bar"""
import os
from typing import Any, Optional, Union
from rich.console import Console
from rich.progress import GetTimeCallable, Progress, ProgressColumn, TaskID
# set to 1 if running tests
OSXPHOTOS_IS_TESTING = bool(os.getenv("OSXPHOTOS_IS_TESTING", default=False))
class MockProgress:
def __init__(self):
pass
def add_task(
self,
description: str,
start: bool = True,
total: float = 100.0,
completed: int = 0,
visible: bool = True,
**fields: Any,
) -> TaskID:
pass
def advance(self, task_id: TaskID, advance: float = 1) -> None:
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def rich_progress(
*columns: Union[str, ProgressColumn],
console: Optional[Console] = None,
auto_refresh: bool = True,
refresh_per_second: float = 10,
speed_estimate_period: float = 30.0,
transient: bool = False,
redirect_stdout: bool = True,
redirect_stderr: bool = True,
get_time: Optional[GetTimeCallable] = None,
disable: bool = False,
expand: bool = False,
mock: bool = False,
) -> None:
"""Return a rich.progress.Progress object unless mock=True or os.getenv("OSXPHOTOS_IS_TESTING") is set"""
# if OSXPHOTOS_IS_TESTING is set or mock=True, return a MockProgress object
if mock or OSXPHOTOS_IS_TESTING:
return MockProgress()
return Progress(
*columns,
console=console,
auto_refresh=auto_refresh,
refresh_per_second=refresh_per_second,
speed_estimate_period=speed_estimate_period,
transient=transient,
redirect_stdout=redirect_stdout,
redirect_stderr=redirect_stderr,
get_time=get_time,
disable=disable,
expand=expand,
)

View File

@ -12,7 +12,8 @@ from rich.syntax import Syntax
import osxphotos
from .common import DB_OPTION, OSXPHOTOS_SNAPSHOT_DIR, get_photos_db, verbose_print
from .common import DB_OPTION, OSXPHOTOS_SNAPSHOT_DIR, get_photos_db
from .verbose import verbose_print
@click.command(name="snap")

144
osxphotos/cli/verbose.py Normal file
View File

@ -0,0 +1,144 @@
"""helper functions for printing verbose output"""
import os
import typing as t
from datetime import datetime
import click
from rich.console import Console
from rich.theme import Theme
from .click_rich_echo import rich_click_echo
from .common import CLI_COLOR_ERROR, CLI_COLOR_WARNING, time_stamp
# set to 1 if running tests
OSXPHOTOS_IS_TESTING = bool(os.getenv("OSXPHOTOS_IS_TESTING", default=False))
# include error/warning emoji's in verbose output
ERROR_EMOJI = True
__all__ = ["get_verbose_console", "verbose_print"]
class _Console:
"""Store console object for verbose output"""
def __init__(self):
self._console: t.Optional[Console] = None
@property
def console(self):
return self._console
@console.setter
def console(self, console: Console):
self._console = console
_console = _Console()
def noop(*args, **kwargs):
"""no-op function"""
pass
def get_verbose_console() -> Console:
"""Get console object
Returns:
Console object
"""
global _console
if _console.console is None:
_console.console = Console(force_terminal=True)
return _console.console
def verbose_print(
verbose: bool = True,
timestamp: bool = False,
rich: bool = False,
highlight: bool = False,
theme: t.Optional[Theme] = None,
**kwargs: t.Any,
) -> t.Callable:
"""Create verbose function to print output
Args:
verbose: if True, returns verbose print function otherwise returns no-op function
timestamp: if True, includes timestamp in verbose output
rich: use rich.print instead of click.echo
highlight: if True, use automatic rich.print highlighting
theme: optional rich.theme.Theme object to use for formatting
kwargs: any extra arguments to pass to click.echo or rich.print depending on whether rich==True
Returns:
function to print output
"""
if not verbose:
return noop
global _console
width = 10_000 if OSXPHOTOS_IS_TESTING else None
_console.console = Console(theme=theme, width=width)
# closure to capture timestamp
def verbose_(*args):
"""print output if verbose flag set"""
styled_args = []
timestamp_str = f"{str(datetime.now())} -- " if timestamp else ""
for arg in args:
if type(arg) == str:
arg = timestamp_str + arg
if "error" in arg.lower():
arg = click.style(arg, fg=CLI_COLOR_ERROR)
elif "warning" in arg.lower():
arg = click.style(arg, fg=CLI_COLOR_WARNING)
styled_args.append(arg)
click.echo(*styled_args, **kwargs)
def rich_verbose_(*args):
"""rich.print output if verbose flag set"""
global ERROR_EMOJI
timestamp_str = time_stamp() if timestamp else ""
new_args = []
for arg in args:
if type(arg) == str:
if "error" in arg.lower():
arg = f"[error]{arg}"
if ERROR_EMOJI:
arg = f":cross_mark-emoji: {arg}"
elif "warning" in arg.lower():
arg = f"[warning]{arg}"
if ERROR_EMOJI:
arg = f":warning-emoji: {arg}"
arg = timestamp_str + arg
new_args.append(arg)
_console.console.print(*new_args, highlight=highlight, **kwargs)
def rich_verbose_testing_(*args):
"""print output if verbose flag set using rich.print"""
global ERROR_EMOJI
timestamp_str = time_stamp() if timestamp else ""
new_args = []
for arg in args:
if type(arg) == str:
if "error" in arg.lower():
arg = f"[error]{arg}"
if ERROR_EMOJI:
arg = f":cross_mark-emoji: {arg}"
elif "warning" in arg.lower():
arg = f"[warning]{arg}"
if ERROR_EMOJI:
arg = f":warning-emoji: {arg}"
arg = timestamp_str + arg
new_args.append(arg)
rich_click_echo(*new_args, theme=theme, **kwargs)
if rich and not OSXPHOTOS_IS_TESTING:
return rich_verbose_
elif rich:
return rich_verbose_testing_
else:
return verbose_

View File

@ -43,6 +43,7 @@ from .photokit import (
PhotoLibrary,
)
from .phototemplate import RenderOptions
from .rich_utils import add_rich_markup_tag
from .uti import get_preferred_uti_extension
from .utils import increment_filename, lineno, list_directory
@ -102,6 +103,7 @@ class ExportOptions:
raw_photo (bool, default=False): if True, will also export the associated RAW photo
render_options (RenderOptions): t.Optional osxphotos.phototemplate.RenderOptions instance to specify options for rendering templates
replace_keywords (bool): if True, keyword_template replaces any keywords, otherwise it's additive
rich (bool): if True, will use rich markup with verbose output
sidecar_drop_ext (bool, default=False): if True, drops the photo's extension from sidecar filename (e.g. 'IMG_1234.json' instead of 'IMG_1234.JPG.json')
sidecar: bit field (int): set to one or more of SIDECAR_XMP, SIDECAR_JSON, SIDECAR_EXIFTOOL
- SIDECAR_JSON: if set will write a json sidecar with data in format readable by exiftool sidecar filename will be dest/filename.json;
@ -150,6 +152,7 @@ class ExportOptions:
raw_photo: bool = False
render_options: t.Optional[RenderOptions] = None
replace_keywords: bool = False
rich: bool = False
sidecar_drop_ext: bool = False
sidecar: int = 0
strip: bool = False
@ -366,6 +369,12 @@ class PhotoExporter:
self._render_options = RenderOptions()
self._verbose = self.photo._verbose
# define functions for adding markup
self._filepath = add_rich_markup_tag("filepath", rich=False)
self._filename = add_rich_markup_tag("filename", rich=False)
self._uuid = add_rich_markup_tag("uuid", rich=False)
self._num = add_rich_markup_tag("num", rich=False)
# temp directory for staging downloaded missing files
self._temp_dir = None
self._temp_dir_path = None
@ -406,6 +415,12 @@ class PhotoExporter:
if verbose and not callable(verbose):
raise TypeError("verbose must be callable")
# define functions for adding markup
self._filepath = add_rich_markup_tag("filepath", rich=options.rich)
self._filename = add_rich_markup_tag("filename", rich=options.rich)
self._uuid = add_rich_markup_tag("uuid", rich=options.rich)
self._num = add_rich_markup_tag("num", rich=options.rich)
# can't use export_as_hardlink with download_missing, use_photos_export as can't hardlink the temporary files downloaded
if options.export_as_hardlink and options.download_missing:
raise ValueError(
@ -465,7 +480,7 @@ class PhotoExporter:
)
else:
verbose(
f"Skipping missing {'edited' if options.edited else 'original'} photo {self.photo.original_filename} ({self.photo.uuid})"
f"Skipping missing {'edited' if options.edited else 'original'} photo {self._filename(self.photo.original_filename)} ({self._uuid(self.photo.uuid)})"
)
all_results.missing.append(dest)
@ -482,7 +497,7 @@ class PhotoExporter:
)
else:
verbose(
f"Skipping missing live photo for {self.photo.original_filename} ({self.photo.uuid})"
f"Skipping missing live photo for {self._filename(self.photo.original_filename)} ({self._uuid(self.photo.uuid)})"
)
all_results.missing.append(live_name)
@ -498,7 +513,7 @@ class PhotoExporter:
)
else:
verbose(
f"Skipping missing edited live photo for {self.photo.original_filename} ({self.photo.uuid})"
f"Skipping missing edited live photo for {self._filename(self.photo.original_filename)} ({self._uuid(self.photo.uuid)})"
)
all_results.missing.append(live_name)
@ -519,7 +534,7 @@ class PhotoExporter:
raw_name = dest.parent / f"{dest.stem}.{raw_ext}"
all_results.missing.append(raw_name)
verbose(
f"Skipping missing raw photo for {self.photo.original_filename} ({self.photo.uuid})"
f"Skipping missing raw photo for {self._filename(self.photo.original_filename)} ({self._uuid(self.photo.uuid)})"
)
# copy preview image if requested
@ -550,7 +565,7 @@ class PhotoExporter:
preview_name = dest.parent / f"{dest.stem}{options.preview_suffix}.jpeg"
all_results.missing.append(preview_name)
verbose(
f"Skipping missing preview photo for {self.photo.original_filename} ({self.photo.uuid})"
f"Skipping missing preview photo for {self._filename(self.photo.original_filename)} ({self._uuid(self.photo.uuid)})"
)
all_results += self._write_sidecar_files(dest=dest, options=options)
@ -1201,7 +1216,9 @@ class PhotoExporter:
raise ValueError("edited or original must be True but not both")
# export to a subdirectory of tmpdir
tmpdir = self.fileutil.tmpdir("osxphotos_applescript_export_", dir=self._temp_dir_path)
tmpdir = self.fileutil.tmpdir(
"osxphotos_applescript_export_", dir=self._temp_dir_path
)
exported_files = []
filename = None
@ -1229,7 +1246,11 @@ class PhotoExporter:
exported_paths = []
for fname in exported_files:
path = pathlib.Path(tmpdir.name) / fname
if len(exported_files) > 1 and not live_photo and path.suffix.lower() == ".mov":
if (
len(exported_files) > 1
and not live_photo
and path.suffix.lower() == ".mov"
):
# it's the .mov part of live photo but not requested, so don't export
continue
if len(exported_files) > 1 and burst and path.stem != filename_stem:
@ -1248,8 +1269,6 @@ class PhotoExporter:
exported_paths.append(str(dest_new))
return exported_paths
def _write_sidecar_files(
self,
dest: pathlib.Path,
@ -1348,14 +1367,18 @@ class PhotoExporter:
)
)
if write_sidecar:
verbose(f"Writing {sidecar_type} sidecar {sidecar_filename}")
verbose(
f"Writing {sidecar_type} sidecar {self._filepath(sidecar_filename)}"
)
files_written.append(str(sidecar_filename))
if not options.dry_run:
self._write_sidecar(sidecar_filename, sidecar_str)
sidecar_record.digest = sidecar_digest
sidecar_record.dest_sig = fileutil.file_sig(sidecar_filename)
else:
verbose(f"Skipped up to date {sidecar_type} sidecar {sidecar_filename}")
verbose(
f"Skipped up to date {sidecar_type} sidecar {self._filepath(sidecar_filename)}"
)
files_skipped.append(str(sidecar_filename))
results = ExportResults(
@ -1418,7 +1441,9 @@ class PhotoExporter:
# determine if we need to write the exif metadata
# if we are not updating, we always write
# else, need to check the database to determine if we need to write
verbose(f"Writing metadata with exiftool for {pathlib.Path(dest).name}")
verbose(
f"Writing metadata with exiftool for {self._filepath(pathlib.Path(dest).name)}"
)
if not options.dry_run:
warning_, error_ = self._write_exif_data(src, options=options)
if warning_:
@ -1975,7 +2000,6 @@ def hexdigest(strval):
return h.hexdigest()
def _check_export_suffix(src, dest, edited):
"""Helper function for exporting photos to check file extensions of destination path.

View File

@ -56,6 +56,7 @@ from ..personinfo import PersonInfo
from ..photoinfo import PhotoInfo
from ..phototemplate import RenderOptions
from ..queryoptions import QueryOptions
from ..rich_utils import add_rich_markup_tag
from ..utils import (
_check_file_exists,
_db_is_locked,
@ -90,13 +91,14 @@ class PhotosDB:
labels_normalized_as_dict,
)
def __init__(self, dbfile=None, verbose=None, exiftool=None):
def __init__(self, dbfile=None, verbose=None, exiftool=None, rich=None):
"""Create a new PhotosDB object.
Args:
dbfile: specify full path to photos library or photos.db; if None, will attempt to locate last library opened by Photos.
verbose: optional callable function to use for printing verbose text during processing; if None (default), does not print output.
exiftool: optional path to exiftool for methods that require this (e.g. PhotoInfo.exiftool); if not provided, will search PATH
rich: use rich with verbose output
Raises:
FileNotFoundError if dbfile is not a valid Photos library.
@ -119,6 +121,12 @@ class PhotosDB:
raise TypeError("verbose must be callable")
self._verbose = verbose
# define functions for adding markup
self._filepath = add_rich_markup_tag("filepath", rich=rich)
self._filename = add_rich_markup_tag("filename", rich=rich)
self._uuid = add_rich_markup_tag("uuid", rich=rich)
self._num = add_rich_markup_tag("num", rich=rich)
# enable beta features
self._beta = False
@ -295,7 +303,7 @@ class PhotosDB:
# or photosanalysisd
self._dbfile = self._dbfile_actual = self._tmp_db = os.path.abspath(dbfile)
verbose(f"Processing database {self._dbfile}")
verbose(f"Processing database {self._filepath(self._dbfile)}")
# if database is exclusively locked, make a copy of it and use the copy
# Photos maintains an exclusive lock on the database file while Photos is open
@ -315,7 +323,7 @@ class PhotosDB:
raise FileNotFoundError(f"dbfile {dbfile} does not exist", dbfile)
else:
self._dbfile_actual = self._tmp_db = dbfile
verbose(f"Processing database {self._dbfile_actual}")
verbose(f"Processing database {self._filepath(self._dbfile_actual)}")
# if database is exclusively locked, make a copy of it and use the copy
if _db_is_locked(self._dbfile_actual):
verbose(f"Database locked, creating temporary copy.")
@ -630,7 +638,7 @@ class PhotosDB:
verbose = self._verbose
verbose("Processing database.")
verbose(f"Database version: {self._db_version}.")
verbose(f"Database version: {self._num(self._db_version)}.")
self._photos_ver = 4 # only used in Photos 5+
@ -1590,7 +1598,9 @@ class PhotosDB:
# some of the tables/columns have different names in different versions of Photos
photos_ver = get_db_model_version(self._tmp_db)
self._photos_ver = photos_ver
verbose(f"Database version: {self._db_version}, {photos_ver}.")
verbose(
f"Database version: {self._num(self._db_version)}, {self._num(photos_ver)}."
)
asset_table = _DB_TABLE_NAMES[photos_ver]["ASSET"]
keyword_join = _DB_TABLE_NAMES[photos_ver]["KEYWORD_JOIN"]
asset_album_table = _DB_TABLE_NAMES[photos_ver]["ASSET_ALBUM_TABLE"]

24
osxphotos/rich_utils.py Normal file
View File

@ -0,0 +1,24 @@
"""utilities for working with rich markup"""
from typing import Callable
def add_rich_markup_tag(tag: str, rich=True) -> Callable:
"""Returns function that rich markup tags to string"""
if not rich:
return no_markup
def add_tag(msg: str) -> str:
"""Add tag to string"""
return f"[{tag}]{msg}[/{tag}]"
return add_tag
def no_markup(msg: str) -> str:
"""Return msg without markup"""
return msg
__all__ = ["add_rich_markup_tag", "no_markup"]

2
pytest.ini Normal file
View File

@ -0,0 +1,2 @@
[pytest]
addopts = -p tests.plugins.env_vars

View File

View File

@ -0,0 +1,8 @@
import os
import pytest
@pytest.hookimpl(tryfirst=True)
def pytest_load_initial_conftests(args, early_config, parser):
os.environ["OSXPHOTOS_IS_TESTING"] = "1"