Compare commits

...

3 Commits

Author SHA1 Message Date
Rhet Turnbull
030191be96 Working on making export CLI threadsafe 2023-04-02 12:36:51 -07:00
Rhet Turnbull
81127b6d89 Added locking to exiftool 2023-04-01 19:07:36 -07:00
Rhet Turnbull
d8c7d45056 Updated ExifTool to use multiple processes 2023-04-01 19:00:01 -07:00
3 changed files with 296 additions and 267 deletions

View File

@ -1,5 +1,7 @@
"""export command for osxphotos CLI""" """export command for osxphotos CLI"""
from __future__ import annotations
import atexit import atexit
import inspect import inspect
import os import os
@ -9,7 +11,8 @@ import shlex
import subprocess import subprocess
import sys import sys
import time import time
from typing import Iterable, List, Optional, Tuple from typing import Iterable, List, Optional, Tuple, Any, Callable
import concurrent.futures
import click import click
from osxmetadata import ( from osxmetadata import (
@ -1426,173 +1429,156 @@ def export(
photo_num = 0 photo_num = 0
num_exported = 0 num_exported = 0
# hack to avoid passing all the options to export_photo
kwargs = locals().copy()
kwargs["export_dir"] = dest
kwargs["export_preview"] = preview
limit_str = f" (limit = [num]{limit}[/num])" if limit else "" limit_str = f" (limit = [num]{limit}[/num])" if limit else ""
with rich_progress(console=get_verbose_console(), mock=no_progress) as progress: with rich_progress(console=get_verbose_console(), mock=no_progress) as progress:
task = progress.add_task( task = progress.add_task(
f"Exporting [num]{num_photos}[/] photos{limit_str}", total=num_photos f"Exporting [num]{num_photos}[/] photos{limit_str}", total=num_photos
) )
for p in photos: futures = []
photo_num += 1 with concurrent.futures.ThreadPoolExecutor(
# hack to avoid passing all the options to export_photo # max_workers=os.cpu_count()
kwargs = { max_workers=1,
k: v ) as executor:
for k, v in locals().items() for p in photos:
if k in inspect.getfullargspec(export_photo).args photo_num += 1
} kwargs["photo_num"] = photo_num
kwargs["photo"] = p futures.append(executor.submit(export_worker, p, **kwargs))
kwargs["export_dir"] = dest
kwargs["export_preview"] = preview for future in concurrent.futures.as_completed(futures):
export_results = export_photo(**kwargs) p, export_results = future.result()
if post_function: if album_export and export_results.exported:
for function in post_function: try:
# post function is tuple of (function, filename.py::function_name) album_export.add(p)
verbose(f"Calling post-function [bold]{function[1]}") export_results.exported_album = [
if not dry_run: (filename, album_export.name)
try: for filename in export_results.exported
function[0](p, export_results, verbose) ]
except Exception as e: except Exception as e:
rich_echo_error( click.secho(
f"[error]Error running post-function [italic]{function[1]}[/italic]: {e}" f"Error adding photo {p.original_filename} ({p.uuid}) to album {album_export.name}: {e}",
fg=CLI_COLOR_ERROR,
err=True,
)
if album_skipped and export_results.skipped:
try:
album_skipped.add(p)
export_results.skipped_album = [
(filename, album_skipped.name)
for filename in export_results.skipped
]
except Exception as e:
click.secho(
f"Error adding photo {p.original_filename} ({p.uuid}) to album {album_skipped.name}: {e}",
fg=CLI_COLOR_ERROR,
err=True,
)
if album_missing and export_results.missing:
try:
album_missing.add(p)
export_results.missing_album = [
(filename, album_missing.name)
for filename in export_results.missing
]
except Exception as e:
click.secho(
f"Error adding photo {p.original_filename} ({p.uuid}) to album {album_missing.name}: {e}",
fg=CLI_COLOR_ERROR,
err=True,
)
results += export_results
# all photo files (not including sidecars) that are part of this export set
# used below for applying Finder tags, etc.
photo_files = set(
export_results.exported
+ export_results.new
+ export_results.updated
+ export_results.exif_updated
+ export_results.converted_to_jpeg
+ export_results.skipped
)
if finder_tag_keywords or finder_tag_template:
if dry_run:
for filepath in photo_files:
verbose(
f"Writing Finder tags to [filepath]{filepath}[/]"
) )
else:
run_post_command( tags_written, tags_skipped = write_finder_tags(
photo=p, p,
post_command=post_command, photo_files,
export_results=export_results, keywords=finder_tag_keywords,
export_dir=dest, keyword_template=keyword_template,
dry_run=dry_run, album_keyword=album_keyword,
exiftool_path=exiftool_path, person_keyword=person_keyword,
export_db=export_db, exiftool_merge_keywords=exiftool_merge_keywords,
verbose=verbose, finder_tag_template=finder_tag_template,
) strip=strip,
export_dir=dest,
if album_export and export_results.exported: verbose=verbose,
try:
album_export.add(p)
export_results.exported_album = [
(filename, album_export.name)
for filename in export_results.exported
]
except Exception as e:
click.secho(
f"Error adding photo {p.original_filename} ({p.uuid}) to album {album_export.name}: {e}",
fg=CLI_COLOR_ERROR,
err=True,
)
if album_skipped and export_results.skipped:
try:
album_skipped.add(p)
export_results.skipped_album = [
(filename, album_skipped.name)
for filename in export_results.skipped
]
except Exception as e:
click.secho(
f"Error adding photo {p.original_filename} ({p.uuid}) to album {album_skipped.name}: {e}",
fg=CLI_COLOR_ERROR,
err=True,
)
if album_missing and export_results.missing:
try:
album_missing.add(p)
export_results.missing_album = [
(filename, album_missing.name)
for filename in export_results.missing
]
except Exception as e:
click.secho(
f"Error adding photo {p.original_filename} ({p.uuid}) to album {album_missing.name}: {e}",
fg=CLI_COLOR_ERROR,
err=True,
)
results += export_results
# all photo files (not including sidecars) that are part of this export set
# used below for applying Finder tags, etc.
photo_files = set(
export_results.exported
+ export_results.new
+ export_results.updated
+ export_results.exif_updated
+ export_results.converted_to_jpeg
+ export_results.skipped
)
if finder_tag_keywords or finder_tag_template:
if dry_run:
for filepath in photo_files:
verbose(f"Writing Finder tags to [filepath]{filepath}[/]")
else:
tags_written, tags_skipped = write_finder_tags(
p,
photo_files,
keywords=finder_tag_keywords,
keyword_template=keyword_template,
album_keyword=album_keyword,
person_keyword=person_keyword,
exiftool_merge_keywords=exiftool_merge_keywords,
finder_tag_template=finder_tag_template,
strip=strip,
export_dir=dest,
verbose=verbose,
)
export_results.xattr_written.extend(tags_written)
export_results.xattr_skipped.extend(tags_skipped)
results.xattr_written.extend(tags_written)
results.xattr_skipped.extend(tags_skipped)
if xattr_template:
if dry_run:
for filepath in photo_files:
verbose(
f"Writing extended attributes to [filepath]{filepath}[/]"
) )
else: export_results.xattr_written.extend(tags_written)
xattr_written, xattr_skipped = write_extended_attributes( export_results.xattr_skipped.extend(tags_skipped)
p, results.xattr_written.extend(tags_written)
photo_files, results.xattr_skipped.extend(tags_skipped)
xattr_template,
strip=strip,
export_dir=dest,
verbose=verbose,
)
export_results.xattr_written.extend(xattr_written)
export_results.xattr_skipped.extend(xattr_skipped)
results.xattr_written.extend(xattr_written)
results.xattr_skipped.extend(xattr_skipped)
report_writer.write(export_results) if xattr_template:
if dry_run:
if print_template: for filepath in photo_files:
options = RenderOptions(export_dir=dest) verbose(
for template in print_template: f"Writing extended attributes to [filepath]{filepath}[/]"
rendered_templates, unmatched = p.render_template( )
template, else:
options, xattr_written, xattr_skipped = write_extended_attributes(
) p,
if unmatched: photo_files,
rich_click_echo( xattr_template,
f"[warning]Unmatched template field: {unmatched}[/]" strip=strip,
export_dir=dest,
verbose=verbose,
) )
for rendered_template in rendered_templates: export_results.xattr_written.extend(xattr_written)
if not rendered_template: export_results.xattr_skipped.extend(xattr_skipped)
continue results.xattr_written.extend(xattr_written)
rich_click_echo(rendered_template) results.xattr_skipped.extend(xattr_skipped)
progress.advance(task) report_writer.write(export_results)
# handle limit if print_template:
if export_results.exported: options = RenderOptions(export_dir=dest)
# if any photos were exported, increment num_exported used by limit for template in print_template:
# limit considers each PhotoInfo object as a single photo even if multiple files are exported rendered_templates, unmatched = p.render_template(
num_exported += 1 template,
if limit and num_exported >= limit: options,
# advance progress to end )
progress.advance(task, num_photos - photo_num) if unmatched:
break rich_click_echo(
f"[warning]Unmatched template field: {unmatched}[/]"
)
for rendered_template in rendered_templates:
if not rendered_template:
continue
rich_click_echo(rendered_template)
progress.advance(task)
# handle limit
if export_results.exported:
# if any photos were exported, increment num_exported used by limit
# limit considers each PhotoInfo object as a single photo even if multiple files are exported
num_exported += 1
if limit and num_exported >= limit:
# advance progress to end
progress.advance(task, num_photos - photo_num)
break
photo_str_total = pluralize(len(photos), "photo", "photos") photo_str_total = pluralize(len(photos), "photo", "photos")
if update or force_update: if update or force_update:
@ -1682,6 +1668,45 @@ def export(
export_db.close() export_db.close()
def export_worker(
photo: osxphotos.PhotoInfo, **kwargs
) -> tuple[osxphotos.PhotoInfo, ExportResults]:
"""Export worker function for multi-threaded export of photos"""
dry_run = kwargs["dry_run"]
verbose: Callable[[str], Any] = kwargs["verbose"]
export_args = {
k: v
for k, v in kwargs.items()
if k in inspect.getfullargspec(export_photo).args
}
export_args["photo"] = photo
export_results = export_photo(**export_args)
if post_function := kwargs["post_function"]:
for function in post_function:
# post function is tuple of (function, filename.py::function_name)
verbose(f"Calling post-function [bold]{function[1]}")
if not dry_run:
try:
function[0](photo, export_results, verbose)
except Exception as e:
rich_echo_error(
f"[error]Error running post-function [italic]{function[1]}[/italic]: {e}"
)
run_post_command(
photo=photo,
post_command=kwargs["post_command"],
export_results=export_results,
export_dir=kwargs["dest"],
dry_run=dry_run,
exiftool_path=kwargs["exiftool_path"],
export_db=kwargs["export_db"],
verbose=verbose,
)
return photo, export_results
def export_photo( def export_photo(
photo=None, photo=None,
dest=None, dest=None,

View File

@ -1,11 +1,11 @@
""" Yet another simple exiftool wrapper """ Yet another simple exiftool wrapper
I rolled my own for following reasons: I rolled my own for following reasons:
1. I wanted something under MIT license (best alternative was licensed under GPL/BSD) 1. I wanted something under MIT license (best alternative was licensed under GPL/BSD)
2. I wanted singleton behavior so only a single exiftool process was ever running 2. I wanted exiftool processes to stay resident between calls (improved performance)
3. When used as a context manager, I wanted the operations to batch until exiting the context (improved performance) 3. When used as a context manager, I wanted the operations to batch until exiting the context (improved performance)
If these aren't important to you, I highly recommend you use Sven Marnach's excellent """
pyexiftool: https://github.com/smarnach/pyexiftool which provides more functionality """
from __future__ import annotations
import atexit import atexit
import contextlib import contextlib
@ -17,6 +17,7 @@ import pathlib
import re import re
import shutil import shutil
import subprocess import subprocess
import threading
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from functools import lru_cache # pylint: disable=syntax-error from functools import lru_cache # pylint: disable=syntax-error
@ -30,6 +31,8 @@ __all__ = [
"unescape_str", "unescape_str",
] ]
logger = logging.getLogger("osxphotos")
# exiftool -stay_open commands outputs this EOF marker after command is run # exiftool -stay_open commands outputs this EOF marker after command is run
EXIFTOOL_STAYOPEN_EOF = "{ready}" EXIFTOOL_STAYOPEN_EOF = "{ready}"
EXIFTOOL_STAYOPEN_EOF_LEN = len(EXIFTOOL_STAYOPEN_EOF) EXIFTOOL_STAYOPEN_EOF_LEN = len(EXIFTOOL_STAYOPEN_EOF)
@ -42,6 +45,8 @@ EXIFTOOL_FILETYPES_JSON = "exiftool_filetypes.json"
with (pathlib.Path(__file__).parent / EXIFTOOL_FILETYPES_JSON).open("r") as f: with (pathlib.Path(__file__).parent / EXIFTOOL_FILETYPES_JSON).open("r") as f:
EXIFTOOL_SUPPORTED_FILETYPES = json.load(f) EXIFTOOL_SUPPORTED_FILETYPES = json.load(f)
NUM_PROCESSES = os.cpu_count() or 1
def exiftool_can_write(suffix: str) -> bool: def exiftool_can_write(suffix: str) -> bool:
"""Return True if exiftool supports writing to a file with the given suffix, otherwise False""" """Return True if exiftool supports writing to a file with the given suffix, otherwise False"""
@ -96,8 +101,11 @@ def get_exiftool_path():
class _ExifToolProc: class _ExifToolProc:
"""Runs exiftool in a subprocess via Popen """
Creates a singleton object""" Runs exiftool in a subprocess via Popen
Creates a singleton object that dispatches commands to one or
more exiftool subprocesses.
"""
def __new__(cls, *args, **kwargs): def __new__(cls, *args, **kwargs):
"""create new object or return instance of already created singleton""" """create new object or return instance of already created singleton"""
@ -106,7 +114,11 @@ class _ExifToolProc:
return cls.instance return cls.instance
def __init__(self, exiftool=None, large_file_support=True): def __init__(
self,
exiftool: str | None = None,
large_file_support: bool = True,
):
"""construct _ExifToolProc singleton object or return instance of already created object """construct _ExifToolProc singleton object or return instance of already created object
Args: Args:
@ -117,7 +129,7 @@ class _ExifToolProc:
if hasattr(self, "_process_running") and self._process_running: if hasattr(self, "_process_running") and self._process_running:
# already running # already running
if exiftool is not None and exiftool != self._exiftool: if exiftool is not None and exiftool != self._exiftool:
logging.warning( logger.warning(
f"exiftool subprocess already running, " f"exiftool subprocess already running, "
f"ignoring exiftool={exiftool}" f"ignoring exiftool={exiftool}"
) )
@ -125,6 +137,9 @@ class _ExifToolProc:
self._process_running = False self._process_running = False
self._large_file_support = large_file_support self._large_file_support = large_file_support
self._exiftool = exiftool or get_exiftool_path() self._exiftool = exiftool or get_exiftool_path()
self._num_processes = NUM_PROCESSES
self._process = []
self._process_counter = 0
self._start_proc(large_file_support=large_file_support) self._start_proc(large_file_support=large_file_support)
@property @property
@ -132,23 +147,20 @@ class _ExifToolProc:
"""return the exiftool subprocess""" """return the exiftool subprocess"""
if not self._process_running: if not self._process_running:
self._start_proc(large_file_support=self._large_file_support) self._start_proc(large_file_support=self._large_file_support)
return self._process process_idx = self._process_counter % self._num_processes
self._process_counter += 1
@property return self._process[process_idx]
def pid(self):
"""return process id (PID) of the exiftool process"""
return self._process.pid
@property @property
def exiftool(self): def exiftool(self):
"""return path to exiftool process""" """return path to exiftool process"""
return self._exiftool return self._exiftool
def _start_proc(self, large_file_support): def _start_proc(self, large_file_support: bool):
"""start exiftool in batch mode""" """start exiftool in batch mode"""
if self._process_running: if self._process_running:
logging.warning("exiftool already running: {self._process}") logger.debug(f"exiftool already running: {self._process}")
return return
# open exiftool process # open exiftool process
@ -156,25 +168,28 @@ class _ExifToolProc:
env = os.environ.copy() env = os.environ.copy()
env["PATH"] = f'/usr/bin/:{env["PATH"]}' env["PATH"] = f'/usr/bin/:{env["PATH"]}'
large_file_args = ["-api", "largefilesupport=1"] if large_file_support else [] large_file_args = ["-api", "largefilesupport=1"] if large_file_support else []
self._process = subprocess.Popen( for _ in range(self._num_processes):
[ self._process.append(
self._exiftool, subprocess.Popen(
"-stay_open", # keep process open in batch mode [
"True", # -stay_open=True, keep process open in batch mode self._exiftool,
*large_file_args, "-stay_open", # keep process open in batch mode
"-@", # read command-line arguments from file "True", # -stay_open=True, keep process open in batch mode
"-", # read from stdin *large_file_args,
"-common_args", # specifies args common to all commands subsequently run "-@", # read command-line arguments from file
"-n", # no print conversion (e.g. print tag values in machine readable format) "-", # read from stdin
"-P", # Preserve file modification date/time "-common_args", # specifies args common to all commands subsequently run
"-G", # print group name for each tag "-n", # no print conversion (e.g. print tag values in machine readable format)
"-E", # escape tag values for HTML (allows use of HTML 
 for newlines) "-P", # Preserve file modification date/time
], "-G", # print group name for each tag
stdin=subprocess.PIPE, "-E", # escape tag values for HTML (allows use of HTML 
 for newlines)
stdout=subprocess.PIPE, ],
stderr=subprocess.STDOUT, stdin=subprocess.PIPE,
env=env, stdout=subprocess.PIPE,
) stderr=subprocess.STDOUT,
env=env,
)
)
self._process_running = True self._process_running = True
EXIFTOOL_PROCESSES.append(self) EXIFTOOL_PROCESSES.append(self)
@ -185,17 +200,19 @@ class _ExifToolProc:
if not self._process_running: if not self._process_running:
return return
with contextlib.suppress(Exception): for i in range(self._num_processes):
self._process.stdin.write(b"-stay_open\n") process = self._process[i]
self._process.stdin.write(b"False\n") with contextlib.suppress(Exception):
self._process.stdin.flush() process.stdin.write(b"-stay_open\n")
try: process.stdin.write(b"False\n")
self._process.communicate(timeout=5) process.stdin.flush()
except subprocess.TimeoutExpired: try:
self._process.kill() process.communicate(timeout=5)
self._process.communicate() except subprocess.TimeoutExpired:
process.kill()
process.communicate()
del self._process self._process = []
self._process_running = False self._process_running = False
@ -233,6 +250,7 @@ class ExifTool:
self._exiftoolproc = _ExifToolProc( self._exiftoolproc = _ExifToolProc(
exiftool=exiftool, large_file_support=large_file_support exiftool=exiftool, large_file_support=large_file_support
) )
self._lock = threading.Lock()
self._read_exif() self._read_exif()
@property @property
@ -336,57 +354,54 @@ class ExifTool:
if not commands: if not commands:
raise TypeError("must provide one or more command to run") raise TypeError("must provide one or more command to run")
if self._context_mgr and self.overwrite: with self._lock:
commands = list(commands) if self._context_mgr and self.overwrite:
commands.append("-overwrite_original") commands = list(commands)
commands.append("-overwrite_original")
filename = b"" if no_file else os.fsencode(self.file) filename = b"" if no_file else os.fsencode(self.file)
if self.flags: if self.flags:
# need to split flags, e.g. so "--ext AVI" becomes ["--ext", "AVI"] # need to split flags, e.g. so "--ext AVI" becomes ["--ext", "AVI"]
flags = [] flags = []
for f in self.flags: for f in self.flags:
flags.extend(f.split()) flags.extend(f.split())
command_str = b"\n".join([f.encode("utf-8") for f in flags]) command_str = b"\n".join([f.encode("utf-8") for f in flags])
command_str += b"\n" command_str += b"\n"
else:
command_str = b""
command_str += (
b"\n".join([c.encode("utf-8") for c in commands])
+ b"\n"
+ filename
+ b"\n"
+ b"-execute\n"
)
# send the command
self._process.stdin.write(command_str)
self._process.stdin.flush()
# read the output
output = b""
warning = b""
error = b""
while EXIFTOOL_STAYOPEN_EOF not in str(output):
line = self._process.stdout.readline()
if line.startswith(b"Warning"):
warning += line.strip()
elif line.startswith(b"Error"):
error += line.strip()
else: else:
output += line.strip() command_str = b""
warning = "" if warning == b"" else warning.decode("utf-8")
error = "" if error == b"" else error.decode("utf-8")
self.warning = warning
self.error = error
return output[:-EXIFTOOL_STAYOPEN_EOF_LEN], warning, error command_str += (
b"\n".join([c.encode("utf-8") for c in commands])
+ b"\n"
+ filename
+ b"\n"
+ b"-execute\n"
)
@property # send the command
def pid(self): process = self._process
"""return process id (PID) of the exiftool process""" process.stdin.write(command_str)
return self._process.pid process.stdin.flush()
# read the output
output = b""
warning = b""
error = b""
while EXIFTOOL_STAYOPEN_EOF not in str(output):
line = process.stdout.readline()
if line.startswith(b"Warning"):
warning += line.strip()
elif line.startswith(b"Error"):
error += line.strip()
else:
output += line.strip()
warning = "" if warning == b"" else warning.decode("utf-8")
error = "" if error == b"" else error.decode("utf-8")
self.warning = warning
self.error = error
return output[:-EXIFTOOL_STAYOPEN_EOF_LEN], warning, error
@property @property
def version(self): def version(self):
@ -404,7 +419,7 @@ class ExifTool:
""" """
json_str, _, _ = self.run_commands("-json") json_str, _, _ = self.run_commands("-json")
if not json_str: if not json_str:
return dict() return {}
json_str = unescape_str(json_str.decode("utf-8")) json_str = unescape_str(json_str.decode("utf-8"))
try: try:
@ -412,8 +427,8 @@ class ExifTool:
except Exception as e: except Exception as e:
# will fail with some commands, e.g --ext AVI which produces # will fail with some commands, e.g --ext AVI which produces
# 'No file with specified extension' instead of json # 'No file with specified extension' instead of json
logging.warning(f"error loading json returned by exiftool: {e} {json_str}") logger.warning(f"error loading json returned by exiftool: {e} {json_str}")
return dict() return {}
exifdict = exifdict[0] exifdict = exifdict[0]
if not tag_groups: if not tag_groups:
# strip tag groups # strip tag groups
@ -482,7 +497,12 @@ class _ExifToolCaching(ExifTool):
""" """
self._json_cache = None self._json_cache = None
self._asdict_cache = {} self._asdict_cache = {}
super().__init__(filepath, exiftool=exiftool, overwrite=False, flags=None) super().__init__(
filepath,
exiftool=exiftool,
overwrite=False,
flags=None,
)
def run_commands(self, *commands, no_file=False): def run_commands(self, *commands, no_file=False):
if commands[0] not in ["-json", "-ver"]: if commands[0] not in ["-json", "-ver"]:

View File

@ -419,22 +419,6 @@ def test_addvalues_unicode():
assert sorted(exif.data["IPTC:Keywords"]) == sorted(["ǂ", "Ƕ"]) assert sorted(exif.data["IPTC:Keywords"]) == sorted(["ǂ", "Ƕ"])
def test_singleton():
import osxphotos.exiftool
exif1 = osxphotos.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
exif2 = osxphotos.exiftool.ExifTool(TEST_FILE_MULTI_KEYWORD)
assert exif1._process.pid == exif2._process.pid
def test_pid():
import osxphotos.exiftool
exif1 = osxphotos.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
assert exif1.pid == exif1._process.pid
def test_exiftoolproc_process(): def test_exiftoolproc_process():
import osxphotos.exiftool import osxphotos.exiftool