Added better exiftool error handling, closes #300

This commit is contained in:
Rhet Turnbull
2020-12-20 20:37:23 -08:00
parent 8d1ccda0c8
commit e0e8850e56
32 changed files with 458 additions and 264 deletions

View File

@@ -19,6 +19,8 @@ from ._constants import (
_EXIF_TOOL_URL, _EXIF_TOOL_URL,
_PHOTOS_4_VERSION, _PHOTOS_4_VERSION,
_UNKNOWN_PLACE, _UNKNOWN_PLACE,
CLI_COLOR_ERROR,
CLI_COLOR_WARNING,
DEFAULT_JPEG_QUALITY, DEFAULT_JPEG_QUALITY,
DEFAULT_EDITED_SUFFIX, DEFAULT_EDITED_SUFFIX,
DEFAULT_ORIGINAL_SUFFIX, DEFAULT_ORIGINAL_SUFFIX,
@@ -50,7 +52,15 @@ OSXPHOTOS_EXPORT_DB = ".osxphotos_export.db"
def verbose_(*args, **kwargs): def verbose_(*args, **kwargs):
""" print output if verbose flag set """ """ print output if verbose flag set """
if VERBOSE: if VERBOSE:
click.echo(*args, **kwargs) styled_args = []
for arg in args:
if type(arg) == str:
if "error" in arg.lower():
arg = click.style(arg, fg=CLI_COLOR_ERROR)
elif "warning" in arg.lower():
arg = click.style(arg, fg=CLI_COLOR_WARNING)
styled_args.append(arg)
click.echo(*styled_args, **kwargs)
def normalize_unicode(value): def normalize_unicode(value):
@@ -1605,7 +1615,11 @@ def export(
cfg.load_from_file(load_config) cfg.load_from_file(load_config)
except ConfigOptionsLoadError as e: except ConfigOptionsLoadError as e:
click.echo( click.echo(
f"Error parsing {load_config} config file: {e.message}", err=True click.style(
f"Error parsing {load_config} config file: {e.message}",
fg=CLI_COLOR_ERROR,
),
err=True,
) )
raise click.Abort() raise click.Abort()
@@ -1737,7 +1751,12 @@ def export(
try: try:
cfg.validate(exclusive=exclusive_options, dependent=dependent_options, cli=True) cfg.validate(exclusive=exclusive_options, dependent=dependent_options, cli=True)
except ConfigOptionsInvalidError as e: except ConfigOptionsInvalidError as e:
click.echo(f"Incompatible export options: {e.message}", err=True) click.echo(
click.style(
f"Incompatible export options: {e.message}", fg=CLI_COLOR_ERROR
),
err=True,
)
raise click.Abort() raise click.Abort()
if save_config: if save_config:
@@ -1752,13 +1771,20 @@ def export(
) )
if not os.path.isdir(dest): if not os.path.isdir(dest):
click.echo(f"DEST {dest} must be valid path", err=True) click.echo(
click.style(f"DEST {dest} must be valid path", fg=CLI_COLOR_ERROR), err=True
)
raise click.Abort() raise click.Abort()
dest = str(pathlib.Path(dest).resolve()) dest = str(pathlib.Path(dest).resolve())
if report and os.path.isdir(report): if report and os.path.isdir(report):
click.echo(f"report is a directory, must be file name", err=True) click.echo(
click.style(
f"report is a directory, must be file name", fg=CLI_COLOR_ERROR
),
err=True,
)
raise click.Abort() raise click.Abort()
# if use_photokit and not check_photokit_authorization(): # if use_photokit and not check_photokit_authorization():
@@ -1785,8 +1811,11 @@ def export(
_ = get_exiftool_path() _ = get_exiftool_path()
except FileNotFoundError: except FileNotFoundError:
click.echo( click.echo(
"Could not find exiftool. Please download and install" click.style(
" from https://exiftool.org/", "Could not find exiftool. Please download and install"
" from https://exiftool.org/",
fg=CLI_COLOR_ERROR,
),
err=True, err=True,
) )
ctx.exit(2) ctx.exit(2)
@@ -1819,10 +1848,13 @@ def export(
other_db_files = find_files_in_branch(dest, OSXPHOTOS_EXPORT_DB) other_db_files = find_files_in_branch(dest, OSXPHOTOS_EXPORT_DB)
if other_db_files: if other_db_files:
click.echo( click.echo(
"WARNING: found other export database files in this destination directory branch. " click.style(
+ "This likely means you are attempting to export files into a directory " "WARNING: found other export database files in this destination directory branch. "
+ "that is either the parent or a child directory of a previous export. " + "This likely means you are attempting to export files into a directory "
+ "Proceeding may cause your exported files to be overwritten.", + "that is either the parent or a child directory of a previous export. "
+ "Proceeding may cause your exported files to be overwritten.",
fg=CLI_COLOR_WARNING,
),
err=True, err=True,
) )
click.echo( click.echo(
@@ -1929,22 +1961,10 @@ def export(
# because the original code used --original-name as an option # because the original code used --original-name as an option
original_name = not current_name original_name = not current_name
results_exported = [] results = ExportResults()
results_new = []
results_updated = []
results_skipped = []
results_exif_updated = []
results_touched = []
results_converted = []
results_sidecar_json_written = []
results_sidecar_json_skipped = []
results_sidecar_xmp_written = []
results_sidecar_xmp_skipped = []
results_missing = []
results_error = []
if verbose: if verbose:
for p in photos: for p in photos:
results = export_photo( export_results = export_photo(
photo=p, photo=p,
dest=dest, dest=dest,
verbose=verbose, verbose=verbose,
@@ -1979,19 +1999,7 @@ def export(
ignore_date_modified=ignore_date_modified, ignore_date_modified=ignore_date_modified,
use_photokit=use_photokit, use_photokit=use_photokit,
) )
results_exported.extend(results.exported) results += export_results
results_new.extend(results.new)
results_updated.extend(results.updated)
results_skipped.extend(results.skipped)
results_exif_updated.extend(results.exif_updated)
results_touched.extend(results.touched)
results_converted.extend(results.converted_to_jpeg)
results_sidecar_json_written.extend(results.sidecar_json_written)
results_sidecar_json_skipped.extend(results.sidecar_json_skipped)
results_sidecar_xmp_written.extend(results.sidecar_xmp_written)
results_sidecar_xmp_skipped.extend(results.sidecar_xmp_skipped)
results_missing.extend(results.missing)
results_error.extend(results.error)
# if convert_to_jpeg and p.isphoto and p.uti != "public.jpeg": # if convert_to_jpeg and p.isphoto and p.uti != "public.jpeg":
# for photo_file in set( # for photo_file in set(
@@ -2003,7 +2011,7 @@ def export(
# show progress bar # show progress bar
with click.progressbar(photos) as bar: with click.progressbar(photos) as bar:
for p in bar: for p in bar:
results = export_photo( export_results = export_photo(
photo=p, photo=p,
dest=dest, dest=dest,
verbose=verbose, verbose=verbose,
@@ -2038,19 +2046,7 @@ def export(
ignore_date_modified=ignore_date_modified, ignore_date_modified=ignore_date_modified,
use_photokit=use_photokit, use_photokit=use_photokit,
) )
results_exported.extend(results.exported) results += export_results
results_new.extend(results.new)
results_updated.extend(results.updated)
results_skipped.extend(results.skipped)
results_exif_updated.extend(results.exif_updated)
results_touched.extend(results.touched)
results_converted.extend(results.converted_to_jpeg)
results_sidecar_json_written.extend(results.sidecar_json_written)
results_sidecar_json_skipped.extend(results.sidecar_json_skipped)
results_sidecar_xmp_written.extend(results.sidecar_xmp_written)
results_sidecar_xmp_skipped.extend(results.sidecar_xmp_skipped)
results_missing.extend(results.missing)
results_error.extend(results.error)
# print summary results # print summary results
# print(f"results_exported: {results_exported}") # print(f"results_exported: {results_exported}")
@@ -2069,19 +2065,19 @@ def export(
if cleanup: if cleanup:
all_files = ( all_files = (
results_exported results.exported
+ results_skipped + results.skipped
+ results_exif_updated + results.exif_updated
+ results_touched + results.touched
+ results_converted + results.converted_to_jpeg
+ results_sidecar_json_written + results.sidecar_json_written
+ results_sidecar_json_skipped + results.sidecar_json_skipped
+ results_sidecar_xmp_written + results.sidecar_xmp_written
+ results_sidecar_xmp_skipped + results.sidecar_xmp_skipped
# include missing so a file that was already in export directory # include missing so a file that was already in export directory
# but was missing on --update doesn't get deleted # but was missing on --update doesn't get deleted
# (better to have old version than none) # (better to have old version than none)
+ results_missing + results.missing
+ [str(pathlib.Path(export_db_path).resolve())] + [str(pathlib.Path(export_db_path).resolve())]
) )
click.echo(f"Cleaning up {dest}") click.echo(f"Cleaning up {dest}")
@@ -2092,41 +2088,26 @@ def export(
if report: if report:
verbose_(f"Writing export report to {report}") verbose_(f"Writing export report to {report}")
write_export_report( write_export_report(report, results)
report,
results_exported=results_exported,
results_new=results_new,
results_updated=results_updated,
results_skipped=results_skipped,
results_exif_updated=results_exif_updated,
results_touched=results_touched,
results_converted=results_converted,
results_sidecar_json_written=results_sidecar_json_written,
results_sidecar_json_skipped=results_sidecar_json_skipped,
results_sidecar_xmp_written=results_sidecar_xmp_written,
results_sidecar_xmp_skipped=results_sidecar_xmp_skipped,
results_missing=results_missing,
results_error=results_error,
)
photo_str_total = "photos" if len(photos) != 1 else "photo" photo_str_total = "photos" if len(photos) != 1 else "photo"
if update: if update:
summary = ( summary = (
f"Processed: {len(photos)} {photo_str_total}, " f"Processed: {len(photos)} {photo_str_total}, "
f"exported: {len(results_new)}, " f"exported: {len(results.new)}, "
f"updated: {len(results_updated)}, " f"updated: {len(results.updated)}, "
f"skipped: {len(results_skipped)}, " f"skipped: {len(results.skipped)}, "
f"updated EXIF data: {len(results_exif_updated)}, " f"updated EXIF data: {len(results.exif_updated)}, "
) )
else: else:
summary = ( summary = (
f"Processed: {len(photos)} {photo_str_total}, " f"Processed: {len(photos)} {photo_str_total}, "
f"exported: {len(results_exported)}, " f"exported: {len(results.exported)}, "
) )
summary += f"missing: {len(results_missing)}, " summary += f"missing: {len(results.missing)}, "
summary += f"error: {len(results_error)}" summary += f"error: {len(results.error)}"
if touch_file: if touch_file:
summary += f", touched date: {len(results_touched)}" summary += f", touched date: {len(results.touched)}"
click.echo(summary) click.echo(summary)
stop_time = time.perf_counter() stop_time = time.perf_counter()
click.echo(f"Elapsed time: {(stop_time-start_time):.3f} seconds") click.echo(f"Elapsed time: {(stop_time-start_time):.3f} seconds")
@@ -2651,19 +2632,7 @@ def export_photo(
global VERBOSE global VERBOSE
VERBOSE = bool(verbose) VERBOSE = bool(verbose)
results_exported = [] results = ExportResults()
results_new = []
results_updated = []
results_skipped = []
results_exif_updated = []
results_touched = []
results_converted = []
results_sidecar_json_written = []
results_sidecar_json_skipped = []
results_sidecar_xmp_written = []
results_sidecar_xmp_skipped = []
results_error = []
results_missing = []
export_original = not (skip_original_if_edited and photo.hasadjustments) export_original = not (skip_original_if_edited and photo.hasadjustments)
@@ -2767,7 +2736,7 @@ def export_photo(
if missing_original: if missing_original:
space = " " if not verbose else "" space = " " if not verbose else ""
verbose_(f"{space}Skipping missing photo {photo.original_filename}") verbose_(f"{space}Skipping missing photo {photo.original_filename}")
results_missing.append( results.missing.append(
str(pathlib.Path(dest_path) / original_filename) str(pathlib.Path(dest_path) / original_filename)
) )
else: else:
@@ -2799,33 +2768,21 @@ def export_photo(
use_photokit=use_photokit, use_photokit=use_photokit,
verbose=verbose_, verbose=verbose_,
) )
results += export_results
for warning_ in export_results.exiftool_warning:
verbose_(f"exiftool warning for file {warning_[0]}: {warning_[1]}")
for error_ in export_results.exiftool_error:
click.echo(click.style(f"exiftool error for file {error_[0]}: {error_[1]}", fg=CLI_COLOR_ERROR),err=True)
results_exported.extend(export_results.exported) except Exception as e:
results_new.extend(export_results.new)
results_updated.extend(export_results.updated)
results_skipped.extend(export_results.skipped)
results_exif_updated.extend(export_results.exif_updated)
results_touched.extend(export_results.touched)
results_converted.extend(export_results.converted_to_jpeg)
results_sidecar_json_written.extend(
export_results.sidecar_json_written
)
results_sidecar_json_skipped.extend(
export_results.sidecar_json_skipped
)
results_sidecar_xmp_written.extend(
export_results.sidecar_xmp_written
)
results_sidecar_xmp_skipped.extend(
export_results.sidecar_xmp_skipped
)
except Exception:
click.echo( click.echo(
f"Error exporting photo {photo.original_filename} ({photo.filename}) as {original_filename}", click.style(
f"Error exporting photo {photo.original_filename} ({photo.filename}) as {original_filename}: {e}",
fg=CLI_COLOR_ERROR,
),
err=True, err=True,
) )
results_error.extend( results.error.append(
str(pathlib.Path(dest) / original_filename) str(pathlib.Path(dest) / original_filename)
) )
else: else:
@@ -2875,7 +2832,7 @@ def export_photo(
if missing_edited: if missing_edited:
space = " " if not verbose else "" space = " " if not verbose else ""
verbose_(f"{space}Skipping missing edited photo for {filename}") verbose_(f"{space}Skipping missing edited photo for {filename}")
results_missing.append( results.missing.append(
str(pathlib.Path(dest_path) / edited_filename) str(pathlib.Path(dest_path) / edited_filename)
) )
else: else:
@@ -2906,65 +2863,36 @@ def export_photo(
use_photokit=use_photokit, use_photokit=use_photokit,
verbose=verbose_, verbose=verbose_,
) )
results += export_results_edited
results_exported.extend(export_results_edited.exported) for warning_ in export_results_edited.exiftool_warning:
results_new.extend(export_results_edited.new) verbose_(f"exiftool warning for file {warning_[0]}: {warning_[1]}")
results_updated.extend(export_results_edited.updated) for error_ in export_results_edited.exiftool_error:
results_skipped.extend(export_results_edited.skipped) click.echo(click.style(f"exiftool error for file {error_[0]}: {error_[1]}", fg=CLI_COLOR_ERROR),err=True)
results_exif_updated.extend(export_results_edited.exif_updated) except Exception as e:
results_touched.extend(export_results_edited.touched)
results_converted.extend(
export_results_edited.converted_to_jpeg
)
results_sidecar_json_written.extend(
export_results_edited.sidecar_json_written
)
results_sidecar_json_skipped.extend(
export_results_edited.sidecar_json_skipped
)
results_sidecar_xmp_written.extend(
export_results_edited.sidecar_xmp_written
)
results_sidecar_xmp_skipped.extend(
export_results_edited.sidecar_xmp_skipped
)
except Exception:
click.echo( click.echo(
f"Error exporting photo {filename} as {edited_filename}", click.style(
f"Error exporting photo {filename} as {edited_filename}",
fg=CLI_COLOR_ERROR,
),
err=True, err=True,
) )
results_error.extend(str(pathlib.Path(dest) / edited_filename)) results.error.append(str(pathlib.Path(dest) / edited_filename))
if verbose: if verbose:
if update: if update:
for new in results_new: for new in results.new:
verbose_(f"Exported new file {new}") verbose_(f"Exported new file {new}")
for updated in results_updated: for updated in results.updated:
verbose_(f"Exported updated file {updated}") verbose_(f"Exported updated file {updated}")
for skipped in results_skipped: for skipped in results.skipped:
verbose_(f"Skipped up to date file {skipped}") verbose_(f"Skipped up to date file {skipped}")
else: else:
for exported in results_exported: for exported in results.exported:
verbose_(f"Exported {exported}") verbose_(f"Exported {exported}")
for touched in results_touched: for touched in results.touched:
verbose_(f"Touched date on file {touched}") verbose_(f"Touched date on file {touched}")
return ExportResults( return results
exported=results_exported,
new=results_new,
updated=results_updated,
skipped=results_skipped,
exif_updated=results_exif_updated,
touched=results_touched,
converted_to_jpeg=results_converted,
sidecar_json_written=results_sidecar_json_written,
sidecar_json_skipped=results_sidecar_json_skipped,
sidecar_xmp_written=results_sidecar_xmp_written,
sidecar_xmp_skipped=results_sidecar_xmp_skipped,
missing=results_missing,
error=results_error,
)
def get_filenames_from_template(photo, filename_template, original_name): def get_filenames_from_template(photo, filename_template, original_name):
@@ -3115,24 +3043,14 @@ def load_uuid_from_file(filename):
return uuid return uuid
def write_export_report( def write_export_report(report_file, results):
report_file,
results_exported,
results_new,
results_updated,
results_skipped,
results_exif_updated,
results_touched,
results_converted,
results_sidecar_json_written,
results_sidecar_json_skipped,
results_sidecar_xmp_written,
results_sidecar_xmp_skipped,
results_missing,
results_error,
):
""" write CSV report with results from export """ """ write CSV report with results from export
Args:
report_file: path to report file
results: ExportResults object
"""
# Collect results for reporting # Collect results for reporting
# TODO: pull this in a separate write_report function # TODO: pull this in a separate write_report function
@@ -3150,65 +3068,61 @@ def write_export_report(
"sidecar_json": 0, "sidecar_json": 0,
"missing": 0, "missing": 0,
"error": 0, "error": 0,
"exiftool_warning": "",
"exiftool_error": "",
} }
for result in results_exported for result in results.all_files()
+ results_new
+ results_updated
+ results_skipped
+ results_exif_updated
+ results_touched
+ results_converted
+ results_sidecar_json_written
+ results_sidecar_json_skipped
+ results_sidecar_xmp_written
+ results_sidecar_xmp_skipped
+ results_missing
+ results_error
} }
for result in results_exported: for result in results.exported:
all_results[result]["exported"] = 1 all_results[result]["exported"] = 1
for result in results_new: for result in results.new:
all_results[result]["new"] = 1 all_results[result]["new"] = 1
for result in results_updated: for result in results.updated:
all_results[result]["updated"] = 1 all_results[result]["updated"] = 1
for result in results_skipped: for result in results.skipped:
all_results[result]["skipped"] = 1 all_results[result]["skipped"] = 1
for result in results_exif_updated: for result in results.exif_updated:
all_results[result]["exif_updated"] = 1 all_results[result]["exif_updated"] = 1
for result in results_touched: for result in results.touched:
all_results[result]["touched"] = 1 all_results[result]["touched"] = 1
for result in results_converted: for result in results.converted_to_jpeg:
all_results[result]["converted_to_jpeg"] = 1 all_results[result]["converted_to_jpeg"] = 1
for result in results_sidecar_xmp_written: for result in results.sidecar_xmp_written:
all_results[result]["sidecar_xmp"] = 1 all_results[result]["sidecar_xmp"] = 1
all_results[result]["exported"] = 1 all_results[result]["exported"] = 1
for result in results_sidecar_xmp_skipped: for result in results.sidecar_xmp_skipped:
all_results[result]["sidecar_xmp"] = 1 all_results[result]["sidecar_xmp"] = 1
all_results[result]["skipped"] = 1 all_results[result]["skipped"] = 1
for result in results_sidecar_json_written: for result in results.sidecar_json_written:
all_results[result]["sidecar_json"] = 1 all_results[result]["sidecar_json"] = 1
all_results[result]["exported"] = 1 all_results[result]["exported"] = 1
for result in results_sidecar_json_skipped: for result in results.sidecar_json_skipped:
all_results[result]["sidecar_json"] = 1 all_results[result]["sidecar_json"] = 1
all_results[result]["skipped"] = 1 all_results[result]["skipped"] = 1
for result in results_missing: for result in results.missing:
all_results[result]["missing"] = 1 all_results[result]["missing"] = 1
for result in results_error: for result in results.error:
all_results[result]["error"] = 1 all_results[result]["error"] = 1
for result in results.exiftool_warning:
all_results[result[0]]["exiftool_warning"] = result[1]
for result in results.exiftool_error:
all_results[result[0]]["exiftool_error"] = result[1]
report_columns = [ report_columns = [
"filename", "filename",
"exported", "exported",
@@ -3222,6 +3136,8 @@ def write_export_report(
"sidecar_json", "sidecar_json",
"missing", "missing",
"error", "error",
"exiftool_warning",
"exiftool_error",
] ]
try: try:
@@ -3231,7 +3147,10 @@ def write_export_report(
for data in [result for result in all_results.values()]: for data in [result for result in all_results.values()]:
writer.writerow(data) writer.writerow(data)
except IOError: except IOError:
click.echo("Could not open output file for writing", err=True) click.echo(
click.style("Could not open output file for writing", fg=CLI_COLOR_ERROR),
err=True,
)
raise click.Abort() raise click.Abort()

View File

@@ -118,4 +118,6 @@ DEFAULT_EDITED_SUFFIX = "_edited"
# Default suffix to add to original images # Default suffix to add to original images
DEFAULT_ORIGINAL_SUFFIX = "" DEFAULT_ORIGINAL_SUFFIX = ""
# Colors for print CLI messages
CLI_COLOR_ERROR = 'red'
CLI_COLOR_WARNING = 'yellow'

View File

@@ -145,6 +145,7 @@ class ExifTool:
self.file = filepath self.file = filepath
self.overwrite = overwrite self.overwrite = overwrite
self.data = {} self.data = {}
self.warning = None
self.error = None self.error = None
# if running as a context manager, self._context_mgr will be True # if running as a context manager, self._context_mgr will be True
self._context_mgr = False self._context_mgr = False
@@ -163,6 +164,7 @@ class ExifTool:
True if success otherwise False True if success otherwise False
If error generated by exiftool, returns False and sets self.error to error string If error generated by exiftool, returns False and sets self.error to error string
If warning generated by exiftool, returns True (unless there was also an error) and sets self.warning to warning string
If called in context manager, returns True (execution is delayed until exiting context manager) If called in context manager, returns True (execution is delayed until exiting context manager)
""" """
@@ -175,8 +177,8 @@ class ExifTool:
self._commands.extend(command) self._commands.extend(command)
return True return True
else: else:
_, self.error = self.run_commands(*command) _, _, error = self.run_commands(*command)
return self.error is None return error is None
def addvalues(self, tag, *values): def addvalues(self, tag, *values):
""" Add one or more value(s) to tag """ Add one or more value(s) to tag
@@ -190,6 +192,7 @@ class ExifTool:
True if success otherwise False True if success otherwise False
If error generated by exiftool, returns False and sets self.error to error string If error generated by exiftool, returns False and sets self.error to error string
If warning generated by exiftool, returns True (unless there was also an error) and sets self.warning to warning string
If called in context manager, returns True (execution is delayed until exiting context manager) If called in context manager, returns True (execution is delayed until exiting context manager)
Notes: exiftool may add duplicate values for some tags so the caller must ensure Notes: exiftool may add duplicate values for some tags so the caller must ensure
@@ -216,8 +219,8 @@ class ExifTool:
self._commands.extend(command) self._commands.extend(command)
return True return True
else: else:
_, self.error = self.run_commands(*command) _, _, error = self.run_commands(*command)
return self.error is None return error is None
def run_commands(self, *commands, no_file=False): def run_commands(self, *commands, no_file=False):
""" Run commands in the exiftool process and return result. """ Run commands in the exiftool process and return result.
@@ -228,11 +231,12 @@ class ExifTool:
by default, all commands will be run against self.file by default, all commands will be run against self.file
use no_file=True to run a command without passing the filename use no_file=True to run a command without passing the filename
Returns: Returns:
(output, errror) (output, warning, errror)
output: bytes is containing output of exiftool commands output: bytes is containing output of exiftool commands
error: if exiftool generated an error, bytes containing error string otherwise None warning: if exiftool generated warnings, string containing warning otherwise empty string
error: if exiftool generated errors, string containing otherwise empty string
Note: Also sets self.error if error generated. Note: Also sets self.warning and self.error if warning or error generated.
""" """
if not (hasattr(self, "_process") and self._process): if not (hasattr(self, "_process") and self._process):
raise ValueError("exiftool process is not running") raise ValueError("exiftool process is not running")
@@ -259,16 +263,21 @@ class ExifTool:
# read the output # read the output
output = b"" output = b""
warning = b""
error = b"" error = b""
while EXIFTOOL_STAYOPEN_EOF not in str(output): while EXIFTOOL_STAYOPEN_EOF not in str(output):
line = self._process.stdout.readline() line = self._process.stdout.readline()
if line.startswith(b"Warning"): if line.startswith(b"Warning"):
error += line warning += line.strip()
elif line.startswith(b"Error"):
error += line.strip()
else: else:
output += line.strip() output += line.strip()
error = None if error == b"" else error warning = "" if warning == b"" else warning.decode("utf-8")
error = "" if error == b"" else error.decode("utf-8")
self.warning = warning
self.error = error self.error = error
return output[:-EXIFTOOL_STAYOPEN_EOF_LEN], error return output[:-EXIFTOOL_STAYOPEN_EOF_LEN], warning, error
@property @property
def pid(self): def pid(self):
@@ -278,14 +287,14 @@ class ExifTool:
@property @property
def version(self): def version(self):
""" returns exiftool version """ """ returns exiftool version """
ver, _ = self.run_commands("-ver", no_file=True) ver, _, _ = self.run_commands("-ver", no_file=True)
return ver.decode("utf-8") return ver.decode("utf-8")
def asdict(self): def asdict(self):
""" return dictionary of all EXIF tags and values from exiftool """ return dictionary of all EXIF tags and values from exiftool
returns empty dict if no tags returns empty dict if no tags
""" """
json_str, _ = self.run_commands("-json") json_str, _, _ = self.run_commands("-json")
if json_str: if json_str:
exifdict = json.loads(json_str) exifdict = json.loads(json_str)
return exifdict[0] return exifdict[0]
@@ -294,7 +303,7 @@ class ExifTool:
def json(self): def json(self):
""" returns JSON string containing all EXIF tags and values from exiftool """ """ returns JSON string containing all EXIF tags and values from exiftool """
json, _ = self.run_commands("-json") json, _, _ = self.run_commands("-json")
return json return json
def _read_exif(self): def _read_exif(self):
@@ -314,4 +323,5 @@ class ExifTool:
if exc_type: if exc_type:
return False return False
elif self._commands: elif self._commands:
_, self.error = self.run_commands(*self._commands) # run_commands sets self.warning and self.error as needed
self.run_commands(*self._commands)

View File

@@ -45,24 +45,105 @@ from ..photokit import (
) )
from ..utils import dd_to_dms_str, findfiles, noop from ..utils import dd_to_dms_str, findfiles, noop
ExportResults = namedtuple(
"ExportResults", class ExportResults:
[ """ holds export results for export2 """
"exported",
"new", def __init__(
"updated", self,
"skipped", exported=None,
"exif_updated", new=None,
"touched", updated=None,
"converted_to_jpeg", skipped=None,
"sidecar_json_written", exif_updated=None,
"sidecar_json_skipped", touched=None,
"sidecar_xmp_written", converted_to_jpeg=None,
"sidecar_xmp_skipped", sidecar_json_written=None,
"missing", sidecar_json_skipped=None,
"error", sidecar_xmp_written=None,
], sidecar_xmp_skipped=None,
) missing=None,
error=None,
exiftool_warning=None,
exiftool_error=None,
):
self.exported = exported or []
self.new = new or []
self.updated = updated or []
self.skipped = skipped or []
self.exif_updated = exif_updated or []
self.touched = touched or []
self.converted_to_jpeg = converted_to_jpeg or []
self.sidecar_json_written = sidecar_json_written or []
self.sidecar_json_skipped = sidecar_json_skipped or []
self.sidecar_xmp_written = sidecar_xmp_written or []
self.sidecar_xmp_skipped = sidecar_xmp_skipped or []
self.missing = missing or []
self.error = error or []
self.exiftool_warning = exiftool_warning or []
self.exiftool_error = exiftool_error or []
def all_files(self):
""" return all filenames contained in results """
files = (
self.exported
+ self.new
+ self.updated
+ self.skipped
+ self.exif_updated
+ self.touched
+ self.converted_to_jpeg
+ self.sidecar_json_written
+ self.sidecar_json_skipped
+ self.sidecar_xmp_written
+ self.sidecar_xmp_skipped
+ self.missing
+ self.error
)
files += [x[0] for x in self.exiftool_warning]
files += [x[0] for x in self.exiftool_error]
files = list(set(files))
return files
def __iadd__(self, other):
self.exported += other.exported
self.new += other.new
self.updated += other.updated
self.skipped += other.skipped
self.exif_updated += other.exif_updated
self.touched += other.touched
self.converted_to_jpeg += other.converted_to_jpeg
self.sidecar_json_written += other.sidecar_json_written
self.sidecar_json_skipped += other.sidecar_json_skipped
self.sidecar_xmp_written += other.sidecar_xmp_written
self.sidecar_xmp_skipped += other.sidecar_xmp_skipped
self.missing += other.missing
self.error += other.error
self.exiftool_warning += other.exiftool_warning
self.exiftool_error += other.exiftool_error
return self
def __str__(self):
return (
"ExportResults("
+ f"exported={self.exported}"
+ f",new={self.new}"
+ f",updated={self.updated}"
+ f",skipped={self.skipped}"
+ f",exif_updated={self.exif_updated}"
+ f",touched={self.touched}"
+ f",converted_to_jpeg={self.converted_to_jpeg}"
+ f",sidecar_json_written={self.sidecar_json_written}"
+ f",sidecar_json_skipped={self.sidecar_json_skipped}"
+ f",sidecar_xmp_written={self.sidecar_xmp_written}"
+ f",sidecar_xmp_skipped={self.sidecar_xmp_skipped}"
+ f",missing={self.missing}"
+ f",error={self.error}"
+ f",exiftool_warning={self.exiftool_warning}"
+ f",exiftool_error={self.exiftool_error}"
+ ")"
)
# hexdigest is not a class method, don't import this into PhotoInfo # hexdigest is not a class method, don't import this into PhotoInfo
@@ -389,7 +470,8 @@ def export2(
ignore_date_modified: for use with sidecar and exiftool; if True, sets EXIF:ModifyDate to EXIF:DateTimeOriginal even if date_modified is set ignore_date_modified: for use with sidecar and exiftool; if True, sets EXIF:ModifyDate to EXIF:DateTimeOriginal even if date_modified is set
verbose: optional callable function to use for printing verbose text during processing; if None (default), does not print output. verbose: optional callable function to use for printing verbose text during processing; if None (default), does not print output.
Returns: ExportResults namedtuple with fields: Returns: ExportResults class
ExportResults has attributes:
"exported", "exported",
"new", "new",
"updated", "updated",
@@ -402,7 +484,10 @@ def export2(
"sidecar_xmp_written", "sidecar_xmp_written",
"sidecar_xmp_skipped", "sidecar_xmp_skipped",
"missing", "missing",
"error" "error",
"exiftool_warning",
"exiftool_error",
Note: to use dry run mode, you must set dry_run=True and also pass in memory version of export_db, Note: to use dry run mode, you must set dry_run=True and also pass in memory version of export_db,
and no-op fileutil (e.g. ExportDBInMemory and FileUtilNoOp) and no-op fileutil (e.g. ExportDBInMemory and FileUtilNoOp)
@@ -853,6 +938,10 @@ def export2(
exif_files = exported_files exif_files = exported_files
exif_files_updated = [] exif_files_updated = []
exiftool_warning = []
exiftool_error = []
errors = []
# TODO: remove duplicative code from below
if exiftool and update and exif_files: if exiftool and update and exif_files:
for exported_file in exif_files: for exported_file in exif_files:
files_are_different = False files_are_different = False
@@ -876,7 +965,7 @@ def export2(
# or files were different # or files were different
verbose(f"Writing metadata with exiftool for {exported_file}") verbose(f"Writing metadata with exiftool for {exported_file}")
if not dry_run: if not dry_run:
self._write_exif_data( warning_, error_ = self._write_exif_data(
exported_file, exported_file,
use_albums_as_keywords=use_albums_as_keywords, use_albums_as_keywords=use_albums_as_keywords,
use_persons_as_keywords=use_persons_as_keywords, use_persons_as_keywords=use_persons_as_keywords,
@@ -884,6 +973,12 @@ def export2(
description_template=description_template, description_template=description_template,
ignore_date_modified=ignore_date_modified, ignore_date_modified=ignore_date_modified,
) )
if warning_:
exiftool_warning.append((exported_file, warning_))
if error_:
exiftool_error.append((exported_file, error_))
errors.append(exported_file)
export_db.set_exifdata_for_file( export_db.set_exifdata_for_file(
exported_file, exported_file,
self._exiftool_json_sidecar( self._exiftool_json_sidecar(
@@ -904,7 +999,7 @@ def export2(
for exported_file in exif_files: for exported_file in exif_files:
verbose(f"Writing metadata with exiftool for {exported_file}") verbose(f"Writing metadata with exiftool for {exported_file}")
if not dry_run: if not dry_run:
self._write_exif_data( warning_, error_ = self._write_exif_data(
exported_file, exported_file,
use_albums_as_keywords=use_albums_as_keywords, use_albums_as_keywords=use_albums_as_keywords,
use_persons_as_keywords=use_persons_as_keywords, use_persons_as_keywords=use_persons_as_keywords,
@@ -912,6 +1007,11 @@ def export2(
description_template=description_template, description_template=description_template,
ignore_date_modified=ignore_date_modified, ignore_date_modified=ignore_date_modified,
) )
if warning_:
exiftool_warning.append((exported_file, warning_))
if error_:
exiftool_error.append((exported_file, error_))
errors.append(exported_file)
export_db.set_exifdata_for_file( export_db.set_exifdata_for_file(
exported_file, exported_file,
@@ -949,8 +1049,9 @@ def export2(
sidecar_json_skipped=sidecar_json_files_skipped, sidecar_json_skipped=sidecar_json_files_skipped,
sidecar_xmp_written=sidecar_xmp_files_written, sidecar_xmp_written=sidecar_xmp_files_written,
sidecar_xmp_skipped=sidecar_xmp_files_skipped, sidecar_xmp_skipped=sidecar_xmp_files_skipped,
missing=[], error=errors,
error=[], exiftool_error=exiftool_error,
exiftool_warning=exiftool_warning,
) )
return results return results
@@ -1152,6 +1253,9 @@ def _write_exif_data(
use_persons_as_keywords: treat person names as keywords use_persons_as_keywords: treat person names as keywords
keyword_template: (list of strings); list of template strings to render as keywords keyword_template: (list of strings); list of template strings to render as keywords
ignore_date_modified: if True, sets EXIF:ModifyDate to EXIF:DateTimeOriginal even if date_modified is set ignore_date_modified: if True, sets EXIF:ModifyDate to EXIF:DateTimeOriginal even if date_modified is set
Returns:
(warning, error) of warning and error strings if exiftool produces warnings or errors
""" """
if not os.path.exists(filepath): if not os.path.exists(filepath):
raise FileNotFoundError(f"Could not find file {filepath}") raise FileNotFoundError(f"Could not find file {filepath}")
@@ -1170,6 +1274,7 @@ def _write_exif_data(
exiftool.setvalue(exiftag, v) exiftool.setvalue(exiftag, v)
else: else:
exiftool.setvalue(exiftag, val) exiftool.setvalue(exiftag, val)
return exiftool.warning, exiftool.error
def _exiftool_dict( def _exiftool_dict(

View File

@@ -7,7 +7,7 @@
<key>hostuuid</key> <key>hostuuid</key>
<string>9575E48B-8D5F-5654-ABAC-4431B1167324</string> <string>9575E48B-8D5F-5654-ABAC-4431B1167324</string>
<key>pid</key> <key>pid</key>
<integer>485</integer> <integer>19275</integer>
<key>processname</key> <key>processname</key>
<string>photolibraryd</string> <string>photolibraryd</string>
<key>uid</key> <key>uid</key>

Binary file not shown.

After

Width:  |  Height:  |  Size: 550 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 171 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 500 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 550 KiB

View File

@@ -18,10 +18,10 @@ PHOTOS_DB = "tests/Test-10.15.7.photoslibrary/database/photos.db"
PHOTOS_DB_PATH = "/Test-10.15.7.photoslibrary/database/photos.db" PHOTOS_DB_PATH = "/Test-10.15.7.photoslibrary/database/photos.db"
PHOTOS_LIBRARY_PATH = "/Test-10.15.7.photoslibrary" PHOTOS_LIBRARY_PATH = "/Test-10.15.7.photoslibrary"
PHOTOS_DB_LEN = 18 PHOTOS_DB_LEN = 19
PHOTOS_NOT_IN_TRASH_LEN = 16 PHOTOS_NOT_IN_TRASH_LEN = 17
PHOTOS_IN_TRASH_LEN = 2 PHOTOS_IN_TRASH_LEN = 2
PHOTOS_DB_IMPORT_SESSIONS = 13 PHOTOS_DB_IMPORT_SESSIONS = 14
KEYWORDS = [ KEYWORDS = [
"Kids", "Kids",
@@ -971,7 +971,7 @@ def test_from_to_date(photosdb):
time.tzset() time.tzset()
photos = photosdb.photos(from_date=datetime.datetime(2018, 10, 28)) photos = photosdb.photos(from_date=datetime.datetime(2018, 10, 28))
assert len(photos) == 9 assert len(photos) == 10
photos = photosdb.photos(to_date=datetime.datetime(2018, 10, 28)) photos = photosdb.photos(to_date=datetime.datetime(2018, 10, 28))
assert len(photos) == 7 assert len(photos) == 7

View File

@@ -405,6 +405,8 @@ CLI_EXIFTOOL_IGNORE_DATE_MODIFIED = {
} }
} }
CLI_EXIFTOOL_ERROR = ["E2078879-A29C-4D6F-BACB-E3BBE6C3EB91"]
LABELS_JSON = { LABELS_JSON = {
"labels": { "labels": {
"Plant": 7, "Plant": 7,
@@ -1091,6 +1093,34 @@ def test_export_exiftool_quicktime():
for filename in files: for filename in files:
os.unlink(filename) os.unlink(filename)
@pytest.mark.skipif(exiftool is None, reason="exiftool not installed")
def test_export_exiftool_error():
"""" test --exiftool catching error """
import glob
import os
import os.path
from osxphotos.__main__ import export
from osxphotos.exiftool import ExifTool
runner = CliRunner()
cwd = os.getcwd()
# pylint: disable=not-context-manager
with runner.isolated_filesystem():
for uuid in CLI_EXIFTOOL_ERROR:
result = runner.invoke(
export,
[
os.path.join(cwd, PHOTOS_DB_15_7),
".",
"-V",
"--exiftool",
"--uuid",
f"{uuid}",
],
)
assert result.exit_code == 0
assert "exiftool error" in result.output
def test_export_edited_suffix(): def test_export_edited_suffix():
""" test export with --edited-suffix """ """ test export with --edited-suffix """

View File

@@ -2,6 +2,7 @@ import pytest
from osxphotos.exiftool import get_exiftool_path from osxphotos.exiftool import get_exiftool_path
TEST_FILE_ONE_KEYWORD = "tests/test-images/wedding.jpg" TEST_FILE_ONE_KEYWORD = "tests/test-images/wedding.jpg"
TEST_FILE_BAD_IMAGE = "tests/test-images/badimage.jpeg"
TEST_FILE_MULTI_KEYWORD = "tests/test-images/Tulips.jpg" TEST_FILE_MULTI_KEYWORD = "tests/test-images/Tulips.jpg"
TEST_MULTI_KEYWORDS = [ TEST_MULTI_KEYWORDS = [
"Top Shot", "Top Shot",
@@ -109,8 +110,8 @@ def test_setvalue_1():
assert exif.data["IPTC:Keywords"] == "test" assert exif.data["IPTC:Keywords"] == "test"
def test_setvalue_error(): def test_setvalue_warning():
# test setting illegal tag value generates error # test setting illegal tag value generates warning
import os.path import os.path
import tempfile import tempfile
import osxphotos.exiftool import osxphotos.exiftool
@@ -122,6 +123,22 @@ def test_setvalue_error():
exif = osxphotos.exiftool.ExifTool(tempfile) exif = osxphotos.exiftool.ExifTool(tempfile)
exif.setvalue("IPTC:Foo", "test") exif.setvalue("IPTC:Foo", "test")
assert exif.warning
def test_setvalue_error():
# test setting tag on bad image generates error
import os.path
import tempfile
import osxphotos.exiftool
from osxphotos.fileutil import FileUtil
tempdir = tempfile.TemporaryDirectory(prefix="osxphotos_")
tempfile = os.path.join(tempdir.name, os.path.basename(TEST_FILE_BAD_IMAGE))
FileUtil.copy(TEST_FILE_BAD_IMAGE, tempfile)
exif = osxphotos.exiftool.ExifTool(tempfile)
exif.setvalue("IPTC:Keywords", "test")
assert exif.error assert exif.error
@@ -142,7 +159,7 @@ def test_setvalue_context_manager():
exif.setvalue("XMP:Title", "title") exif.setvalue("XMP:Title", "title")
exif.setvalue("XMP:Subject", "subject") exif.setvalue("XMP:Subject", "subject")
assert exif.error is None assert not exif.error
exif2 = osxphotos.exiftool.ExifTool(tempfile) exif2 = osxphotos.exiftool.ExifTool(tempfile)
exif2._read_exif() exif2._read_exif()
@@ -151,8 +168,8 @@ def test_setvalue_context_manager():
assert exif2.data["XMP:Subject"] == "subject" assert exif2.data["XMP:Subject"] == "subject"
def test_setvalue_context_manager_error(): def test_setvalue_context_manager_warning():
# test setting a tag value as context manager when error generated # test setting a tag value as context manager when warning generated
import os.path import os.path
import tempfile import tempfile
import osxphotos.exiftool import osxphotos.exiftool
@@ -164,6 +181,22 @@ def test_setvalue_context_manager_error():
with osxphotos.exiftool.ExifTool(tempfile) as exif: with osxphotos.exiftool.ExifTool(tempfile) as exif:
exif.setvalue("Foo:Bar", "test1") exif.setvalue("Foo:Bar", "test1")
assert exif.warning
def test_setvalue_context_manager_error():
# test setting a tag value as context manager when error generated
import os.path
import tempfile
import osxphotos.exiftool
from osxphotos.fileutil import FileUtil
tempdir = tempfile.TemporaryDirectory(prefix="osxphotos_")
tempfile = os.path.join(tempdir.name, os.path.basename(TEST_FILE_BAD_IMAGE))
FileUtil.copy(TEST_FILE_BAD_IMAGE, tempfile)
with osxphotos.exiftool.ExifTool(tempfile) as exif:
exif.setvalue("IPTC:Keywords", "test1")
assert exif.error assert exif.error

View File

@@ -716,7 +716,7 @@ def test_xmp_sidecar_is_valid(tmp_path):
xmp_file = tmp_path / XMP_FILENAME xmp_file = tmp_path / XMP_FILENAME
assert xmp_file.is_file() assert xmp_file.is_file()
exiftool = ExifTool(str(xmp_file)) exiftool = ExifTool(str(xmp_file))
output, _ = exiftool.run_commands("-validate", "-warning") output, _, _ = exiftool.run_commands("-validate", "-warning")
assert output == b"[ExifTool] Validate : 0 0 0" assert output == b"[ExifTool] Validate : 0 0 0"

View File

@@ -0,0 +1,95 @@
""" test ExportResults class """
import pytest
from osxphotos.photoinfo import ExportResults
EXPORT_RESULT_ATTRIBUTES = [
"exported",
"new",
"updated",
"skipped",
"exif_updated",
"touched",
"converted_to_jpeg",
"sidecar_json_written",
"sidecar_json_skipped",
"sidecar_xmp_written",
"sidecar_xmp_skipped",
"missing",
"error",
"exiftool_warning",
"exiftool_error",
]
def test_exportresults_init():
results = ExportResults()
assert results.exported == []
assert results.new == []
assert results.updated == []
assert results.skipped == []
assert results.exif_updated == []
assert results.touched == []
assert results.converted_to_jpeg == []
assert results.sidecar_json_written == []
assert results.sidecar_json_skipped == []
assert results.sidecar_xmp_written == []
assert results.sidecar_xmp_skipped == []
assert results.missing == []
assert results.error == []
assert results.exiftool_warning == []
assert results.exiftool_error == []
def test_exportresults_iadd():
results1 = ExportResults()
results2 = ExportResults()
for x in EXPORT_RESULT_ATTRIBUTES:
setattr(results1, x, [f"{x}1"])
setattr(results2, x, [f"{x}2"])
results1 += results2
for x in EXPORT_RESULT_ATTRIBUTES:
assert getattr(results1, x) == [f"{x}1", f"{x}2"]
# exiftool_warning and exiftool_error are lists of tuples
results1 = ExportResults()
results2 = ExportResults()
results1.exiftool_warning = [("exiftool_warning1", "foo")]
results2.exiftool_warning = [("exiftool_warning2", "bar")]
results1.exiftool_error = [("exiftool_error1", "foo")]
results2.exiftool_error = [("exiftool_error2", "bar")]
results1 += results2
assert results1.exiftool_warning == [
("exiftool_warning1", "foo"),
("exiftool_warning2", "bar"),
]
assert results1.exiftool_error == [
("exiftool_error1", "foo"),
("exiftool_error2", "bar"),
]
def test_all_files():
""" test ExportResults.all_files() """
results = ExportResults()
for x in EXPORT_RESULT_ATTRIBUTES:
setattr(results, x, [f"{x}1"])
results.exiftool_warning = [("exiftool_warning1", "foo")]
results.exiftool_error = [("exiftool_error1", "foo")]
assert sorted(results.all_files()) == sorted(
[f"{x}1" for x in EXPORT_RESULT_ATTRIBUTES]
)
def test_str():
""" test ExportResults.__str__ """
results = ExportResults()
assert (
str(results)
== "ExportResults(exported=[],new=[],updated=[],skipped=[],exif_updated=[],touched=[],converted_to_jpeg=[],sidecar_json_written=[],sidecar_json_skipped=[],sidecar_xmp_written=[],sidecar_xmp_skipped=[],missing=[],error=[],exiftool_warning=[],exiftool_error=[])"
)