Compare commits

...

3 Commits

Author SHA1 Message Date
Rhet Turnbull
030191be96 Working on making export CLI threadsafe 2023-04-02 12:36:51 -07:00
Rhet Turnbull
81127b6d89 Added locking to exiftool 2023-04-01 19:07:36 -07:00
Rhet Turnbull
d8c7d45056 Updated ExifTool to use multiple processes 2023-04-01 19:00:01 -07:00
3 changed files with 296 additions and 267 deletions

View File

@@ -1,5 +1,7 @@
"""export command for osxphotos CLI""" """export command for osxphotos CLI"""
from __future__ import annotations
import atexit import atexit
import inspect import inspect
import os import os
@@ -9,7 +11,8 @@ import shlex
import subprocess import subprocess
import sys import sys
import time import time
from typing import Iterable, List, Optional, Tuple from typing import Iterable, List, Optional, Tuple, Any, Callable
import concurrent.futures
import click import click
from osxmetadata import ( from osxmetadata import (
@@ -1426,46 +1429,27 @@ def export(
photo_num = 0 photo_num = 0
num_exported = 0 num_exported = 0
# hack to avoid passing all the options to export_photo
kwargs = locals().copy()
kwargs["export_dir"] = dest
kwargs["export_preview"] = preview
limit_str = f" (limit = [num]{limit}[/num])" if limit else "" limit_str = f" (limit = [num]{limit}[/num])" if limit else ""
with rich_progress(console=get_verbose_console(), mock=no_progress) as progress: with rich_progress(console=get_verbose_console(), mock=no_progress) as progress:
task = progress.add_task( task = progress.add_task(
f"Exporting [num]{num_photos}[/] photos{limit_str}", total=num_photos f"Exporting [num]{num_photos}[/] photos{limit_str}", total=num_photos
) )
futures = []
with concurrent.futures.ThreadPoolExecutor(
# max_workers=os.cpu_count()
max_workers=1,
) as executor:
for p in photos: for p in photos:
photo_num += 1 photo_num += 1
# hack to avoid passing all the options to export_photo kwargs["photo_num"] = photo_num
kwargs = { futures.append(executor.submit(export_worker, p, **kwargs))
k: v
for k, v in locals().items()
if k in inspect.getfullargspec(export_photo).args
}
kwargs["photo"] = p
kwargs["export_dir"] = dest
kwargs["export_preview"] = preview
export_results = export_photo(**kwargs)
if post_function:
for function in post_function:
# post function is tuple of (function, filename.py::function_name)
verbose(f"Calling post-function [bold]{function[1]}")
if not dry_run:
try:
function[0](p, export_results, verbose)
except Exception as e:
rich_echo_error(
f"[error]Error running post-function [italic]{function[1]}[/italic]: {e}"
)
run_post_command(
photo=p,
post_command=post_command,
export_results=export_results,
export_dir=dest,
dry_run=dry_run,
exiftool_path=exiftool_path,
export_db=export_db,
verbose=verbose,
)
for future in concurrent.futures.as_completed(futures):
p, export_results = future.result()
if album_export and export_results.exported: if album_export and export_results.exported:
try: try:
album_export.add(p) album_export.add(p)
@@ -1524,7 +1508,9 @@ def export(
if finder_tag_keywords or finder_tag_template: if finder_tag_keywords or finder_tag_template:
if dry_run: if dry_run:
for filepath in photo_files: for filepath in photo_files:
verbose(f"Writing Finder tags to [filepath]{filepath}[/]") verbose(
f"Writing Finder tags to [filepath]{filepath}[/]"
)
else: else:
tags_written, tags_skipped = write_finder_tags( tags_written, tags_skipped = write_finder_tags(
p, p,
@@ -1682,6 +1668,45 @@ def export(
export_db.close() export_db.close()
def export_worker(
photo: osxphotos.PhotoInfo, **kwargs
) -> tuple[osxphotos.PhotoInfo, ExportResults]:
"""Export worker function for multi-threaded export of photos"""
dry_run = kwargs["dry_run"]
verbose: Callable[[str], Any] = kwargs["verbose"]
export_args = {
k: v
for k, v in kwargs.items()
if k in inspect.getfullargspec(export_photo).args
}
export_args["photo"] = photo
export_results = export_photo(**export_args)
if post_function := kwargs["post_function"]:
for function in post_function:
# post function is tuple of (function, filename.py::function_name)
verbose(f"Calling post-function [bold]{function[1]}")
if not dry_run:
try:
function[0](photo, export_results, verbose)
except Exception as e:
rich_echo_error(
f"[error]Error running post-function [italic]{function[1]}[/italic]: {e}"
)
run_post_command(
photo=photo,
post_command=kwargs["post_command"],
export_results=export_results,
export_dir=kwargs["dest"],
dry_run=dry_run,
exiftool_path=kwargs["exiftool_path"],
export_db=kwargs["export_db"],
verbose=verbose,
)
return photo, export_results
def export_photo( def export_photo(
photo=None, photo=None,
dest=None, dest=None,

View File

@@ -1,11 +1,11 @@
""" Yet another simple exiftool wrapper """ Yet another simple exiftool wrapper
I rolled my own for following reasons: I rolled my own for following reasons:
1. I wanted something under MIT license (best alternative was licensed under GPL/BSD) 1. I wanted something under MIT license (best alternative was licensed under GPL/BSD)
2. I wanted singleton behavior so only a single exiftool process was ever running 2. I wanted exiftool processes to stay resident between calls (improved performance)
3. When used as a context manager, I wanted the operations to batch until exiting the context (improved performance) 3. When used as a context manager, I wanted the operations to batch until exiting the context (improved performance)
If these aren't important to you, I highly recommend you use Sven Marnach's excellent """
pyexiftool: https://github.com/smarnach/pyexiftool which provides more functionality """
from __future__ import annotations
import atexit import atexit
import contextlib import contextlib
@@ -17,6 +17,7 @@ import pathlib
import re import re
import shutil import shutil
import subprocess import subprocess
import threading
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from functools import lru_cache # pylint: disable=syntax-error from functools import lru_cache # pylint: disable=syntax-error
@@ -30,6 +31,8 @@ __all__ = [
"unescape_str", "unescape_str",
] ]
logger = logging.getLogger("osxphotos")
# exiftool -stay_open commands outputs this EOF marker after command is run # exiftool -stay_open commands outputs this EOF marker after command is run
EXIFTOOL_STAYOPEN_EOF = "{ready}" EXIFTOOL_STAYOPEN_EOF = "{ready}"
EXIFTOOL_STAYOPEN_EOF_LEN = len(EXIFTOOL_STAYOPEN_EOF) EXIFTOOL_STAYOPEN_EOF_LEN = len(EXIFTOOL_STAYOPEN_EOF)
@@ -42,6 +45,8 @@ EXIFTOOL_FILETYPES_JSON = "exiftool_filetypes.json"
with (pathlib.Path(__file__).parent / EXIFTOOL_FILETYPES_JSON).open("r") as f: with (pathlib.Path(__file__).parent / EXIFTOOL_FILETYPES_JSON).open("r") as f:
EXIFTOOL_SUPPORTED_FILETYPES = json.load(f) EXIFTOOL_SUPPORTED_FILETYPES = json.load(f)
NUM_PROCESSES = os.cpu_count() or 1
def exiftool_can_write(suffix: str) -> bool: def exiftool_can_write(suffix: str) -> bool:
"""Return True if exiftool supports writing to a file with the given suffix, otherwise False""" """Return True if exiftool supports writing to a file with the given suffix, otherwise False"""
@@ -96,8 +101,11 @@ def get_exiftool_path():
class _ExifToolProc: class _ExifToolProc:
"""Runs exiftool in a subprocess via Popen """
Creates a singleton object""" Runs exiftool in a subprocess via Popen
Creates a singleton object that dispatches commands to one or
more exiftool subprocesses.
"""
def __new__(cls, *args, **kwargs): def __new__(cls, *args, **kwargs):
"""create new object or return instance of already created singleton""" """create new object or return instance of already created singleton"""
@@ -106,7 +114,11 @@ class _ExifToolProc:
return cls.instance return cls.instance
def __init__(self, exiftool=None, large_file_support=True): def __init__(
self,
exiftool: str | None = None,
large_file_support: bool = True,
):
"""construct _ExifToolProc singleton object or return instance of already created object """construct _ExifToolProc singleton object or return instance of already created object
Args: Args:
@@ -117,7 +129,7 @@ class _ExifToolProc:
if hasattr(self, "_process_running") and self._process_running: if hasattr(self, "_process_running") and self._process_running:
# already running # already running
if exiftool is not None and exiftool != self._exiftool: if exiftool is not None and exiftool != self._exiftool:
logging.warning( logger.warning(
f"exiftool subprocess already running, " f"exiftool subprocess already running, "
f"ignoring exiftool={exiftool}" f"ignoring exiftool={exiftool}"
) )
@@ -125,6 +137,9 @@ class _ExifToolProc:
self._process_running = False self._process_running = False
self._large_file_support = large_file_support self._large_file_support = large_file_support
self._exiftool = exiftool or get_exiftool_path() self._exiftool = exiftool or get_exiftool_path()
self._num_processes = NUM_PROCESSES
self._process = []
self._process_counter = 0
self._start_proc(large_file_support=large_file_support) self._start_proc(large_file_support=large_file_support)
@property @property
@@ -132,23 +147,20 @@ class _ExifToolProc:
"""return the exiftool subprocess""" """return the exiftool subprocess"""
if not self._process_running: if not self._process_running:
self._start_proc(large_file_support=self._large_file_support) self._start_proc(large_file_support=self._large_file_support)
return self._process process_idx = self._process_counter % self._num_processes
self._process_counter += 1
@property return self._process[process_idx]
def pid(self):
"""return process id (PID) of the exiftool process"""
return self._process.pid
@property @property
def exiftool(self): def exiftool(self):
"""return path to exiftool process""" """return path to exiftool process"""
return self._exiftool return self._exiftool
def _start_proc(self, large_file_support): def _start_proc(self, large_file_support: bool):
"""start exiftool in batch mode""" """start exiftool in batch mode"""
if self._process_running: if self._process_running:
logging.warning("exiftool already running: {self._process}") logger.debug(f"exiftool already running: {self._process}")
return return
# open exiftool process # open exiftool process
@@ -156,7 +168,9 @@ class _ExifToolProc:
env = os.environ.copy() env = os.environ.copy()
env["PATH"] = f'/usr/bin/:{env["PATH"]}' env["PATH"] = f'/usr/bin/:{env["PATH"]}'
large_file_args = ["-api", "largefilesupport=1"] if large_file_support else [] large_file_args = ["-api", "largefilesupport=1"] if large_file_support else []
self._process = subprocess.Popen( for _ in range(self._num_processes):
self._process.append(
subprocess.Popen(
[ [
self._exiftool, self._exiftool,
"-stay_open", # keep process open in batch mode "-stay_open", # keep process open in batch mode
@@ -175,6 +189,7 @@ class _ExifToolProc:
stderr=subprocess.STDOUT, stderr=subprocess.STDOUT,
env=env, env=env,
) )
)
self._process_running = True self._process_running = True
EXIFTOOL_PROCESSES.append(self) EXIFTOOL_PROCESSES.append(self)
@@ -185,17 +200,19 @@ class _ExifToolProc:
if not self._process_running: if not self._process_running:
return return
for i in range(self._num_processes):
process = self._process[i]
with contextlib.suppress(Exception): with contextlib.suppress(Exception):
self._process.stdin.write(b"-stay_open\n") process.stdin.write(b"-stay_open\n")
self._process.stdin.write(b"False\n") process.stdin.write(b"False\n")
self._process.stdin.flush() process.stdin.flush()
try: try:
self._process.communicate(timeout=5) process.communicate(timeout=5)
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
self._process.kill() process.kill()
self._process.communicate() process.communicate()
del self._process self._process = []
self._process_running = False self._process_running = False
@@ -233,6 +250,7 @@ class ExifTool:
self._exiftoolproc = _ExifToolProc( self._exiftoolproc = _ExifToolProc(
exiftool=exiftool, large_file_support=large_file_support exiftool=exiftool, large_file_support=large_file_support
) )
self._lock = threading.Lock()
self._read_exif() self._read_exif()
@property @property
@@ -336,6 +354,7 @@ class ExifTool:
if not commands: if not commands:
raise TypeError("must provide one or more command to run") raise TypeError("must provide one or more command to run")
with self._lock:
if self._context_mgr and self.overwrite: if self._context_mgr and self.overwrite:
commands = list(commands) commands = list(commands)
commands.append("-overwrite_original") commands.append("-overwrite_original")
@@ -361,15 +380,16 @@ class ExifTool:
) )
# send the command # send the command
self._process.stdin.write(command_str) process = self._process
self._process.stdin.flush() process.stdin.write(command_str)
process.stdin.flush()
# read the output # read the output
output = b"" output = b""
warning = b"" warning = b""
error = b"" error = b""
while EXIFTOOL_STAYOPEN_EOF not in str(output): while EXIFTOOL_STAYOPEN_EOF not in str(output):
line = self._process.stdout.readline() line = process.stdout.readline()
if line.startswith(b"Warning"): if line.startswith(b"Warning"):
warning += line.strip() warning += line.strip()
elif line.startswith(b"Error"): elif line.startswith(b"Error"):
@@ -383,11 +403,6 @@ class ExifTool:
return output[:-EXIFTOOL_STAYOPEN_EOF_LEN], warning, error return output[:-EXIFTOOL_STAYOPEN_EOF_LEN], warning, error
@property
def pid(self):
"""return process id (PID) of the exiftool process"""
return self._process.pid
@property @property
def version(self): def version(self):
"""returns exiftool version""" """returns exiftool version"""
@@ -404,7 +419,7 @@ class ExifTool:
""" """
json_str, _, _ = self.run_commands("-json") json_str, _, _ = self.run_commands("-json")
if not json_str: if not json_str:
return dict() return {}
json_str = unescape_str(json_str.decode("utf-8")) json_str = unescape_str(json_str.decode("utf-8"))
try: try:
@@ -412,8 +427,8 @@ class ExifTool:
except Exception as e: except Exception as e:
# will fail with some commands, e.g --ext AVI which produces # will fail with some commands, e.g --ext AVI which produces
# 'No file with specified extension' instead of json # 'No file with specified extension' instead of json
logging.warning(f"error loading json returned by exiftool: {e} {json_str}") logger.warning(f"error loading json returned by exiftool: {e} {json_str}")
return dict() return {}
exifdict = exifdict[0] exifdict = exifdict[0]
if not tag_groups: if not tag_groups:
# strip tag groups # strip tag groups
@@ -482,7 +497,12 @@ class _ExifToolCaching(ExifTool):
""" """
self._json_cache = None self._json_cache = None
self._asdict_cache = {} self._asdict_cache = {}
super().__init__(filepath, exiftool=exiftool, overwrite=False, flags=None) super().__init__(
filepath,
exiftool=exiftool,
overwrite=False,
flags=None,
)
def run_commands(self, *commands, no_file=False): def run_commands(self, *commands, no_file=False):
if commands[0] not in ["-json", "-ver"]: if commands[0] not in ["-json", "-ver"]:

View File

@@ -419,22 +419,6 @@ def test_addvalues_unicode():
assert sorted(exif.data["IPTC:Keywords"]) == sorted(["ǂ", "Ƕ"]) assert sorted(exif.data["IPTC:Keywords"]) == sorted(["ǂ", "Ƕ"])
def test_singleton():
import osxphotos.exiftool
exif1 = osxphotos.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
exif2 = osxphotos.exiftool.ExifTool(TEST_FILE_MULTI_KEYWORD)
assert exif1._process.pid == exif2._process.pid
def test_pid():
import osxphotos.exiftool
exif1 = osxphotos.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
assert exif1.pid == exif1._process.pid
def test_exiftoolproc_process(): def test_exiftoolproc_process():
import osxphotos.exiftool import osxphotos.exiftool