Compare commits

...

3 Commits

Author SHA1 Message Date
Rhet Turnbull
030191be96 Working on making export CLI threadsafe 2023-04-02 12:36:51 -07:00
Rhet Turnbull
81127b6d89 Added locking to exiftool 2023-04-01 19:07:36 -07:00
Rhet Turnbull
d8c7d45056 Updated ExifTool to use multiple processes 2023-04-01 19:00:01 -07:00
3 changed files with 296 additions and 267 deletions

View File

@ -1,5 +1,7 @@
"""export command for osxphotos CLI"""
from __future__ import annotations
import atexit
import inspect
import os
@ -9,7 +11,8 @@ import shlex
import subprocess
import sys
import time
from typing import Iterable, List, Optional, Tuple
from typing import Iterable, List, Optional, Tuple, Any, Callable
import concurrent.futures
import click
from osxmetadata import (
@ -1426,173 +1429,156 @@ def export(
photo_num = 0
num_exported = 0
# hack to avoid passing all the options to export_photo
kwargs = locals().copy()
kwargs["export_dir"] = dest
kwargs["export_preview"] = preview
limit_str = f" (limit = [num]{limit}[/num])" if limit else ""
with rich_progress(console=get_verbose_console(), mock=no_progress) as progress:
task = progress.add_task(
f"Exporting [num]{num_photos}[/] photos{limit_str}", total=num_photos
)
for p in photos:
photo_num += 1
# hack to avoid passing all the options to export_photo
kwargs = {
k: v
for k, v in locals().items()
if k in inspect.getfullargspec(export_photo).args
}
kwargs["photo"] = p
kwargs["export_dir"] = dest
kwargs["export_preview"] = preview
export_results = export_photo(**kwargs)
if post_function:
for function in post_function:
# post function is tuple of (function, filename.py::function_name)
verbose(f"Calling post-function [bold]{function[1]}")
if not dry_run:
try:
function[0](p, export_results, verbose)
except Exception as e:
rich_echo_error(
f"[error]Error running post-function [italic]{function[1]}[/italic]: {e}"
futures = []
with concurrent.futures.ThreadPoolExecutor(
# max_workers=os.cpu_count()
max_workers=1,
) as executor:
for p in photos:
photo_num += 1
kwargs["photo_num"] = photo_num
futures.append(executor.submit(export_worker, p, **kwargs))
for future in concurrent.futures.as_completed(futures):
p, export_results = future.result()
if album_export and export_results.exported:
try:
album_export.add(p)
export_results.exported_album = [
(filename, album_export.name)
for filename in export_results.exported
]
except Exception as e:
click.secho(
f"Error adding photo {p.original_filename} ({p.uuid}) to album {album_export.name}: {e}",
fg=CLI_COLOR_ERROR,
err=True,
)
if album_skipped and export_results.skipped:
try:
album_skipped.add(p)
export_results.skipped_album = [
(filename, album_skipped.name)
for filename in export_results.skipped
]
except Exception as e:
click.secho(
f"Error adding photo {p.original_filename} ({p.uuid}) to album {album_skipped.name}: {e}",
fg=CLI_COLOR_ERROR,
err=True,
)
if album_missing and export_results.missing:
try:
album_missing.add(p)
export_results.missing_album = [
(filename, album_missing.name)
for filename in export_results.missing
]
except Exception as e:
click.secho(
f"Error adding photo {p.original_filename} ({p.uuid}) to album {album_missing.name}: {e}",
fg=CLI_COLOR_ERROR,
err=True,
)
results += export_results
# all photo files (not including sidecars) that are part of this export set
# used below for applying Finder tags, etc.
photo_files = set(
export_results.exported
+ export_results.new
+ export_results.updated
+ export_results.exif_updated
+ export_results.converted_to_jpeg
+ export_results.skipped
)
if finder_tag_keywords or finder_tag_template:
if dry_run:
for filepath in photo_files:
verbose(
f"Writing Finder tags to [filepath]{filepath}[/]"
)
run_post_command(
photo=p,
post_command=post_command,
export_results=export_results,
export_dir=dest,
dry_run=dry_run,
exiftool_path=exiftool_path,
export_db=export_db,
verbose=verbose,
)
if album_export and export_results.exported:
try:
album_export.add(p)
export_results.exported_album = [
(filename, album_export.name)
for filename in export_results.exported
]
except Exception as e:
click.secho(
f"Error adding photo {p.original_filename} ({p.uuid}) to album {album_export.name}: {e}",
fg=CLI_COLOR_ERROR,
err=True,
)
if album_skipped and export_results.skipped:
try:
album_skipped.add(p)
export_results.skipped_album = [
(filename, album_skipped.name)
for filename in export_results.skipped
]
except Exception as e:
click.secho(
f"Error adding photo {p.original_filename} ({p.uuid}) to album {album_skipped.name}: {e}",
fg=CLI_COLOR_ERROR,
err=True,
)
if album_missing and export_results.missing:
try:
album_missing.add(p)
export_results.missing_album = [
(filename, album_missing.name)
for filename in export_results.missing
]
except Exception as e:
click.secho(
f"Error adding photo {p.original_filename} ({p.uuid}) to album {album_missing.name}: {e}",
fg=CLI_COLOR_ERROR,
err=True,
)
results += export_results
# all photo files (not including sidecars) that are part of this export set
# used below for applying Finder tags, etc.
photo_files = set(
export_results.exported
+ export_results.new
+ export_results.updated
+ export_results.exif_updated
+ export_results.converted_to_jpeg
+ export_results.skipped
)
if finder_tag_keywords or finder_tag_template:
if dry_run:
for filepath in photo_files:
verbose(f"Writing Finder tags to [filepath]{filepath}[/]")
else:
tags_written, tags_skipped = write_finder_tags(
p,
photo_files,
keywords=finder_tag_keywords,
keyword_template=keyword_template,
album_keyword=album_keyword,
person_keyword=person_keyword,
exiftool_merge_keywords=exiftool_merge_keywords,
finder_tag_template=finder_tag_template,
strip=strip,
export_dir=dest,
verbose=verbose,
)
export_results.xattr_written.extend(tags_written)
export_results.xattr_skipped.extend(tags_skipped)
results.xattr_written.extend(tags_written)
results.xattr_skipped.extend(tags_skipped)
if xattr_template:
if dry_run:
for filepath in photo_files:
verbose(
f"Writing extended attributes to [filepath]{filepath}[/]"
else:
tags_written, tags_skipped = write_finder_tags(
p,
photo_files,
keywords=finder_tag_keywords,
keyword_template=keyword_template,
album_keyword=album_keyword,
person_keyword=person_keyword,
exiftool_merge_keywords=exiftool_merge_keywords,
finder_tag_template=finder_tag_template,
strip=strip,
export_dir=dest,
verbose=verbose,
)
else:
xattr_written, xattr_skipped = write_extended_attributes(
p,
photo_files,
xattr_template,
strip=strip,
export_dir=dest,
verbose=verbose,
)
export_results.xattr_written.extend(xattr_written)
export_results.xattr_skipped.extend(xattr_skipped)
results.xattr_written.extend(xattr_written)
results.xattr_skipped.extend(xattr_skipped)
export_results.xattr_written.extend(tags_written)
export_results.xattr_skipped.extend(tags_skipped)
results.xattr_written.extend(tags_written)
results.xattr_skipped.extend(tags_skipped)
report_writer.write(export_results)
if print_template:
options = RenderOptions(export_dir=dest)
for template in print_template:
rendered_templates, unmatched = p.render_template(
template,
options,
)
if unmatched:
rich_click_echo(
f"[warning]Unmatched template field: {unmatched}[/]"
if xattr_template:
if dry_run:
for filepath in photo_files:
verbose(
f"Writing extended attributes to [filepath]{filepath}[/]"
)
else:
xattr_written, xattr_skipped = write_extended_attributes(
p,
photo_files,
xattr_template,
strip=strip,
export_dir=dest,
verbose=verbose,
)
for rendered_template in rendered_templates:
if not rendered_template:
continue
rich_click_echo(rendered_template)
export_results.xattr_written.extend(xattr_written)
export_results.xattr_skipped.extend(xattr_skipped)
results.xattr_written.extend(xattr_written)
results.xattr_skipped.extend(xattr_skipped)
progress.advance(task)
report_writer.write(export_results)
# handle limit
if export_results.exported:
# if any photos were exported, increment num_exported used by limit
# limit considers each PhotoInfo object as a single photo even if multiple files are exported
num_exported += 1
if limit and num_exported >= limit:
# advance progress to end
progress.advance(task, num_photos - photo_num)
break
if print_template:
options = RenderOptions(export_dir=dest)
for template in print_template:
rendered_templates, unmatched = p.render_template(
template,
options,
)
if unmatched:
rich_click_echo(
f"[warning]Unmatched template field: {unmatched}[/]"
)
for rendered_template in rendered_templates:
if not rendered_template:
continue
rich_click_echo(rendered_template)
progress.advance(task)
# handle limit
if export_results.exported:
# if any photos were exported, increment num_exported used by limit
# limit considers each PhotoInfo object as a single photo even if multiple files are exported
num_exported += 1
if limit and num_exported >= limit:
# advance progress to end
progress.advance(task, num_photos - photo_num)
break
photo_str_total = pluralize(len(photos), "photo", "photos")
if update or force_update:
@ -1682,6 +1668,45 @@ def export(
export_db.close()
def export_worker(
photo: osxphotos.PhotoInfo, **kwargs
) -> tuple[osxphotos.PhotoInfo, ExportResults]:
"""Export worker function for multi-threaded export of photos"""
dry_run = kwargs["dry_run"]
verbose: Callable[[str], Any] = kwargs["verbose"]
export_args = {
k: v
for k, v in kwargs.items()
if k in inspect.getfullargspec(export_photo).args
}
export_args["photo"] = photo
export_results = export_photo(**export_args)
if post_function := kwargs["post_function"]:
for function in post_function:
# post function is tuple of (function, filename.py::function_name)
verbose(f"Calling post-function [bold]{function[1]}")
if not dry_run:
try:
function[0](photo, export_results, verbose)
except Exception as e:
rich_echo_error(
f"[error]Error running post-function [italic]{function[1]}[/italic]: {e}"
)
run_post_command(
photo=photo,
post_command=kwargs["post_command"],
export_results=export_results,
export_dir=kwargs["dest"],
dry_run=dry_run,
exiftool_path=kwargs["exiftool_path"],
export_db=kwargs["export_db"],
verbose=verbose,
)
return photo, export_results
def export_photo(
photo=None,
dest=None,

View File

@ -1,11 +1,11 @@
""" Yet another simple exiftool wrapper
I rolled my own for following reasons:
1. I wanted something under MIT license (best alternative was licensed under GPL/BSD)
2. I wanted singleton behavior so only a single exiftool process was ever running
2. I wanted exiftool processes to stay resident between calls (improved performance)
3. When used as a context manager, I wanted the operations to batch until exiting the context (improved performance)
If these aren't important to you, I highly recommend you use Sven Marnach's excellent
pyexiftool: https://github.com/smarnach/pyexiftool which provides more functionality """
"""
from __future__ import annotations
import atexit
import contextlib
@ -17,6 +17,7 @@ import pathlib
import re
import shutil
import subprocess
import threading
from abc import ABC, abstractmethod
from functools import lru_cache # pylint: disable=syntax-error
@ -30,6 +31,8 @@ __all__ = [
"unescape_str",
]
logger = logging.getLogger("osxphotos")
# exiftool -stay_open commands outputs this EOF marker after command is run
EXIFTOOL_STAYOPEN_EOF = "{ready}"
EXIFTOOL_STAYOPEN_EOF_LEN = len(EXIFTOOL_STAYOPEN_EOF)
@ -42,6 +45,8 @@ EXIFTOOL_FILETYPES_JSON = "exiftool_filetypes.json"
with (pathlib.Path(__file__).parent / EXIFTOOL_FILETYPES_JSON).open("r") as f:
EXIFTOOL_SUPPORTED_FILETYPES = json.load(f)
NUM_PROCESSES = os.cpu_count() or 1
def exiftool_can_write(suffix: str) -> bool:
"""Return True if exiftool supports writing to a file with the given suffix, otherwise False"""
@ -96,8 +101,11 @@ def get_exiftool_path():
class _ExifToolProc:
"""Runs exiftool in a subprocess via Popen
Creates a singleton object"""
"""
Runs exiftool in a subprocess via Popen
Creates a singleton object that dispatches commands to one or
more exiftool subprocesses.
"""
def __new__(cls, *args, **kwargs):
"""create new object or return instance of already created singleton"""
@ -106,7 +114,11 @@ class _ExifToolProc:
return cls.instance
def __init__(self, exiftool=None, large_file_support=True):
def __init__(
self,
exiftool: str | None = None,
large_file_support: bool = True,
):
"""construct _ExifToolProc singleton object or return instance of already created object
Args:
@ -117,7 +129,7 @@ class _ExifToolProc:
if hasattr(self, "_process_running") and self._process_running:
# already running
if exiftool is not None and exiftool != self._exiftool:
logging.warning(
logger.warning(
f"exiftool subprocess already running, "
f"ignoring exiftool={exiftool}"
)
@ -125,6 +137,9 @@ class _ExifToolProc:
self._process_running = False
self._large_file_support = large_file_support
self._exiftool = exiftool or get_exiftool_path()
self._num_processes = NUM_PROCESSES
self._process = []
self._process_counter = 0
self._start_proc(large_file_support=large_file_support)
@property
@ -132,23 +147,20 @@ class _ExifToolProc:
"""return the exiftool subprocess"""
if not self._process_running:
self._start_proc(large_file_support=self._large_file_support)
return self._process
@property
def pid(self):
"""return process id (PID) of the exiftool process"""
return self._process.pid
process_idx = self._process_counter % self._num_processes
self._process_counter += 1
return self._process[process_idx]
@property
def exiftool(self):
"""return path to exiftool process"""
return self._exiftool
def _start_proc(self, large_file_support):
def _start_proc(self, large_file_support: bool):
"""start exiftool in batch mode"""
if self._process_running:
logging.warning("exiftool already running: {self._process}")
logger.debug(f"exiftool already running: {self._process}")
return
# open exiftool process
@ -156,25 +168,28 @@ class _ExifToolProc:
env = os.environ.copy()
env["PATH"] = f'/usr/bin/:{env["PATH"]}'
large_file_args = ["-api", "largefilesupport=1"] if large_file_support else []
self._process = subprocess.Popen(
[
self._exiftool,
"-stay_open", # keep process open in batch mode
"True", # -stay_open=True, keep process open in batch mode
*large_file_args,
"-@", # read command-line arguments from file
"-", # read from stdin
"-common_args", # specifies args common to all commands subsequently run
"-n", # no print conversion (e.g. print tag values in machine readable format)
"-P", # Preserve file modification date/time
"-G", # print group name for each tag
"-E", # escape tag values for HTML (allows use of HTML 
 for newlines)
],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=env,
)
for _ in range(self._num_processes):
self._process.append(
subprocess.Popen(
[
self._exiftool,
"-stay_open", # keep process open in batch mode
"True", # -stay_open=True, keep process open in batch mode
*large_file_args,
"-@", # read command-line arguments from file
"-", # read from stdin
"-common_args", # specifies args common to all commands subsequently run
"-n", # no print conversion (e.g. print tag values in machine readable format)
"-P", # Preserve file modification date/time
"-G", # print group name for each tag
"-E", # escape tag values for HTML (allows use of HTML 
 for newlines)
],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=env,
)
)
self._process_running = True
EXIFTOOL_PROCESSES.append(self)
@ -185,17 +200,19 @@ class _ExifToolProc:
if not self._process_running:
return
with contextlib.suppress(Exception):
self._process.stdin.write(b"-stay_open\n")
self._process.stdin.write(b"False\n")
self._process.stdin.flush()
try:
self._process.communicate(timeout=5)
except subprocess.TimeoutExpired:
self._process.kill()
self._process.communicate()
for i in range(self._num_processes):
process = self._process[i]
with contextlib.suppress(Exception):
process.stdin.write(b"-stay_open\n")
process.stdin.write(b"False\n")
process.stdin.flush()
try:
process.communicate(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
process.communicate()
del self._process
self._process = []
self._process_running = False
@ -233,6 +250,7 @@ class ExifTool:
self._exiftoolproc = _ExifToolProc(
exiftool=exiftool, large_file_support=large_file_support
)
self._lock = threading.Lock()
self._read_exif()
@property
@ -336,57 +354,54 @@ class ExifTool:
if not commands:
raise TypeError("must provide one or more command to run")
if self._context_mgr and self.overwrite:
commands = list(commands)
commands.append("-overwrite_original")
with self._lock:
if self._context_mgr and self.overwrite:
commands = list(commands)
commands.append("-overwrite_original")
filename = b"" if no_file else os.fsencode(self.file)
filename = b"" if no_file else os.fsencode(self.file)
if self.flags:
# need to split flags, e.g. so "--ext AVI" becomes ["--ext", "AVI"]
flags = []
for f in self.flags:
flags.extend(f.split())
command_str = b"\n".join([f.encode("utf-8") for f in flags])
command_str += b"\n"
else:
command_str = b""
command_str += (
b"\n".join([c.encode("utf-8") for c in commands])
+ b"\n"
+ filename
+ b"\n"
+ b"-execute\n"
)
# send the command
self._process.stdin.write(command_str)
self._process.stdin.flush()
# read the output
output = b""
warning = b""
error = b""
while EXIFTOOL_STAYOPEN_EOF not in str(output):
line = self._process.stdout.readline()
if line.startswith(b"Warning"):
warning += line.strip()
elif line.startswith(b"Error"):
error += line.strip()
if self.flags:
# need to split flags, e.g. so "--ext AVI" becomes ["--ext", "AVI"]
flags = []
for f in self.flags:
flags.extend(f.split())
command_str = b"\n".join([f.encode("utf-8") for f in flags])
command_str += b"\n"
else:
output += line.strip()
warning = "" if warning == b"" else warning.decode("utf-8")
error = "" if error == b"" else error.decode("utf-8")
self.warning = warning
self.error = error
command_str = b""
return output[:-EXIFTOOL_STAYOPEN_EOF_LEN], warning, error
command_str += (
b"\n".join([c.encode("utf-8") for c in commands])
+ b"\n"
+ filename
+ b"\n"
+ b"-execute\n"
)
@property
def pid(self):
"""return process id (PID) of the exiftool process"""
return self._process.pid
# send the command
process = self._process
process.stdin.write(command_str)
process.stdin.flush()
# read the output
output = b""
warning = b""
error = b""
while EXIFTOOL_STAYOPEN_EOF not in str(output):
line = process.stdout.readline()
if line.startswith(b"Warning"):
warning += line.strip()
elif line.startswith(b"Error"):
error += line.strip()
else:
output += line.strip()
warning = "" if warning == b"" else warning.decode("utf-8")
error = "" if error == b"" else error.decode("utf-8")
self.warning = warning
self.error = error
return output[:-EXIFTOOL_STAYOPEN_EOF_LEN], warning, error
@property
def version(self):
@ -404,7 +419,7 @@ class ExifTool:
"""
json_str, _, _ = self.run_commands("-json")
if not json_str:
return dict()
return {}
json_str = unescape_str(json_str.decode("utf-8"))
try:
@ -412,8 +427,8 @@ class ExifTool:
except Exception as e:
# will fail with some commands, e.g --ext AVI which produces
# 'No file with specified extension' instead of json
logging.warning(f"error loading json returned by exiftool: {e} {json_str}")
return dict()
logger.warning(f"error loading json returned by exiftool: {e} {json_str}")
return {}
exifdict = exifdict[0]
if not tag_groups:
# strip tag groups
@ -482,7 +497,12 @@ class _ExifToolCaching(ExifTool):
"""
self._json_cache = None
self._asdict_cache = {}
super().__init__(filepath, exiftool=exiftool, overwrite=False, flags=None)
super().__init__(
filepath,
exiftool=exiftool,
overwrite=False,
flags=None,
)
def run_commands(self, *commands, no_file=False):
if commands[0] not in ["-json", "-ver"]:

View File

@ -419,22 +419,6 @@ def test_addvalues_unicode():
assert sorted(exif.data["IPTC:Keywords"]) == sorted(["ǂ", "Ƕ"])
def test_singleton():
import osxphotos.exiftool
exif1 = osxphotos.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
exif2 = osxphotos.exiftool.ExifTool(TEST_FILE_MULTI_KEYWORD)
assert exif1._process.pid == exif2._process.pid
def test_pid():
import osxphotos.exiftool
exif1 = osxphotos.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
assert exif1.pid == exif1._process.pid
def test_exiftoolproc_process():
import osxphotos.exiftool