Compare commits

...

3 Commits

Author SHA1 Message Date
Rhet Turnbull
030191be96 Working on making export CLI threadsafe 2023-04-02 12:36:51 -07:00
Rhet Turnbull
81127b6d89 Added locking to exiftool 2023-04-01 19:07:36 -07:00
Rhet Turnbull
d8c7d45056 Updated ExifTool to use multiple processes 2023-04-01 19:00:01 -07:00
3 changed files with 296 additions and 267 deletions

View File

@@ -1,5 +1,7 @@
"""export command for osxphotos CLI"""
from __future__ import annotations
import atexit
import inspect
import os
@@ -9,7 +11,8 @@ import shlex
import subprocess
import sys
import time
from typing import Iterable, List, Optional, Tuple
from typing import Iterable, List, Optional, Tuple, Any, Callable
import concurrent.futures
import click
from osxmetadata import (
@@ -1426,46 +1429,27 @@ def export(
photo_num = 0
num_exported = 0
# hack to avoid passing all the options to export_photo
kwargs = locals().copy()
kwargs["export_dir"] = dest
kwargs["export_preview"] = preview
limit_str = f" (limit = [num]{limit}[/num])" if limit else ""
with rich_progress(console=get_verbose_console(), mock=no_progress) as progress:
task = progress.add_task(
f"Exporting [num]{num_photos}[/] photos{limit_str}", total=num_photos
)
futures = []
with concurrent.futures.ThreadPoolExecutor(
# max_workers=os.cpu_count()
max_workers=1,
) as executor:
for p in photos:
photo_num += 1
# hack to avoid passing all the options to export_photo
kwargs = {
k: v
for k, v in locals().items()
if k in inspect.getfullargspec(export_photo).args
}
kwargs["photo"] = p
kwargs["export_dir"] = dest
kwargs["export_preview"] = preview
export_results = export_photo(**kwargs)
if post_function:
for function in post_function:
# post function is tuple of (function, filename.py::function_name)
verbose(f"Calling post-function [bold]{function[1]}")
if not dry_run:
try:
function[0](p, export_results, verbose)
except Exception as e:
rich_echo_error(
f"[error]Error running post-function [italic]{function[1]}[/italic]: {e}"
)
run_post_command(
photo=p,
post_command=post_command,
export_results=export_results,
export_dir=dest,
dry_run=dry_run,
exiftool_path=exiftool_path,
export_db=export_db,
verbose=verbose,
)
kwargs["photo_num"] = photo_num
futures.append(executor.submit(export_worker, p, **kwargs))
for future in concurrent.futures.as_completed(futures):
p, export_results = future.result()
if album_export and export_results.exported:
try:
album_export.add(p)
@@ -1524,7 +1508,9 @@ def export(
if finder_tag_keywords or finder_tag_template:
if dry_run:
for filepath in photo_files:
verbose(f"Writing Finder tags to [filepath]{filepath}[/]")
verbose(
f"Writing Finder tags to [filepath]{filepath}[/]"
)
else:
tags_written, tags_skipped = write_finder_tags(
p,
@@ -1682,6 +1668,45 @@ def export(
export_db.close()
def export_worker(
photo: osxphotos.PhotoInfo, **kwargs
) -> tuple[osxphotos.PhotoInfo, ExportResults]:
"""Export worker function for multi-threaded export of photos"""
dry_run = kwargs["dry_run"]
verbose: Callable[[str], Any] = kwargs["verbose"]
export_args = {
k: v
for k, v in kwargs.items()
if k in inspect.getfullargspec(export_photo).args
}
export_args["photo"] = photo
export_results = export_photo(**export_args)
if post_function := kwargs["post_function"]:
for function in post_function:
# post function is tuple of (function, filename.py::function_name)
verbose(f"Calling post-function [bold]{function[1]}")
if not dry_run:
try:
function[0](photo, export_results, verbose)
except Exception as e:
rich_echo_error(
f"[error]Error running post-function [italic]{function[1]}[/italic]: {e}"
)
run_post_command(
photo=photo,
post_command=kwargs["post_command"],
export_results=export_results,
export_dir=kwargs["dest"],
dry_run=dry_run,
exiftool_path=kwargs["exiftool_path"],
export_db=kwargs["export_db"],
verbose=verbose,
)
return photo, export_results
def export_photo(
photo=None,
dest=None,

View File

@@ -1,11 +1,11 @@
""" Yet another simple exiftool wrapper
I rolled my own for following reasons:
1. I wanted something under MIT license (best alternative was licensed under GPL/BSD)
2. I wanted singleton behavior so only a single exiftool process was ever running
2. I wanted exiftool processes to stay resident between calls (improved performance)
3. When used as a context manager, I wanted the operations to batch until exiting the context (improved performance)
If these aren't important to you, I highly recommend you use Sven Marnach's excellent
pyexiftool: https://github.com/smarnach/pyexiftool which provides more functionality """
"""
from __future__ import annotations
import atexit
import contextlib
@@ -17,6 +17,7 @@ import pathlib
import re
import shutil
import subprocess
import threading
from abc import ABC, abstractmethod
from functools import lru_cache # pylint: disable=syntax-error
@@ -30,6 +31,8 @@ __all__ = [
"unescape_str",
]
logger = logging.getLogger("osxphotos")
# exiftool -stay_open commands outputs this EOF marker after command is run
EXIFTOOL_STAYOPEN_EOF = "{ready}"
EXIFTOOL_STAYOPEN_EOF_LEN = len(EXIFTOOL_STAYOPEN_EOF)
@@ -42,6 +45,8 @@ EXIFTOOL_FILETYPES_JSON = "exiftool_filetypes.json"
with (pathlib.Path(__file__).parent / EXIFTOOL_FILETYPES_JSON).open("r") as f:
EXIFTOOL_SUPPORTED_FILETYPES = json.load(f)
NUM_PROCESSES = os.cpu_count() or 1
def exiftool_can_write(suffix: str) -> bool:
"""Return True if exiftool supports writing to a file with the given suffix, otherwise False"""
@@ -96,8 +101,11 @@ def get_exiftool_path():
class _ExifToolProc:
"""Runs exiftool in a subprocess via Popen
Creates a singleton object"""
"""
Runs exiftool in a subprocess via Popen
Creates a singleton object that dispatches commands to one or
more exiftool subprocesses.
"""
def __new__(cls, *args, **kwargs):
"""create new object or return instance of already created singleton"""
@@ -106,7 +114,11 @@ class _ExifToolProc:
return cls.instance
def __init__(self, exiftool=None, large_file_support=True):
def __init__(
self,
exiftool: str | None = None,
large_file_support: bool = True,
):
"""construct _ExifToolProc singleton object or return instance of already created object
Args:
@@ -117,7 +129,7 @@ class _ExifToolProc:
if hasattr(self, "_process_running") and self._process_running:
# already running
if exiftool is not None and exiftool != self._exiftool:
logging.warning(
logger.warning(
f"exiftool subprocess already running, "
f"ignoring exiftool={exiftool}"
)
@@ -125,6 +137,9 @@ class _ExifToolProc:
self._process_running = False
self._large_file_support = large_file_support
self._exiftool = exiftool or get_exiftool_path()
self._num_processes = NUM_PROCESSES
self._process = []
self._process_counter = 0
self._start_proc(large_file_support=large_file_support)
@property
@@ -132,23 +147,20 @@ class _ExifToolProc:
"""return the exiftool subprocess"""
if not self._process_running:
self._start_proc(large_file_support=self._large_file_support)
return self._process
@property
def pid(self):
"""return process id (PID) of the exiftool process"""
return self._process.pid
process_idx = self._process_counter % self._num_processes
self._process_counter += 1
return self._process[process_idx]
@property
def exiftool(self):
"""return path to exiftool process"""
return self._exiftool
def _start_proc(self, large_file_support):
def _start_proc(self, large_file_support: bool):
"""start exiftool in batch mode"""
if self._process_running:
logging.warning("exiftool already running: {self._process}")
logger.debug(f"exiftool already running: {self._process}")
return
# open exiftool process
@@ -156,7 +168,9 @@ class _ExifToolProc:
env = os.environ.copy()
env["PATH"] = f'/usr/bin/:{env["PATH"]}'
large_file_args = ["-api", "largefilesupport=1"] if large_file_support else []
self._process = subprocess.Popen(
for _ in range(self._num_processes):
self._process.append(
subprocess.Popen(
[
self._exiftool,
"-stay_open", # keep process open in batch mode
@@ -175,6 +189,7 @@ class _ExifToolProc:
stderr=subprocess.STDOUT,
env=env,
)
)
self._process_running = True
EXIFTOOL_PROCESSES.append(self)
@@ -185,17 +200,19 @@ class _ExifToolProc:
if not self._process_running:
return
for i in range(self._num_processes):
process = self._process[i]
with contextlib.suppress(Exception):
self._process.stdin.write(b"-stay_open\n")
self._process.stdin.write(b"False\n")
self._process.stdin.flush()
process.stdin.write(b"-stay_open\n")
process.stdin.write(b"False\n")
process.stdin.flush()
try:
self._process.communicate(timeout=5)
process.communicate(timeout=5)
except subprocess.TimeoutExpired:
self._process.kill()
self._process.communicate()
process.kill()
process.communicate()
del self._process
self._process = []
self._process_running = False
@@ -233,6 +250,7 @@ class ExifTool:
self._exiftoolproc = _ExifToolProc(
exiftool=exiftool, large_file_support=large_file_support
)
self._lock = threading.Lock()
self._read_exif()
@property
@@ -336,6 +354,7 @@ class ExifTool:
if not commands:
raise TypeError("must provide one or more command to run")
with self._lock:
if self._context_mgr and self.overwrite:
commands = list(commands)
commands.append("-overwrite_original")
@@ -361,15 +380,16 @@ class ExifTool:
)
# send the command
self._process.stdin.write(command_str)
self._process.stdin.flush()
process = self._process
process.stdin.write(command_str)
process.stdin.flush()
# read the output
output = b""
warning = b""
error = b""
while EXIFTOOL_STAYOPEN_EOF not in str(output):
line = self._process.stdout.readline()
line = process.stdout.readline()
if line.startswith(b"Warning"):
warning += line.strip()
elif line.startswith(b"Error"):
@@ -383,11 +403,6 @@ class ExifTool:
return output[:-EXIFTOOL_STAYOPEN_EOF_LEN], warning, error
@property
def pid(self):
"""return process id (PID) of the exiftool process"""
return self._process.pid
@property
def version(self):
"""returns exiftool version"""
@@ -404,7 +419,7 @@ class ExifTool:
"""
json_str, _, _ = self.run_commands("-json")
if not json_str:
return dict()
return {}
json_str = unescape_str(json_str.decode("utf-8"))
try:
@@ -412,8 +427,8 @@ class ExifTool:
except Exception as e:
# will fail with some commands, e.g --ext AVI which produces
# 'No file with specified extension' instead of json
logging.warning(f"error loading json returned by exiftool: {e} {json_str}")
return dict()
logger.warning(f"error loading json returned by exiftool: {e} {json_str}")
return {}
exifdict = exifdict[0]
if not tag_groups:
# strip tag groups
@@ -482,7 +497,12 @@ class _ExifToolCaching(ExifTool):
"""
self._json_cache = None
self._asdict_cache = {}
super().__init__(filepath, exiftool=exiftool, overwrite=False, flags=None)
super().__init__(
filepath,
exiftool=exiftool,
overwrite=False,
flags=None,
)
def run_commands(self, *commands, no_file=False):
if commands[0] not in ["-json", "-ver"]:

View File

@@ -419,22 +419,6 @@ def test_addvalues_unicode():
assert sorted(exif.data["IPTC:Keywords"]) == sorted(["ǂ", "Ƕ"])
def test_singleton():
import osxphotos.exiftool
exif1 = osxphotos.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
exif2 = osxphotos.exiftool.ExifTool(TEST_FILE_MULTI_KEYWORD)
assert exif1._process.pid == exif2._process.pid
def test_pid():
import osxphotos.exiftool
exif1 = osxphotos.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
assert exif1.pid == exif1._process.pid
def test_exiftoolproc_process():
import osxphotos.exiftool