--convert-to-jpeg initial version working

This commit is contained in:
Rhet Turnbull 2020-10-02 06:31:20 -07:00
parent ddc1e69b4a
commit 38f201d0fb
12 changed files with 600 additions and 121 deletions

View File

@ -1907,6 +1907,8 @@ For additional details about how osxphotos is implemented or if you would like t
- [Mako](https://www.makotemplates.org/)
- [bpylist2](https://pypi.org/project/bpylist2/)
- [pathvalidate](https://pypi.org/project/pathvalidate/)
- [wurlitzer](https://pypi.org/project/wurlitzer/)
## Acknowledgements
This project was originally inspired by [photo-export](https://github.com/patrikhson/photo-export) by Patrick Fältström, Copyright (c) 2015 Patrik Fältström paf@frobbit.se

View File

@ -29,7 +29,7 @@ from ._constants import (
_UNKNOWN_PLACE,
UNICODE_FORMAT,
)
from ._export_db import ExportDB, ExportDBInMemory
from .export_db import ExportDB, ExportDBInMemory
from ._version import __version__
from .datetime_formatter import DateTimeFormatter
from .exiftool import get_exiftool_path
@ -1230,6 +1230,12 @@ def query(
"Note: Starting with Photos 5, all photos are renamed upon import. By default, "
"photos are exported with the the original name they had before import.",
)
@click.option(
"--convert-to-jpeg",
is_flag=True,
help="Convert all non-jpeg images (e.g. RAW, HEIC, PNG, etc) "
"to JPEG upon export.",
)
@click.option(
"--sidecar",
default=None,
@ -1349,6 +1355,7 @@ def export(
keyword_template,
description_template,
current_name,
convert_to_jpeg,
sidecar,
only_photos,
only_movies,
@ -1398,7 +1405,7 @@ def export(
"""
global VERBOSE
VERBOSE = True if verbose_ else False
VERBOSE = bool(verbose_)
if not os.path.isdir(dest):
sys.exit(f"DEST {dest} must be valid path")
@ -1424,6 +1431,7 @@ def export(
(any(place), no_place),
(deleted, deleted_only),
(skip_edited, skip_original_if_edited),
(export_as_hardlink, convert_to_jpeg),
]
if any(all(bb) for bb in exclusive):
click.echo("Incompatible export options", err=True)
@ -1491,13 +1499,22 @@ def export(
if dry_run:
export_db = ExportDBInMemory(export_db_path)
# echo = functools.partial(click.echo, err=True)
# fileutil = FileUtilNoOp(verbose=echo)
fileutil = FileUtilNoOp
else:
export_db = ExportDB(export_db_path)
fileutil = FileUtil
if verbose_:
if export_db.was_created:
verbose(f"Created export database {export_db_path}")
else:
verbose(f"Using export database {export_db_path}")
upgraded = export_db.was_upgraded
if upgraded:
verbose(
f"Upgraded export database {export_db_path} from version {upgraded[0]} to {upgraded[1]}"
)
photos = _query(
db=db,
keyword=keyword,
@ -1610,6 +1627,7 @@ def export(
touch_file=touch_file,
edited_suffix=edited_suffix,
use_photos_export=use_photos_export,
convert_to_jpeg=convert_to_jpeg,
)
results_exported.extend(results.exported)
results_new.extend(results.new)
@ -1618,6 +1636,12 @@ def export(
results_exif_updated.extend(results.exif_updated)
results_touched.extend(results.touched)
# if convert_to_jpeg and p.isphoto and p.uti != "public.jpeg":
# for photo_file in set(
# results.exported + results.updated + results.exif_updated
# ):
# verbose(f"Converting {photo_file} to jpeg")
else:
# show progress bar
with click.progressbar(photos) as bar:
@ -1651,6 +1675,7 @@ def export(
touch_file=touch_file,
edited_suffix=edited_suffix,
use_photos_export=use_photos_export,
convert_to_jpeg=convert_to_jpeg,
)
results_exported.extend(results.exported)
results_new.extend(results.new)
@ -1658,6 +1683,7 @@ def export(
results_skipped.extend(results.skipped)
results_exif_updated.extend(results.exif_updated)
results_touched.extend(results.touched)
stop_time = time.perf_counter()
# print summary results
if update:
@ -2140,6 +2166,7 @@ def export_photo(
touch_file=None,
edited_suffix="_edited",
use_photos_export=False,
convert_to_jpeg=False,
):
""" Helper function for export that does the actual export
@ -2171,6 +2198,7 @@ def export_photo(
dry_run: boolean; if True, doesn't actually export or update any files
touch_file: boolean; sets file's modification time to match photo date
use_photos_export: boolean; if True forces the use of AppleScript to export even if photo not missing
convert_to_jpeg: boolean; if True, converts non-jpeg images to jpeg
Returns:
list of path(s) of exported photo or None if photo was missing
@ -2179,7 +2207,7 @@ def export_photo(
ValueError on invalid filename_template
"""
global VERBOSE
VERBOSE = True if verbose_ else False
VERBOSE = bool(verbose_)
if not download_missing:
if photo.ismissing:
@ -2257,6 +2285,7 @@ def export_photo(
fileutil=fileutil,
dry_run=dry_run,
touch_file=touch_file,
convert_to_jpeg=convert_to_jpeg,
)
results_exported.extend(export_results.exported)
@ -2316,6 +2345,7 @@ def export_photo(
fileutil=fileutil,
dry_run=dry_run,
touch_file=touch_file,
convert_to_jpeg=convert_to_jpeg,
)
results_exported.extend(export_results_edited.exported)

View File

@ -1,3 +1,3 @@
""" version info """
__version__ = "0.34.2"
__version__ = "0.34.4"

View File

@ -14,7 +14,7 @@ from sqlite3 import Error
from ._version import __version__
OSXPHOTOS_EXPORTDB_VERSION = "1.0"
OSXPHOTOS_EXPORTDB_VERSION = "2.0"
class ExportDB_ABC(ABC):
@ -36,6 +36,22 @@ class ExportDB_ABC(ABC):
def get_stat_orig_for_file(self, filename):
pass
@abstractmethod
def set_stat_edited_for_file(self, filename, stats):
pass
@abstractmethod
def get_stat_edited_for_file(self, filename):
pass
@abstractmethod
def set_stat_converted_for_file(self, filename, stats):
pass
@abstractmethod
def get_stat_converted_for_file(self, filename):
pass
@abstractmethod
def set_stat_exif_for_file(self, filename, stats):
pass
@ -61,7 +77,17 @@ class ExportDB_ABC(ABC):
pass
@abstractmethod
def set_data(self, filename, uuid, orig_stat, exif_stat, info_json, exif_json):
def set_data(
self,
filename,
uuid,
orig_stat,
exif_stat,
converted_stat,
edited_stat,
info_json,
exif_json,
):
pass
@ -80,6 +106,18 @@ class ExportDBNoOp(ExportDB_ABC):
def get_stat_orig_for_file(self, filename):
pass
def set_stat_edited_for_file(self, filename, stats):
pass
def get_stat_edited_for_file(self, filename):
pass
def set_stat_converted_for_file(self, filename, stats):
pass
def get_stat_converted_for_file(self, filename):
pass
def set_stat_exif_for_file(self, filename, stats):
pass
@ -98,7 +136,17 @@ class ExportDBNoOp(ExportDB_ABC):
def set_exifdata_for_file(self, uuid, exifdata):
pass
def set_data(self, filename, uuid, orig_stat, exif_stat, info_json, exif_json):
def set_data(
self,
filename,
uuid,
orig_stat,
exif_stat,
converted_stat,
edited_stat,
info_json,
exif_json,
):
pass
@ -122,7 +170,6 @@ class ExportDB(ExportDB_ABC):
returns None if filename not found in database
"""
filename = str(pathlib.Path(filename).relative_to(self._path)).lower()
logging.debug(f"get_uuid: {filename}")
conn = self._conn
try:
c = conn.cursor()
@ -135,14 +182,12 @@ class ExportDB(ExportDB_ABC):
logging.warning(e)
uuid = None
logging.debug(f"get_uuid: {uuid}")
return uuid
def set_uuid_for_file(self, filename, uuid):
""" set UUID of filename to uuid in the database """
filename = str(pathlib.Path(filename).relative_to(self._path))
filename_normalized = filename.lower()
logging.debug(f"set_uuid: {filename} {uuid}")
conn = self._conn
try:
c = conn.cursor()
@ -162,7 +207,6 @@ class ExportDB(ExportDB_ABC):
if len(stats) != 3:
raise ValueError(f"expected 3 elements for stat, got {len(stats)}")
logging.debug(f"set_stat_orig_for_file: {filename} {stats}")
conn = self._conn
try:
c = conn.cursor()
@ -199,9 +243,20 @@ class ExportDB(ExportDB_ABC):
logging.warning(e)
stats = (None, None, None)
logging.debug(f"get_stat_orig_for_file: {stats}")
return stats
def set_stat_edited_for_file(self, filename, stats):
""" set stat info for edited version of image (in Photos' library)
filename: filename to set the stat info for
stat: a tuple of length 3: mode, size, mtime """
return self._set_stat_for_file("edited", filename, stats)
def get_stat_edited_for_file(self, filename):
""" get stat info for edited version of image (in Photos' library)
filename: filename to set the stat info for
stat: a tuple of length 3: mode, size, mtime """
return self._get_stat_for_file("edited", filename)
def set_stat_exif_for_file(self, filename, stats):
""" set stat info for filename (after exiftool has updated it)
filename: filename to set the stat info for
@ -210,7 +265,6 @@ class ExportDB(ExportDB_ABC):
if len(stats) != 3:
raise ValueError(f"expected 3 elements for stat, got {len(stats)}")
logging.debug(f"set_stat_exif_for_file: {filename} {stats}")
conn = self._conn
try:
c = conn.cursor()
@ -247,9 +301,20 @@ class ExportDB(ExportDB_ABC):
logging.warning(e)
stats = (None, None, None)
logging.debug(f"get_stat_exif_for_file: {stats}")
return stats
def set_stat_converted_for_file(self, filename, stats):
""" set stat info for filename (after image converted to jpeg)
filename: filename to set the stat info for
stat: a tuple of length 3: mode, size, mtime """
return self._set_stat_for_file("converted", filename, stats)
def get_stat_converted_for_file(self, filename):
""" get stat info for filename (after jpeg conversion)
returns: tuple of (mode, size, mtime)
"""
return self._get_stat_for_file("converted", filename)
def get_info_for_uuid(self, uuid):
""" returns the info JSON struct for a UUID """
conn = self._conn
@ -262,7 +327,6 @@ class ExportDB(ExportDB_ABC):
logging.warning(e)
info = None
logging.debug(f"get_info: {uuid}, {info}")
return info
def set_info_for_uuid(self, uuid, info):
@ -278,8 +342,6 @@ class ExportDB(ExportDB_ABC):
except Error as e:
logging.warning(e)
logging.debug(f"set_info: {uuid}, {info}")
def get_exifdata_for_file(self, filename):
""" returns the exifdata JSON struct for a file """
filename = str(pathlib.Path(filename).relative_to(self._path)).lower()
@ -296,7 +358,6 @@ class ExportDB(ExportDB_ABC):
logging.warning(e)
exifdata = None
logging.debug(f"get_exifdata: {filename}, {exifdata}")
return exifdata
def set_exifdata_for_file(self, filename, exifdata):
@ -313,9 +374,17 @@ class ExportDB(ExportDB_ABC):
except Error as e:
logging.warning(e)
logging.debug(f"set_exifdata: {filename}, {exifdata}")
def set_data(self, filename, uuid, orig_stat, exif_stat, info_json, exif_json):
def set_data(
self,
filename,
uuid,
orig_stat,
exif_stat,
converted_stat,
edited_stat,
info_json,
exif_json,
):
""" sets all the data for file and uuid at once
"""
filename = str(pathlib.Path(filename).relative_to(self._path))
@ -339,6 +408,14 @@ class ExportDB(ExportDB_ABC):
+ "WHERE filepath_normalized = ?;",
(*exif_stat, filename_normalized),
)
c.execute(
"INSERT OR REPLACE INTO converted(filepath_normalized, mode, size, mtime) VALUES (?, ?, ?, ?);",
(filename_normalized, *converted_stat),
)
c.execute(
"INSERT OR REPLACE INTO edited(filepath_normalized, mode, size, mtime) VALUES (?, ?, ?, ?);",
(filename_normalized, *edited_stat),
)
c.execute(
"INSERT OR REPLACE INTO info(uuid, json_info) VALUES (?, ?);",
(uuid, info_json),
@ -358,6 +435,46 @@ class ExportDB(ExportDB_ABC):
except Error as e:
logging.warning(e)
def _set_stat_for_file(self, table, filename, stats):
filename = str(pathlib.Path(filename).relative_to(self._path)).lower()
if len(stats) != 3:
raise ValueError(f"expected 3 elements for stat, got {len(stats)}")
conn = self._conn
try:
c = conn.cursor()
c.execute(
f"UPDATE {table} "
+ "SET converted_mode = ?, converted_size = ?, converted_mtime = ? "
+ "WHERE filepath_normalized = ?;",
(*stats, filename),
)
conn.commit()
except Error as e:
logging.warning(e)
def _get_stat_for_file(self, table, filename):
filename = str(pathlib.Path(filename).relative_to(self._path)).lower()
conn = self._conn
try:
c = conn.cursor()
c.execute(
f"SELECT mode, size, mtime FROM {table} WHERE filepath_normalized = ?",
(filename,),
)
results = c.fetchone()
if results:
stats = results[0:3]
mtime = int(stats[2]) if stats[2] is not None else None
stats = (stats[0], stats[1], mtime)
else:
stats = (None, None, None)
except Error as e:
logging.warning(e)
stats = (None, None, None)
return stats
def _open_export_db(self, dbfile):
""" open export database and return a db connection
if dbfile does not exist, will create and initialize the database
@ -365,15 +482,24 @@ class ExportDB(ExportDB_ABC):
"""
if not os.path.isfile(dbfile):
logging.debug(f"dbfile {dbfile} doesn't exist, creating it")
conn = self._get_db_connection(dbfile)
if conn:
self._create_db_tables(conn)
self.was_created = True
self.was_upgraded = ()
self.version = OSXPHOTOS_EXPORTDB_VERSION
else:
raise Exception("Error getting connection to database {dbfile}")
else:
logging.debug(f"dbfile {dbfile} exists, opening it")
conn = self._get_db_connection(dbfile)
self.was_created = False
version_info = self._get_database_version(conn)
if version_info[1] < OSXPHOTOS_EXPORTDB_VERSION:
self._create_db_tables(conn)
self.was_upgraded = (version_info[1], OSXPHOTOS_EXPORTDB_VERSION)
else:
self.was_upgraded = ()
self.version = OSXPHOTOS_EXPORTDB_VERSION
return conn
@ -387,6 +513,13 @@ class ExportDB(ExportDB_ABC):
return conn
def _get_database_version(self, conn):
""" return tuple of (osxphotos, exportdb) versions for database connection conn """
version_info = conn.execute(
"SELECT osxphotos, exportdb, max(id) FROM version"
).fetchone()
return (version_info[0], version_info[1])
def _create_db_tables(self, conn):
""" create (if not already created) the necessary db tables for the export database
conn: sqlite3 db connection
@ -427,9 +560,25 @@ class ExportDB(ExportDB_ABC):
filepath_normalized TEXT NOT NULL,
json_exifdata JSON
); """,
"sql_files_idx": """ CREATE UNIQUE INDEX idx_files_filepath_normalized on files (filepath_normalized); """,
"sql_info_idx": """ CREATE UNIQUE INDEX idx_info_uuid on info (uuid); """,
"sql_exifdata_idx": """ CREATE UNIQUE INDEX idx_exifdata_filename on exifdata (filepath_normalized); """,
"sql_edited_table": """ CREATE TABLE IF NOT EXISTS edited (
id INTEGER PRIMARY KEY,
filepath_normalized TEXT NOT NULL,
mode INTEGER,
size INTEGER,
mtime REAL
); """,
"sql_converted_table": """ CREATE TABLE IF NOT EXISTS converted (
id INTEGER PRIMARY KEY,
filepath_normalized TEXT NOT NULL,
mode INTEGER,
size INTEGER,
mtime REAL
); """,
"sql_files_idx": """ CREATE UNIQUE INDEX IF NOT EXISTS idx_files_filepath_normalized on files (filepath_normalized); """,
"sql_info_idx": """ CREATE UNIQUE INDEX IF NOT EXISTS idx_info_uuid on info (uuid); """,
"sql_exifdata_idx": """ CREATE UNIQUE INDEX IF NOT EXISTS idx_exifdata_filename on exifdata (filepath_normalized); """,
"sql_edited_idx": """ CREATE UNIQUE INDEX IF NOT EXISTS idx_edited_filename on edited (filepath_normalized);""",
"sql_converted_idx": """ CREATE UNIQUE INDEX IF NOT EXISTS idx_converted_filename on converted (filepath_normalized);""",
}
try:
c = conn.cursor()
@ -445,11 +594,10 @@ class ExportDB(ExportDB_ABC):
def __del__(self):
""" ensure the database connection is closed """
if self._conn:
try:
self._conn.close()
except Error as e:
logging.warning(e)
try:
self._conn.close()
except:
pass
def _insert_run_info(self):
dt = datetime.datetime.utcnow().isoformat()
@ -492,14 +640,15 @@ class ExportDBInMemory(ExportDB):
returns: connection to the database
"""
if not os.path.isfile(dbfile):
logging.debug(f"dbfile {dbfile} doesn't exist, creating in memory version")
conn = self._get_db_connection()
if conn:
self._create_db_tables(conn)
self.was_created = True
self.was_upgraded = ()
self.version = OSXPHOTOS_EXPORTDB_VERSION
else:
raise Exception("Error getting connection to in-memory database")
else:
logging.debug(f"dbfile {dbfile} exists, opening it and copying to memory")
try:
conn = sqlite3.connect(dbfile)
except Error as e:
@ -516,6 +665,14 @@ class ExportDBInMemory(ExportDB):
conn = sqlite3.connect(":memory:")
conn.cursor().executescript(tempfile.read())
conn.commit()
self.was_created = False
_, exportdb_ver = self._get_database_version(conn)
if exportdb_ver < OSXPHOTOS_EXPORTDB_VERSION:
self._create_db_tables(conn)
self.was_upgraded = (exportdb_ver, OSXPHOTOS_EXPORTDB_VERSION)
else:
self.was_upgraded = ()
self.version = OSXPHOTOS_EXPORTDB_VERSION
return conn

View File

@ -8,6 +8,7 @@ import subprocess
import sys
from abc import ABC, abstractmethod
from .imageconverter import ImageConverter
class FileUtilABC(ABC):
""" Abstract base class for FileUtil """
@ -47,6 +48,11 @@ class FileUtilABC(ABC):
def file_sig(cls, file1):
pass
@classmethod
@abstractmethod
def convert_to_jpeg(cls, src_file, dest_file, compression_quality=1.0):
pass
class FileUtilMacOS(FileUtilABC):
""" Various file utilities """
@ -163,6 +169,21 @@ class FileUtilMacOS(FileUtilABC):
def file_sig(cls, f1):
""" return os.stat signature for file f1 """
return cls._sig(os.stat(f1))
@classmethod
def convert_to_jpeg(cls, src_file, dest_file, compression_quality=1.0):
""" converts image file src_file to jpeg format as dest_file
Args:
src_file: image file to convert
dest_file: destination path to write converted file to
compression quality: JPEG compression quality in range 0.0 <= compression_quality <= 1.0; default 1.0
Returns:
True if success, otherwise False
"""
converter = ImageConverter()
return converter.write_jpeg(src_file, dest_file, compression_quality=compression_quality)
@staticmethod
def _sig(st):
@ -173,7 +194,6 @@ class FileUtilMacOS(FileUtilABC):
# use int(st.st_mtime) because ditto does not copy fractional portion of mtime
return (stat.S_IFMT(st.st_mode), st.st_size, int(st.st_mtime))
class FileUtil(FileUtilMacOS):
""" Various file utilities """
@ -221,3 +241,7 @@ class FileUtilNoOp(FileUtil):
def file_sig(cls, file1):
cls.verbose(f"file_sig: {file1}")
return (42, 42, 42)
@classmethod
def convert_to_jpeg(cls, src_file, dest_file, compression_quality=1.0):
cls.verbose(f"convert_to_jpeg: {src_file}, {dest_file}, {compression_quality}")

116
osxphotos/imageconverter.py Normal file
View File

@ -0,0 +1,116 @@
""" ImageConverter class
Convert an image to JPEG using CoreImage --
for example, RAW to JPEG """
# reference: https://stackoverflow.com/questions/59330149/coreimage-ciimage-write-jpg-is-shifting-colors-macos/59334308#59334308
import logging
import pathlib
import Metal
import Quartz
from Cocoa import NSURL
from Foundation import NSDictionary
from py import path
# needed to capture system-level stderr
from wurlitzer import pipes
class ImageConverter:
""" Convert images to jpeg. This class is a singleton
which will re-use the Core Image CIContext to avoid
creating a new context for every conversion. """
def __new__(cls, *args, **kwargs):
""" create new object or return instance of already created singleton """
if not hasattr(cls, "instance") or not cls.instance:
cls.instance = super().__new__(cls)
return cls.instance
def __init__(self):
""" return existing singleton or create a new one """
if hasattr(self, "context"):
return
""" initialize CIContext """
context_options = NSDictionary.dictionaryWithDictionary_(
{
"workingColorSpace": Quartz.CoreGraphics.kCGColorSpaceExtendedSRGB,
"workingFormat": Quartz.kCIFormatRGBAh,
}
)
mtldevice = Metal.MTLCreateSystemDefaultDevice()
self.context = Quartz.CIContext.contextWithMTLDevice_options_(
mtldevice, context_options
)
def write_jpeg(self, input_path, output_path, compression_quality=1.0):
""" convert image to jpeg and write image to output_path
Args:
input_path: path to input image (e.g. '/path/to/import/file.CR2') as str or pathlib.Path
output_path: path to exported jpeg (e.g. '/path/to/export/file.jpeg') as str or pathlib.Path
compression_quality: JPEG compression quality, float in range 0.0 to 1.0; default is 1.0 (best quality)
Return:
True if conversion successful, else False
Raises:
ValueError if compression quality not in range 0.0 to 1.0
FileNotFoundError if input_path doesn't exist
"""
# accept input_path or output_path as pathlib.Path
if not isinstance(input_path, str):
input_path = str(input_path)
if not isinstance(output_path, str):
output_path = str(output_path)
if not pathlib.Path(input_path).is_file():
raise FileNotFoundError(f"could not find {input_path}")
if not (0.0 <= compression_quality <= 1.0):
raise ValueError(
"illegal value for compression_quality: {compression_quality}"
)
input_url = NSURL.fileURLWithPath_(input_path)
output_url = NSURL.fileURLWithPath_(output_path)
with pipes() as (out, err):
# capture stdout and stderr from system calls
# otherwise, Quartz.CIImage.imageWithContentsOfURL_
# prints to stderr something like:
# 2020-09-20 20:55:25.538 python[73042:5650492] Creating client/daemon connection: B8FE995E-3F27-47F4-9FA8-559C615FD774
# 2020-09-20 20:55:25.652 python[73042:5650492] Got the query meta data reply for: com.apple.MobileAsset.RawCamera.Camera, response: 0
input_image = Quartz.CIImage.imageWithContentsOfURL_(input_url)
if input_image is None:
logging.debug(f"Could not create CIImage for {input_path}")
return False
output_colorspace = (
input_image.colorSpace()
if input_image.colorSpace()
else Quartz.CGColorSpaceCreateWithName(
Quartz.CoreGraphics.kCGColorSpaceSRGB
)
)
output_options = NSDictionary.dictionaryWithDictionary_(
{"kCGImageDestinationLossyCompressionQuality": compression_quality}
)
_, error = self.context.writeJPEGRepresentationOfImage_toURL_colorSpace_options_error_(
input_image, output_url, output_colorspace, output_options, None
)
if not error:
return True
else:
logging.debug(
"Error converting file {input_path} to jpeg at {output_path}: {error}"
)
return False

View File

@ -30,7 +30,7 @@ from .._constants import (
_UNKNOWN_PERSON,
_XMP_TEMPLATE_NAME,
)
from .._export_db import ExportDBNoOp
from ..export_db import ExportDBNoOp
from ..exiftool import ExifTool
from ..fileutil import FileUtil
from ..utils import dd_to_dms_str, findfiles
@ -306,6 +306,7 @@ def export2(
fileutil=FileUtil,
dry_run=False,
touch_file=False,
convert_to_jpeg=False,
):
""" export photo, like export but with update and dry_run options
dest: must be valid destination path or exception raised
@ -313,10 +314,8 @@ def export2(
**NOTE**: if provided, user must ensure file extension (suffix) is correct.
For example, if photo is .CR2 file, edited image may be .jpeg.
If you provide an extension different than what the actual file is,
export will print a warning but will export the photo using the
incorrect file extension (unless use_photos_export is true, in which case export will
use the extension provided by Photos upon export; in this case, an incorrect extension is
silently ignored).
will export the photo using the incorrect file extension (unless use_photos_export is true,
in which case export will use the extension provided by Photos upon export.
e.g. to get the extension of the edited photo,
reference PhotoInfo.path_edited
edited: (boolean, default=False); if True will export the edited version of the photo
@ -335,7 +334,6 @@ def export2(
timeout: (int, default=120) timeout in seconds used with use_photos_export
exiftool: (boolean, default = False); if True, will use exiftool to write metadata to export file
no_xattr: (boolean, default = False); if True, exports file without preserving extended attributes
returns list of full paths to the exported files
use_albums_as_keywords: (boolean, default = False); if True, will include album names in keywords
when exporting metadata with exiftool or sidecar
use_persons_as_keywords: (boolean, default = False); if True, will include person names in keywords
@ -349,6 +347,7 @@ def export2(
fileutil: (FileUtilABC); class that conforms to FileUtilABC with various file utilities
dry_run: (boolean, default=False); set to True to run in "dry run" mode
touch_file: (boolean, default=False); if True, sets file's modification time upon photo date
convert_to_jpeg: boolean; if True, converts non-jpeg images to jpeg
Returns: ExportResults namedtuple with fields: exported, new, updated, skipped
where each field is a list of file paths
@ -357,6 +356,10 @@ def export2(
and no-op fileutil (e.g. ExportDBInMemory and FileUtilNoOp)
"""
# NOTE: This function is very complex and does a lot of things.
# Don't modify this code if you don't fully understand everything it does.
# TODO: This is a good candidate for refactoring.
# when called from export(), won't get an export_db, so use no-op version
if export_db is None:
export_db = ExportDBNoOp()
@ -392,34 +395,40 @@ def export2(
raise TypeError(
"Too many positional arguments. Should be at most two: destination, filename."
)
else:
# verify destination is a valid path
if dest is None:
raise ValueError("Destination must not be None")
elif not dry_run and not os.path.isdir(dest):
raise FileNotFoundError("Invalid path passed to export")
if filename and len(filename) == 1:
# if filename passed, use it
fname = filename[0]
else:
# no filename provided so use the default
# if edited file requested, use filename but add _edited
# need to use file extension from edited file as Photos saves a jpeg once edited
if edited and not use_photos_export:
# verify we have a valid path_edited and use that to get filename
if not self.path_edited:
raise FileNotFoundError(
"edited=True but path_edited is none; hasadjustments: "
f" {self.hasadjustments}"
)
edited_name = pathlib.Path(self.path_edited).name
edited_suffix = pathlib.Path(edited_name).suffix
fname = (
pathlib.Path(self.filename).stem + edited_identifier + edited_suffix
# verify destination is a valid path
if dest is None:
raise ValueError("Destination must not be None")
elif not dry_run and not os.path.isdir(dest):
raise FileNotFoundError("Invalid path passed to export")
if filename and len(filename) == 1:
# if filename passed, use it
fname = filename[0]
else:
# no filename provided so use the default
# if edited file requested, use filename but add _edited
# need to use file extension from edited file as Photos saves a jpeg once edited
if edited and not use_photos_export:
# verify we have a valid path_edited and use that to get filename
if not self.path_edited:
raise FileNotFoundError(
"edited=True but path_edited is none; hasadjustments: "
f" {self.hasadjustments}"
)
else:
fname = self.filename
edited_name = pathlib.Path(self.path_edited).name
edited_suffix = pathlib.Path(edited_name).suffix
fname = pathlib.Path(self.filename).stem + edited_identifier + edited_suffix
else:
fname = self.filename
if convert_to_jpeg and self.isphoto and self.uti != "public.jpeg":
# not a jpeg but will convert to jpeg upon export so fix file extension
fname_new = pathlib.Path(fname)
fname = str(fname_new.parent / f"{fname_new.stem}.jpeg")
else:
# nothing to convert
convert_to_jpeg = False
# check destination path
dest = pathlib.Path(dest)
@ -473,16 +482,12 @@ def export2(
raise FileNotFoundError(f"{src} does not appear to exist")
if not _check_export_suffix(src, dest, edited):
logging.warning(
logging.debug(
f"Invalid destination suffix: {dest.suffix} for {self.path}, "
+ f"edited={edited}, path_edited={self.path_edited}, "
+ f"original_filename={self.original_filename}, filename={self.filename}"
)
logging.debug(
f"exporting {src} to {dest}, overwrite={overwrite}, increment={increment}, dest exists: {dest.exists()}"
)
# found source now try to find right destination
if update and dest.exists():
# destination exists, check to see if destination is the right UUID
@ -498,14 +503,13 @@ def export2(
self.uuid,
fileutil.file_sig(dest),
(None, None, None),
(None, None, None),
(None, None, None),
self.json(),
None,
)
if dest_uuid != self.uuid:
# not the right file, find the right one
logging.debug(
f"Need to find right photo: uuid={self.uuid}, dest={dest_uuid}, dest={dest}, path={self.path}"
)
count = 1
glob_str = str(dest.parent / f"{dest.stem} (*{dest.suffix}")
dest_files = glob.glob(glob_str)
@ -513,17 +517,11 @@ def export2(
for file_ in dest_files:
dest_uuid = export_db.get_uuid_for_file(file_)
if dest_uuid == self.uuid:
logging.debug(
f"Found matching file for uuid: {dest_uuid}, {file_}"
)
dest = pathlib.Path(file_)
found_match = True
break
elif dest_uuid is None and fileutil.cmp(src, file_):
# files match, update the UUID
logging.debug(
f"Found matching file with blank uuid: {self.uuid}, {file_}"
)
dest = pathlib.Path(file_)
found_match = True
export_db.set_data(
@ -531,16 +529,14 @@ def export2(
self.uuid,
fileutil.file_sig(dest),
(None, None, None),
(None, None, None),
(None, None, None),
self.json(),
None,
)
break
if not found_match:
logging.debug(
f"Didn't find destination match for uuid {self.uuid} {dest}"
)
# increment the destination file
count = 1
glob_str = str(dest.parent / f"{dest.stem}*")
@ -551,7 +547,6 @@ def export2(
dest_new = f"{dest.stem} ({count})"
count += 1
dest = dest.parent / f"{dest_new}{dest.suffix}"
logging.debug(f"New destination = {dest}, uuid = {self.uuid}")
# export the dest file
results = self._export_photo(
@ -564,7 +559,9 @@ def export2(
export_as_hardlink,
exiftool,
touch_file,
fileutil,
convert_to_jpeg,
fileutil=fileutil,
edited=edited,
)
exported_files = results.exported
update_new_files = results.new
@ -591,7 +588,8 @@ def export2(
export_as_hardlink,
exiftool,
touch_file,
fileutil,
convert_to_jpeg,
fileutil=fileutil,
)
exported_files.extend(results.exported)
update_new_files.extend(results.new)
@ -618,7 +616,8 @@ def export2(
export_as_hardlink,
exiftool,
touch_file,
fileutil,
convert_to_jpeg,
fileutil=fileutil,
)
exported_files.extend(results.exported)
update_new_files.extend(results.new)
@ -684,8 +683,6 @@ def export2(
)
# export metadata
info = export_db.get_info_for_uuid(self.uuid)
if sidecar_json:
logging.debug("writing exiftool_json_sidecar")
sidecar_filename = dest.parent / pathlib.Path(f"{dest.stem}{dest.suffix}.json")
@ -747,7 +744,6 @@ def export2(
if old_data is None or files_are_different:
# didn't have old data, assume we need to write it
# or files were different
logging.debug(f"No exifdata for {exported_file}, writing it")
if not dry_run:
self._write_exif_data(
exported_file,
@ -771,7 +767,6 @@ def export2(
exif_files_updated.append(exported_file)
elif exiftool and exif_files:
for exported_file in exif_files:
logging.debug(f"Writing exif data to {exported_file}")
if not dry_run:
self._write_exif_data(
exported_file,
@ -797,9 +792,9 @@ def export2(
if touch_file:
for exif_file in exif_files_updated:
touched_files.append(exported_file)
touched_files.append(exif_file)
ts = int(self.date.timestamp())
fileutil.utime(exported_file, (ts, ts))
fileutil.utime(exif_file, (ts, ts))
touched_files = list(set(touched_files))
@ -825,7 +820,9 @@ def _export_photo(
export_as_hardlink,
exiftool,
touch_file,
convert_to_jpeg,
fileutil=FileUtil,
edited=False,
):
""" Helper function for export()
Does the actual copy or hardlink taking the appropriate
@ -843,12 +840,18 @@ def _export_photo(
export_as_hardlink: bool
exiftool: bool
touch_file: bool
convert_to_jpeg: bool; if True, convert file to jpeg on export
fileutil: FileUtil class that conforms to fileutil.FileUtilABC
Returns:
ExportResults
Raises:
ValueError if export_as_hardlink and convert_to_jpeg both True
"""
if export_as_hardlink and convert_to_jpeg:
raise ValueError("export_as_hardlink and convert_to_jpeg cannot both be True")
exported_files = []
update_updated_files = []
update_new_files = []
@ -857,40 +860,44 @@ def _export_photo(
dest_str = str(dest)
dest_exists = dest.exists()
if export_as_hardlink:
op_desc = "export_as_hardlink"
else:
op_desc = "export_by_copying"
op_desc = "export_as_hardlink" if export_as_hardlink else "export_by_copying"
if not update:
# not update, export the file
logging.debug(f"Exporting file with {op_desc} {src} {dest}")
exported_files.append(dest_str)
if touch_file:
sig = fileutil.file_sig(src)
sig = (sig[0], sig[1], int(self.date.timestamp()))
if not fileutil.cmp_file_sig(src, sig):
touched_files.append(dest_str)
else: # updating
if not dest_exists:
# update, destination doesn't exist (new file)
logging.debug(f"Update: exporting new file with {op_desc} {src} {dest}")
update_new_files.append(dest_str)
if touch_file:
touched_files.append(dest_str)
else:
if update: # updating
cmp_touch, cmp_orig = False, False
if dest_exists:
# update, destination exists, but we might not need to replace it...
if exiftool:
sig_exif = export_db.get_stat_exif_for_file(dest_str)
cmp_orig = fileutil.cmp_file_sig(dest_str, sig_exif)
sig_exif = (sig_exif[0], sig_exif[1], int(self.date.timestamp()))
cmp_touch = fileutil.cmp_file_sig(dest_str, sig_exif)
elif convert_to_jpeg:
sig_converted = export_db.get_stat_converted_for_file(dest_str)
cmp_orig = fileutil.cmp_file_sig(dest_str, sig_converted)
sig_converted = (
sig_converted[0],
sig_converted[1],
int(self.date.timestamp()),
)
cmp_touch = fileutil.cmp_file_sig(dest_str, sig_converted)
else:
cmp_orig = fileutil.cmp(src, dest)
cmp_touch = fileutil.cmp(src, dest, mtime1=int(self.date.timestamp()))
sig_cmp = cmp_touch if touch_file else cmp_orig
if edited:
# requested edited version of photo
# need to see if edited version in Photos library has changed
# (e.g. it's been edited again)
sig_edited = export_db.get_stat_edited_for_file(dest_str)
cmp_edited = (
fileutil.cmp_file_sig(src, sig_edited)
if sig_edited != (None, None, None)
else False
)
sig_cmp = sig_cmp and cmp_edited
if (export_as_hardlink and dest.samefile(src)) or (
not export_as_hardlink and not dest.samefile(src) and sig_cmp
):
@ -914,7 +921,24 @@ def _export_photo(
if touch_file:
touched_files.append(dest_str)
else:
# update, destination doesn't exist (new file)
logging.debug(f"Update: exporting new file with {op_desc} {src} {dest}")
update_new_files.append(dest_str)
if touch_file:
touched_files.append(dest_str)
else:
# not update, export the file
logging.debug(f"Exporting file with {op_desc} {src} {dest}")
exported_files.append(dest_str)
if touch_file:
sig = fileutil.file_sig(src)
sig = (sig[0], sig[1], int(self.date.timestamp()))
if not fileutil.cmp_file_sig(src, sig):
touched_files.append(dest_str)
if not update_skipped_files:
converted_stat = (None, None, None)
edited_stat = fileutil.file_sig(src) if edited else (None, None, None)
if dest_exists and (update or overwrite):
# need to remove the destination first
logging.debug(
@ -923,6 +947,10 @@ def _export_photo(
fileutil.unlink(dest)
if export_as_hardlink:
fileutil.hardlink(src, dest)
elif convert_to_jpeg:
# use convert_to_jpeg to export the file
fileutil.convert_to_jpeg(src, dest_str)
converted_stat = fileutil.file_sig(dest_str)
else:
fileutil.copy(src, dest_str, norsrc=no_xattr)
@ -931,6 +959,8 @@ def _export_photo(
self.uuid,
fileutil.file_sig(dest_str),
(None, None, None),
converted_stat,
edited_stat,
self.json(),
None,
)
@ -1121,7 +1151,7 @@ def _xmp_sidecar(
use_persons_as_keywords=False,
keyword_template=None,
description_template=None,
extension=None
extension=None,
):
""" returns string for XMP sidecar
use_albums_as_keywords: treat album names as keywords
@ -1134,7 +1164,7 @@ def _xmp_sidecar(
if extension is None:
extension = pathlib.Path(self.original_filename)
extension = extension.suffix[1:] if extension.suffix else None
if description_template is not None:
description = self.render_template(
description_template, expand_inplace=True, inplace_sep=", "

View File

@ -78,6 +78,7 @@ setup(
"bpylist2==3.0.2",
"pathvalidate==2.2.1",
"dataclasses==0.7;python_version<'3.7'",
"wurlitzer>=2.0.1",
],
entry_points={"console_scripts": ["osxphotos=osxphotos.__main__:cli"]},
include_package_data=True,

BIN
tests/export_db_version1.db Normal file

Binary file not shown.

View File

@ -12,7 +12,7 @@ def test_export_db():
""" test ExportDB """
import os
import tempfile
from osxphotos._export_db import ExportDB
from osxphotos.export_db import ExportDB
tempdir = tempfile.TemporaryDirectory(prefix="osxphotos_")
dbname = os.path.join(tempdir.name, ".osxphotos_export.db")

View File

@ -2,6 +2,7 @@
import pytest
TEST_HEIC = "tests/test-images/IMG_3092.heic"
def test_copy_file_valid():
# copy file with valid src, dest
@ -67,3 +68,15 @@ def test_unlink_file():
assert os.path.isfile(dest)
FileUtil.unlink(dest)
assert not os.path.isfile(dest)
def test_convert_to_jpeg():
import pathlib
import tempfile
from osxphotos.fileutil import FileUtil
temp_dir = tempfile.TemporaryDirectory(prefix="osxphotos_")
with temp_dir:
imgfile = pathlib.Path(TEST_HEIC)
outfile = pathlib.Path(temp_dir.name) / f"{imgfile.stem}.jpeg"
assert FileUtil.convert_to_jpeg(imgfile, outfile)
assert outfile.is_file()

View File

@ -0,0 +1,106 @@
""" test ImageConverter """
import pytest
TEST_HEIC = "tests/test-images/IMG_3092.heic"
TEST_RAW = "tests/test-images/IMG_0476_2.CR2"
TEST_JPEG = "tests/test-images/IMG_3984.jpeg"
TEST_IMAGES = [TEST_HEIC, TEST_RAW, TEST_JPEG]
TEST_NOT_AN_IMAGE = "tests/README.md"
TEST_IMAGE_DOES_NOT_EXIST = "tests/test-images/NOT-A-FILE.heic"
def test_image_converter_singleton():
""" test that ImageConverter is a singleton """
from osxphotos.imageconverter import ImageConverter
convert1 = ImageConverter()
convert2 = ImageConverter()
assert convert1 == convert2
def test_image_converter():
""" test conversion of different image types """
import pathlib
import tempfile
from osxphotos.imageconverter import ImageConverter
converter = ImageConverter()
tempdir = tempfile.TemporaryDirectory(prefix="osxphotos_")
with tempdir:
for imgfile in TEST_IMAGES:
imgfile = pathlib.Path(imgfile)
outfile = pathlib.Path(tempdir.name) / f"{imgfile.stem}.jpeg"
outfile2 = pathlib.Path(tempdir.name) / f"{imgfile.stem}_2.jpeg"
# call write_jpeg with both pathlib.Path and str arguments
assert converter.write_jpeg(imgfile, outfile)
assert converter.write_jpeg(str(imgfile), str(outfile2))
assert outfile.is_file()
assert outfile2.is_file()
def test_image_converter_compression_quality():
""" test conversion of different image types with custom compression quality """
import pathlib
import tempfile
from osxphotos.imageconverter import ImageConverter
converter = ImageConverter()
tempdir = tempfile.TemporaryDirectory(prefix="osxphotos_")
with tempdir:
for imgfile in TEST_IMAGES:
imgfile = pathlib.Path(imgfile)
outfile = pathlib.Path(tempdir.name) / f"{imgfile.stem}.jpeg"
# call write_jpeg with both pathlib.Path and str arguments
assert converter.write_jpeg(imgfile, outfile, compression_quality=0.5)
assert outfile.is_file()
def test_image_converter_bad_compression_quality():
""" test illegal compression quality """
import pathlib
import tempfile
from osxphotos.imageconverter import ImageConverter
converter = ImageConverter()
tempdir = tempfile.TemporaryDirectory(prefix="osxphotos_")
with tempdir:
imgfile = pathlib.Path(TEST_HEIC)
outfile = pathlib.Path(tempdir.name) / f"{imgfile.stem}.jpeg"
with pytest.raises(ValueError):
converter.write_jpeg(imgfile, outfile, compression_quality=2.0)
with pytest.raises(ValueError):
converter.write_jpeg(imgfile, outfile, compression_quality=-1.0)
def test_image_converter_bad_file():
""" Try to convert a file that's not an image """
import pathlib
import tempfile
from osxphotos.imageconverter import ImageConverter
converter = ImageConverter()
tempdir = tempfile.TemporaryDirectory(prefix="osxphotos_")
with tempdir:
imgfile = pathlib.Path(TEST_NOT_AN_IMAGE)
outfile = pathlib.Path(tempdir.name) / f"{imgfile.stem}.jpeg"
assert not converter.write_jpeg(imgfile, outfile)
def test_image_converter_missing_file():
""" Try to convert a file that's not an image """
import pathlib
import tempfile
from osxphotos.imageconverter import ImageConverter
converter = ImageConverter()
tempdir = tempfile.TemporaryDirectory(prefix="osxphotos_")
with tempdir:
imgfile = pathlib.Path(TEST_IMAGE_DOES_NOT_EXIST)
outfile = pathlib.Path(tempdir.name) / f"{imgfile.stem}.jpeg"
with pytest.raises(FileNotFoundError):
converter.write_jpeg(imgfile, outfile)