From b1171e96cc06362555725995bb311317eb163e49 Mon Sep 17 00:00:00 2001 From: Rhet Turnbull Date: Sat, 23 May 2020 09:34:04 -0700 Subject: [PATCH] Added --update to CLI export; reference issue #100 --- README.md | 39 +- osxphotos/__main__.py | 218 ++++- osxphotos/_export_db.py | 439 ++++++++++ osxphotos/_filecmp.py | 54 ++ osxphotos/photoinfo/__init__.py | 3 +- osxphotos/photoinfo/_photoinfo_export.py | 999 +++++++++++++++++++++++ osxphotos/photoinfo/photoinfo.py | 597 +------------- osxphotos/utils.py | 33 +- tests/conftest.py | 9 + tests/test_cli.py | 303 ++++++- 10 files changed, 2072 insertions(+), 622 deletions(-) create mode 100644 osxphotos/_export_db.py create mode 100644 osxphotos/_filecmp.py create mode 100644 osxphotos/photoinfo/_photoinfo_export.py create mode 100644 tests/conftest.py diff --git a/README.md b/README.md index 66f7dc46..162cb752 100644 --- a/README.md +++ b/README.md @@ -201,7 +201,11 @@ Options: Search by end item date, e.g. 2000-01-12T12:00:00 or 2000-12-31 (ISO 8601 w/o TZ). + --update Only export new or updated files. See notes + below on export and --update. --export-as-hardlink Hardlink files instead of copying them. + Cannot be used with --exiftool which creates + copies of the files with embedded EXIF data. --overwrite Overwrite existing files. Default behavior is to add (1), (2), etc to filename if file already exists. Use this with caution as it @@ -270,7 +274,8 @@ Options: exported photos. To use this option, exiftool must be installed and in the path. exiftool may be installed from - https://exiftool.org/ + https://exiftool.org/. Cannot be used with + --export-as-hardlink. --directory DIRECTORY Optional template for specifying name of output directory in the form '{name,DEFAULT}'. See below for additional @@ -282,7 +287,35 @@ Options: get an error while exporting. -h, --help Show this message and exit. -**Templating System** +** Export ** +When exporting photos, osxphotos creates a database in the top-level export +folder called '.osxphotos_export.db'. This database preserves state +information used for determining which files need to be updated when run with +--update. It is recommended that if you later move the export folder tree you +also move the database file. + +The --update option will only copy new or updated files from the library to +the export folder. If a file is changed in the export folder (for example, +you edited the exported image), osxphotos will detect this as a difference and +re-export the original image from the library thus overwriting the changes. +If using --update, the exported library should be treated as a backup, not a +working copy where you intend to make changes. + +Note: The number of files reported for export and the number actually exported +may differ due to live photos, associated RAW images, and edited photos which +are reported in the total photos exported. + +Implementation note: To determine which files need to be updated, osxphotos +stores file signature information in the '.osxphotos_export.db' database. The +signature includes size, modification time, and filename. In order to +minimize run time, --update does not do a full comparison (diff) of the files +nor does it compare hashes of the files. In normal usage, this is sufficient +for updating the library. You can always run export without the --update +option to re-export the entire library thus rebuilding the +'.osxphotos_export.db' database. + + +** Templating System ** With the --directory option you may specify a template for the export directory. This directory will be appended to the export path specified in @@ -1098,7 +1131,7 @@ Export photo from the Photos library to another destination on disk. - use_albums_as_keywords: (boolean, default = False); if True, will use album names as keywords when exporting metadata with exiftool or sidecar - use_persons_as_keywords: (boolean, default = False); if True, will use person names as keywords when exporting metadata with exiftool or sidecar -Returns: list of paths to exported files. More than one file could be exported, for example if live_photo=True, both the original imaage and the associated .mov file will be exported +Returns: list of paths to exported files. More than one file could be exported, for example if live_photo=True, both the original image and the associated .mov file will be exported The json sidecar file can be used by exiftool to apply the metadata from the json file to the image. For example: diff --git a/osxphotos/__main__.py b/osxphotos/__main__.py index 4a9d2530..25ccc2ec 100644 --- a/osxphotos/__main__.py +++ b/osxphotos/__main__.py @@ -6,6 +6,7 @@ import os import os.path import pathlib import sys +import time import click import yaml @@ -21,11 +22,25 @@ import osxphotos from ._constants import _EXIF_TOOL_URL, _PHOTOS_4_VERSION, _UNKNOWN_PLACE from ._version import __version__ from .exiftool import get_exiftool_path +from .photoinfo import ExportResults from .photoinfo.template import ( TEMPLATE_SUBSTITUTIONS, TEMPLATE_SUBSTITUTIONS_MULTI_VALUED, ) from .utils import _copy_file, create_path_by_date +from ._export_db import ExportDB + +# global variable to control verbose output +# set via --verbose/-V +VERBOSE = False + +# name of export DB +OSXPHOTOS_EXPORT_DB = ".osxphotos_export.db" + + +def verbose(*args, **kwargs): + if VERBOSE: + click.echo(*args, **kwargs) def get_photos_db(*db_options): @@ -77,9 +92,41 @@ class ExportCommand(click.Command): help_text = super().get_help(ctx) formatter = click.HelpFormatter() - formatter.write("\n\n") # passed to click.HelpFormatter.write_dl for formatting - formatter.write_text("**Templating System**") + + formatter.write("\n\n") + formatter.write_text("** Export **") + formatter.write_text( + "When exporting photos, osxphotos creates a database in the top-level " + + f"export folder called '{OSXPHOTOS_EXPORT_DB}'. This database preserves state information " + + "used for determining which files need to be updated when run with --update. It is recommended " + + "that if you later move the export folder tree you also move the database file." + ) + formatter.write("\n") + formatter.write_text( + "The --update option will only copy new or updated files from the library " + + "to the export folder. If a file is changed in the export folder (for example, you edited the " + + "exported image), osxphotos will detect this as a difference and re-export the original image " + + "from the library thus overwriting the changes. If using --update, the exported library " + + "should be treated as a backup, not a working copy where you intend to make changes. " + ) + formatter.write("\n") + formatter.write_text("Note: The number of files reported for export and the number actually exported " + +"may differ due to live photos, associated RAW images, and edited photos which are reported " + +"in the total photos exported.") + formatter.write("\n") + formatter.write_text( + "Implementation note: To determine which files need to be updated, " + + f"osxphotos stores file signature information in the '{OSXPHOTOS_EXPORT_DB}' database. " + + "The signature includes size, modification time, and filename. In order to minimize " + + "run time, --update does not do a full comparison (diff) of the files nor does it compare " + + "hashes of the files. In normal usage, this is sufficient for updating the library. " + + "You can always run export without the --update option to re-export the entire library thus " + + f"rebuilding the '{OSXPHOTOS_EXPORT_DB}' database." + ) + + formatter.write("\n\n") + formatter.write_text("** Templating System **") formatter.write("\n") formatter.write_text( "With the --directory option you may specify a template for the " @@ -862,8 +909,13 @@ def query( @cli.command(cls=ExportCommand) @DB_OPTION -@click.option("--verbose", "-V", is_flag=True, help="Print verbose output.") +@click.option("--verbose", "-V", "verbose_", is_flag=True, help="Print verbose output.") @query_options +@click.option( + "--update", + is_flag=True, + help="Only export new or updated files. See notes below on export and --update.", +) @click.option( "--export-as-hardlink", is_flag=True, @@ -1014,7 +1066,8 @@ def export( not_shared, from_date, to_date, - verbose, + verbose_, + update, export_as_hardlink, overwrite, export_by_date, @@ -1068,6 +1121,9 @@ def export( to modify this behavior. """ + global VERBOSE + VERBOSE = True if verbose_ else False + if not os.path.isdir(dest): sys.exit("DEST must be valid path") @@ -1092,7 +1148,7 @@ def export( (any(place), no_place), ] if any([all(bb) for bb in exclusive]): - click.echo("Incompatible export options",err=True) + click.echo("Incompatible export options", err=True) click.echo(cli.commands["export"].get_help(ctx), err=True) return @@ -1134,6 +1190,9 @@ def export( _list_libraries() return + # open export database + export_db = ExportDB(os.path.join(dest, OSXPHOTOS_EXPORT_DB)) + photos = _query( db=db, keyword=keyword, @@ -1188,6 +1247,11 @@ def export( no_place=no_place, ) + results_exported = [] + results_new = [] + results_updated = [] + results_skipped = [] + results_exif_updated = [] if photos: if export_bursts: # add the burst_photos to the export set @@ -1199,16 +1263,18 @@ def export( num_photos = len(photos) photo_str = "photos" if num_photos > 1 else "photo" click.echo(f"Exporting {num_photos} {photo_str} to {dest}...") - if not verbose: + start_time = time.perf_counter() + if not verbose_: # show progress bar with click.progressbar(photos) as bar: for p in bar: - export_photo( + results = export_photo( p, dest, - verbose, + verbose_, export_by_date, sidecar, + update, export_as_hardlink, overwrite, export_edited, @@ -1222,15 +1288,22 @@ def export( album_keyword, person_keyword, keyword_template, + export_db, ) + results_exported.extend(results.exported) + results_new.extend(results.new) + results_updated.extend(results.updated) + results_skipped.extend(results.skipped) + results_exif_updated.extend(results.exif_updated) else: for p in photos: - export_paths = export_photo( + results = export_photo( p, dest, - verbose, + verbose_, export_by_date, sidecar, + update, export_as_hardlink, overwrite, export_edited, @@ -1244,14 +1317,40 @@ def export( album_keyword, person_keyword, keyword_template, + export_db, ) - if export_paths: - click.echo(f"Exported {p.filename} to {export_paths}") - else: - click.echo(f"Did not export missing file {p.filename}") + results_exported.extend(results.exported) + results_new.extend(results.new) + results_updated.extend(results.updated) + results_skipped.extend(results.skipped) + results_exif_updated.extend(results.exif_updated) + + stop_time = time.perf_counter() + # print summary results + if not update: + photo_str = "photos" if len(results_exported) != 1 else "photo" + click.echo(f"Exported: {len(results_exported)} {photo_str}") + click.echo(f"Elapsed time: {stop_time-start_time} seconds") + else: + photo_str_new = "photos" if len(results_new) != 1 else "photo" + photo_str_updated = "photos" if len(results_new) != 1 else "photo" + photo_str_skipped = "photos" if len(results_skipped) != 1 else "photo" + photo_str_exif_updated = ( + "photos" if len(results_exif_updated) != 1 else "photo" + ) + click.echo( + f"Exported: {len(results_new)} {photo_str_new}, " + + f"updated: {len(results_updated)} {photo_str_updated}, " + + f"skipped: {len(results_skipped)} {photo_str_skipped}, " + + f"updated EXIF data: {len(results_exif_updated)} {photo_str_exif_updated}" + ) + click.echo(f"Elapsed time: {stop_time-start_time} seconds") + else: click.echo("Did not find any photos to export") + export_db.close() + @cli.command() @click.argument("topic", default=None, required=False, nargs=1) @@ -1618,9 +1717,10 @@ def _query( def export_photo( photo, dest, - verbose, + verbose_, export_by_date, sidecar, + update, export_as_hardlink, overwrite, export_edited, @@ -1634,11 +1734,12 @@ def export_photo( album_keyword, person_keyword, keyword_template, + export_db, ): """ Helper function for export that does the actual export photo: PhotoInfo object dest: destination path as string - verbose: boolean; print verbose output + verbose_: boolean; print verbose output export_by_date: boolean; create export folder in form dest/YYYY/MM/DD sidecar: list zero, 1 or 2 of ["json","xmp"] of sidecar variety to export export_as_hardlink: boolean; hardlink files instead of copying them @@ -1656,6 +1757,8 @@ def export_photo( keyword_template: list of strings; if provided use rendered template strings as keywords returns list of path(s) of exported photo or None if photo was missing """ + global VERBOSE + VERBOSE = True if verbose_ else False # Can export to multiple paths # Start with single path [dest] but direcotry and export_by_date will modify dest_paths @@ -1663,21 +1766,21 @@ def export_photo( if not download_missing: if photo.ismissing: - space = " " if not verbose else "" - click.echo(f"{space}Skipping missing photo {photo.filename}") - return None + space = " " if not verbose_ else "" + verbose(f"{space}Skipping missing photo {photo.filename}") + return ExportResults([], [], [], [], []) elif not os.path.exists(photo.path): - space = " " if not verbose else "" - click.echo( + space = " " if not verbose_ else "" + verbose( f"{space}WARNING: file {photo.path} is missing but ismissing=False, " f"skipping {photo.filename}" ) - return None + return ExportResults([], [], [], [], []) elif photo.ismissing and not photo.iscloudasset or not photo.incloud: - click.echo( + verbose( f"Skipping missing {photo.filename}: not iCloud asset or missing from cloud" ) - return None + return ExportResults([], [], [], [], []) filename = None if original_name: @@ -1685,8 +1788,7 @@ def export_photo( else: filename = photo.filename - if verbose: - click.echo(f"Exporting {photo.filename} as {filename}") + verbose(f"Exporting {photo.filename} as {filename}") if export_by_date: date_created = photo.date.timetuple() @@ -1724,9 +1826,13 @@ def export_photo( ) # export the photo to each path in dest_paths - photo_paths = [] + results_exported = [] + results_new = [] + results_updated = [] + results_skipped = [] + results_exif_updated = [] for dest_path in dest_paths: - photo_path = photo.export( + export_results = photo.export2( dest_path, filename, sidecar_json=sidecar_json, @@ -1741,8 +1847,25 @@ def export_photo( use_albums_as_keywords=album_keyword, use_persons_as_keywords=person_keyword, keyword_template=keyword_template, - )[0] - photo_paths.append(photo_path) + update=update, + export_db=export_db, + ) + + results_exported.extend(export_results.exported) + results_new.extend(export_results.new) + results_updated.extend(export_results.updated) + results_skipped.extend(export_results.skipped) + results_exif_updated.extend(export_results.exif_updated) + + if verbose_: + for exported in export_results.exported: + verbose(f"Exported {exported}") + for new in export_results.new: + verbose(f"Exported new file {new}") + for updated in export_results.updated: + verbose(f"Exported updated file {updated}") + for skipped in export_results.skipped: + verbose(f"Skipped up to date file {skipped}") # if export-edited, also export the edited version # verify the photo has adjustments and valid path to avoid raising an exception @@ -1751,7 +1874,7 @@ def export_photo( # try to download with Photos use_photos_export = download_missing and photo.path_edited is None if not download_missing and photo.path_edited is None: - click.echo(f"Skipping missing edited photo for {filename}") + verbose(f"Skipping missing edited photo for {filename}") else: edited_name = pathlib.Path(filename) # check for correct edited suffix @@ -1762,11 +1885,8 @@ def export_photo( # will be corrected by use_photos_export edited_suffix = pathlib.Path(photo.filename).suffix edited_name = f"{edited_name.stem}_edited{edited_suffix}" - if verbose: - click.echo( - f"Exporting edited version of {filename} as {edited_name}" - ) - photo.export( + verbose(f"Exporting edited version of {filename} as {edited_name}") + export_results_edited = photo.export2( dest_path, edited_name, sidecar_json=sidecar_json, @@ -1780,9 +1900,33 @@ def export_photo( use_albums_as_keywords=album_keyword, use_persons_as_keywords=person_keyword, keyword_template=keyword_template, + update=update, + export_db=export_db, ) - return photo_paths + results_exported.extend(export_results_edited.exported) + results_new.extend(export_results_edited.new) + results_updated.extend(export_results_edited.updated) + results_skipped.extend(export_results_edited.skipped) + results_exif_updated.extend(export_results_edited.exif_updated) + + if verbose_: + for exported in export_results_edited.exported: + verbose(f"Exported {exported}") + for new in export_results_edited.new: + verbose(f"Exported new file {new}") + for updated in export_results_edited.updated: + verbose(f"Exported updated file {updated}") + for skipped in export_results_edited.skipped: + verbose(f"Skipped up to date file {skipped}") + + return ExportResults( + results_exported, + results_new, + results_updated, + results_skipped, + results_exif_updated, + ) if __name__ == "__main__": diff --git a/osxphotos/_export_db.py b/osxphotos/_export_db.py new file mode 100644 index 00000000..8a587ea2 --- /dev/null +++ b/osxphotos/_export_db.py @@ -0,0 +1,439 @@ +""" Helper class for managing a database used by + PhotoInfo.export for tracking state of exports and updates +""" + +import datetime +import logging +import os +import pathlib +import sqlite3 +import sys +from abc import ABC, abstractmethod +from sqlite3 import Error + +from ._version import __version__ + +OSXPHOTOS_EXPORTDB_VERSION = "1.0" + + +class ExportDB_ABC(ABC): + @abstractmethod + def get_uuid_for_file(self, filename): + pass + + @abstractmethod + def set_uuid_for_file(self, filename, uuid): + pass + + @abstractmethod + def set_stat_orig_for_file(self, filename, stats): + pass + + @abstractmethod + def get_stat_orig_for_file(self, filename): + pass + + @abstractmethod + def set_stat_exif_for_file(self, filename, stats): + pass + + @abstractmethod + def get_stat_exif_for_file(self, filename): + pass + + @abstractmethod + def get_info_for_uuid(self, uuid): + pass + + @abstractmethod + def set_info_for_uuid(self, uuid, info): + pass + + @abstractmethod + def get_exifdata_for_file(self, uuid): + pass + + @abstractmethod + def set_exifdata_for_file(self, uuid, exifdata): + pass + + @abstractmethod + def set_data(self, file, uuid, orig_stat, exif_stat, info_json, exif_json): + pass + + +class ExportDBNoOp(ExportDB_ABC): + """ An ExportDB with NoOp methods """ + + def get_uuid_for_file(self, filename): + pass + + def set_uuid_for_file(self, filename, uuid): + pass + + def set_stat_orig_for_file(self, filename, stats): + pass + + def get_stat_orig_for_file(self, filename): + pass + + def set_stat_exif_for_file(self, filename, stats): + pass + + def get_stat_exif_for_file(self, filename): + pass + + def get_info_for_uuid(self, uuid): + pass + + def set_info_for_uuid(self, uuid, info): + pass + + def get_exifdata_for_file(self, uuid): + pass + + def set_exifdata_for_file(self, uuid, exifdata): + pass + + def set_data(self, file, uuid, orig_stat, exif_stat, info_json, exif_json): + pass + + +class ExportDB(ExportDB_ABC): + """ Interface to sqlite3 database used to store state information for osxphotos export command """ + + def __init__(self, dbfile): + """ dbfile: path to osxphotos export database file """ + self._dbfile = dbfile + # _path is parent of the database + # all files referenced by get_/set_uuid_for_file will be converted to + # relative paths to this parent _path + # this allows the entire export tree to be moved to a new disk/location + # whilst preserving the UUID to filename mappping + self._path = pathlib.Path(dbfile).parent + self._conn = self._open_export_db(dbfile) + self._insert_run_info() + + def get_uuid_for_file(self, filename): + """ query database for filename and return UUID + returns None if filename not found in database + """ + filename = str(pathlib.Path(filename).relative_to(self._path)).lower() + logging.debug(f"get_uuid: {filename}") + conn = self._conn + try: + c = conn.cursor() + c.execute( + f"SELECT uuid FROM files WHERE filepath_normalized = ?", (filename,) + ) + results = c.fetchone() + uuid = results[0] if results else None + except Error as e: + logging.warning(e) + uuid = None + + logging.debug(f"get_uuid: {uuid}") + return uuid + + def set_uuid_for_file(self, filename, uuid): + """ set UUID of filename to uuid in the database """ + filename = str(pathlib.Path(filename).relative_to(self._path)) + filename_normalized = filename.lower() + logging.debug(f"set_uuid: {filename} {uuid}") + conn = self._conn + try: + c = conn.cursor() + c.execute( + f"INSERT OR REPLACE INTO files(filepath, filepath_normalized, uuid) VALUES (?, ?, ?);", + (filename, filename_normalized, uuid), + ) + conn.commit() + except Error as e: + logging.warning(e) + + def set_stat_orig_for_file(self, filename, stats): + """ set stat info for filename + filename: filename to set the stat info for + stat: a tuple of length 3: mode, size, mtime """ + filename = str(pathlib.Path(filename).relative_to(self._path)).lower() + if len(stats) != 3: + raise ValueError(f"expected 3 elements for stat, got {len(stats)}") + + logging.debug(f"set_stat_orig_for_file: {filename} {stats}") + conn = self._conn + try: + c = conn.cursor() + c.execute( + "UPDATE files " + + "SET orig_mode = ?, orig_size = ?, orig_mtime = ? " + + "WHERE filepath_normalized = ?;", + (*stats, filename), + ) + conn.commit() + except Error as e: + logging.warning(e) + + def get_stat_orig_for_file(self, filename): + """ get stat info for filename + returns: tuple of (mode, size, mtime) + """ + filename = str(pathlib.Path(filename).relative_to(self._path)).lower() + conn = self._conn + try: + c = conn.cursor() + c.execute( + "SELECT orig_mode, orig_size, orig_mtime FROM files WHERE filepath_normalized = ?", + (filename,), + ) + results = c.fetchone() + stats = results[0:3] if results else None + except Error as e: + logging.warning(e) + stats = (None, None, None) + + logging.debug(f"get_stat_orig_for_file: {stats}") + return stats + + def set_stat_exif_for_file(self, filename, stats): + """ set stat info for filename (after exiftool has updated it) + filename: filename to set the stat info for + stat: a tuple of length 3: mode, size, mtime """ + filename = str(pathlib.Path(filename).relative_to(self._path)).lower() + if len(stats) != 3: + raise ValueError(f"expected 3 elements for stat, got {len(stats)}") + + logging.debug(f"set_stat_exif_for_file: {filename} {stats}") + conn = self._conn + try: + c = conn.cursor() + c.execute( + "UPDATE files " + + "SET exif_mode = ?, exif_size = ?, exif_mtime = ? " + + "WHERE filepath_normalized = ?;", + (*stats, filename), + ) + conn.commit() + except Error as e: + logging.warning(e) + + def get_stat_exif_for_file(self, filename): + """ get stat info for filename (after exiftool has updated it) + returns: tuple of (mode, size, mtime) + """ + filename = str(pathlib.Path(filename).relative_to(self._path)).lower() + conn = self._conn + try: + c = conn.cursor() + c.execute( + "SELECT exif_mode, exif_size, exif_mtime FROM files WHERE filepath_normalized = ?", + (filename,), + ) + results = c.fetchone() + stats = results[0:3] if results else None + except Error as e: + logging.warning(e) + stats = (None, None, None) + + logging.debug(f"get_stat_exif_for_file: {stats}") + return stats + + def get_info_for_uuid(self, uuid): + """ returns the info JSON struct for a UUID """ + conn = self._conn + try: + c = conn.cursor() + c.execute("SELECT json_info FROM info WHERE uuid = ?", (uuid,)) + results = c.fetchone() + info = results[0] if results else None + except Error as e: + logging.warning(e) + info = None + + logging.debug(f"get_info: {uuid}, {info}") + return info + + def set_info_for_uuid(self, uuid, info): + """ sets the info JSON struct for a UUID """ + conn = self._conn + try: + c = conn.cursor() + c.execute( + "INSERT OR REPLACE INTO info(uuid, json_info) VALUES (?, ?);", + (uuid, info), + ) + conn.commit() + except Error as e: + logging.warning(e) + + logging.debug(f"set_info: {uuid}, {info}") + + def get_exifdata_for_file(self, filename): + """ returns the exifdata JSON struct for a file """ + filename = str(pathlib.Path(filename).relative_to(self._path)).lower() + conn = self._conn + try: + c = conn.cursor() + c.execute( + "SELECT json_exifdata FROM exifdata WHERE filepath_normalized = ?", + (filename,), + ) + results = c.fetchone() + exifdata = results[0] if results else None + except Error as e: + logging.warning(e) + exifdata = None + + logging.debug(f"get_exifdata: {filename}, {exifdata}") + return exifdata + + def set_exifdata_for_file(self, filename, exifdata): + """ sets the exifdata JSON struct for a file """ + filename = str(pathlib.Path(filename).relative_to(self._path)).lower() + conn = self._conn + try: + c = conn.cursor() + c.execute( + "INSERT OR REPLACE INTO exifdata(filepath_normalized, json_exifdata) VALUES (?, ?);", + (filename, exifdata), + ) + conn.commit() + except Error as e: + logging.warning(e) + + logging.debug(f"set_exifdata: {filename}, {exifdata}") + + def set_data(self, file, uuid, orig_stat, exif_stat, info_json, exif_json): + """ sets all the data for file and uuid at once + calls set_uuid_for_file + set_info_for_uuid + set_stat_orig_for_file + set_stat_exif_for_file + set_exifdata_for_file + """ + filename = str(pathlib.Path(filename).relative_to(self._path)).lower() + + self.set_uuid_for_file(filename, uuid) + self.set_info_for_uuid(uuid, info_json) + self.set_stat_orig_for_file(filename, orig_stat) + self.set_stat_exif_for_file(filename, exif_stat) + self.set_exifdata_for_file(filename, exif_json) + + def close(self): + """ close the database connection """ + try: + self._conn.close() + except Error as e: + logging.warning(e) + + def _open_export_db(self, dbfile): + """ open export database and return a db connection + if dbfile does not exist, will create and initialize the database + returns: connection to the database + """ + + if not os.path.isfile(dbfile): + logging.debug(f"dbfile {dbfile} doesn't exist, creating it") + conn = self._get_db_connection(dbfile) + if conn: + self._create_db_tables(conn) + else: + raise Exception("Error getting connection to database {dbfile}") + else: + logging.debug(f"dbfile {dbfile} exists, opening it") + conn = self._get_db_connection(dbfile) + + return conn + + def _get_db_connection(self, dbfile): + """ return db connection to dbname """ + try: + conn = sqlite3.connect(dbfile) + except Error as e: + logging.warning(e) + conn = None + + return conn + + def _create_db_tables(self, conn): + """ create (if not already created) the necessary db tables for the export database + conn: sqlite3 db connection + """ + sql_commands = { + "sql_version_table": """ CREATE TABLE IF NOT EXISTS version ( + id INTEGER PRIMARY KEY, + osxphotos TEXT, + exportdb TEXT + ); """, + "sql_files_table": """ CREATE TABLE IF NOT EXISTS files ( + id INTEGER PRIMARY KEY, + filepath TEXT NOT NULL, + filepath_normalized TEXT NOT NULL, + uuid TEXT, + orig_mode INTEGER, + orig_size INTEGER, + orig_mtime REAL, + exif_mode INTEGER, + exif_size INTEGER, + exif_mtime REAL + ); """, + "sql_runs_table": """ CREATE TABLE IF NOT EXISTS runs ( + id INTEGER PRIMARY KEY, + datetime TEXT, + python_path TEXT, + script_name TEXT, + args TEXT, + cwd TEXT + ); """, + "sql_info_table": """ CREATE TABLE IF NOT EXISTS info ( + id INTEGER PRIMARY KEY, + uuid text NOT NULL, + json_info JSON + ); """, + "sql_exifdata_table": """ CREATE TABLE IF NOT EXISTS exifdata ( + id INTEGER PRIMARY KEY, + filepath_normalized TEXT NOT NULL, + json_exifdata JSON + ); """, + "sql_files_idx": """ CREATE UNIQUE INDEX idx_files_filepath_normalized on files (filepath_normalized); """, + "sql_info_idx": """ CREATE UNIQUE INDEX idx_info_uuid on info (uuid); """, + "sql_exifdata_idx": """ CREATE UNIQUE INDEX idx_exifdata_filename on exifdata (filepath_normalized); """, + } + try: + c = conn.cursor() + for cmd in sql_commands.values(): + c.execute(cmd) + c.execute( + "INSERT INTO version(osxphotos, exportdb) VALUES (?, ?);", + (__version__, OSXPHOTOS_EXPORTDB_VERSION), + ) + conn.commit() + except Error as e: + logging.warning(e) + + def __del__(self): + """ ensure the database connection is closed """ + if self._conn: + try: + self._conn.close() + except Error as e: + logging.warning(e) + + def _insert_run_info(self): + dt = datetime.datetime.utcnow().isoformat() + python_path = sys.executable + cmd = sys.argv[0] + if len(sys.argv) > 1: + args = " ".join(sys.argv[1:]) + else: + args = "" + cwd = os.getcwd() + conn = self._conn + try: + c = conn.cursor() + c.execute( + f"INSERT INTO runs (datetime, python_path, script_name, args, cwd) VALUES (?, ?, ?, ?, ?)", + (dt, python_path, cmd, args, cwd), + ) + conn.commit() + except Error as e: + logging.warning(e) diff --git a/osxphotos/_filecmp.py b/osxphotos/_filecmp.py new file mode 100644 index 00000000..03e8f24b --- /dev/null +++ b/osxphotos/_filecmp.py @@ -0,0 +1,54 @@ +"""Utilities for comparing files + +Modified from CPython/Lib/filecmp.py + +Functions: + cmp_file(f1, s2) -> int + file_sig(f1) -> Tuple[int, int, float] + +""" + +import os +import stat + +__all__ = ["cmp", "sig"] + + +def cmp_file(f1, s2): + """Compare file f1 to signature s2. + + Arguments: + + f1 -- File name + + s2 -- stats as returned by sig + + Return value: + + True if the files are the same, False otherwise. + + This function uses a cache for past comparisons and the results, + with cache entries invalidated if their stat information + changes. The cache may be cleared by calling clear_cache(). + + """ + + if not s2: + return False + + s1 = _sig(os.stat(f1)) + + if s1[0] != stat.S_IFREG or s2[0] != stat.S_IFREG: + return False + if s1 == s2: + return True + return False + + +def _sig(st): + return (stat.S_IFMT(st.st_mode), st.st_size, st.st_mtime) + + +def file_sig(f1): + """ return os.stat signature for file f1 """ + return _sig(os.stat(f1)) diff --git a/osxphotos/photoinfo/__init__.py b/osxphotos/photoinfo/__init__.py index d3f62fdb..73b31bc1 100644 --- a/osxphotos/photoinfo/__init__.py +++ b/osxphotos/photoinfo/__init__.py @@ -4,5 +4,6 @@ Represents a single photo in the Photos library and provides access to the photo PhotosDB.photos() returns a list of PhotoInfo objects """ -from .photoinfo import PhotoInfo from ._photoinfo_exifinfo import ExifInfo +from ._photoinfo_export import ExportResults +from .photoinfo import PhotoInfo diff --git a/osxphotos/photoinfo/_photoinfo_export.py b/osxphotos/photoinfo/_photoinfo_export.py new file mode 100644 index 00000000..4123da6d --- /dev/null +++ b/osxphotos/photoinfo/_photoinfo_export.py @@ -0,0 +1,999 @@ +""" export methods for PhotoInfo """ + +# TODO: should this be its own PhotoExporter class? + +import filecmp +import glob +import json +import logging +import os +import pathlib +import re +from collections import namedtuple # pylint: disable=syntax-error + +from mako.template import Template + +from .._constants import ( + _MAX_IPTC_KEYWORD_LEN, + _OSXPHOTOS_NONE_SENTINEL, + _TEMPLATE_DIR, + _UNKNOWN_PERSON, + _XMP_TEMPLATE_NAME, +) +from ..exiftool import ExifTool +from .._export_db import ExportDBNoOp +from .._filecmp import cmp_file, file_sig +from ..utils import ( + _copy_file, + _export_photo_uuid_applescript, + _hardlink_file, + dd_to_dms_str, +) + +ExportResults = namedtuple( + "ExportResults", ["exported", "new", "updated", "skipped", "exif_updated"] +) + + +def export( + self, + dest, + *filename, + edited=False, + live_photo=False, + raw_photo=False, + export_as_hardlink=False, + overwrite=False, + increment=True, + sidecar_json=False, + sidecar_xmp=False, + use_photos_export=False, + timeout=120, + exiftool=False, + no_xattr=False, + use_albums_as_keywords=False, + use_persons_as_keywords=False, + keyword_template=None, +): + """ export photo + dest: must be valid destination path (or exception raised) + filename: (optional): name of exported picture; if not provided, will use current filename + **NOTE**: if provided, user must ensure file extension (suffix) is correct. + For example, if photo is .CR2 file, edited image may be .jpeg. + If you provide an extension different than what the actual file is, + export will print a warning but will happily export the photo using the + incorrect file extension. e.g. to get the extension of the edited photo, + reference PhotoInfo.path_edited + edited: (boolean, default=False); if True will export the edited version of the photo + (or raise exception if no edited version) + live_photo: (boolean, default=False); if True, will also export the associted .mov for live photos + raw_photo: (boolean, default=False); if True, will also export the associted RAW photo + export_as_hardlink: (boolean, default=False); if True, will hardlink files instead of copying them + overwrite: (boolean, default=False); if True will overwrite files if they alreay exist + increment: (boolean, default=True); if True, will increment file name until a non-existant name is found + if overwrite=False and increment=False, export will fail if destination file already exists + sidecar_json: (boolean, default = False); if True will also write a json sidecar with IPTC data in format readable by exiftool + sidecar filename will be dest/filename.json + sidecar_xmp: (boolean, default = False); if True will also write a XMP sidecar with IPTC data + sidecar filename will be dest/filename.xmp + use_photos_export: (boolean, default=False); if True will attempt to export photo via applescript interaction with Photos + timeout: (int, default=120) timeout in seconds used with use_photos_export + exiftool: (boolean, default = False); if True, will use exiftool to write metadata to export file + no_xattr: (boolean, default = False); if True, exports file without preserving extended attributes + returns list of full paths to the exported files + use_albums_as_keywords: (boolean, default = False); if True, will include album names in keywords + when exporting metadata with exiftool or sidecar + use_persons_as_keywords: (boolean, default = False); if True, will include person names in keywords + when exporting metadata with exiftool or sidecar + keyword_template: (list of strings); list of template strings that will be rendered as used as keywords + returns: list of photos exported + """ + + # Implementation note: calls export2 to actually do the work + + results = self.export2( + dest, + *filename, + edited=edited, + live_photo=live_photo, + raw_photo=raw_photo, + export_as_hardlink=export_as_hardlink, + overwrite=overwrite, + increment=increment, + sidecar_json=sidecar_json, + sidecar_xmp=sidecar_xmp, + use_photos_export=use_photos_export, + timeout=timeout, + exiftool=exiftool, + no_xattr=no_xattr, + use_albums_as_keywords=use_albums_as_keywords, + use_persons_as_keywords=use_persons_as_keywords, + keyword_template=keyword_template, + ) + + return results.exported + + +def export2( + self, + dest, + *filename, + edited=False, + live_photo=False, + raw_photo=False, + export_as_hardlink=False, + overwrite=False, + increment=True, + sidecar_json=False, + sidecar_xmp=False, + use_photos_export=False, + timeout=120, + exiftool=False, + no_xattr=False, + use_albums_as_keywords=False, + use_persons_as_keywords=False, + keyword_template=None, + update=False, + export_db=None, +): + """ export photo + dest: must be valid destination path (or exception raised) + filename: (optional): name of exported picture; if not provided, will use current filename + **NOTE**: if provided, user must ensure file extension (suffix) is correct. + For example, if photo is .CR2 file, edited image may be .jpeg. + If you provide an extension different than what the actual file is, + export will print a warning but will happily export the photo using the + incorrect file extension. e.g. to get the extension of the edited photo, + reference PhotoInfo.path_edited + edited: (boolean, default=False); if True will export the edited version of the photo + (or raise exception if no edited version) + live_photo: (boolean, default=False); if True, will also export the associted .mov for live photos + raw_photo: (boolean, default=False); if True, will also export the associted RAW photo + export_as_hardlink: (boolean, default=False); if True, will hardlink files instead of copying them + overwrite: (boolean, default=False); if True will overwrite files if they alreay exist + increment: (boolean, default=True); if True, will increment file name until a non-existant name is found + if overwrite=False and increment=False, export will fail if destination file already exists + sidecar_json: (boolean, default = False); if True will also write a json sidecar with IPTC data in format readable by exiftool + sidecar filename will be dest/filename.json + sidecar_xmp: (boolean, default = False); if True will also write a XMP sidecar with IPTC data + sidecar filename will be dest/filename.xmp + use_photos_export: (boolean, default=False); if True will attempt to export photo via applescript interaction with Photos + timeout: (int, default=120) timeout in seconds used with use_photos_export + exiftool: (boolean, default = False); if True, will use exiftool to write metadata to export file + no_xattr: (boolean, default = False); if True, exports file without preserving extended attributes + returns list of full paths to the exported files + use_albums_as_keywords: (boolean, default = False); if True, will include album names in keywords + when exporting metadata with exiftool or sidecar + use_persons_as_keywords: (boolean, default = False); if True, will include person names in keywords + when exporting metadata with exiftool or sidecar + keyword_template: (list of strings); list of template strings that will be rendered as used as keywords + update: (boolean, default=False); if True export will run in update mode, that is, it will + not export the photo if the current version already exists in the destination + export_db: (ExportDB_ABC); instance of a class that conforms to ExportDB_ABC with methods + for getting/setting data related to exported files to compare update state + Returns: ExportResults namedtuple with fields: exported, new, updated, skipped + where each field is a list of file paths + """ + + # if update, caller may pass function refs to get/set uuid for file being exported + # and for setting/getting the PhotoInfo json info for an exported file + if export_db is None: + export_db = ExportDBNoOp() + + # suffix to add to edited files + # e.g. name will be filename_edited.jpg + edited_identifier = "_edited" + + # list of all files exported during this call to export + exported_files = [] + + # list of new files during update + update_new_files = [] + + # list of files that were updated + update_updated_files = [] + + # list of all files skipped because they do not need to be updated (for use with update=True) + update_skipped_files = [] + + # check edited and raise exception trying to export edited version of + # photo that hasn't been edited + if edited and not self.hasadjustments: + raise ValueError( + "Photo does not have adjustments, cannot export edited version" + ) + + # check arguments and get destination path and filename (if provided) + if filename and len(filename) > 2: + raise TypeError( + "Too many positional arguments. Should be at most two: destination, filename." + ) + else: + # verify destination is a valid path + if dest is None: + raise ValueError("Destination must not be None") + elif not os.path.isdir(dest): + raise FileNotFoundError("Invalid path passed to export") + + if filename and len(filename) == 1: + # if filename passed, use it + fname = filename[0] + else: + # no filename provided so use the default + # if edited file requested, use filename but add _edited + # need to use file extension from edited file as Photos saves a jpeg once edited + if edited and not use_photos_export: + # verify we have a valid path_edited and use that to get filename + if not self.path_edited: + raise FileNotFoundError( + "edited=True but path_edited is none; hasadjustments: " + f" {self.hasadjustments}" + ) + edited_name = pathlib.Path(self.path_edited).name + edited_suffix = pathlib.Path(edited_name).suffix + fname = ( + pathlib.Path(self.filename).stem + edited_identifier + edited_suffix + ) + else: + fname = self.filename + + # check destination path + dest = pathlib.Path(dest) + fname = pathlib.Path(fname) + dest = dest / fname + + # check extension of destination + if edited and self.path_edited is not None: + # use suffix from edited file + actual_suffix = pathlib.Path(self.path_edited).suffix + elif edited: + # use .jpeg as that's probably correct + # if edited and path_edited is None, will raise FileNotFoundError below + # unless use_photos_export is True + actual_suffix = ".jpeg" + else: + # use suffix from the non-edited file + actual_suffix = pathlib.Path(self.filename).suffix + + # warn if suffixes don't match but ignore .JPG / .jpeg as + # Photo's often converts .JPG to .jpeg + suffixes = sorted([x.lower() for x in [dest.suffix, actual_suffix]]) + if dest.suffix.lower() != actual_suffix.lower() and suffixes != [".jpeg", ".jpg"]: + logging.warning( + f"Invalid destination suffix: {dest.suffix}, should be {actual_suffix}" + ) + + # check to see if file exists and if so, add (1), (2), etc until we find one that works + # Photos checks the stem and adds (1), (2), etc which avoids collision with sidecars + # e.g. exporting sidecar for file1.png and file1.jpeg + # if file1.png exists and exporting file1.jpeg, + # dest will be file1 (1).jpeg even though file1.jpeg doesn't exist to prevent sidecar collision + if not update and increment and not overwrite: + count = 1 + glob_str = str(dest.parent / f"{dest.stem}*") + dest_files = glob.glob(glob_str) + dest_files = [pathlib.Path(f).stem for f in dest_files] + dest_new = dest.stem + while dest_new in dest_files: + dest_new = f"{dest.stem} ({count})" + count += 1 + dest = dest.parent / f"{dest_new}{dest.suffix}" + + # TODO: need way to check if DB is missing, try to find the right photo anyway by seeing if they're the same and then updating + # move the checks into "if not use_photos_export" block below + # if use_photos_export is True then we'll export wether destination exists or not + + # if overwrite==False and #increment==False, export should fail if file exists + if dest.exists() and not update and not overwrite and not increment: + raise FileExistsError( + f"destination exists ({dest}); overwrite={overwrite}, increment={increment}" + ) + + if not use_photos_export: + # find the source file on disk and export + # get path to source file and verify it's not None and is valid file + # TODO: how to handle ismissing or not hasadjustments and edited=True cases? + if edited: + if self.path_edited is not None: + src = self.path_edited + else: + raise FileNotFoundError( + f"Cannot export edited photo if path_edited is None" + ) + else: + if self.ismissing: + logging.debug( + f"Attempting to export photo with ismissing=True: path = {self.path}" + ) + + if self.path is not None: + src = self.path + else: + raise FileNotFoundError("Cannot export photo if path is None") + + if not os.path.isfile(src): + raise FileNotFoundError(f"{src} does not appear to exist") + + logging.debug( + f"exporting {src} to {dest}, overwrite={overwrite}, increment={increment}, dest exists: {dest.exists()}" + ) + + # found source now try to find right destination + if update and dest.exists(): + # destination exists, check to see if destination is the right UUID + dest_uuid = export_db.get_uuid_for_file(dest) + if dest_uuid is None and filecmp.cmp(src, dest): + # might be exporting into a pre-ExportDB folder or the DB got deleted + logging.debug( + f"Found matching file with blank uuid: {self.uuid}, {dest}" + ) + dest_uuid = self.uuid + export_db.set_uuid_for_file(dest, self.uuid) + export_db.set_info_for_uuid(self.uuid, self.json()) + export_db.set_stat_orig_for_file(dest, file_sig(dest)) + export_db.set_stat_exif_for_file(dest, (None, None, None)) + export_db.set_exifdata_for_file(dest, None) + if dest_uuid != self.uuid: + # not the right file, find the right one + logging.debug( + f"Need to find right photo: uuid={self.uuid}, dest={dest_uuid}, dest={dest}, path={self.path}" + ) + count = 1 + glob_str = str(dest.parent / f"{dest.stem} (*{dest.suffix}") + dest_files = glob.glob(glob_str) + found_match = False + for file_ in dest_files: + dest_uuid = export_db.get_uuid_for_file(file_) + if dest_uuid == self.uuid: + logging.debug( + f"Found matching file for uuid: {dest_uuid}, {file_}" + ) + dest = pathlib.Path(file_) + found_match = True + break + elif dest_uuid is None and filecmp.cmp(src, file_): + # files match, update the UUID + logging.debug( + f"Found matching file with blank uuid: {self.uuid}, {file_}" + ) + dest = pathlib.Path(file_) + found_match = True + export_db.set_uuid_for_file(file_, self.uuid) + export_db.set_info_for_uuid(self.uuid, self.json()) + export_db.set_stat_orig_for_file(dest, file_sig(dest)) + export_db.set_stat_exif_for_file(dest, (None, None, None)) + export_db.set_exifdata_for_file(dest, None) + break + + if not found_match: + logging.debug( + f"Didn't find destination match for uuid {self.uuid} {dest}" + ) + + # increment the destination file + count = 1 + glob_str = str(dest.parent / f"{dest.stem}*") + dest_files = glob.glob(glob_str) + dest_files = [pathlib.Path(f).stem for f in dest_files] + dest_new = dest.stem + while dest_new in dest_files: + dest_new = f"{dest.stem} ({count})" + count += 1 + dest = dest.parent / f"{dest_new}{dest.suffix}" + logging.debug(f"New destination = {dest}, uuid = {self.uuid}") + + # export the dest file + results = self._export_photo( + src, + dest, + update, + export_db, + overwrite, + no_xattr, + export_as_hardlink, + exiftool, + ) + exported_files = results.exported + update_new_files = results.new + update_updated_files = results.updated + update_skipped_files = results.skipped + + # copy live photo associated .mov if requested + if live_photo and self.live_photo: + live_name = dest.parent / f"{dest.stem}.mov" + src_live = self.path_live_photo + + if src_live is not None: + logging.debug( + f"Exporting live photo video of {filename} as {live_name.name}" + ) + results = self._export_photo( + src_live, + live_name, + update, + export_db, + overwrite, + no_xattr, + export_as_hardlink, + exiftool, + ) + exported_files.extend(results.exported) + update_new_files.extend(results.new) + update_updated_files.extend(results.updated) + update_skipped_files.extend(results.skipped) + else: + logging.debug(f"Skipping missing live movie for {filename}") + + # copy associated RAW image if requested + if raw_photo and self.has_raw: + raw_path = pathlib.Path(self.path_raw) + raw_ext = raw_path.suffix + raw_name = dest.parent / f"{dest.stem}{raw_ext}" + if raw_path is not None: + logging.debug(f"Exporting RAW photo of {filename} as {raw_name.name}") + results = self._export_photo( + raw_path, + raw_name, + update, + export_db, + overwrite, + no_xattr, + export_as_hardlink, + exiftool, + ) + exported_files.extend(results.exported) + update_new_files.extend(results.new) + update_updated_files.extend(results.updated) + update_skipped_files.extend(results.skipped) + else: + logging.debug(f"Skipping missing RAW photo for {filename}") + else: + # use_photo_export + exported = None + # export live_photo .mov file? + live_photo = True if live_photo and self.live_photo else False + if edited: + # exported edited version and not original + if filename: + # use filename stem provided + filestem = dest.stem + else: + # didn't get passed a filename, add _edited + filestem = f"{dest.stem}_edited" + dest = dest.parent / f"{filestem}.jpeg" + + exported = _export_photo_uuid_applescript( + self.uuid, + dest.parent, + filestem=filestem, + original=False, + edited=True, + live_photo=live_photo, + timeout=timeout, + burst=self.burst, + ) + else: + # export original version and not edited + filestem = dest.stem + exported = _export_photo_uuid_applescript( + self.uuid, + dest.parent, + filestem=filestem, + original=True, + edited=False, + live_photo=live_photo, + timeout=timeout, + burst=self.burst, + ) + + if exported is not None: + exported_files.extend(exported) + else: + logging.warning( + f"Error exporting photo {self.uuid} to {dest} with use_photos_export" + ) + + # export metadata + info = export_db.get_info_for_uuid(self.uuid) + + if sidecar_json: + logging.debug("writing exiftool_json_sidecar") + sidecar_filename = dest.parent / pathlib.Path(f"{dest.stem}.json") + sidecar_str = self._exiftool_json_sidecar( + use_albums_as_keywords=use_albums_as_keywords, + use_persons_as_keywords=use_persons_as_keywords, + keyword_template=keyword_template, + ) + try: + self._write_sidecar(sidecar_filename, sidecar_str) + except Exception as e: + logging.warning(f"Error writing json sidecar to {sidecar_filename}") + raise e + + if sidecar_xmp: + logging.debug("writing xmp_sidecar") + sidecar_filename = dest.parent / pathlib.Path(f"{dest.stem}.xmp") + sidecar_str = self._xmp_sidecar( + use_albums_as_keywords=use_albums_as_keywords, + use_persons_as_keywords=use_persons_as_keywords, + keyword_template=keyword_template, + ) + try: + self._write_sidecar(sidecar_filename, sidecar_str) + except Exception as e: + logging.warning(f"Error writing xmp sidecar to {sidecar_filename}") + raise e + + # if exiftool, write the metadata + if update: + exif_files = update_new_files + update_updated_files + update_skipped_files + else: + exif_files = exported_files + + exif_files_updated = [] + if exiftool and update and exif_files: + for exported_file in exif_files: + logging.debug(f"checking exif for {exported_file}") + files_are_different = False + old_data = export_db.get_exifdata_for_file(exported_file) + if old_data is not None: + old_data = json.loads(old_data)[0] + current_data = json.loads( + self._exiftool_json_sidecar( + use_albums_as_keywords=use_albums_as_keywords, + use_persons_as_keywords=use_persons_as_keywords, + keyword_template=keyword_template, + ) + )[0] + if old_data != current_data: + files_are_different = True + + if old_data is None or files_are_different: + # didn't have old data, assume we need to write it + # or files were different + logging.debug(f"No exifdata for {exported_file}, writing it") + self._write_exif_data( + exported_file, + use_albums_as_keywords=use_albums_as_keywords, + use_persons_as_keywords=use_persons_as_keywords, + keyword_template=keyword_template, + ) + export_db.set_exifdata_for_file( + exported_file, + self._exiftool_json_sidecar( + use_albums_as_keywords=use_albums_as_keywords, + use_persons_as_keywords=use_persons_as_keywords, + keyword_template=keyword_template, + ), + ) + export_db.set_stat_exif_for_file(exported_file, file_sig(exported_file)) + exif_files_updated.append(exported_file) + elif exiftool and exif_files: + for exported_file in exif_files: + logging.debug(f"Writing exif data to {exported_file}") + self._write_exif_data( + exported_file, + use_albums_as_keywords=use_albums_as_keywords, + use_persons_as_keywords=use_persons_as_keywords, + keyword_template=keyword_template, + ) + export_db.set_exifdata_for_file( + exported_file, + self._exiftool_json_sidecar( + use_albums_as_keywords=use_albums_as_keywords, + use_persons_as_keywords=use_persons_as_keywords, + keyword_template=keyword_template, + ), + ) + export_db.set_stat_exif_for_file(exported_file, file_sig(exported_file)) + exif_files_updated.append(exported_file) + + return ExportResults( + exported_files, + update_new_files, + update_updated_files, + update_skipped_files, + exif_files_updated, + ) + + +def _export_photo( + self, + src, + dest, + update, + export_db, + overwrite, + no_xattr, + export_as_hardlink, + exiftool, +): + """ Helper function for export() + Does the actual copy or hardlink taking the appropriate + action depending on update, overwrite + Assumes destination is the right destination (e.g. UUID matches) + sets UUID and JSON info foo exported file using set_uuid_for_file, set_inf_for_uuido + src: src path (string) + dest: dest path (pathlib.Path) + update: bool + export_db: instance of ExportDB that conforms to ExportDB_ABC interface + overwrite: bool + no_xattr: don't copy extended attributes + export_as_hardlink: bool + exiftool: bool + Returns: ExportResults + """ + + exported_files = [] + update_updated_files = [] + update_new_files = [] + update_skipped_files = [] + + dest_str = str(dest) + dest_exists = dest.exists() + if export_as_hardlink: + # use hardlink instead of copy + if not update: + # not update, do the the hardlink + if overwrite and dest.exists(): + # need to remove the destination first + dest.unlink() + logging.debug(f"Not update: export_as_hardlink linking file {src} {dest}") + _hardlink_file(src, dest) + export_db.set_uuid_for_file(dest_str, self.uuid) + export_db.set_info_for_uuid(self.uuid, self.json()) + export_db.set_stat_orig_for_file(dest_str, file_sig(dest_str)) + export_db.set_stat_exif_for_file(dest_str, (None, None, None)) + export_db.set_exifdata_for_file(dest_str, None) + exported_files.append(dest_str) + elif dest_exists and dest.samefile(src): + # update, hardlink and it already points to the right file, do nothing + logging.debug( + f"Update: skipping samefile with export_as_hardlink {src} {dest}" + ) + update_skipped_files.append(dest_str) + elif dest_exists: + # update, not the same file (e.g. user may not have used export_as_hardlink last time it was run + logging.debug( + f"Update: removing existing file prior to export_as_hardlink {src} {dest}" + ) + dest.unlink() + _hardlink_file(src, dest) + export_db.set_uuid_for_file(dest_str, self.uuid) + export_db.set_info_for_uuid(self.uuid, self.json()) + export_db.set_stat_orig_for_file(dest_str, file_sig(dest_str)) + export_db.set_stat_exif_for_file(dest_str, (None, None, None)) + export_db.set_exifdata_for_file(dest_str, None) + update_updated_files.append(dest_str) + exported_files.append(dest_str) + else: + # update, hardlink, destination doesn't exist (new file) + logging.debug( + f"Update: exporting new file with export_as_hardlink {src} {dest}" + ) + _hardlink_file(src, dest) + export_db.set_uuid_for_file(dest_str, self.uuid) + export_db.set_info_for_uuid(self.uuid, self.json()) + export_db.set_stat_orig_for_file(dest_str, file_sig(dest_str)) + export_db.set_stat_exif_for_file(dest_str, (None, None, None)) + export_db.set_exifdata_for_file(dest_str, None) + exported_files.append(dest_str) + update_new_files.append(dest_str) + else: + if not update: + # not update, do the the copy + if overwrite and dest.exists(): + # need to remove the destination first + dest.unlink() + logging.debug(f"Not update: copying file {src} {dest}") + _copy_file(src, dest_str, norsrc=no_xattr) + export_db.set_uuid_for_file(dest_str, self.uuid) + export_db.set_info_for_uuid(self.uuid, self.json()) + export_db.set_stat_orig_for_file(dest_str, file_sig(dest_str)) + export_db.set_stat_exif_for_file(dest_str, (None, None, None)) + export_db.set_exifdata_for_file(dest_str, None) + exported_files.append(dest_str) + # elif dest_exists and not exiftool and cmp_file(dest_str, export_db.get_stat_orig_for_file(dest_str)): + elif ( + dest_exists + and not exiftool + and filecmp.cmp(src, dest) + and not dest.samefile(src) + ): + # destination exists but is identical + logging.debug(f"Update: skipping identifical original files {src} {dest}") + # call set_stat because code can reach this spot if no export DB but exporting a RAW or live photo + # potentially re-writes the data in the database but ensures database is complete + export_db.set_stat_orig_for_file(dest_str, file_sig(dest_str)) + update_skipped_files.append(dest_str) + elif ( + dest_exists + and exiftool + and cmp_file(dest_str, export_db.get_stat_exif_for_file(dest_str)) + and not dest.samefile(src) + ): + # destination exists but is identical + logging.debug(f"Update: skipping identifical exiftool files {src} {dest}") + update_skipped_files.append(dest_str) + elif dest_exists: + # destination exists but is different or is a hardlink + logging.debug(f"Update: removing existing file prior to copy {src} {dest}") + stat_src = os.stat(src) + stat_dest = os.stat(dest) + dest.unlink() + _copy_file(src, dest_str, norsrc=no_xattr) + export_db.set_uuid_for_file(dest_str, self.uuid) + export_db.set_info_for_uuid(self.uuid, self.json()) + export_db.set_stat_orig_for_file(dest_str, file_sig(dest_str)) + export_db.set_stat_exif_for_file(dest_str, (None, None, None)) + export_db.set_exifdata_for_file(dest_str, None) + exported_files.append(dest_str) + update_updated_files.append(dest_str) + else: + # destination doesn't exist, copy the file + logging.debug(f"Update: copying new file {src} {dest}") + _copy_file(src, dest_str, norsrc=no_xattr) + export_db.set_uuid_for_file(dest_str, self.uuid) + export_db.set_info_for_uuid(self.uuid, self.json()) + export_db.set_stat_orig_for_file(dest_str, file_sig(dest_str)) + export_db.set_stat_exif_for_file(dest_str, (None, None, None)) + export_db.set_exifdata_for_file(dest_str, None) + exported_files.append(dest_str) + update_new_files.append(dest_str) + + return ExportResults( + exported_files, update_new_files, update_updated_files, update_skipped_files, [] + ) + + +def _write_exif_data( + self, + filepath, + use_albums_as_keywords=False, + use_persons_as_keywords=False, + keyword_template=None, +): + """ write exif data to image file at filepath + filepath: full path to the image file """ + if not os.path.exists(filepath): + raise FileNotFoundError(f"Could not find file {filepath}") + exiftool = ExifTool(filepath) + exif_info = json.loads( + self._exiftool_json_sidecar( + use_albums_as_keywords=use_albums_as_keywords, + use_persons_as_keywords=use_persons_as_keywords, + keyword_template=keyword_template, + ) + )[0] + for exiftag, val in exif_info.items(): + if type(val) == list: + # more than one, set first value the add additional values + exiftool.setvalue(exiftag, val.pop(0)) + if val: + # add any remaining items + exiftool.addvalues(exiftag, *val) + else: + exiftool.setvalue(exiftag, val) + + +def _exiftool_json_sidecar( + self, + use_albums_as_keywords=False, + use_persons_as_keywords=False, + keyword_template=None, +): + """ return json string of EXIF details in exiftool sidecar format + Does not include all the EXIF fields as those are likely already in the image + use_albums_as_keywords: treat album names as keywords + use_persons_as_keywords: treat person names as keywords + keyword_template: (list of strings); list of template strings to render as keywords + Exports the following: + FileName + ImageDescription + Description + Title + TagsList + Keywords (may include album name, person name, or template) + Subject + PersonInImage + GPSLatitude, GPSLongitude + GPSPosition + GPSLatitudeRef, GPSLongitudeRef + DateTimeOriginal + OffsetTimeOriginal + ModifyDate """ + + exif = {} + exif["_CreatedBy"] = "osxphotos, https://github.com/RhetTbull/osxphotos" + + if self.description: + exif["EXIF:ImageDescription"] = self.description + exif["XMP:Description"] = self.description + + if self.title: + exif["XMP:Title"] = self.title + + keyword_list = [] + if self.keywords: + keyword_list.extend(self.keywords) + + person_list = [] + if self.persons: + # filter out _UNKNOWN_PERSON + person_list = sorted([p for p in self.persons if p != _UNKNOWN_PERSON]) + + if use_persons_as_keywords and person_list: + keyword_list.extend(sorted(person_list)) + + if use_albums_as_keywords and self.albums: + keyword_list.extend(sorted(self.albums)) + + if keyword_template: + rendered_keywords = [] + for template_str in keyword_template: + rendered, unmatched = self.render_template( + template_str, none_str=_OSXPHOTOS_NONE_SENTINEL, path_sep="/" + ) + if unmatched: + logging.warning( + f"Unmatched template substitution for template: {template_str} {unmatched}" + ) + rendered_keywords.extend(rendered) + + # filter out any template values that didn't match by looking for sentinel + rendered_keywords = [ + keyword + for keyword in sorted(rendered_keywords) + if _OSXPHOTOS_NONE_SENTINEL not in keyword + ] + + # check to see if any keywords too long + long_keywords = [ + long_str + for long_str in rendered_keywords + if len(long_str) > _MAX_IPTC_KEYWORD_LEN + ] + if long_keywords: + logging.warning( + f"Some keywords exceed max IPTC Keyword length of {_MAX_IPTC_KEYWORD_LEN}: {long_keywords}" + ) + + keyword_list.extend(rendered_keywords) + + if keyword_list: + exif["XMP:TagsList"] = exif["IPTC:Keywords"] = keyword_list + + if person_list: + exif["XMP:PersonInImage"] = person_list + + if self.keywords or person_list: + # Photos puts both keywords and persons in Subject when using "Export IPTC as XMP" + # only use Photos' keywords for subject + exif["XMP:Subject"] = list(self.keywords) + person_list + + # if self.favorite(): + # exif["Rating"] = 5 + + (lat, lon) = self.location + if lat is not None and lon is not None: + lat_str, lon_str = dd_to_dms_str(lat, lon) + exif["EXIF:GPSLatitude"] = lat_str + exif["EXIF:GPSLongitude"] = lon_str + exif["Composite:GPSPosition"] = f"{lat_str}, {lon_str}" + lat_ref = "North" if lat >= 0 else "South" + lon_ref = "East" if lon >= 0 else "West" + exif["EXIF:GPSLatitudeRef"] = lat_ref + exif["EXIF:GPSLongitudeRef"] = lon_ref + + # process date/time and timezone offset + date = self.date + # exiftool expects format to "2015:01:18 12:00:00" + datetimeoriginal = date.strftime("%Y:%m:%d %H:%M:%S") + offsettime = date.strftime("%z") + # find timezone offset in format "-04:00" + offset = re.findall(r"([+-]?)([\d]{2})([\d]{2})", offsettime) + offset = offset[0] # findall returns list of tuples + offsettime = f"{offset[0]}{offset[1]}:{offset[2]}" + exif["EXIF:DateTimeOriginal"] = datetimeoriginal + exif["EXIF:OffsetTimeOriginal"] = offsettime + + if self.date_modified is not None: + exif["EXIF:ModifyDate"] = self.date_modified.strftime("%Y:%m:%d %H:%M:%S") + + json_str = json.dumps([exif]) + return json_str + + +def _xmp_sidecar( + self, + use_albums_as_keywords=False, + use_persons_as_keywords=False, + keyword_template=None, +): + """ returns string for XMP sidecar + use_albums_as_keywords: treat album names as keywords + use_persons_as_keywords: treat person names as keywords + keyword_template: (list of strings); list of template strings to render as keywords """ + + # TODO: add additional fields to XMP file? + + xmp_template = Template(filename=os.path.join(_TEMPLATE_DIR, _XMP_TEMPLATE_NAME)) + + keyword_list = [] + if self.keywords: + keyword_list.extend(self.keywords) + + # TODO: keyword handling in this and _exiftool_json_sidecar is + # good candidate for pulling out in a function + + person_list = [] + if self.persons: + # filter out _UNKNOWN_PERSON + person_list = [p for p in self.persons if p != _UNKNOWN_PERSON] + + if use_persons_as_keywords and person_list: + keyword_list.extend(person_list) + + if use_albums_as_keywords and self.albums: + keyword_list.extend(self.albums) + + if keyword_template: + rendered_keywords = [] + for template_str in keyword_template: + rendered, unmatched = self.render_template( + template_str, none_str=_OSXPHOTOS_NONE_SENTINEL, path_sep="/" + ) + if unmatched: + logging.warning( + f"Unmatched template substitution for template: {template_str} {unmatched}" + ) + rendered_keywords.extend(rendered) + + # filter out any template values that didn't match by looking for sentinel + rendered_keywords = [ + keyword + for keyword in rendered_keywords + if _OSXPHOTOS_NONE_SENTINEL not in keyword + ] + + # check to see if any keywords too long + long_keywords = [ + long_str + for long_str in rendered_keywords + if len(long_str) > _MAX_IPTC_KEYWORD_LEN + ] + if long_keywords: + logging.warning( + f"Some keywords exceed max IPTC Keyword length of {_MAX_IPTC_KEYWORD_LEN}: {long_keywords}" + ) + + keyword_list.extend(rendered_keywords) + + subject_list = [] + if self.keywords or person_list: + # Photos puts both keywords and persons in Subject when using "Export IPTC as XMP" + subject_list = list(self.keywords) + person_list + + xmp_str = xmp_template.render( + photo=self, keywords=keyword_list, persons=person_list, subjects=subject_list + ) + + # remove extra lines that mako inserts from template + xmp_str = "\n".join([line for line in xmp_str.split("\n") if line.strip() != ""]) + return xmp_str + + +def _write_sidecar(self, filename, sidecar_str): + """ write sidecar_str to filename + used for exporting sidecar info """ + if not filename and not sidecar_str: + raise ( + ValueError( + f"filename {filename} and sidecar_str {sidecar_str} must not be None" + ) + ) + + # TODO: catch exception? + f = open(filename, "w") + f.write(sidecar_str) + f.close() diff --git a/osxphotos/photoinfo/photoinfo.py b/osxphotos/photoinfo/photoinfo.py index aaa94edd..3c1bc001 100644 --- a/osxphotos/photoinfo/photoinfo.py +++ b/osxphotos/photoinfo/photoinfo.py @@ -4,6 +4,7 @@ Represents a single photo in the Photos library and provides access to the photo PhotosDB.photos() returns a list of PhotoInfo objects """ +import dataclasses import glob import json import logging @@ -17,31 +18,18 @@ from datetime import timedelta, timezone from pprint import pformat import yaml -from mako.template import Template + from .._constants import ( - _MAX_IPTC_KEYWORD_LEN, _MOVIE_TYPE, - _OSXPHOTOS_NONE_SENTINEL, _PHOTO_TYPE, _PHOTOS_4_VERSION, _PHOTOS_5_SHARED_PHOTO_PATH, - _TEMPLATE_DIR, _UNKNOWN_PERSON, - _XMP_TEMPLATE_NAME, ) from ..albuminfo import AlbumInfo from ..datetime_formatter import DateTimeFormatter -from ..exiftool import ExifTool from ..placeinfo import PlaceInfo4, PlaceInfo5 -from ..utils import ( - _copy_file, - _export_photo_uuid_applescript, - _get_resource_loc, - _hardlink_file, - dd_to_dms_str, - findfiles, - get_preferred_uti_extension, -) +from ..utils import _debug, _get_resource_loc, findfiles, get_preferred_uti_extension from .template import ( MULTI_VALUE_SUBSTITUTIONS, TEMPLATE_SUBSTITUTIONS, @@ -64,6 +52,16 @@ class PhotoInfo: ) from ._photoinfo_exifinfo import exif_info, ExifInfo from ._photoinfo_exiftool import exiftool + from ._photoinfo_export import ( + export, + export2, + _export_photo, + _exiftool_json_sidecar, + _write_exif_data, + _write_sidecar, + _xmp_sidecar, + ExportResults, + ) def __init__(self, db=None, uuid=None, info=None): self._uuid = uuid @@ -262,7 +260,7 @@ class PhotoInfo: # if self._info["isMissing"] == 1: # photopath = None # path would be meaningless until downloaded - logging.debug(photopath) + # logging.debug(photopath) return photopath @@ -638,307 +636,6 @@ class PhotoInfo: otherwise returns False """ return self._info["raw_is_original"] - def export( - self, - dest, - *filename, - edited=False, - live_photo=False, - raw_photo=False, - export_as_hardlink=False, - overwrite=False, - increment=True, - sidecar_json=False, - sidecar_xmp=False, - use_photos_export=False, - timeout=120, - exiftool=False, - no_xattr=False, - use_albums_as_keywords=False, - use_persons_as_keywords=False, - keyword_template=None, - ): - """ export photo - dest: must be valid destination path (or exception raised) - filename: (optional): name of exported picture; if not provided, will use current filename - **NOTE**: if provided, user must ensure file extension (suffix) is correct. - For example, if photo is .CR2 file, edited image may be .jpeg. - If you provide an extension different than what the actual file is, - export will print a warning but will happily export the photo using the - incorrect file extension. e.g. to get the extension of the edited photo, - reference PhotoInfo.path_edited - edited: (boolean, default=False); if True will export the edited version of the photo - (or raise exception if no edited version) - live_photo: (boolean, default=False); if True, will also export the associted .mov for live photos - raw_photo: (boolean, default=False); if True, will also export the associted RAW photo - export_as_hardlink: (boolean, default=False); if True, will hardlink files instead of copying them - overwrite: (boolean, default=False); if True will overwrite files if they alreay exist - increment: (boolean, default=True); if True, will increment file name until a non-existant name is found - if overwrite=False and increment=False, export will fail if destination file already exists - sidecar_json: (boolean, default = False); if True will also write a json sidecar with IPTC data in format readable by exiftool - sidecar filename will be dest/filename.json - sidecar_xmp: (boolean, default = False); if True will also write a XMP sidecar with IPTC data - sidecar filename will be dest/filename.xmp - use_photos_export: (boolean, default=False); if True will attempt to export photo via applescript interaction with Photos - timeout: (int, default=120) timeout in seconds used with use_photos_export - exiftool: (boolean, default = False); if True, will use exiftool to write metadata to export file - no_xattr: (boolean, default = False); if True, exports file without preserving extended attributes - returns list of full paths to the exported files - use_albums_as_keywords: (boolean, default = False); if True, will include album names in keywords - when exporting metadata with exiftool or sidecar - use_persons_as_keywords: (boolean, default = False); if True, will include person names in keywords - when exporting metadata with exiftool or sidecar - keyword_template: (list of strings); list of template strings that will be rendered as used as keywords - """ - - # list of all files exported during this call to export - exported_files = [] - - # check edited and raise exception trying to export edited version of - # photo that hasn't been edited - if edited and not self.hasadjustments: - raise ValueError( - "Photo does not have adjustments, cannot export edited version" - ) - - # check arguments and get destination path and filename (if provided) - if filename and len(filename) > 2: - raise TypeError( - "Too many positional arguments. Should be at most two: destination, filename." - ) - else: - # verify destination is a valid path - if dest is None: - raise ValueError("Destination must not be None") - elif not os.path.isdir(dest): - raise FileNotFoundError("Invalid path passed to export") - - if filename and len(filename) == 1: - # if filename passed, use it - fname = filename[0] - else: - # no filename provided so use the default - # if edited file requested, use filename but add _edited - # need to use file extension from edited file as Photos saves a jpeg once edited - if edited and not use_photos_export: - # verify we have a valid path_edited and use that to get filename - if not self.path_edited: - raise FileNotFoundError( - "edited=True but path_edited is none; hasadjustments: " - f" {self.hasadjustments}" - ) - edited_name = pathlib.Path(self.path_edited).name - edited_suffix = pathlib.Path(edited_name).suffix - fname = pathlib.Path(self.filename).stem + "_edited" + edited_suffix - else: - fname = self.filename - - # check destination path - dest = pathlib.Path(dest) - fname = pathlib.Path(fname) - dest = dest / fname - - # check extension of destination - if edited and self.path_edited is not None: - # use suffix from edited file - actual_suffix = pathlib.Path(self.path_edited).suffix - elif edited: - # use .jpeg as that's probably correct - # if edited and path_edited is None, will raise FileNotFoundError below - # unless use_photos_export is True - actual_suffix = ".jpeg" - else: - # use suffix from the non-edited file - actual_suffix = pathlib.Path(self.filename).suffix - - # warn if suffixes don't match but ignore .JPG / .jpeg as - # Photo's often converts .JPG to .jpeg - suffixes = sorted([x.lower() for x in [dest.suffix, actual_suffix]]) - if dest.suffix.lower() != actual_suffix.lower() and suffixes != [ - ".jpeg", - ".jpg", - ]: - logging.warning( - f"Invalid destination suffix: {dest.suffix}, should be {actual_suffix}" - ) - - # check to see if file exists and if so, add (1), (2), etc until we find one that works - # Photos checks the stem and adds (1), (2), etc which avoids collision with sidecars - # e.g. exporting sidecar for file1.png and file1.jpeg - # if file1.png exists and exporting file1.jpeg, - # dest will be file1 (1).jpeg even though file1.jpeg doesn't exist to prevent sidecar collision - if increment and not overwrite: - count = 1 - glob_str = str(dest.parent / f"{dest.stem}*") - dest_files = glob.glob(glob_str) - dest_files = [pathlib.Path(f).stem for f in dest_files] - dest_new = dest.stem - while dest_new in dest_files: - dest_new = f"{dest.stem} ({count})" - count += 1 - dest = dest.parent / f"{dest_new}{dest.suffix}" - - # if overwrite==False and #increment==False, export should fail if file exists - if dest.exists() and not overwrite and not increment: - raise FileExistsError( - f"destination exists ({dest}); overwrite={overwrite}, increment={increment}" - ) - - if not use_photos_export: - # find the source file on disk and export - # get path to source file and verify it's not None and is valid file - # TODO: how to handle ismissing or not hasadjustments and edited=True cases? - if edited: - if self.path_edited is not None: - src = self.path_edited - else: - raise FileNotFoundError( - f"Cannot export edited photo if path_edited is None" - ) - else: - if self.ismissing: - logging.warning( - f"Attempting to export photo with ismissing=True: path = {self.path}" - ) - - if self.path is not None: - src = self.path - else: - raise FileNotFoundError("Cannot export photo if path is None") - - if not os.path.isfile(src): - raise FileNotFoundError(f"{src} does not appear to exist") - - logging.debug( - f"exporting {src} to {dest}, overwrite={overwrite}, increment={increment}, dest exists: {dest.exists()}" - ) - - # copy the file, _copy_file uses ditto to preserve Mac extended attributes - if export_as_hardlink: - _hardlink_file(src, dest) - else: - _copy_file(src, dest, norsrc=no_xattr) - exported_files.append(str(dest)) - - # copy live photo associated .mov if requested - if live_photo and self.live_photo: - live_name = dest.parent / f"{dest.stem}.mov" - src_live = self.path_live_photo - - if src_live is not None: - logging.debug( - f"Exporting live photo video of {filename} as {live_name.name}" - ) - if export_as_hardlink: - _hardlink_file(src_live, str(live_name)) - else: - _copy_file(src_live, str(live_name), norsrc=no_xattr) - exported_files.append(str(live_name)) - else: - logging.warning(f"Skipping missing live movie for {filename}") - - # copy associated RAW image if requested - if raw_photo and self.has_raw: - raw_path = pathlib.Path(self.path_raw) - raw_ext = raw_path.suffix - raw_name = dest.parent / f"{dest.stem}{raw_ext}" - if raw_path is not None: - logging.debug( - f"Exporting RAW photo of {filename} as {raw_name.name}" - ) - if export_as_hardlink: - _hardlink_file(str(raw_path), str(raw_name)) - else: - _copy_file(str(raw_path), str(raw_name), norsrc=no_xattr) - exported_files.append(str(raw_name)) - else: - logging.warning(f"Skipping missing RAW photo for {filename}") - else: - # use_photo_export - exported = None - # export live_photo .mov file? - live_photo = True if live_photo and self.live_photo else False - if edited: - # exported edited version and not original - if filename: - # use filename stem provided - filestem = dest.stem - else: - # didn't get passed a filename, add _edited - filestem = f"{dest.stem}_edited" - dest = dest.parent / f"{filestem}.jpeg" - - exported = _export_photo_uuid_applescript( - self.uuid, - dest.parent, - filestem=filestem, - original=False, - edited=True, - live_photo=live_photo, - timeout=timeout, - burst=self.burst, - ) - else: - # export original version and not edited - filestem = dest.stem - exported = _export_photo_uuid_applescript( - self.uuid, - dest.parent, - filestem=filestem, - original=True, - edited=False, - live_photo=live_photo, - timeout=timeout, - burst=self.burst, - ) - - if exported is not None: - exported_files.extend(exported) - else: - logging.warning( - f"Error exporting photo {self.uuid} to {dest} with use_photos_export" - ) - - if sidecar_json: - logging.debug("writing exiftool_json_sidecar") - sidecar_filename = dest.parent / pathlib.Path(f"{dest.stem}.json") - sidecar_str = self._exiftool_json_sidecar( - use_albums_as_keywords=use_albums_as_keywords, - use_persons_as_keywords=use_persons_as_keywords, - keyword_template=keyword_template, - ) - try: - self._write_sidecar(sidecar_filename, sidecar_str) - except Exception as e: - logging.warning(f"Error writing json sidecar to {sidecar_filename}") - raise e - - if sidecar_xmp: - logging.debug("writing xmp_sidecar") - sidecar_filename = dest.parent / pathlib.Path(f"{dest.stem}.xmp") - sidecar_str = self._xmp_sidecar( - use_albums_as_keywords=use_albums_as_keywords, - use_persons_as_keywords=use_persons_as_keywords, - keyword_template=keyword_template, - ) - try: - self._write_sidecar(sidecar_filename, sidecar_str) - except Exception as e: - logging.warning(f"Error writing xmp sidecar to {sidecar_filename}") - raise e - - # if exiftool, write the metadata - if exiftool and exported_files: - for exported_file in exported_files: - self._write_exif_data( - exported_file, - use_albums_as_keywords=use_albums_as_keywords, - use_persons_as_keywords=use_persons_as_keywords, - keyword_template=keyword_template, - ) - - return exported_files - def render_template(self, template, none_str="_", path_sep=None): """ render a filename or directory template template: str template @@ -1273,264 +970,6 @@ class PhotoInfo: # if here, didn't get a match raise KeyError(f"No rule for processing {lookup}") - def _write_exif_data( - self, - filepath, - use_albums_as_keywords=False, - use_persons_as_keywords=False, - keyword_template=None, - ): - """ write exif data to image file at filepath - filepath: full path to the image file """ - if not os.path.exists(filepath): - raise FileNotFoundError(f"Could not find file {filepath}") - exiftool = ExifTool(filepath) - exif_info = json.loads( - self._exiftool_json_sidecar( - use_albums_as_keywords=use_albums_as_keywords, - use_persons_as_keywords=use_persons_as_keywords, - keyword_template=keyword_template, - ) - )[0] - for exiftag, val in exif_info.items(): - if type(val) == list: - # more than one, set first value the add additional values - exiftool.setvalue(exiftag, val.pop(0)) - if val: - # add any remaining items - exiftool.addvalues(exiftag, *val) - else: - exiftool.setvalue(exiftag, val) - - def _exiftool_json_sidecar( - self, - use_albums_as_keywords=False, - use_persons_as_keywords=False, - keyword_template=None, - ): - """ return json string of EXIF details in exiftool sidecar format - Does not include all the EXIF fields as those are likely already in the image - use_albums_as_keywords: treat album names as keywords - use_persons_as_keywords: treat person names as keywords - keyword_template: (list of strings); list of template strings to render as keywords - Exports the following: - FileName - ImageDescription - Description - Title - TagsList - Keywords (may include album name, person name, or template) - Subject - PersonInImage - GPSLatitude, GPSLongitude - GPSPosition - GPSLatitudeRef, GPSLongitudeRef - DateTimeOriginal - OffsetTimeOriginal - ModifyDate """ - - exif = {} - exif["_CreatedBy"] = "osxphotos, https://github.com/RhetTbull/osxphotos" - - if self.description: - exif["EXIF:ImageDescription"] = self.description - exif["XMP:Description"] = self.description - - if self.title: - exif["XMP:Title"] = self.title - - keyword_list = [] - if self.keywords: - keyword_list.extend(self.keywords) - - person_list = [] - if self.persons: - # filter out _UNKNOWN_PERSON - person_list = [p for p in self.persons if p != _UNKNOWN_PERSON] - - if use_persons_as_keywords and person_list: - keyword_list.extend(person_list) - - if use_albums_as_keywords and self.albums: - keyword_list.extend(self.albums) - - if keyword_template: - rendered_keywords = [] - for template_str in keyword_template: - rendered, unmatched = self.render_template( - template_str, none_str=_OSXPHOTOS_NONE_SENTINEL, path_sep="/" - ) - if unmatched: - logging.warning( - f"Unmatched template substitution for template: {template_str} {unmatched}" - ) - rendered_keywords.extend(rendered) - - # filter out any template values that didn't match by looking for sentinel - rendered_keywords = [ - keyword - for keyword in rendered_keywords - if _OSXPHOTOS_NONE_SENTINEL not in keyword - ] - - # check to see if any keywords too long - long_keywords = [ - long_str - for long_str in rendered_keywords - if len(long_str) > _MAX_IPTC_KEYWORD_LEN - ] - if long_keywords: - logging.warning( - f"Some keywords exceed max IPTC Keyword length of {_MAX_IPTC_KEYWORD_LEN}: {long_keywords}" - ) - - logging.debug(f"rendered_keywords: {rendered_keywords}") - keyword_list.extend(rendered_keywords) - - if keyword_list: - exif["XMP:TagsList"] = exif["IPTC:Keywords"] = keyword_list - - if person_list: - exif["XMP:PersonInImage"] = person_list - - if self.keywords or person_list: - # Photos puts both keywords and persons in Subject when using "Export IPTC as XMP" - # only use Photos' keywords for subject - exif["XMP:Subject"] = list(self.keywords) + person_list - - # if self.favorite(): - # exif["Rating"] = 5 - - (lat, lon) = self.location - if lat is not None and lon is not None: - lat_str, lon_str = dd_to_dms_str(lat, lon) - exif["EXIF:GPSLatitude"] = lat_str - exif["EXIF:GPSLongitude"] = lon_str - exif["Composite:GPSPosition"] = f"{lat_str}, {lon_str}" - lat_ref = "North" if lat >= 0 else "South" - lon_ref = "East" if lon >= 0 else "West" - exif["EXIF:GPSLatitudeRef"] = lat_ref - exif["EXIF:GPSLongitudeRef"] = lon_ref - - # process date/time and timezone offset - date = self.date - # exiftool expects format to "2015:01:18 12:00:00" - datetimeoriginal = date.strftime("%Y:%m:%d %H:%M:%S") - offsettime = date.strftime("%z") - # find timezone offset in format "-04:00" - offset = re.findall(r"([+-]?)([\d]{2})([\d]{2})", offsettime) - offset = offset[0] # findall returns list of tuples - offsettime = f"{offset[0]}{offset[1]}:{offset[2]}" - exif["EXIF:DateTimeOriginal"] = datetimeoriginal - exif["EXIF:OffsetTimeOriginal"] = offsettime - - if self.date_modified is not None: - exif["EXIF:ModifyDate"] = self.date_modified.strftime("%Y:%m:%d %H:%M:%S") - - json_str = json.dumps([exif]) - return json_str - - def _xmp_sidecar( - self, - use_albums_as_keywords=False, - use_persons_as_keywords=False, - keyword_template=None, - ): - """ returns string for XMP sidecar - use_albums_as_keywords: treat album names as keywords - use_persons_as_keywords: treat person names as keywords - keyword_template: (list of strings); list of template strings to render as keywords """ - - # TODO: add additional fields to XMP file? - - xmp_template = Template( - filename=os.path.join(_TEMPLATE_DIR, _XMP_TEMPLATE_NAME) - ) - - keyword_list = [] - if self.keywords: - keyword_list.extend(self.keywords) - - # TODO: keyword handling in this and _exiftool_json_sidecar is - # good candidate for pulling out in a function - - person_list = [] - if self.persons: - # filter out _UNKNOWN_PERSON - person_list = [p for p in self.persons if p != _UNKNOWN_PERSON] - - if use_persons_as_keywords and person_list: - keyword_list.extend(person_list) - - if use_albums_as_keywords and self.albums: - keyword_list.extend(self.albums) - - if keyword_template: - rendered_keywords = [] - for template_str in keyword_template: - rendered, unmatched = self.render_template( - template_str, none_str=_OSXPHOTOS_NONE_SENTINEL, path_sep="/" - ) - if unmatched: - logging.warning( - f"Unmatched template substitution for template: {template_str} {unmatched}" - ) - rendered_keywords.extend(rendered) - - # filter out any template values that didn't match by looking for sentinel - rendered_keywords = [ - keyword - for keyword in rendered_keywords - if _OSXPHOTOS_NONE_SENTINEL not in keyword - ] - - # check to see if any keywords too long - long_keywords = [ - long_str - for long_str in rendered_keywords - if len(long_str) > _MAX_IPTC_KEYWORD_LEN - ] - if long_keywords: - logging.warning( - f"Some keywords exceed max IPTC Keyword length of {_MAX_IPTC_KEYWORD_LEN}: {long_keywords}" - ) - - logging.debug(f"rendered_keywords: {rendered_keywords}") - keyword_list.extend(rendered_keywords) - - subject_list = [] - if self.keywords or person_list: - # Photos puts both keywords and persons in Subject when using "Export IPTC as XMP" - subject_list = list(self.keywords) + person_list - - xmp_str = xmp_template.render( - photo=self, - keywords=keyword_list, - persons=person_list, - subjects=subject_list, - ) - - # remove extra lines that mako inserts from template - xmp_str = "\n".join( - [line for line in xmp_str.split("\n") if line.strip() != ""] - ) - return xmp_str - - def _write_sidecar(self, filename, sidecar_str): - """ write sidecar_str to filename - used for exporting sidecar info """ - if not filename and not sidecar_str: - raise ( - ValueError( - f"filename {filename} and sidecar_str {sidecar_str} must not be None" - ) - ) - - # TODO: catch exception? - f = open(filename, "w") - f.write(sidecar_str) - f.close() - @property def _longitude(self): """ Returns longitude, in degrees """ @@ -1600,6 +1039,9 @@ class PhotoInfo: date_modified_iso = ( self.date_modified.isoformat() if self.date_modified else None ) + folders = {album.title: album.folder_names for album in self.album_info} + exif = dataclasses.asdict(self.exif_info) if self.exif_info else {} + place = self.place.as_dict() if self.place else {} pic = { "uuid": self.uuid, @@ -1609,7 +1051,10 @@ class PhotoInfo: "description": self.description, "title": self.title, "keywords": self.keywords, + "labels": self.labels, + "keywords": self.keywords, "albums": self.albums, + "folders": folders, "persons": self.persons, "path": self.path, "ismissing": self.ismissing, @@ -1640,6 +1085,8 @@ class PhotoInfo: "has_raw": self.has_raw, "uti_raw": self.uti_raw, "path_raw": self.path_raw, + "place": place, + "exif": exif, } return json.dumps(pic) diff --git a/osxphotos/utils.py b/osxphotos/utils.py index 37c30c43..bc8aa7ec 100644 --- a/osxphotos/utils.py +++ b/osxphotos/utils.py @@ -130,17 +130,13 @@ def _hardlink_file(src, dest): if not os.path.isfile(src): raise FileNotFoundError("src file does not appear to exist", src) - # if error on copy, subprocess will raise CalledProcessError try: os.link(src, dest) except Exception as e: - logging.critical( - f"ln returned error: {e.returncode} {e.stderr.decode(sys.getfilesystemencoding()).rstrip()}" - ) + logging.critical(f"os.link returned error: {e}") raise e - def _copy_file(src, dest, norsrc=False): """ Copies a file from src path to dest path @@ -516,3 +512,30 @@ def _db_is_locked(dbname): locked = True return locked + + +# OSXPHOTOS_XATTR_UUID = "com.osxphotos.uuid" + +# def get_uuid_for_file(filepath): +# """ returns UUID associated with an exported file +# filepath: path to exported photo +# """ +# attr = xattr.xattr(filepath) +# try: +# uuid_bytes = attr[OSXPHOTOS_XATTR_UUID] +# uuid_str = uuid_bytes.decode('utf-8') +# except KeyError: +# uuid_str = None +# return uuid_str + +# def set_uuid_for_file(filepath, uuid): +# """ sets the UUID associated with an exported file +# filepath: path to exported photo +# uuid: uuid string for photo +# """ +# if not os.path.exists(filepath): +# raise FileNotFoundError(f"Missing file: {filepath}") + +# attr = xattr.xattr(filepath) +# uuid_bytes = bytes(uuid, 'utf-8') +# attr.set(OSXPHOTOS_XATTR_UUID, uuid_bytes) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 00000000..6df4903b --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,9 @@ +""" pytest test configuration """ +import pytest + +from osxphotos.exiftool import _ExifToolProc + +@pytest.fixture(autouse=True) +def reset_singletons(): + """ Need to clean up any ExifTool singletons between tests """ + _ExifToolProc.instance = None \ No newline at end of file diff --git a/tests/test_cli.py b/tests/test_cli.py index 5cc08ce1..1b6b9aa1 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -2,6 +2,8 @@ import os import pytest from click.testing import CliRunner +from osxphotos.exiftool import get_exiftool_path + CLI_PHOTOS_DB = "tests/Test-10.15.1.photoslibrary" LIVE_PHOTOS_DB = "tests/Test-Cloud-10.15.1.photoslibrary" RAW_PHOTOS_DB = "tests/Test-RAW-10.15.1.photoslibrary" @@ -120,6 +122,11 @@ CLI_EXPORT_UUID = "D79B8D77-BFFC-460B-9312-034F2877D35B" CLI_EXPORT_UUID_FILENAME = "Pumkins2.jpg" +CLI_EXPORT_BY_DATE = [ + "2018/09/28/Pumpkins3.jpg", + "2018/09/28/Pumkins1.jpg", +] + CLI_EXPORT_SIDECAR_FILENAMES = ["Pumkins2.jpg", "Pumkins2.json", "Pumkins2.xmp"] CLI_EXPORT_LIVE = [ @@ -139,6 +146,24 @@ CLI_EXPORT_RAW_EDITED_ORIGINAL = ["IMG_0476_2.CR2", "IMG_0476_2_edited.jpeg"] CLI_PLACES_JSON = """{"places": {"_UNKNOWN_": 1, "Maui, Wailea, Hawai'i, United States": 1, "Washington, District of Columbia, United States": 1}}""" +CLI_EXIFTOOL = { + "D79B8D77-BFFC-460B-9312-034F2877D35B": { + "File:FileName": "Pumkins2.jpg", + "IPTC:Keywords": "Kids", + "XMP:TagsList": "Kids", + "XMP:Title": "I found one!", + "EXIF:ImageDescription": "Girl holding pumpkin", + "XMP:Description": "Girl holding pumpkin", + "XMP:PersonInImage": "Katie", + "XMP:Subject": ["Kids", "Katie"], + } +} +# determine if exiftool installed so exiftool tests can be skipped +try: + exiftool = get_exiftool_path() +except: + exiftool = None + def test_osxphotos(): import osxphotos @@ -291,6 +316,7 @@ def test_export_as_hardlink_samefile(): assert os.path.exists(CLI_EXPORT_UUID_FILENAME) assert os.path.samefile(CLI_EXPORT_UUID_FILENAME, photo.path) + def test_export_using_hardlinks_incompat_options(): # test that error shown if --export-as-hardlink used with --exiftool import os @@ -317,7 +343,8 @@ def test_export_using_hardlinks_incompat_options(): ) assert result.exit_code == 0 assert "Incompatible export options" in result.output - + + def test_export_current_name(): import glob import os @@ -356,6 +383,40 @@ def test_export_skip_edited(): assert "St James Park_edited.jpeg" not in files +@pytest.mark.skipif(exiftool is None, reason="exiftool not installed") +def test_export_exiftool(): + import glob + import os + import os.path + import osxphotos + from osxphotos.__main__ import export + from osxphotos.exiftool import ExifTool + + runner = CliRunner() + cwd = os.getcwd() + # pylint: disable=not-context-manager + with runner.isolated_filesystem(): + for uuid in CLI_EXIFTOOL: + result = runner.invoke( + export, + [ + os.path.join(cwd, PHOTOS_DB_15_4), + ".", + "-V", + "--exiftool", + "--uuid", + f"{uuid}", + ], + ) + assert result.exit_code == 0 + files = glob.glob("*") + assert sorted(files) == sorted([CLI_EXIFTOOL[uuid]["File:FileName"]]) + + exif = ExifTool(CLI_EXIFTOOL[uuid]["File:FileName"]).as_dict() + for key in CLI_EXIFTOOL[uuid]: + assert exif[key] == CLI_EXIFTOOL[uuid][key] + + def test_query_date(): import json import osxphotos @@ -1003,3 +1064,243 @@ def test_export_sidecar_keyword_template(): assert sorted(json_got[k]) == sorted(v) else: assert json_got[k] == v + + +def test_export_update_basic(): + """ test export then update """ + import glob + import os + import os.path + + import osxphotos + from osxphotos.__main__ import export, OSXPHOTOS_EXPORT_DB + + runner = CliRunner() + cwd = os.getcwd() + # pylint: disable=not-context-manager + with runner.isolated_filesystem(): + # basic export + result = runner.invoke(export, [os.path.join(cwd, CLI_PHOTOS_DB), ".", "-V"]) + assert result.exit_code == 0 + files = glob.glob("*") + assert sorted(files) == sorted(CLI_EXPORT_FILENAMES) + assert os.path.isfile(OSXPHOTOS_EXPORT_DB) + + # update + result = runner.invoke( + export, [os.path.join(cwd, CLI_PHOTOS_DB), ".", "--update"] + ) + assert result.exit_code == 0 + assert ( + "Exported: 0 photos, updated: 0 photos, skipped: 8 photos, updated EXIF data: 0 photos" + in result.output + ) + + +@pytest.mark.skipif(exiftool is None, reason="exiftool not installed") +def test_export_update_exiftool(): + """ test export then update with exiftool """ + import glob + import os + import os.path + + import osxphotos + from osxphotos.__main__ import export + + runner = CliRunner() + cwd = os.getcwd() + # pylint: disable=not-context-manager + with runner.isolated_filesystem(): + # basic export + result = runner.invoke(export, [os.path.join(cwd, CLI_PHOTOS_DB), ".", "-V"]) + assert result.exit_code == 0 + files = glob.glob("*") + assert sorted(files) == sorted(CLI_EXPORT_FILENAMES) + + # update with exiftool + result = runner.invoke( + export, [os.path.join(cwd, CLI_PHOTOS_DB), ".", "--update", "--exiftool"] + ) + assert result.exit_code == 0 + assert ( + "Exported: 0 photos, updated: 8 photos, skipped: 0 photos, updated EXIF data: 8 photos" + in result.output + ) + + +def test_export_update_hardlink(): + """ test export with hardlink then update """ + import glob + import os + import os.path + + import osxphotos + from osxphotos.__main__ import export + + photosdb = osxphotos.PhotosDB(dbfile=CLI_PHOTOS_DB) + photo = photosdb.photos(uuid=[CLI_EXPORT_UUID])[0] + + runner = CliRunner() + cwd = os.getcwd() + # pylint: disable=not-context-manager + with runner.isolated_filesystem(): + # basic export + result = runner.invoke( + export, + [os.path.join(cwd, CLI_PHOTOS_DB), ".", "-V", "--export-as-hardlink"], + ) + assert result.exit_code == 0 + files = glob.glob("*") + assert sorted(files) == sorted(CLI_EXPORT_FILENAMES) + assert os.path.samefile(CLI_EXPORT_UUID_FILENAME, photo.path) + + # update, should replace the hardlink files with new copies + result = runner.invoke( + export, [os.path.join(cwd, CLI_PHOTOS_DB), ".", "--update"] + ) + assert result.exit_code == 0 + assert ( + "Exported: 0 photos, updated: 8 photos, skipped: 0 photos, updated EXIF data: 0 photos" + in result.output + ) + assert not os.path.samefile(CLI_EXPORT_UUID_FILENAME, photo.path) + + +@pytest.mark.skipif(exiftool is None, reason="exiftool not installed") +def test_export_update_hardlink_exiftool(): + """ test export with hardlink then update with exiftool """ + import glob + import os + import os.path + + import osxphotos + from osxphotos.__main__ import export + + photosdb = osxphotos.PhotosDB(dbfile=CLI_PHOTOS_DB) + photo = photosdb.photos(uuid=[CLI_EXPORT_UUID])[0] + + runner = CliRunner() + cwd = os.getcwd() + # pylint: disable=not-context-manager + with runner.isolated_filesystem(): + # basic export + result = runner.invoke( + export, + [os.path.join(cwd, CLI_PHOTOS_DB), ".", "-V", "--export-as-hardlink"], + ) + assert result.exit_code == 0 + files = glob.glob("*") + assert sorted(files) == sorted(CLI_EXPORT_FILENAMES) + assert os.path.samefile(CLI_EXPORT_UUID_FILENAME, photo.path) + + # update, should replace the hardlink files with new copies + result = runner.invoke( + export, [os.path.join(cwd, CLI_PHOTOS_DB), ".", "--update", "--exiftool"] + ) + assert result.exit_code == 0 + assert ( + "Exported: 0 photos, updated: 8 photos, skipped: 0 photos, updated EXIF data: 8 photos" + in result.output + ) + assert not os.path.samefile(CLI_EXPORT_UUID_FILENAME, photo.path) + + +def test_export_update_edits(): + """ test export then update after removing and editing files """ + import glob + import os + import os.path + import shutil + + import osxphotos + from osxphotos.__main__ import export + + runner = CliRunner() + cwd = os.getcwd() + # pylint: disable=not-context-manager + with runner.isolated_filesystem(): + # basic export + result = runner.invoke( + export, [os.path.join(cwd, CLI_PHOTOS_DB), ".", "-V", "--export-by-date"] + ) + assert result.exit_code == 0 + + # change a couple of destination photos + os.unlink(CLI_EXPORT_BY_DATE[1]) + shutil.copyfile(CLI_EXPORT_BY_DATE[0], CLI_EXPORT_BY_DATE[1]) + os.unlink(CLI_EXPORT_BY_DATE[0]) + + # update + result = runner.invoke( + export, + [os.path.join(cwd, CLI_PHOTOS_DB), ".", "--update", "--export-by-date"], + ) + assert result.exit_code == 0 + assert ( + "Exported: 1 photo, updated: 1 photo, skipped: 6 photos, updated EXIF data: 0 photos" + in result.output + ) + + +def test_export_update_no_db(): + """ test export then update after db has been deleted """ + import glob + import os + import os.path + + import osxphotos + from osxphotos.__main__ import export, OSXPHOTOS_EXPORT_DB + + runner = CliRunner() + cwd = os.getcwd() + # pylint: disable=not-context-manager + with runner.isolated_filesystem(): + # basic export + result = runner.invoke(export, [os.path.join(cwd, CLI_PHOTOS_DB), ".", "-V"]) + assert result.exit_code == 0 + files = glob.glob("*") + assert sorted(files) == sorted(CLI_EXPORT_FILENAMES) + assert os.path.isfile(OSXPHOTOS_EXPORT_DB) + os.unlink(OSXPHOTOS_EXPORT_DB) + + # update + result = runner.invoke( + export, [os.path.join(cwd, CLI_PHOTOS_DB), ".", "--update"] + ) + assert result.exit_code == 0 + assert ( + "Exported: 0 photos, updated: 0 photos, skipped: 8 photos, updated EXIF data: 0 photos" + in result.output + ) + assert os.path.isfile(OSXPHOTOS_EXPORT_DB) + + +def test_export_then_hardlink(): + """ test export then hardlink """ + import glob + import os + import os.path + + import osxphotos + from osxphotos.__main__ import export + + photosdb = osxphotos.PhotosDB(dbfile=CLI_PHOTOS_DB) + photo = photosdb.photos(uuid=[CLI_EXPORT_UUID])[0] + + runner = CliRunner() + cwd = os.getcwd() + # pylint: disable=not-context-manager + with runner.isolated_filesystem(): + # basic export + result = runner.invoke(export, [os.path.join(cwd, CLI_PHOTOS_DB), ".", "-V",],) + assert result.exit_code == 0 + files = glob.glob("*") + assert sorted(files) == sorted(CLI_EXPORT_FILENAMES) + assert not os.path.samefile(CLI_EXPORT_UUID_FILENAME, photo.path) + + result = runner.invoke( + export, [os.path.join(cwd, CLI_PHOTOS_DB), ".", "--export-as-hardlink", "--overwrite"] + ) + assert result.exit_code == 0 + assert "Exported: 8 photos" in result.output + assert os.path.samefile(CLI_EXPORT_UUID_FILENAME, photo.path)