Added --update to CLI export; reference issue #100

This commit is contained in:
Rhet Turnbull
2020-05-23 09:34:04 -07:00
parent 8c4fe40aa6
commit b1171e96cc
10 changed files with 2072 additions and 622 deletions

View File

@@ -201,7 +201,11 @@ Options:
Search by end item date, e.g.
2000-01-12T12:00:00 or 2000-12-31 (ISO 8601
w/o TZ).
--update Only export new or updated files. See notes
below on export and --update.
--export-as-hardlink Hardlink files instead of copying them.
Cannot be used with --exiftool which creates
copies of the files with embedded EXIF data.
--overwrite Overwrite existing files. Default behavior
is to add (1), (2), etc to filename if file
already exists. Use this with caution as it
@@ -270,7 +274,8 @@ Options:
exported photos. To use this option,
exiftool must be installed and in the path.
exiftool may be installed from
https://exiftool.org/
https://exiftool.org/. Cannot be used with
--export-as-hardlink.
--directory DIRECTORY Optional template for specifying name of
output directory in the form
'{name,DEFAULT}'. See below for additional
@@ -282,7 +287,35 @@ Options:
get an error while exporting.
-h, --help Show this message and exit.
**Templating System**
** Export **
When exporting photos, osxphotos creates a database in the top-level export
folder called '.osxphotos_export.db'. This database preserves state
information used for determining which files need to be updated when run with
--update. It is recommended that if you later move the export folder tree you
also move the database file.
The --update option will only copy new or updated files from the library to
the export folder. If a file is changed in the export folder (for example,
you edited the exported image), osxphotos will detect this as a difference and
re-export the original image from the library thus overwriting the changes.
If using --update, the exported library should be treated as a backup, not a
working copy where you intend to make changes.
Note: The number of files reported for export and the number actually exported
may differ due to live photos, associated RAW images, and edited photos which
are reported in the total photos exported.
Implementation note: To determine which files need to be updated, osxphotos
stores file signature information in the '.osxphotos_export.db' database. The
signature includes size, modification time, and filename. In order to
minimize run time, --update does not do a full comparison (diff) of the files
nor does it compare hashes of the files. In normal usage, this is sufficient
for updating the library. You can always run export without the --update
option to re-export the entire library thus rebuilding the
'.osxphotos_export.db' database.
** Templating System **
With the --directory option you may specify a template for the export
directory. This directory will be appended to the export path specified in
@@ -1098,7 +1131,7 @@ Export photo from the Photos library to another destination on disk.
- use_albums_as_keywords: (boolean, default = False); if True, will use album names as keywords when exporting metadata with exiftool or sidecar
- use_persons_as_keywords: (boolean, default = False); if True, will use person names as keywords when exporting metadata with exiftool or sidecar
Returns: list of paths to exported files. More than one file could be exported, for example if live_photo=True, both the original imaage and the associated .mov file will be exported
Returns: list of paths to exported files. More than one file could be exported, for example if live_photo=True, both the original image and the associated .mov file will be exported
The json sidecar file can be used by exiftool to apply the metadata from the json file to the image. For example:

View File

@@ -6,6 +6,7 @@ import os
import os.path
import pathlib
import sys
import time
import click
import yaml
@@ -21,11 +22,25 @@ import osxphotos
from ._constants import _EXIF_TOOL_URL, _PHOTOS_4_VERSION, _UNKNOWN_PLACE
from ._version import __version__
from .exiftool import get_exiftool_path
from .photoinfo import ExportResults
from .photoinfo.template import (
TEMPLATE_SUBSTITUTIONS,
TEMPLATE_SUBSTITUTIONS_MULTI_VALUED,
)
from .utils import _copy_file, create_path_by_date
from ._export_db import ExportDB
# global variable to control verbose output
# set via --verbose/-V
VERBOSE = False
# name of export DB
OSXPHOTOS_EXPORT_DB = ".osxphotos_export.db"
def verbose(*args, **kwargs):
if VERBOSE:
click.echo(*args, **kwargs)
def get_photos_db(*db_options):
@@ -77,9 +92,41 @@ class ExportCommand(click.Command):
help_text = super().get_help(ctx)
formatter = click.HelpFormatter()
formatter.write("\n\n")
# passed to click.HelpFormatter.write_dl for formatting
formatter.write_text("**Templating System**")
formatter.write("\n\n")
formatter.write_text("** Export **")
formatter.write_text(
"When exporting photos, osxphotos creates a database in the top-level "
+ f"export folder called '{OSXPHOTOS_EXPORT_DB}'. This database preserves state information "
+ "used for determining which files need to be updated when run with --update. It is recommended "
+ "that if you later move the export folder tree you also move the database file."
)
formatter.write("\n")
formatter.write_text(
"The --update option will only copy new or updated files from the library "
+ "to the export folder. If a file is changed in the export folder (for example, you edited the "
+ "exported image), osxphotos will detect this as a difference and re-export the original image "
+ "from the library thus overwriting the changes. If using --update, the exported library "
+ "should be treated as a backup, not a working copy where you intend to make changes. "
)
formatter.write("\n")
formatter.write_text("Note: The number of files reported for export and the number actually exported "
+"may differ due to live photos, associated RAW images, and edited photos which are reported "
+"in the total photos exported.")
formatter.write("\n")
formatter.write_text(
"Implementation note: To determine which files need to be updated, "
+ f"osxphotos stores file signature information in the '{OSXPHOTOS_EXPORT_DB}' database. "
+ "The signature includes size, modification time, and filename. In order to minimize "
+ "run time, --update does not do a full comparison (diff) of the files nor does it compare "
+ "hashes of the files. In normal usage, this is sufficient for updating the library. "
+ "You can always run export without the --update option to re-export the entire library thus "
+ f"rebuilding the '{OSXPHOTOS_EXPORT_DB}' database."
)
formatter.write("\n\n")
formatter.write_text("** Templating System **")
formatter.write("\n")
formatter.write_text(
"With the --directory option you may specify a template for the "
@@ -862,8 +909,13 @@ def query(
@cli.command(cls=ExportCommand)
@DB_OPTION
@click.option("--verbose", "-V", is_flag=True, help="Print verbose output.")
@click.option("--verbose", "-V", "verbose_", is_flag=True, help="Print verbose output.")
@query_options
@click.option(
"--update",
is_flag=True,
help="Only export new or updated files. See notes below on export and --update.",
)
@click.option(
"--export-as-hardlink",
is_flag=True,
@@ -1014,7 +1066,8 @@ def export(
not_shared,
from_date,
to_date,
verbose,
verbose_,
update,
export_as_hardlink,
overwrite,
export_by_date,
@@ -1068,6 +1121,9 @@ def export(
to modify this behavior.
"""
global VERBOSE
VERBOSE = True if verbose_ else False
if not os.path.isdir(dest):
sys.exit("DEST must be valid path")
@@ -1092,7 +1148,7 @@ def export(
(any(place), no_place),
]
if any([all(bb) for bb in exclusive]):
click.echo("Incompatible export options",err=True)
click.echo("Incompatible export options", err=True)
click.echo(cli.commands["export"].get_help(ctx), err=True)
return
@@ -1134,6 +1190,9 @@ def export(
_list_libraries()
return
# open export database
export_db = ExportDB(os.path.join(dest, OSXPHOTOS_EXPORT_DB))
photos = _query(
db=db,
keyword=keyword,
@@ -1188,6 +1247,11 @@ def export(
no_place=no_place,
)
results_exported = []
results_new = []
results_updated = []
results_skipped = []
results_exif_updated = []
if photos:
if export_bursts:
# add the burst_photos to the export set
@@ -1199,16 +1263,18 @@ def export(
num_photos = len(photos)
photo_str = "photos" if num_photos > 1 else "photo"
click.echo(f"Exporting {num_photos} {photo_str} to {dest}...")
if not verbose:
start_time = time.perf_counter()
if not verbose_:
# show progress bar
with click.progressbar(photos) as bar:
for p in bar:
export_photo(
results = export_photo(
p,
dest,
verbose,
verbose_,
export_by_date,
sidecar,
update,
export_as_hardlink,
overwrite,
export_edited,
@@ -1222,15 +1288,22 @@ def export(
album_keyword,
person_keyword,
keyword_template,
export_db,
)
results_exported.extend(results.exported)
results_new.extend(results.new)
results_updated.extend(results.updated)
results_skipped.extend(results.skipped)
results_exif_updated.extend(results.exif_updated)
else:
for p in photos:
export_paths = export_photo(
results = export_photo(
p,
dest,
verbose,
verbose_,
export_by_date,
sidecar,
update,
export_as_hardlink,
overwrite,
export_edited,
@@ -1244,14 +1317,40 @@ def export(
album_keyword,
person_keyword,
keyword_template,
export_db,
)
if export_paths:
click.echo(f"Exported {p.filename} to {export_paths}")
else:
click.echo(f"Did not export missing file {p.filename}")
results_exported.extend(results.exported)
results_new.extend(results.new)
results_updated.extend(results.updated)
results_skipped.extend(results.skipped)
results_exif_updated.extend(results.exif_updated)
stop_time = time.perf_counter()
# print summary results
if not update:
photo_str = "photos" if len(results_exported) != 1 else "photo"
click.echo(f"Exported: {len(results_exported)} {photo_str}")
click.echo(f"Elapsed time: {stop_time-start_time} seconds")
else:
photo_str_new = "photos" if len(results_new) != 1 else "photo"
photo_str_updated = "photos" if len(results_new) != 1 else "photo"
photo_str_skipped = "photos" if len(results_skipped) != 1 else "photo"
photo_str_exif_updated = (
"photos" if len(results_exif_updated) != 1 else "photo"
)
click.echo(
f"Exported: {len(results_new)} {photo_str_new}, "
+ f"updated: {len(results_updated)} {photo_str_updated}, "
+ f"skipped: {len(results_skipped)} {photo_str_skipped}, "
+ f"updated EXIF data: {len(results_exif_updated)} {photo_str_exif_updated}"
)
click.echo(f"Elapsed time: {stop_time-start_time} seconds")
else:
click.echo("Did not find any photos to export")
export_db.close()
@cli.command()
@click.argument("topic", default=None, required=False, nargs=1)
@@ -1618,9 +1717,10 @@ def _query(
def export_photo(
photo,
dest,
verbose,
verbose_,
export_by_date,
sidecar,
update,
export_as_hardlink,
overwrite,
export_edited,
@@ -1634,11 +1734,12 @@ def export_photo(
album_keyword,
person_keyword,
keyword_template,
export_db,
):
""" Helper function for export that does the actual export
photo: PhotoInfo object
dest: destination path as string
verbose: boolean; print verbose output
verbose_: boolean; print verbose output
export_by_date: boolean; create export folder in form dest/YYYY/MM/DD
sidecar: list zero, 1 or 2 of ["json","xmp"] of sidecar variety to export
export_as_hardlink: boolean; hardlink files instead of copying them
@@ -1656,6 +1757,8 @@ def export_photo(
keyword_template: list of strings; if provided use rendered template strings as keywords
returns list of path(s) of exported photo or None if photo was missing
"""
global VERBOSE
VERBOSE = True if verbose_ else False
# Can export to multiple paths
# Start with single path [dest] but direcotry and export_by_date will modify dest_paths
@@ -1663,21 +1766,21 @@ def export_photo(
if not download_missing:
if photo.ismissing:
space = " " if not verbose else ""
click.echo(f"{space}Skipping missing photo {photo.filename}")
return None
space = " " if not verbose_ else ""
verbose(f"{space}Skipping missing photo {photo.filename}")
return ExportResults([], [], [], [], [])
elif not os.path.exists(photo.path):
space = " " if not verbose else ""
click.echo(
space = " " if not verbose_ else ""
verbose(
f"{space}WARNING: file {photo.path} is missing but ismissing=False, "
f"skipping {photo.filename}"
)
return None
return ExportResults([], [], [], [], [])
elif photo.ismissing and not photo.iscloudasset or not photo.incloud:
click.echo(
verbose(
f"Skipping missing {photo.filename}: not iCloud asset or missing from cloud"
)
return None
return ExportResults([], [], [], [], [])
filename = None
if original_name:
@@ -1685,8 +1788,7 @@ def export_photo(
else:
filename = photo.filename
if verbose:
click.echo(f"Exporting {photo.filename} as {filename}")
verbose(f"Exporting {photo.filename} as {filename}")
if export_by_date:
date_created = photo.date.timetuple()
@@ -1724,9 +1826,13 @@ def export_photo(
)
# export the photo to each path in dest_paths
photo_paths = []
results_exported = []
results_new = []
results_updated = []
results_skipped = []
results_exif_updated = []
for dest_path in dest_paths:
photo_path = photo.export(
export_results = photo.export2(
dest_path,
filename,
sidecar_json=sidecar_json,
@@ -1741,8 +1847,25 @@ def export_photo(
use_albums_as_keywords=album_keyword,
use_persons_as_keywords=person_keyword,
keyword_template=keyword_template,
)[0]
photo_paths.append(photo_path)
update=update,
export_db=export_db,
)
results_exported.extend(export_results.exported)
results_new.extend(export_results.new)
results_updated.extend(export_results.updated)
results_skipped.extend(export_results.skipped)
results_exif_updated.extend(export_results.exif_updated)
if verbose_:
for exported in export_results.exported:
verbose(f"Exported {exported}")
for new in export_results.new:
verbose(f"Exported new file {new}")
for updated in export_results.updated:
verbose(f"Exported updated file {updated}")
for skipped in export_results.skipped:
verbose(f"Skipped up to date file {skipped}")
# if export-edited, also export the edited version
# verify the photo has adjustments and valid path to avoid raising an exception
@@ -1751,7 +1874,7 @@ def export_photo(
# try to download with Photos
use_photos_export = download_missing and photo.path_edited is None
if not download_missing and photo.path_edited is None:
click.echo(f"Skipping missing edited photo for {filename}")
verbose(f"Skipping missing edited photo for {filename}")
else:
edited_name = pathlib.Path(filename)
# check for correct edited suffix
@@ -1762,11 +1885,8 @@ def export_photo(
# will be corrected by use_photos_export
edited_suffix = pathlib.Path(photo.filename).suffix
edited_name = f"{edited_name.stem}_edited{edited_suffix}"
if verbose:
click.echo(
f"Exporting edited version of {filename} as {edited_name}"
)
photo.export(
verbose(f"Exporting edited version of {filename} as {edited_name}")
export_results_edited = photo.export2(
dest_path,
edited_name,
sidecar_json=sidecar_json,
@@ -1780,9 +1900,33 @@ def export_photo(
use_albums_as_keywords=album_keyword,
use_persons_as_keywords=person_keyword,
keyword_template=keyword_template,
update=update,
export_db=export_db,
)
return photo_paths
results_exported.extend(export_results_edited.exported)
results_new.extend(export_results_edited.new)
results_updated.extend(export_results_edited.updated)
results_skipped.extend(export_results_edited.skipped)
results_exif_updated.extend(export_results_edited.exif_updated)
if verbose_:
for exported in export_results_edited.exported:
verbose(f"Exported {exported}")
for new in export_results_edited.new:
verbose(f"Exported new file {new}")
for updated in export_results_edited.updated:
verbose(f"Exported updated file {updated}")
for skipped in export_results_edited.skipped:
verbose(f"Skipped up to date file {skipped}")
return ExportResults(
results_exported,
results_new,
results_updated,
results_skipped,
results_exif_updated,
)
if __name__ == "__main__":

439
osxphotos/_export_db.py Normal file
View File

@@ -0,0 +1,439 @@
""" Helper class for managing a database used by
PhotoInfo.export for tracking state of exports and updates
"""
import datetime
import logging
import os
import pathlib
import sqlite3
import sys
from abc import ABC, abstractmethod
from sqlite3 import Error
from ._version import __version__
OSXPHOTOS_EXPORTDB_VERSION = "1.0"
class ExportDB_ABC(ABC):
@abstractmethod
def get_uuid_for_file(self, filename):
pass
@abstractmethod
def set_uuid_for_file(self, filename, uuid):
pass
@abstractmethod
def set_stat_orig_for_file(self, filename, stats):
pass
@abstractmethod
def get_stat_orig_for_file(self, filename):
pass
@abstractmethod
def set_stat_exif_for_file(self, filename, stats):
pass
@abstractmethod
def get_stat_exif_for_file(self, filename):
pass
@abstractmethod
def get_info_for_uuid(self, uuid):
pass
@abstractmethod
def set_info_for_uuid(self, uuid, info):
pass
@abstractmethod
def get_exifdata_for_file(self, uuid):
pass
@abstractmethod
def set_exifdata_for_file(self, uuid, exifdata):
pass
@abstractmethod
def set_data(self, file, uuid, orig_stat, exif_stat, info_json, exif_json):
pass
class ExportDBNoOp(ExportDB_ABC):
""" An ExportDB with NoOp methods """
def get_uuid_for_file(self, filename):
pass
def set_uuid_for_file(self, filename, uuid):
pass
def set_stat_orig_for_file(self, filename, stats):
pass
def get_stat_orig_for_file(self, filename):
pass
def set_stat_exif_for_file(self, filename, stats):
pass
def get_stat_exif_for_file(self, filename):
pass
def get_info_for_uuid(self, uuid):
pass
def set_info_for_uuid(self, uuid, info):
pass
def get_exifdata_for_file(self, uuid):
pass
def set_exifdata_for_file(self, uuid, exifdata):
pass
def set_data(self, file, uuid, orig_stat, exif_stat, info_json, exif_json):
pass
class ExportDB(ExportDB_ABC):
""" Interface to sqlite3 database used to store state information for osxphotos export command """
def __init__(self, dbfile):
""" dbfile: path to osxphotos export database file """
self._dbfile = dbfile
# _path is parent of the database
# all files referenced by get_/set_uuid_for_file will be converted to
# relative paths to this parent _path
# this allows the entire export tree to be moved to a new disk/location
# whilst preserving the UUID to filename mappping
self._path = pathlib.Path(dbfile).parent
self._conn = self._open_export_db(dbfile)
self._insert_run_info()
def get_uuid_for_file(self, filename):
""" query database for filename and return UUID
returns None if filename not found in database
"""
filename = str(pathlib.Path(filename).relative_to(self._path)).lower()
logging.debug(f"get_uuid: {filename}")
conn = self._conn
try:
c = conn.cursor()
c.execute(
f"SELECT uuid FROM files WHERE filepath_normalized = ?", (filename,)
)
results = c.fetchone()
uuid = results[0] if results else None
except Error as e:
logging.warning(e)
uuid = None
logging.debug(f"get_uuid: {uuid}")
return uuid
def set_uuid_for_file(self, filename, uuid):
""" set UUID of filename to uuid in the database """
filename = str(pathlib.Path(filename).relative_to(self._path))
filename_normalized = filename.lower()
logging.debug(f"set_uuid: {filename} {uuid}")
conn = self._conn
try:
c = conn.cursor()
c.execute(
f"INSERT OR REPLACE INTO files(filepath, filepath_normalized, uuid) VALUES (?, ?, ?);",
(filename, filename_normalized, uuid),
)
conn.commit()
except Error as e:
logging.warning(e)
def set_stat_orig_for_file(self, filename, stats):
""" set stat info for filename
filename: filename to set the stat info for
stat: a tuple of length 3: mode, size, mtime """
filename = str(pathlib.Path(filename).relative_to(self._path)).lower()
if len(stats) != 3:
raise ValueError(f"expected 3 elements for stat, got {len(stats)}")
logging.debug(f"set_stat_orig_for_file: {filename} {stats}")
conn = self._conn
try:
c = conn.cursor()
c.execute(
"UPDATE files "
+ "SET orig_mode = ?, orig_size = ?, orig_mtime = ? "
+ "WHERE filepath_normalized = ?;",
(*stats, filename),
)
conn.commit()
except Error as e:
logging.warning(e)
def get_stat_orig_for_file(self, filename):
""" get stat info for filename
returns: tuple of (mode, size, mtime)
"""
filename = str(pathlib.Path(filename).relative_to(self._path)).lower()
conn = self._conn
try:
c = conn.cursor()
c.execute(
"SELECT orig_mode, orig_size, orig_mtime FROM files WHERE filepath_normalized = ?",
(filename,),
)
results = c.fetchone()
stats = results[0:3] if results else None
except Error as e:
logging.warning(e)
stats = (None, None, None)
logging.debug(f"get_stat_orig_for_file: {stats}")
return stats
def set_stat_exif_for_file(self, filename, stats):
""" set stat info for filename (after exiftool has updated it)
filename: filename to set the stat info for
stat: a tuple of length 3: mode, size, mtime """
filename = str(pathlib.Path(filename).relative_to(self._path)).lower()
if len(stats) != 3:
raise ValueError(f"expected 3 elements for stat, got {len(stats)}")
logging.debug(f"set_stat_exif_for_file: {filename} {stats}")
conn = self._conn
try:
c = conn.cursor()
c.execute(
"UPDATE files "
+ "SET exif_mode = ?, exif_size = ?, exif_mtime = ? "
+ "WHERE filepath_normalized = ?;",
(*stats, filename),
)
conn.commit()
except Error as e:
logging.warning(e)
def get_stat_exif_for_file(self, filename):
""" get stat info for filename (after exiftool has updated it)
returns: tuple of (mode, size, mtime)
"""
filename = str(pathlib.Path(filename).relative_to(self._path)).lower()
conn = self._conn
try:
c = conn.cursor()
c.execute(
"SELECT exif_mode, exif_size, exif_mtime FROM files WHERE filepath_normalized = ?",
(filename,),
)
results = c.fetchone()
stats = results[0:3] if results else None
except Error as e:
logging.warning(e)
stats = (None, None, None)
logging.debug(f"get_stat_exif_for_file: {stats}")
return stats
def get_info_for_uuid(self, uuid):
""" returns the info JSON struct for a UUID """
conn = self._conn
try:
c = conn.cursor()
c.execute("SELECT json_info FROM info WHERE uuid = ?", (uuid,))
results = c.fetchone()
info = results[0] if results else None
except Error as e:
logging.warning(e)
info = None
logging.debug(f"get_info: {uuid}, {info}")
return info
def set_info_for_uuid(self, uuid, info):
""" sets the info JSON struct for a UUID """
conn = self._conn
try:
c = conn.cursor()
c.execute(
"INSERT OR REPLACE INTO info(uuid, json_info) VALUES (?, ?);",
(uuid, info),
)
conn.commit()
except Error as e:
logging.warning(e)
logging.debug(f"set_info: {uuid}, {info}")
def get_exifdata_for_file(self, filename):
""" returns the exifdata JSON struct for a file """
filename = str(pathlib.Path(filename).relative_to(self._path)).lower()
conn = self._conn
try:
c = conn.cursor()
c.execute(
"SELECT json_exifdata FROM exifdata WHERE filepath_normalized = ?",
(filename,),
)
results = c.fetchone()
exifdata = results[0] if results else None
except Error as e:
logging.warning(e)
exifdata = None
logging.debug(f"get_exifdata: {filename}, {exifdata}")
return exifdata
def set_exifdata_for_file(self, filename, exifdata):
""" sets the exifdata JSON struct for a file """
filename = str(pathlib.Path(filename).relative_to(self._path)).lower()
conn = self._conn
try:
c = conn.cursor()
c.execute(
"INSERT OR REPLACE INTO exifdata(filepath_normalized, json_exifdata) VALUES (?, ?);",
(filename, exifdata),
)
conn.commit()
except Error as e:
logging.warning(e)
logging.debug(f"set_exifdata: {filename}, {exifdata}")
def set_data(self, file, uuid, orig_stat, exif_stat, info_json, exif_json):
""" sets all the data for file and uuid at once
calls set_uuid_for_file
set_info_for_uuid
set_stat_orig_for_file
set_stat_exif_for_file
set_exifdata_for_file
"""
filename = str(pathlib.Path(filename).relative_to(self._path)).lower()
self.set_uuid_for_file(filename, uuid)
self.set_info_for_uuid(uuid, info_json)
self.set_stat_orig_for_file(filename, orig_stat)
self.set_stat_exif_for_file(filename, exif_stat)
self.set_exifdata_for_file(filename, exif_json)
def close(self):
""" close the database connection """
try:
self._conn.close()
except Error as e:
logging.warning(e)
def _open_export_db(self, dbfile):
""" open export database and return a db connection
if dbfile does not exist, will create and initialize the database
returns: connection to the database
"""
if not os.path.isfile(dbfile):
logging.debug(f"dbfile {dbfile} doesn't exist, creating it")
conn = self._get_db_connection(dbfile)
if conn:
self._create_db_tables(conn)
else:
raise Exception("Error getting connection to database {dbfile}")
else:
logging.debug(f"dbfile {dbfile} exists, opening it")
conn = self._get_db_connection(dbfile)
return conn
def _get_db_connection(self, dbfile):
""" return db connection to dbname """
try:
conn = sqlite3.connect(dbfile)
except Error as e:
logging.warning(e)
conn = None
return conn
def _create_db_tables(self, conn):
""" create (if not already created) the necessary db tables for the export database
conn: sqlite3 db connection
"""
sql_commands = {
"sql_version_table": """ CREATE TABLE IF NOT EXISTS version (
id INTEGER PRIMARY KEY,
osxphotos TEXT,
exportdb TEXT
); """,
"sql_files_table": """ CREATE TABLE IF NOT EXISTS files (
id INTEGER PRIMARY KEY,
filepath TEXT NOT NULL,
filepath_normalized TEXT NOT NULL,
uuid TEXT,
orig_mode INTEGER,
orig_size INTEGER,
orig_mtime REAL,
exif_mode INTEGER,
exif_size INTEGER,
exif_mtime REAL
); """,
"sql_runs_table": """ CREATE TABLE IF NOT EXISTS runs (
id INTEGER PRIMARY KEY,
datetime TEXT,
python_path TEXT,
script_name TEXT,
args TEXT,
cwd TEXT
); """,
"sql_info_table": """ CREATE TABLE IF NOT EXISTS info (
id INTEGER PRIMARY KEY,
uuid text NOT NULL,
json_info JSON
); """,
"sql_exifdata_table": """ CREATE TABLE IF NOT EXISTS exifdata (
id INTEGER PRIMARY KEY,
filepath_normalized TEXT NOT NULL,
json_exifdata JSON
); """,
"sql_files_idx": """ CREATE UNIQUE INDEX idx_files_filepath_normalized on files (filepath_normalized); """,
"sql_info_idx": """ CREATE UNIQUE INDEX idx_info_uuid on info (uuid); """,
"sql_exifdata_idx": """ CREATE UNIQUE INDEX idx_exifdata_filename on exifdata (filepath_normalized); """,
}
try:
c = conn.cursor()
for cmd in sql_commands.values():
c.execute(cmd)
c.execute(
"INSERT INTO version(osxphotos, exportdb) VALUES (?, ?);",
(__version__, OSXPHOTOS_EXPORTDB_VERSION),
)
conn.commit()
except Error as e:
logging.warning(e)
def __del__(self):
""" ensure the database connection is closed """
if self._conn:
try:
self._conn.close()
except Error as e:
logging.warning(e)
def _insert_run_info(self):
dt = datetime.datetime.utcnow().isoformat()
python_path = sys.executable
cmd = sys.argv[0]
if len(sys.argv) > 1:
args = " ".join(sys.argv[1:])
else:
args = ""
cwd = os.getcwd()
conn = self._conn
try:
c = conn.cursor()
c.execute(
f"INSERT INTO runs (datetime, python_path, script_name, args, cwd) VALUES (?, ?, ?, ?, ?)",
(dt, python_path, cmd, args, cwd),
)
conn.commit()
except Error as e:
logging.warning(e)

54
osxphotos/_filecmp.py Normal file
View File

@@ -0,0 +1,54 @@
"""Utilities for comparing files
Modified from CPython/Lib/filecmp.py
Functions:
cmp_file(f1, s2) -> int
file_sig(f1) -> Tuple[int, int, float]
"""
import os
import stat
__all__ = ["cmp", "sig"]
def cmp_file(f1, s2):
"""Compare file f1 to signature s2.
Arguments:
f1 -- File name
s2 -- stats as returned by sig
Return value:
True if the files are the same, False otherwise.
This function uses a cache for past comparisons and the results,
with cache entries invalidated if their stat information
changes. The cache may be cleared by calling clear_cache().
"""
if not s2:
return False
s1 = _sig(os.stat(f1))
if s1[0] != stat.S_IFREG or s2[0] != stat.S_IFREG:
return False
if s1 == s2:
return True
return False
def _sig(st):
return (stat.S_IFMT(st.st_mode), st.st_size, st.st_mtime)
def file_sig(f1):
""" return os.stat signature for file f1 """
return _sig(os.stat(f1))

View File

@@ -4,5 +4,6 @@ Represents a single photo in the Photos library and provides access to the photo
PhotosDB.photos() returns a list of PhotoInfo objects
"""
from .photoinfo import PhotoInfo
from ._photoinfo_exifinfo import ExifInfo
from ._photoinfo_export import ExportResults
from .photoinfo import PhotoInfo

View File

@@ -0,0 +1,999 @@
""" export methods for PhotoInfo """
# TODO: should this be its own PhotoExporter class?
import filecmp
import glob
import json
import logging
import os
import pathlib
import re
from collections import namedtuple # pylint: disable=syntax-error
from mako.template import Template
from .._constants import (
_MAX_IPTC_KEYWORD_LEN,
_OSXPHOTOS_NONE_SENTINEL,
_TEMPLATE_DIR,
_UNKNOWN_PERSON,
_XMP_TEMPLATE_NAME,
)
from ..exiftool import ExifTool
from .._export_db import ExportDBNoOp
from .._filecmp import cmp_file, file_sig
from ..utils import (
_copy_file,
_export_photo_uuid_applescript,
_hardlink_file,
dd_to_dms_str,
)
ExportResults = namedtuple(
"ExportResults", ["exported", "new", "updated", "skipped", "exif_updated"]
)
def export(
self,
dest,
*filename,
edited=False,
live_photo=False,
raw_photo=False,
export_as_hardlink=False,
overwrite=False,
increment=True,
sidecar_json=False,
sidecar_xmp=False,
use_photos_export=False,
timeout=120,
exiftool=False,
no_xattr=False,
use_albums_as_keywords=False,
use_persons_as_keywords=False,
keyword_template=None,
):
""" export photo
dest: must be valid destination path (or exception raised)
filename: (optional): name of exported picture; if not provided, will use current filename
**NOTE**: if provided, user must ensure file extension (suffix) is correct.
For example, if photo is .CR2 file, edited image may be .jpeg.
If you provide an extension different than what the actual file is,
export will print a warning but will happily export the photo using the
incorrect file extension. e.g. to get the extension of the edited photo,
reference PhotoInfo.path_edited
edited: (boolean, default=False); if True will export the edited version of the photo
(or raise exception if no edited version)
live_photo: (boolean, default=False); if True, will also export the associted .mov for live photos
raw_photo: (boolean, default=False); if True, will also export the associted RAW photo
export_as_hardlink: (boolean, default=False); if True, will hardlink files instead of copying them
overwrite: (boolean, default=False); if True will overwrite files if they alreay exist
increment: (boolean, default=True); if True, will increment file name until a non-existant name is found
if overwrite=False and increment=False, export will fail if destination file already exists
sidecar_json: (boolean, default = False); if True will also write a json sidecar with IPTC data in format readable by exiftool
sidecar filename will be dest/filename.json
sidecar_xmp: (boolean, default = False); if True will also write a XMP sidecar with IPTC data
sidecar filename will be dest/filename.xmp
use_photos_export: (boolean, default=False); if True will attempt to export photo via applescript interaction with Photos
timeout: (int, default=120) timeout in seconds used with use_photos_export
exiftool: (boolean, default = False); if True, will use exiftool to write metadata to export file
no_xattr: (boolean, default = False); if True, exports file without preserving extended attributes
returns list of full paths to the exported files
use_albums_as_keywords: (boolean, default = False); if True, will include album names in keywords
when exporting metadata with exiftool or sidecar
use_persons_as_keywords: (boolean, default = False); if True, will include person names in keywords
when exporting metadata with exiftool or sidecar
keyword_template: (list of strings); list of template strings that will be rendered as used as keywords
returns: list of photos exported
"""
# Implementation note: calls export2 to actually do the work
results = self.export2(
dest,
*filename,
edited=edited,
live_photo=live_photo,
raw_photo=raw_photo,
export_as_hardlink=export_as_hardlink,
overwrite=overwrite,
increment=increment,
sidecar_json=sidecar_json,
sidecar_xmp=sidecar_xmp,
use_photos_export=use_photos_export,
timeout=timeout,
exiftool=exiftool,
no_xattr=no_xattr,
use_albums_as_keywords=use_albums_as_keywords,
use_persons_as_keywords=use_persons_as_keywords,
keyword_template=keyword_template,
)
return results.exported
def export2(
self,
dest,
*filename,
edited=False,
live_photo=False,
raw_photo=False,
export_as_hardlink=False,
overwrite=False,
increment=True,
sidecar_json=False,
sidecar_xmp=False,
use_photos_export=False,
timeout=120,
exiftool=False,
no_xattr=False,
use_albums_as_keywords=False,
use_persons_as_keywords=False,
keyword_template=None,
update=False,
export_db=None,
):
""" export photo
dest: must be valid destination path (or exception raised)
filename: (optional): name of exported picture; if not provided, will use current filename
**NOTE**: if provided, user must ensure file extension (suffix) is correct.
For example, if photo is .CR2 file, edited image may be .jpeg.
If you provide an extension different than what the actual file is,
export will print a warning but will happily export the photo using the
incorrect file extension. e.g. to get the extension of the edited photo,
reference PhotoInfo.path_edited
edited: (boolean, default=False); if True will export the edited version of the photo
(or raise exception if no edited version)
live_photo: (boolean, default=False); if True, will also export the associted .mov for live photos
raw_photo: (boolean, default=False); if True, will also export the associted RAW photo
export_as_hardlink: (boolean, default=False); if True, will hardlink files instead of copying them
overwrite: (boolean, default=False); if True will overwrite files if they alreay exist
increment: (boolean, default=True); if True, will increment file name until a non-existant name is found
if overwrite=False and increment=False, export will fail if destination file already exists
sidecar_json: (boolean, default = False); if True will also write a json sidecar with IPTC data in format readable by exiftool
sidecar filename will be dest/filename.json
sidecar_xmp: (boolean, default = False); if True will also write a XMP sidecar with IPTC data
sidecar filename will be dest/filename.xmp
use_photos_export: (boolean, default=False); if True will attempt to export photo via applescript interaction with Photos
timeout: (int, default=120) timeout in seconds used with use_photos_export
exiftool: (boolean, default = False); if True, will use exiftool to write metadata to export file
no_xattr: (boolean, default = False); if True, exports file without preserving extended attributes
returns list of full paths to the exported files
use_albums_as_keywords: (boolean, default = False); if True, will include album names in keywords
when exporting metadata with exiftool or sidecar
use_persons_as_keywords: (boolean, default = False); if True, will include person names in keywords
when exporting metadata with exiftool or sidecar
keyword_template: (list of strings); list of template strings that will be rendered as used as keywords
update: (boolean, default=False); if True export will run in update mode, that is, it will
not export the photo if the current version already exists in the destination
export_db: (ExportDB_ABC); instance of a class that conforms to ExportDB_ABC with methods
for getting/setting data related to exported files to compare update state
Returns: ExportResults namedtuple with fields: exported, new, updated, skipped
where each field is a list of file paths
"""
# if update, caller may pass function refs to get/set uuid for file being exported
# and for setting/getting the PhotoInfo json info for an exported file
if export_db is None:
export_db = ExportDBNoOp()
# suffix to add to edited files
# e.g. name will be filename_edited.jpg
edited_identifier = "_edited"
# list of all files exported during this call to export
exported_files = []
# list of new files during update
update_new_files = []
# list of files that were updated
update_updated_files = []
# list of all files skipped because they do not need to be updated (for use with update=True)
update_skipped_files = []
# check edited and raise exception trying to export edited version of
# photo that hasn't been edited
if edited and not self.hasadjustments:
raise ValueError(
"Photo does not have adjustments, cannot export edited version"
)
# check arguments and get destination path and filename (if provided)
if filename and len(filename) > 2:
raise TypeError(
"Too many positional arguments. Should be at most two: destination, filename."
)
else:
# verify destination is a valid path
if dest is None:
raise ValueError("Destination must not be None")
elif not os.path.isdir(dest):
raise FileNotFoundError("Invalid path passed to export")
if filename and len(filename) == 1:
# if filename passed, use it
fname = filename[0]
else:
# no filename provided so use the default
# if edited file requested, use filename but add _edited
# need to use file extension from edited file as Photos saves a jpeg once edited
if edited and not use_photos_export:
# verify we have a valid path_edited and use that to get filename
if not self.path_edited:
raise FileNotFoundError(
"edited=True but path_edited is none; hasadjustments: "
f" {self.hasadjustments}"
)
edited_name = pathlib.Path(self.path_edited).name
edited_suffix = pathlib.Path(edited_name).suffix
fname = (
pathlib.Path(self.filename).stem + edited_identifier + edited_suffix
)
else:
fname = self.filename
# check destination path
dest = pathlib.Path(dest)
fname = pathlib.Path(fname)
dest = dest / fname
# check extension of destination
if edited and self.path_edited is not None:
# use suffix from edited file
actual_suffix = pathlib.Path(self.path_edited).suffix
elif edited:
# use .jpeg as that's probably correct
# if edited and path_edited is None, will raise FileNotFoundError below
# unless use_photos_export is True
actual_suffix = ".jpeg"
else:
# use suffix from the non-edited file
actual_suffix = pathlib.Path(self.filename).suffix
# warn if suffixes don't match but ignore .JPG / .jpeg as
# Photo's often converts .JPG to .jpeg
suffixes = sorted([x.lower() for x in [dest.suffix, actual_suffix]])
if dest.suffix.lower() != actual_suffix.lower() and suffixes != [".jpeg", ".jpg"]:
logging.warning(
f"Invalid destination suffix: {dest.suffix}, should be {actual_suffix}"
)
# check to see if file exists and if so, add (1), (2), etc until we find one that works
# Photos checks the stem and adds (1), (2), etc which avoids collision with sidecars
# e.g. exporting sidecar for file1.png and file1.jpeg
# if file1.png exists and exporting file1.jpeg,
# dest will be file1 (1).jpeg even though file1.jpeg doesn't exist to prevent sidecar collision
if not update and increment and not overwrite:
count = 1
glob_str = str(dest.parent / f"{dest.stem}*")
dest_files = glob.glob(glob_str)
dest_files = [pathlib.Path(f).stem for f in dest_files]
dest_new = dest.stem
while dest_new in dest_files:
dest_new = f"{dest.stem} ({count})"
count += 1
dest = dest.parent / f"{dest_new}{dest.suffix}"
# TODO: need way to check if DB is missing, try to find the right photo anyway by seeing if they're the same and then updating
# move the checks into "if not use_photos_export" block below
# if use_photos_export is True then we'll export wether destination exists or not
# if overwrite==False and #increment==False, export should fail if file exists
if dest.exists() and not update and not overwrite and not increment:
raise FileExistsError(
f"destination exists ({dest}); overwrite={overwrite}, increment={increment}"
)
if not use_photos_export:
# find the source file on disk and export
# get path to source file and verify it's not None and is valid file
# TODO: how to handle ismissing or not hasadjustments and edited=True cases?
if edited:
if self.path_edited is not None:
src = self.path_edited
else:
raise FileNotFoundError(
f"Cannot export edited photo if path_edited is None"
)
else:
if self.ismissing:
logging.debug(
f"Attempting to export photo with ismissing=True: path = {self.path}"
)
if self.path is not None:
src = self.path
else:
raise FileNotFoundError("Cannot export photo if path is None")
if not os.path.isfile(src):
raise FileNotFoundError(f"{src} does not appear to exist")
logging.debug(
f"exporting {src} to {dest}, overwrite={overwrite}, increment={increment}, dest exists: {dest.exists()}"
)
# found source now try to find right destination
if update and dest.exists():
# destination exists, check to see if destination is the right UUID
dest_uuid = export_db.get_uuid_for_file(dest)
if dest_uuid is None and filecmp.cmp(src, dest):
# might be exporting into a pre-ExportDB folder or the DB got deleted
logging.debug(
f"Found matching file with blank uuid: {self.uuid}, {dest}"
)
dest_uuid = self.uuid
export_db.set_uuid_for_file(dest, self.uuid)
export_db.set_info_for_uuid(self.uuid, self.json())
export_db.set_stat_orig_for_file(dest, file_sig(dest))
export_db.set_stat_exif_for_file(dest, (None, None, None))
export_db.set_exifdata_for_file(dest, None)
if dest_uuid != self.uuid:
# not the right file, find the right one
logging.debug(
f"Need to find right photo: uuid={self.uuid}, dest={dest_uuid}, dest={dest}, path={self.path}"
)
count = 1
glob_str = str(dest.parent / f"{dest.stem} (*{dest.suffix}")
dest_files = glob.glob(glob_str)
found_match = False
for file_ in dest_files:
dest_uuid = export_db.get_uuid_for_file(file_)
if dest_uuid == self.uuid:
logging.debug(
f"Found matching file for uuid: {dest_uuid}, {file_}"
)
dest = pathlib.Path(file_)
found_match = True
break
elif dest_uuid is None and filecmp.cmp(src, file_):
# files match, update the UUID
logging.debug(
f"Found matching file with blank uuid: {self.uuid}, {file_}"
)
dest = pathlib.Path(file_)
found_match = True
export_db.set_uuid_for_file(file_, self.uuid)
export_db.set_info_for_uuid(self.uuid, self.json())
export_db.set_stat_orig_for_file(dest, file_sig(dest))
export_db.set_stat_exif_for_file(dest, (None, None, None))
export_db.set_exifdata_for_file(dest, None)
break
if not found_match:
logging.debug(
f"Didn't find destination match for uuid {self.uuid} {dest}"
)
# increment the destination file
count = 1
glob_str = str(dest.parent / f"{dest.stem}*")
dest_files = glob.glob(glob_str)
dest_files = [pathlib.Path(f).stem for f in dest_files]
dest_new = dest.stem
while dest_new in dest_files:
dest_new = f"{dest.stem} ({count})"
count += 1
dest = dest.parent / f"{dest_new}{dest.suffix}"
logging.debug(f"New destination = {dest}, uuid = {self.uuid}")
# export the dest file
results = self._export_photo(
src,
dest,
update,
export_db,
overwrite,
no_xattr,
export_as_hardlink,
exiftool,
)
exported_files = results.exported
update_new_files = results.new
update_updated_files = results.updated
update_skipped_files = results.skipped
# copy live photo associated .mov if requested
if live_photo and self.live_photo:
live_name = dest.parent / f"{dest.stem}.mov"
src_live = self.path_live_photo
if src_live is not None:
logging.debug(
f"Exporting live photo video of {filename} as {live_name.name}"
)
results = self._export_photo(
src_live,
live_name,
update,
export_db,
overwrite,
no_xattr,
export_as_hardlink,
exiftool,
)
exported_files.extend(results.exported)
update_new_files.extend(results.new)
update_updated_files.extend(results.updated)
update_skipped_files.extend(results.skipped)
else:
logging.debug(f"Skipping missing live movie for {filename}")
# copy associated RAW image if requested
if raw_photo and self.has_raw:
raw_path = pathlib.Path(self.path_raw)
raw_ext = raw_path.suffix
raw_name = dest.parent / f"{dest.stem}{raw_ext}"
if raw_path is not None:
logging.debug(f"Exporting RAW photo of {filename} as {raw_name.name}")
results = self._export_photo(
raw_path,
raw_name,
update,
export_db,
overwrite,
no_xattr,
export_as_hardlink,
exiftool,
)
exported_files.extend(results.exported)
update_new_files.extend(results.new)
update_updated_files.extend(results.updated)
update_skipped_files.extend(results.skipped)
else:
logging.debug(f"Skipping missing RAW photo for {filename}")
else:
# use_photo_export
exported = None
# export live_photo .mov file?
live_photo = True if live_photo and self.live_photo else False
if edited:
# exported edited version and not original
if filename:
# use filename stem provided
filestem = dest.stem
else:
# didn't get passed a filename, add _edited
filestem = f"{dest.stem}_edited"
dest = dest.parent / f"{filestem}.jpeg"
exported = _export_photo_uuid_applescript(
self.uuid,
dest.parent,
filestem=filestem,
original=False,
edited=True,
live_photo=live_photo,
timeout=timeout,
burst=self.burst,
)
else:
# export original version and not edited
filestem = dest.stem
exported = _export_photo_uuid_applescript(
self.uuid,
dest.parent,
filestem=filestem,
original=True,
edited=False,
live_photo=live_photo,
timeout=timeout,
burst=self.burst,
)
if exported is not None:
exported_files.extend(exported)
else:
logging.warning(
f"Error exporting photo {self.uuid} to {dest} with use_photos_export"
)
# export metadata
info = export_db.get_info_for_uuid(self.uuid)
if sidecar_json:
logging.debug("writing exiftool_json_sidecar")
sidecar_filename = dest.parent / pathlib.Path(f"{dest.stem}.json")
sidecar_str = self._exiftool_json_sidecar(
use_albums_as_keywords=use_albums_as_keywords,
use_persons_as_keywords=use_persons_as_keywords,
keyword_template=keyword_template,
)
try:
self._write_sidecar(sidecar_filename, sidecar_str)
except Exception as e:
logging.warning(f"Error writing json sidecar to {sidecar_filename}")
raise e
if sidecar_xmp:
logging.debug("writing xmp_sidecar")
sidecar_filename = dest.parent / pathlib.Path(f"{dest.stem}.xmp")
sidecar_str = self._xmp_sidecar(
use_albums_as_keywords=use_albums_as_keywords,
use_persons_as_keywords=use_persons_as_keywords,
keyword_template=keyword_template,
)
try:
self._write_sidecar(sidecar_filename, sidecar_str)
except Exception as e:
logging.warning(f"Error writing xmp sidecar to {sidecar_filename}")
raise e
# if exiftool, write the metadata
if update:
exif_files = update_new_files + update_updated_files + update_skipped_files
else:
exif_files = exported_files
exif_files_updated = []
if exiftool and update and exif_files:
for exported_file in exif_files:
logging.debug(f"checking exif for {exported_file}")
files_are_different = False
old_data = export_db.get_exifdata_for_file(exported_file)
if old_data is not None:
old_data = json.loads(old_data)[0]
current_data = json.loads(
self._exiftool_json_sidecar(
use_albums_as_keywords=use_albums_as_keywords,
use_persons_as_keywords=use_persons_as_keywords,
keyword_template=keyword_template,
)
)[0]
if old_data != current_data:
files_are_different = True
if old_data is None or files_are_different:
# didn't have old data, assume we need to write it
# or files were different
logging.debug(f"No exifdata for {exported_file}, writing it")
self._write_exif_data(
exported_file,
use_albums_as_keywords=use_albums_as_keywords,
use_persons_as_keywords=use_persons_as_keywords,
keyword_template=keyword_template,
)
export_db.set_exifdata_for_file(
exported_file,
self._exiftool_json_sidecar(
use_albums_as_keywords=use_albums_as_keywords,
use_persons_as_keywords=use_persons_as_keywords,
keyword_template=keyword_template,
),
)
export_db.set_stat_exif_for_file(exported_file, file_sig(exported_file))
exif_files_updated.append(exported_file)
elif exiftool and exif_files:
for exported_file in exif_files:
logging.debug(f"Writing exif data to {exported_file}")
self._write_exif_data(
exported_file,
use_albums_as_keywords=use_albums_as_keywords,
use_persons_as_keywords=use_persons_as_keywords,
keyword_template=keyword_template,
)
export_db.set_exifdata_for_file(
exported_file,
self._exiftool_json_sidecar(
use_albums_as_keywords=use_albums_as_keywords,
use_persons_as_keywords=use_persons_as_keywords,
keyword_template=keyword_template,
),
)
export_db.set_stat_exif_for_file(exported_file, file_sig(exported_file))
exif_files_updated.append(exported_file)
return ExportResults(
exported_files,
update_new_files,
update_updated_files,
update_skipped_files,
exif_files_updated,
)
def _export_photo(
self,
src,
dest,
update,
export_db,
overwrite,
no_xattr,
export_as_hardlink,
exiftool,
):
""" Helper function for export()
Does the actual copy or hardlink taking the appropriate
action depending on update, overwrite
Assumes destination is the right destination (e.g. UUID matches)
sets UUID and JSON info foo exported file using set_uuid_for_file, set_inf_for_uuido
src: src path (string)
dest: dest path (pathlib.Path)
update: bool
export_db: instance of ExportDB that conforms to ExportDB_ABC interface
overwrite: bool
no_xattr: don't copy extended attributes
export_as_hardlink: bool
exiftool: bool
Returns: ExportResults
"""
exported_files = []
update_updated_files = []
update_new_files = []
update_skipped_files = []
dest_str = str(dest)
dest_exists = dest.exists()
if export_as_hardlink:
# use hardlink instead of copy
if not update:
# not update, do the the hardlink
if overwrite and dest.exists():
# need to remove the destination first
dest.unlink()
logging.debug(f"Not update: export_as_hardlink linking file {src} {dest}")
_hardlink_file(src, dest)
export_db.set_uuid_for_file(dest_str, self.uuid)
export_db.set_info_for_uuid(self.uuid, self.json())
export_db.set_stat_orig_for_file(dest_str, file_sig(dest_str))
export_db.set_stat_exif_for_file(dest_str, (None, None, None))
export_db.set_exifdata_for_file(dest_str, None)
exported_files.append(dest_str)
elif dest_exists and dest.samefile(src):
# update, hardlink and it already points to the right file, do nothing
logging.debug(
f"Update: skipping samefile with export_as_hardlink {src} {dest}"
)
update_skipped_files.append(dest_str)
elif dest_exists:
# update, not the same file (e.g. user may not have used export_as_hardlink last time it was run
logging.debug(
f"Update: removing existing file prior to export_as_hardlink {src} {dest}"
)
dest.unlink()
_hardlink_file(src, dest)
export_db.set_uuid_for_file(dest_str, self.uuid)
export_db.set_info_for_uuid(self.uuid, self.json())
export_db.set_stat_orig_for_file(dest_str, file_sig(dest_str))
export_db.set_stat_exif_for_file(dest_str, (None, None, None))
export_db.set_exifdata_for_file(dest_str, None)
update_updated_files.append(dest_str)
exported_files.append(dest_str)
else:
# update, hardlink, destination doesn't exist (new file)
logging.debug(
f"Update: exporting new file with export_as_hardlink {src} {dest}"
)
_hardlink_file(src, dest)
export_db.set_uuid_for_file(dest_str, self.uuid)
export_db.set_info_for_uuid(self.uuid, self.json())
export_db.set_stat_orig_for_file(dest_str, file_sig(dest_str))
export_db.set_stat_exif_for_file(dest_str, (None, None, None))
export_db.set_exifdata_for_file(dest_str, None)
exported_files.append(dest_str)
update_new_files.append(dest_str)
else:
if not update:
# not update, do the the copy
if overwrite and dest.exists():
# need to remove the destination first
dest.unlink()
logging.debug(f"Not update: copying file {src} {dest}")
_copy_file(src, dest_str, norsrc=no_xattr)
export_db.set_uuid_for_file(dest_str, self.uuid)
export_db.set_info_for_uuid(self.uuid, self.json())
export_db.set_stat_orig_for_file(dest_str, file_sig(dest_str))
export_db.set_stat_exif_for_file(dest_str, (None, None, None))
export_db.set_exifdata_for_file(dest_str, None)
exported_files.append(dest_str)
# elif dest_exists and not exiftool and cmp_file(dest_str, export_db.get_stat_orig_for_file(dest_str)):
elif (
dest_exists
and not exiftool
and filecmp.cmp(src, dest)
and not dest.samefile(src)
):
# destination exists but is identical
logging.debug(f"Update: skipping identifical original files {src} {dest}")
# call set_stat because code can reach this spot if no export DB but exporting a RAW or live photo
# potentially re-writes the data in the database but ensures database is complete
export_db.set_stat_orig_for_file(dest_str, file_sig(dest_str))
update_skipped_files.append(dest_str)
elif (
dest_exists
and exiftool
and cmp_file(dest_str, export_db.get_stat_exif_for_file(dest_str))
and not dest.samefile(src)
):
# destination exists but is identical
logging.debug(f"Update: skipping identifical exiftool files {src} {dest}")
update_skipped_files.append(dest_str)
elif dest_exists:
# destination exists but is different or is a hardlink
logging.debug(f"Update: removing existing file prior to copy {src} {dest}")
stat_src = os.stat(src)
stat_dest = os.stat(dest)
dest.unlink()
_copy_file(src, dest_str, norsrc=no_xattr)
export_db.set_uuid_for_file(dest_str, self.uuid)
export_db.set_info_for_uuid(self.uuid, self.json())
export_db.set_stat_orig_for_file(dest_str, file_sig(dest_str))
export_db.set_stat_exif_for_file(dest_str, (None, None, None))
export_db.set_exifdata_for_file(dest_str, None)
exported_files.append(dest_str)
update_updated_files.append(dest_str)
else:
# destination doesn't exist, copy the file
logging.debug(f"Update: copying new file {src} {dest}")
_copy_file(src, dest_str, norsrc=no_xattr)
export_db.set_uuid_for_file(dest_str, self.uuid)
export_db.set_info_for_uuid(self.uuid, self.json())
export_db.set_stat_orig_for_file(dest_str, file_sig(dest_str))
export_db.set_stat_exif_for_file(dest_str, (None, None, None))
export_db.set_exifdata_for_file(dest_str, None)
exported_files.append(dest_str)
update_new_files.append(dest_str)
return ExportResults(
exported_files, update_new_files, update_updated_files, update_skipped_files, []
)
def _write_exif_data(
self,
filepath,
use_albums_as_keywords=False,
use_persons_as_keywords=False,
keyword_template=None,
):
""" write exif data to image file at filepath
filepath: full path to the image file """
if not os.path.exists(filepath):
raise FileNotFoundError(f"Could not find file {filepath}")
exiftool = ExifTool(filepath)
exif_info = json.loads(
self._exiftool_json_sidecar(
use_albums_as_keywords=use_albums_as_keywords,
use_persons_as_keywords=use_persons_as_keywords,
keyword_template=keyword_template,
)
)[0]
for exiftag, val in exif_info.items():
if type(val) == list:
# more than one, set first value the add additional values
exiftool.setvalue(exiftag, val.pop(0))
if val:
# add any remaining items
exiftool.addvalues(exiftag, *val)
else:
exiftool.setvalue(exiftag, val)
def _exiftool_json_sidecar(
self,
use_albums_as_keywords=False,
use_persons_as_keywords=False,
keyword_template=None,
):
""" return json string of EXIF details in exiftool sidecar format
Does not include all the EXIF fields as those are likely already in the image
use_albums_as_keywords: treat album names as keywords
use_persons_as_keywords: treat person names as keywords
keyword_template: (list of strings); list of template strings to render as keywords
Exports the following:
FileName
ImageDescription
Description
Title
TagsList
Keywords (may include album name, person name, or template)
Subject
PersonInImage
GPSLatitude, GPSLongitude
GPSPosition
GPSLatitudeRef, GPSLongitudeRef
DateTimeOriginal
OffsetTimeOriginal
ModifyDate """
exif = {}
exif["_CreatedBy"] = "osxphotos, https://github.com/RhetTbull/osxphotos"
if self.description:
exif["EXIF:ImageDescription"] = self.description
exif["XMP:Description"] = self.description
if self.title:
exif["XMP:Title"] = self.title
keyword_list = []
if self.keywords:
keyword_list.extend(self.keywords)
person_list = []
if self.persons:
# filter out _UNKNOWN_PERSON
person_list = sorted([p for p in self.persons if p != _UNKNOWN_PERSON])
if use_persons_as_keywords and person_list:
keyword_list.extend(sorted(person_list))
if use_albums_as_keywords and self.albums:
keyword_list.extend(sorted(self.albums))
if keyword_template:
rendered_keywords = []
for template_str in keyword_template:
rendered, unmatched = self.render_template(
template_str, none_str=_OSXPHOTOS_NONE_SENTINEL, path_sep="/"
)
if unmatched:
logging.warning(
f"Unmatched template substitution for template: {template_str} {unmatched}"
)
rendered_keywords.extend(rendered)
# filter out any template values that didn't match by looking for sentinel
rendered_keywords = [
keyword
for keyword in sorted(rendered_keywords)
if _OSXPHOTOS_NONE_SENTINEL not in keyword
]
# check to see if any keywords too long
long_keywords = [
long_str
for long_str in rendered_keywords
if len(long_str) > _MAX_IPTC_KEYWORD_LEN
]
if long_keywords:
logging.warning(
f"Some keywords exceed max IPTC Keyword length of {_MAX_IPTC_KEYWORD_LEN}: {long_keywords}"
)
keyword_list.extend(rendered_keywords)
if keyword_list:
exif["XMP:TagsList"] = exif["IPTC:Keywords"] = keyword_list
if person_list:
exif["XMP:PersonInImage"] = person_list
if self.keywords or person_list:
# Photos puts both keywords and persons in Subject when using "Export IPTC as XMP"
# only use Photos' keywords for subject
exif["XMP:Subject"] = list(self.keywords) + person_list
# if self.favorite():
# exif["Rating"] = 5
(lat, lon) = self.location
if lat is not None and lon is not None:
lat_str, lon_str = dd_to_dms_str(lat, lon)
exif["EXIF:GPSLatitude"] = lat_str
exif["EXIF:GPSLongitude"] = lon_str
exif["Composite:GPSPosition"] = f"{lat_str}, {lon_str}"
lat_ref = "North" if lat >= 0 else "South"
lon_ref = "East" if lon >= 0 else "West"
exif["EXIF:GPSLatitudeRef"] = lat_ref
exif["EXIF:GPSLongitudeRef"] = lon_ref
# process date/time and timezone offset
date = self.date
# exiftool expects format to "2015:01:18 12:00:00"
datetimeoriginal = date.strftime("%Y:%m:%d %H:%M:%S")
offsettime = date.strftime("%z")
# find timezone offset in format "-04:00"
offset = re.findall(r"([+-]?)([\d]{2})([\d]{2})", offsettime)
offset = offset[0] # findall returns list of tuples
offsettime = f"{offset[0]}{offset[1]}:{offset[2]}"
exif["EXIF:DateTimeOriginal"] = datetimeoriginal
exif["EXIF:OffsetTimeOriginal"] = offsettime
if self.date_modified is not None:
exif["EXIF:ModifyDate"] = self.date_modified.strftime("%Y:%m:%d %H:%M:%S")
json_str = json.dumps([exif])
return json_str
def _xmp_sidecar(
self,
use_albums_as_keywords=False,
use_persons_as_keywords=False,
keyword_template=None,
):
""" returns string for XMP sidecar
use_albums_as_keywords: treat album names as keywords
use_persons_as_keywords: treat person names as keywords
keyword_template: (list of strings); list of template strings to render as keywords """
# TODO: add additional fields to XMP file?
xmp_template = Template(filename=os.path.join(_TEMPLATE_DIR, _XMP_TEMPLATE_NAME))
keyword_list = []
if self.keywords:
keyword_list.extend(self.keywords)
# TODO: keyword handling in this and _exiftool_json_sidecar is
# good candidate for pulling out in a function
person_list = []
if self.persons:
# filter out _UNKNOWN_PERSON
person_list = [p for p in self.persons if p != _UNKNOWN_PERSON]
if use_persons_as_keywords and person_list:
keyword_list.extend(person_list)
if use_albums_as_keywords and self.albums:
keyword_list.extend(self.albums)
if keyword_template:
rendered_keywords = []
for template_str in keyword_template:
rendered, unmatched = self.render_template(
template_str, none_str=_OSXPHOTOS_NONE_SENTINEL, path_sep="/"
)
if unmatched:
logging.warning(
f"Unmatched template substitution for template: {template_str} {unmatched}"
)
rendered_keywords.extend(rendered)
# filter out any template values that didn't match by looking for sentinel
rendered_keywords = [
keyword
for keyword in rendered_keywords
if _OSXPHOTOS_NONE_SENTINEL not in keyword
]
# check to see if any keywords too long
long_keywords = [
long_str
for long_str in rendered_keywords
if len(long_str) > _MAX_IPTC_KEYWORD_LEN
]
if long_keywords:
logging.warning(
f"Some keywords exceed max IPTC Keyword length of {_MAX_IPTC_KEYWORD_LEN}: {long_keywords}"
)
keyword_list.extend(rendered_keywords)
subject_list = []
if self.keywords or person_list:
# Photos puts both keywords and persons in Subject when using "Export IPTC as XMP"
subject_list = list(self.keywords) + person_list
xmp_str = xmp_template.render(
photo=self, keywords=keyword_list, persons=person_list, subjects=subject_list
)
# remove extra lines that mako inserts from template
xmp_str = "\n".join([line for line in xmp_str.split("\n") if line.strip() != ""])
return xmp_str
def _write_sidecar(self, filename, sidecar_str):
""" write sidecar_str to filename
used for exporting sidecar info """
if not filename and not sidecar_str:
raise (
ValueError(
f"filename {filename} and sidecar_str {sidecar_str} must not be None"
)
)
# TODO: catch exception?
f = open(filename, "w")
f.write(sidecar_str)
f.close()

View File

@@ -4,6 +4,7 @@ Represents a single photo in the Photos library and provides access to the photo
PhotosDB.photos() returns a list of PhotoInfo objects
"""
import dataclasses
import glob
import json
import logging
@@ -17,31 +18,18 @@ from datetime import timedelta, timezone
from pprint import pformat
import yaml
from mako.template import Template
from .._constants import (
_MAX_IPTC_KEYWORD_LEN,
_MOVIE_TYPE,
_OSXPHOTOS_NONE_SENTINEL,
_PHOTO_TYPE,
_PHOTOS_4_VERSION,
_PHOTOS_5_SHARED_PHOTO_PATH,
_TEMPLATE_DIR,
_UNKNOWN_PERSON,
_XMP_TEMPLATE_NAME,
)
from ..albuminfo import AlbumInfo
from ..datetime_formatter import DateTimeFormatter
from ..exiftool import ExifTool
from ..placeinfo import PlaceInfo4, PlaceInfo5
from ..utils import (
_copy_file,
_export_photo_uuid_applescript,
_get_resource_loc,
_hardlink_file,
dd_to_dms_str,
findfiles,
get_preferred_uti_extension,
)
from ..utils import _debug, _get_resource_loc, findfiles, get_preferred_uti_extension
from .template import (
MULTI_VALUE_SUBSTITUTIONS,
TEMPLATE_SUBSTITUTIONS,
@@ -64,6 +52,16 @@ class PhotoInfo:
)
from ._photoinfo_exifinfo import exif_info, ExifInfo
from ._photoinfo_exiftool import exiftool
from ._photoinfo_export import (
export,
export2,
_export_photo,
_exiftool_json_sidecar,
_write_exif_data,
_write_sidecar,
_xmp_sidecar,
ExportResults,
)
def __init__(self, db=None, uuid=None, info=None):
self._uuid = uuid
@@ -262,7 +260,7 @@ class PhotoInfo:
# if self._info["isMissing"] == 1:
# photopath = None # path would be meaningless until downloaded
logging.debug(photopath)
# logging.debug(photopath)
return photopath
@@ -638,307 +636,6 @@ class PhotoInfo:
otherwise returns False """
return self._info["raw_is_original"]
def export(
self,
dest,
*filename,
edited=False,
live_photo=False,
raw_photo=False,
export_as_hardlink=False,
overwrite=False,
increment=True,
sidecar_json=False,
sidecar_xmp=False,
use_photos_export=False,
timeout=120,
exiftool=False,
no_xattr=False,
use_albums_as_keywords=False,
use_persons_as_keywords=False,
keyword_template=None,
):
""" export photo
dest: must be valid destination path (or exception raised)
filename: (optional): name of exported picture; if not provided, will use current filename
**NOTE**: if provided, user must ensure file extension (suffix) is correct.
For example, if photo is .CR2 file, edited image may be .jpeg.
If you provide an extension different than what the actual file is,
export will print a warning but will happily export the photo using the
incorrect file extension. e.g. to get the extension of the edited photo,
reference PhotoInfo.path_edited
edited: (boolean, default=False); if True will export the edited version of the photo
(or raise exception if no edited version)
live_photo: (boolean, default=False); if True, will also export the associted .mov for live photos
raw_photo: (boolean, default=False); if True, will also export the associted RAW photo
export_as_hardlink: (boolean, default=False); if True, will hardlink files instead of copying them
overwrite: (boolean, default=False); if True will overwrite files if they alreay exist
increment: (boolean, default=True); if True, will increment file name until a non-existant name is found
if overwrite=False and increment=False, export will fail if destination file already exists
sidecar_json: (boolean, default = False); if True will also write a json sidecar with IPTC data in format readable by exiftool
sidecar filename will be dest/filename.json
sidecar_xmp: (boolean, default = False); if True will also write a XMP sidecar with IPTC data
sidecar filename will be dest/filename.xmp
use_photos_export: (boolean, default=False); if True will attempt to export photo via applescript interaction with Photos
timeout: (int, default=120) timeout in seconds used with use_photos_export
exiftool: (boolean, default = False); if True, will use exiftool to write metadata to export file
no_xattr: (boolean, default = False); if True, exports file without preserving extended attributes
returns list of full paths to the exported files
use_albums_as_keywords: (boolean, default = False); if True, will include album names in keywords
when exporting metadata with exiftool or sidecar
use_persons_as_keywords: (boolean, default = False); if True, will include person names in keywords
when exporting metadata with exiftool or sidecar
keyword_template: (list of strings); list of template strings that will be rendered as used as keywords
"""
# list of all files exported during this call to export
exported_files = []
# check edited and raise exception trying to export edited version of
# photo that hasn't been edited
if edited and not self.hasadjustments:
raise ValueError(
"Photo does not have adjustments, cannot export edited version"
)
# check arguments and get destination path and filename (if provided)
if filename and len(filename) > 2:
raise TypeError(
"Too many positional arguments. Should be at most two: destination, filename."
)
else:
# verify destination is a valid path
if dest is None:
raise ValueError("Destination must not be None")
elif not os.path.isdir(dest):
raise FileNotFoundError("Invalid path passed to export")
if filename and len(filename) == 1:
# if filename passed, use it
fname = filename[0]
else:
# no filename provided so use the default
# if edited file requested, use filename but add _edited
# need to use file extension from edited file as Photos saves a jpeg once edited
if edited and not use_photos_export:
# verify we have a valid path_edited and use that to get filename
if not self.path_edited:
raise FileNotFoundError(
"edited=True but path_edited is none; hasadjustments: "
f" {self.hasadjustments}"
)
edited_name = pathlib.Path(self.path_edited).name
edited_suffix = pathlib.Path(edited_name).suffix
fname = pathlib.Path(self.filename).stem + "_edited" + edited_suffix
else:
fname = self.filename
# check destination path
dest = pathlib.Path(dest)
fname = pathlib.Path(fname)
dest = dest / fname
# check extension of destination
if edited and self.path_edited is not None:
# use suffix from edited file
actual_suffix = pathlib.Path(self.path_edited).suffix
elif edited:
# use .jpeg as that's probably correct
# if edited and path_edited is None, will raise FileNotFoundError below
# unless use_photos_export is True
actual_suffix = ".jpeg"
else:
# use suffix from the non-edited file
actual_suffix = pathlib.Path(self.filename).suffix
# warn if suffixes don't match but ignore .JPG / .jpeg as
# Photo's often converts .JPG to .jpeg
suffixes = sorted([x.lower() for x in [dest.suffix, actual_suffix]])
if dest.suffix.lower() != actual_suffix.lower() and suffixes != [
".jpeg",
".jpg",
]:
logging.warning(
f"Invalid destination suffix: {dest.suffix}, should be {actual_suffix}"
)
# check to see if file exists and if so, add (1), (2), etc until we find one that works
# Photos checks the stem and adds (1), (2), etc which avoids collision with sidecars
# e.g. exporting sidecar for file1.png and file1.jpeg
# if file1.png exists and exporting file1.jpeg,
# dest will be file1 (1).jpeg even though file1.jpeg doesn't exist to prevent sidecar collision
if increment and not overwrite:
count = 1
glob_str = str(dest.parent / f"{dest.stem}*")
dest_files = glob.glob(glob_str)
dest_files = [pathlib.Path(f).stem for f in dest_files]
dest_new = dest.stem
while dest_new in dest_files:
dest_new = f"{dest.stem} ({count})"
count += 1
dest = dest.parent / f"{dest_new}{dest.suffix}"
# if overwrite==False and #increment==False, export should fail if file exists
if dest.exists() and not overwrite and not increment:
raise FileExistsError(
f"destination exists ({dest}); overwrite={overwrite}, increment={increment}"
)
if not use_photos_export:
# find the source file on disk and export
# get path to source file and verify it's not None and is valid file
# TODO: how to handle ismissing or not hasadjustments and edited=True cases?
if edited:
if self.path_edited is not None:
src = self.path_edited
else:
raise FileNotFoundError(
f"Cannot export edited photo if path_edited is None"
)
else:
if self.ismissing:
logging.warning(
f"Attempting to export photo with ismissing=True: path = {self.path}"
)
if self.path is not None:
src = self.path
else:
raise FileNotFoundError("Cannot export photo if path is None")
if not os.path.isfile(src):
raise FileNotFoundError(f"{src} does not appear to exist")
logging.debug(
f"exporting {src} to {dest}, overwrite={overwrite}, increment={increment}, dest exists: {dest.exists()}"
)
# copy the file, _copy_file uses ditto to preserve Mac extended attributes
if export_as_hardlink:
_hardlink_file(src, dest)
else:
_copy_file(src, dest, norsrc=no_xattr)
exported_files.append(str(dest))
# copy live photo associated .mov if requested
if live_photo and self.live_photo:
live_name = dest.parent / f"{dest.stem}.mov"
src_live = self.path_live_photo
if src_live is not None:
logging.debug(
f"Exporting live photo video of {filename} as {live_name.name}"
)
if export_as_hardlink:
_hardlink_file(src_live, str(live_name))
else:
_copy_file(src_live, str(live_name), norsrc=no_xattr)
exported_files.append(str(live_name))
else:
logging.warning(f"Skipping missing live movie for {filename}")
# copy associated RAW image if requested
if raw_photo and self.has_raw:
raw_path = pathlib.Path(self.path_raw)
raw_ext = raw_path.suffix
raw_name = dest.parent / f"{dest.stem}{raw_ext}"
if raw_path is not None:
logging.debug(
f"Exporting RAW photo of {filename} as {raw_name.name}"
)
if export_as_hardlink:
_hardlink_file(str(raw_path), str(raw_name))
else:
_copy_file(str(raw_path), str(raw_name), norsrc=no_xattr)
exported_files.append(str(raw_name))
else:
logging.warning(f"Skipping missing RAW photo for {filename}")
else:
# use_photo_export
exported = None
# export live_photo .mov file?
live_photo = True if live_photo and self.live_photo else False
if edited:
# exported edited version and not original
if filename:
# use filename stem provided
filestem = dest.stem
else:
# didn't get passed a filename, add _edited
filestem = f"{dest.stem}_edited"
dest = dest.parent / f"{filestem}.jpeg"
exported = _export_photo_uuid_applescript(
self.uuid,
dest.parent,
filestem=filestem,
original=False,
edited=True,
live_photo=live_photo,
timeout=timeout,
burst=self.burst,
)
else:
# export original version and not edited
filestem = dest.stem
exported = _export_photo_uuid_applescript(
self.uuid,
dest.parent,
filestem=filestem,
original=True,
edited=False,
live_photo=live_photo,
timeout=timeout,
burst=self.burst,
)
if exported is not None:
exported_files.extend(exported)
else:
logging.warning(
f"Error exporting photo {self.uuid} to {dest} with use_photos_export"
)
if sidecar_json:
logging.debug("writing exiftool_json_sidecar")
sidecar_filename = dest.parent / pathlib.Path(f"{dest.stem}.json")
sidecar_str = self._exiftool_json_sidecar(
use_albums_as_keywords=use_albums_as_keywords,
use_persons_as_keywords=use_persons_as_keywords,
keyword_template=keyword_template,
)
try:
self._write_sidecar(sidecar_filename, sidecar_str)
except Exception as e:
logging.warning(f"Error writing json sidecar to {sidecar_filename}")
raise e
if sidecar_xmp:
logging.debug("writing xmp_sidecar")
sidecar_filename = dest.parent / pathlib.Path(f"{dest.stem}.xmp")
sidecar_str = self._xmp_sidecar(
use_albums_as_keywords=use_albums_as_keywords,
use_persons_as_keywords=use_persons_as_keywords,
keyword_template=keyword_template,
)
try:
self._write_sidecar(sidecar_filename, sidecar_str)
except Exception as e:
logging.warning(f"Error writing xmp sidecar to {sidecar_filename}")
raise e
# if exiftool, write the metadata
if exiftool and exported_files:
for exported_file in exported_files:
self._write_exif_data(
exported_file,
use_albums_as_keywords=use_albums_as_keywords,
use_persons_as_keywords=use_persons_as_keywords,
keyword_template=keyword_template,
)
return exported_files
def render_template(self, template, none_str="_", path_sep=None):
""" render a filename or directory template
template: str template
@@ -1273,264 +970,6 @@ class PhotoInfo:
# if here, didn't get a match
raise KeyError(f"No rule for processing {lookup}")
def _write_exif_data(
self,
filepath,
use_albums_as_keywords=False,
use_persons_as_keywords=False,
keyword_template=None,
):
""" write exif data to image file at filepath
filepath: full path to the image file """
if not os.path.exists(filepath):
raise FileNotFoundError(f"Could not find file {filepath}")
exiftool = ExifTool(filepath)
exif_info = json.loads(
self._exiftool_json_sidecar(
use_albums_as_keywords=use_albums_as_keywords,
use_persons_as_keywords=use_persons_as_keywords,
keyword_template=keyword_template,
)
)[0]
for exiftag, val in exif_info.items():
if type(val) == list:
# more than one, set first value the add additional values
exiftool.setvalue(exiftag, val.pop(0))
if val:
# add any remaining items
exiftool.addvalues(exiftag, *val)
else:
exiftool.setvalue(exiftag, val)
def _exiftool_json_sidecar(
self,
use_albums_as_keywords=False,
use_persons_as_keywords=False,
keyword_template=None,
):
""" return json string of EXIF details in exiftool sidecar format
Does not include all the EXIF fields as those are likely already in the image
use_albums_as_keywords: treat album names as keywords
use_persons_as_keywords: treat person names as keywords
keyword_template: (list of strings); list of template strings to render as keywords
Exports the following:
FileName
ImageDescription
Description
Title
TagsList
Keywords (may include album name, person name, or template)
Subject
PersonInImage
GPSLatitude, GPSLongitude
GPSPosition
GPSLatitudeRef, GPSLongitudeRef
DateTimeOriginal
OffsetTimeOriginal
ModifyDate """
exif = {}
exif["_CreatedBy"] = "osxphotos, https://github.com/RhetTbull/osxphotos"
if self.description:
exif["EXIF:ImageDescription"] = self.description
exif["XMP:Description"] = self.description
if self.title:
exif["XMP:Title"] = self.title
keyword_list = []
if self.keywords:
keyword_list.extend(self.keywords)
person_list = []
if self.persons:
# filter out _UNKNOWN_PERSON
person_list = [p for p in self.persons if p != _UNKNOWN_PERSON]
if use_persons_as_keywords and person_list:
keyword_list.extend(person_list)
if use_albums_as_keywords and self.albums:
keyword_list.extend(self.albums)
if keyword_template:
rendered_keywords = []
for template_str in keyword_template:
rendered, unmatched = self.render_template(
template_str, none_str=_OSXPHOTOS_NONE_SENTINEL, path_sep="/"
)
if unmatched:
logging.warning(
f"Unmatched template substitution for template: {template_str} {unmatched}"
)
rendered_keywords.extend(rendered)
# filter out any template values that didn't match by looking for sentinel
rendered_keywords = [
keyword
for keyword in rendered_keywords
if _OSXPHOTOS_NONE_SENTINEL not in keyword
]
# check to see if any keywords too long
long_keywords = [
long_str
for long_str in rendered_keywords
if len(long_str) > _MAX_IPTC_KEYWORD_LEN
]
if long_keywords:
logging.warning(
f"Some keywords exceed max IPTC Keyword length of {_MAX_IPTC_KEYWORD_LEN}: {long_keywords}"
)
logging.debug(f"rendered_keywords: {rendered_keywords}")
keyword_list.extend(rendered_keywords)
if keyword_list:
exif["XMP:TagsList"] = exif["IPTC:Keywords"] = keyword_list
if person_list:
exif["XMP:PersonInImage"] = person_list
if self.keywords or person_list:
# Photos puts both keywords and persons in Subject when using "Export IPTC as XMP"
# only use Photos' keywords for subject
exif["XMP:Subject"] = list(self.keywords) + person_list
# if self.favorite():
# exif["Rating"] = 5
(lat, lon) = self.location
if lat is not None and lon is not None:
lat_str, lon_str = dd_to_dms_str(lat, lon)
exif["EXIF:GPSLatitude"] = lat_str
exif["EXIF:GPSLongitude"] = lon_str
exif["Composite:GPSPosition"] = f"{lat_str}, {lon_str}"
lat_ref = "North" if lat >= 0 else "South"
lon_ref = "East" if lon >= 0 else "West"
exif["EXIF:GPSLatitudeRef"] = lat_ref
exif["EXIF:GPSLongitudeRef"] = lon_ref
# process date/time and timezone offset
date = self.date
# exiftool expects format to "2015:01:18 12:00:00"
datetimeoriginal = date.strftime("%Y:%m:%d %H:%M:%S")
offsettime = date.strftime("%z")
# find timezone offset in format "-04:00"
offset = re.findall(r"([+-]?)([\d]{2})([\d]{2})", offsettime)
offset = offset[0] # findall returns list of tuples
offsettime = f"{offset[0]}{offset[1]}:{offset[2]}"
exif["EXIF:DateTimeOriginal"] = datetimeoriginal
exif["EXIF:OffsetTimeOriginal"] = offsettime
if self.date_modified is not None:
exif["EXIF:ModifyDate"] = self.date_modified.strftime("%Y:%m:%d %H:%M:%S")
json_str = json.dumps([exif])
return json_str
def _xmp_sidecar(
self,
use_albums_as_keywords=False,
use_persons_as_keywords=False,
keyword_template=None,
):
""" returns string for XMP sidecar
use_albums_as_keywords: treat album names as keywords
use_persons_as_keywords: treat person names as keywords
keyword_template: (list of strings); list of template strings to render as keywords """
# TODO: add additional fields to XMP file?
xmp_template = Template(
filename=os.path.join(_TEMPLATE_DIR, _XMP_TEMPLATE_NAME)
)
keyword_list = []
if self.keywords:
keyword_list.extend(self.keywords)
# TODO: keyword handling in this and _exiftool_json_sidecar is
# good candidate for pulling out in a function
person_list = []
if self.persons:
# filter out _UNKNOWN_PERSON
person_list = [p for p in self.persons if p != _UNKNOWN_PERSON]
if use_persons_as_keywords and person_list:
keyword_list.extend(person_list)
if use_albums_as_keywords and self.albums:
keyword_list.extend(self.albums)
if keyword_template:
rendered_keywords = []
for template_str in keyword_template:
rendered, unmatched = self.render_template(
template_str, none_str=_OSXPHOTOS_NONE_SENTINEL, path_sep="/"
)
if unmatched:
logging.warning(
f"Unmatched template substitution for template: {template_str} {unmatched}"
)
rendered_keywords.extend(rendered)
# filter out any template values that didn't match by looking for sentinel
rendered_keywords = [
keyword
for keyword in rendered_keywords
if _OSXPHOTOS_NONE_SENTINEL not in keyword
]
# check to see if any keywords too long
long_keywords = [
long_str
for long_str in rendered_keywords
if len(long_str) > _MAX_IPTC_KEYWORD_LEN
]
if long_keywords:
logging.warning(
f"Some keywords exceed max IPTC Keyword length of {_MAX_IPTC_KEYWORD_LEN}: {long_keywords}"
)
logging.debug(f"rendered_keywords: {rendered_keywords}")
keyword_list.extend(rendered_keywords)
subject_list = []
if self.keywords or person_list:
# Photos puts both keywords and persons in Subject when using "Export IPTC as XMP"
subject_list = list(self.keywords) + person_list
xmp_str = xmp_template.render(
photo=self,
keywords=keyword_list,
persons=person_list,
subjects=subject_list,
)
# remove extra lines that mako inserts from template
xmp_str = "\n".join(
[line for line in xmp_str.split("\n") if line.strip() != ""]
)
return xmp_str
def _write_sidecar(self, filename, sidecar_str):
""" write sidecar_str to filename
used for exporting sidecar info """
if not filename and not sidecar_str:
raise (
ValueError(
f"filename {filename} and sidecar_str {sidecar_str} must not be None"
)
)
# TODO: catch exception?
f = open(filename, "w")
f.write(sidecar_str)
f.close()
@property
def _longitude(self):
""" Returns longitude, in degrees """
@@ -1600,6 +1039,9 @@ class PhotoInfo:
date_modified_iso = (
self.date_modified.isoformat() if self.date_modified else None
)
folders = {album.title: album.folder_names for album in self.album_info}
exif = dataclasses.asdict(self.exif_info) if self.exif_info else {}
place = self.place.as_dict() if self.place else {}
pic = {
"uuid": self.uuid,
@@ -1609,7 +1051,10 @@ class PhotoInfo:
"description": self.description,
"title": self.title,
"keywords": self.keywords,
"labels": self.labels,
"keywords": self.keywords,
"albums": self.albums,
"folders": folders,
"persons": self.persons,
"path": self.path,
"ismissing": self.ismissing,
@@ -1640,6 +1085,8 @@ class PhotoInfo:
"has_raw": self.has_raw,
"uti_raw": self.uti_raw,
"path_raw": self.path_raw,
"place": place,
"exif": exif,
}
return json.dumps(pic)

View File

@@ -130,17 +130,13 @@ def _hardlink_file(src, dest):
if not os.path.isfile(src):
raise FileNotFoundError("src file does not appear to exist", src)
# if error on copy, subprocess will raise CalledProcessError
try:
os.link(src, dest)
except Exception as e:
logging.critical(
f"ln returned error: {e.returncode} {e.stderr.decode(sys.getfilesystemencoding()).rstrip()}"
)
logging.critical(f"os.link returned error: {e}")
raise e
def _copy_file(src, dest, norsrc=False):
""" Copies a file from src path to dest path
@@ -516,3 +512,30 @@ def _db_is_locked(dbname):
locked = True
return locked
# OSXPHOTOS_XATTR_UUID = "com.osxphotos.uuid"
# def get_uuid_for_file(filepath):
# """ returns UUID associated with an exported file
# filepath: path to exported photo
# """
# attr = xattr.xattr(filepath)
# try:
# uuid_bytes = attr[OSXPHOTOS_XATTR_UUID]
# uuid_str = uuid_bytes.decode('utf-8')
# except KeyError:
# uuid_str = None
# return uuid_str
# def set_uuid_for_file(filepath, uuid):
# """ sets the UUID associated with an exported file
# filepath: path to exported photo
# uuid: uuid string for photo
# """
# if not os.path.exists(filepath):
# raise FileNotFoundError(f"Missing file: {filepath}")
# attr = xattr.xattr(filepath)
# uuid_bytes = bytes(uuid, 'utf-8')
# attr.set(OSXPHOTOS_XATTR_UUID, uuid_bytes)

9
tests/conftest.py Normal file
View File

@@ -0,0 +1,9 @@
""" pytest test configuration """
import pytest
from osxphotos.exiftool import _ExifToolProc
@pytest.fixture(autouse=True)
def reset_singletons():
""" Need to clean up any ExifTool singletons between tests """
_ExifToolProc.instance = None

View File

@@ -2,6 +2,8 @@ import os
import pytest
from click.testing import CliRunner
from osxphotos.exiftool import get_exiftool_path
CLI_PHOTOS_DB = "tests/Test-10.15.1.photoslibrary"
LIVE_PHOTOS_DB = "tests/Test-Cloud-10.15.1.photoslibrary"
RAW_PHOTOS_DB = "tests/Test-RAW-10.15.1.photoslibrary"
@@ -120,6 +122,11 @@ CLI_EXPORT_UUID = "D79B8D77-BFFC-460B-9312-034F2877D35B"
CLI_EXPORT_UUID_FILENAME = "Pumkins2.jpg"
CLI_EXPORT_BY_DATE = [
"2018/09/28/Pumpkins3.jpg",
"2018/09/28/Pumkins1.jpg",
]
CLI_EXPORT_SIDECAR_FILENAMES = ["Pumkins2.jpg", "Pumkins2.json", "Pumkins2.xmp"]
CLI_EXPORT_LIVE = [
@@ -139,6 +146,24 @@ CLI_EXPORT_RAW_EDITED_ORIGINAL = ["IMG_0476_2.CR2", "IMG_0476_2_edited.jpeg"]
CLI_PLACES_JSON = """{"places": {"_UNKNOWN_": 1, "Maui, Wailea, Hawai'i, United States": 1, "Washington, District of Columbia, United States": 1}}"""
CLI_EXIFTOOL = {
"D79B8D77-BFFC-460B-9312-034F2877D35B": {
"File:FileName": "Pumkins2.jpg",
"IPTC:Keywords": "Kids",
"XMP:TagsList": "Kids",
"XMP:Title": "I found one!",
"EXIF:ImageDescription": "Girl holding pumpkin",
"XMP:Description": "Girl holding pumpkin",
"XMP:PersonInImage": "Katie",
"XMP:Subject": ["Kids", "Katie"],
}
}
# determine if exiftool installed so exiftool tests can be skipped
try:
exiftool = get_exiftool_path()
except:
exiftool = None
def test_osxphotos():
import osxphotos
@@ -291,6 +316,7 @@ def test_export_as_hardlink_samefile():
assert os.path.exists(CLI_EXPORT_UUID_FILENAME)
assert os.path.samefile(CLI_EXPORT_UUID_FILENAME, photo.path)
def test_export_using_hardlinks_incompat_options():
# test that error shown if --export-as-hardlink used with --exiftool
import os
@@ -317,7 +343,8 @@ def test_export_using_hardlinks_incompat_options():
)
assert result.exit_code == 0
assert "Incompatible export options" in result.output
def test_export_current_name():
import glob
import os
@@ -356,6 +383,40 @@ def test_export_skip_edited():
assert "St James Park_edited.jpeg" not in files
@pytest.mark.skipif(exiftool is None, reason="exiftool not installed")
def test_export_exiftool():
import glob
import os
import os.path
import osxphotos
from osxphotos.__main__ import export
from osxphotos.exiftool import ExifTool
runner = CliRunner()
cwd = os.getcwd()
# pylint: disable=not-context-manager
with runner.isolated_filesystem():
for uuid in CLI_EXIFTOOL:
result = runner.invoke(
export,
[
os.path.join(cwd, PHOTOS_DB_15_4),
".",
"-V",
"--exiftool",
"--uuid",
f"{uuid}",
],
)
assert result.exit_code == 0
files = glob.glob("*")
assert sorted(files) == sorted([CLI_EXIFTOOL[uuid]["File:FileName"]])
exif = ExifTool(CLI_EXIFTOOL[uuid]["File:FileName"]).as_dict()
for key in CLI_EXIFTOOL[uuid]:
assert exif[key] == CLI_EXIFTOOL[uuid][key]
def test_query_date():
import json
import osxphotos
@@ -1003,3 +1064,243 @@ def test_export_sidecar_keyword_template():
assert sorted(json_got[k]) == sorted(v)
else:
assert json_got[k] == v
def test_export_update_basic():
""" test export then update """
import glob
import os
import os.path
import osxphotos
from osxphotos.__main__ import export, OSXPHOTOS_EXPORT_DB
runner = CliRunner()
cwd = os.getcwd()
# pylint: disable=not-context-manager
with runner.isolated_filesystem():
# basic export
result = runner.invoke(export, [os.path.join(cwd, CLI_PHOTOS_DB), ".", "-V"])
assert result.exit_code == 0
files = glob.glob("*")
assert sorted(files) == sorted(CLI_EXPORT_FILENAMES)
assert os.path.isfile(OSXPHOTOS_EXPORT_DB)
# update
result = runner.invoke(
export, [os.path.join(cwd, CLI_PHOTOS_DB), ".", "--update"]
)
assert result.exit_code == 0
assert (
"Exported: 0 photos, updated: 0 photos, skipped: 8 photos, updated EXIF data: 0 photos"
in result.output
)
@pytest.mark.skipif(exiftool is None, reason="exiftool not installed")
def test_export_update_exiftool():
""" test export then update with exiftool """
import glob
import os
import os.path
import osxphotos
from osxphotos.__main__ import export
runner = CliRunner()
cwd = os.getcwd()
# pylint: disable=not-context-manager
with runner.isolated_filesystem():
# basic export
result = runner.invoke(export, [os.path.join(cwd, CLI_PHOTOS_DB), ".", "-V"])
assert result.exit_code == 0
files = glob.glob("*")
assert sorted(files) == sorted(CLI_EXPORT_FILENAMES)
# update with exiftool
result = runner.invoke(
export, [os.path.join(cwd, CLI_PHOTOS_DB), ".", "--update", "--exiftool"]
)
assert result.exit_code == 0
assert (
"Exported: 0 photos, updated: 8 photos, skipped: 0 photos, updated EXIF data: 8 photos"
in result.output
)
def test_export_update_hardlink():
""" test export with hardlink then update """
import glob
import os
import os.path
import osxphotos
from osxphotos.__main__ import export
photosdb = osxphotos.PhotosDB(dbfile=CLI_PHOTOS_DB)
photo = photosdb.photos(uuid=[CLI_EXPORT_UUID])[0]
runner = CliRunner()
cwd = os.getcwd()
# pylint: disable=not-context-manager
with runner.isolated_filesystem():
# basic export
result = runner.invoke(
export,
[os.path.join(cwd, CLI_PHOTOS_DB), ".", "-V", "--export-as-hardlink"],
)
assert result.exit_code == 0
files = glob.glob("*")
assert sorted(files) == sorted(CLI_EXPORT_FILENAMES)
assert os.path.samefile(CLI_EXPORT_UUID_FILENAME, photo.path)
# update, should replace the hardlink files with new copies
result = runner.invoke(
export, [os.path.join(cwd, CLI_PHOTOS_DB), ".", "--update"]
)
assert result.exit_code == 0
assert (
"Exported: 0 photos, updated: 8 photos, skipped: 0 photos, updated EXIF data: 0 photos"
in result.output
)
assert not os.path.samefile(CLI_EXPORT_UUID_FILENAME, photo.path)
@pytest.mark.skipif(exiftool is None, reason="exiftool not installed")
def test_export_update_hardlink_exiftool():
""" test export with hardlink then update with exiftool """
import glob
import os
import os.path
import osxphotos
from osxphotos.__main__ import export
photosdb = osxphotos.PhotosDB(dbfile=CLI_PHOTOS_DB)
photo = photosdb.photos(uuid=[CLI_EXPORT_UUID])[0]
runner = CliRunner()
cwd = os.getcwd()
# pylint: disable=not-context-manager
with runner.isolated_filesystem():
# basic export
result = runner.invoke(
export,
[os.path.join(cwd, CLI_PHOTOS_DB), ".", "-V", "--export-as-hardlink"],
)
assert result.exit_code == 0
files = glob.glob("*")
assert sorted(files) == sorted(CLI_EXPORT_FILENAMES)
assert os.path.samefile(CLI_EXPORT_UUID_FILENAME, photo.path)
# update, should replace the hardlink files with new copies
result = runner.invoke(
export, [os.path.join(cwd, CLI_PHOTOS_DB), ".", "--update", "--exiftool"]
)
assert result.exit_code == 0
assert (
"Exported: 0 photos, updated: 8 photos, skipped: 0 photos, updated EXIF data: 8 photos"
in result.output
)
assert not os.path.samefile(CLI_EXPORT_UUID_FILENAME, photo.path)
def test_export_update_edits():
""" test export then update after removing and editing files """
import glob
import os
import os.path
import shutil
import osxphotos
from osxphotos.__main__ import export
runner = CliRunner()
cwd = os.getcwd()
# pylint: disable=not-context-manager
with runner.isolated_filesystem():
# basic export
result = runner.invoke(
export, [os.path.join(cwd, CLI_PHOTOS_DB), ".", "-V", "--export-by-date"]
)
assert result.exit_code == 0
# change a couple of destination photos
os.unlink(CLI_EXPORT_BY_DATE[1])
shutil.copyfile(CLI_EXPORT_BY_DATE[0], CLI_EXPORT_BY_DATE[1])
os.unlink(CLI_EXPORT_BY_DATE[0])
# update
result = runner.invoke(
export,
[os.path.join(cwd, CLI_PHOTOS_DB), ".", "--update", "--export-by-date"],
)
assert result.exit_code == 0
assert (
"Exported: 1 photo, updated: 1 photo, skipped: 6 photos, updated EXIF data: 0 photos"
in result.output
)
def test_export_update_no_db():
""" test export then update after db has been deleted """
import glob
import os
import os.path
import osxphotos
from osxphotos.__main__ import export, OSXPHOTOS_EXPORT_DB
runner = CliRunner()
cwd = os.getcwd()
# pylint: disable=not-context-manager
with runner.isolated_filesystem():
# basic export
result = runner.invoke(export, [os.path.join(cwd, CLI_PHOTOS_DB), ".", "-V"])
assert result.exit_code == 0
files = glob.glob("*")
assert sorted(files) == sorted(CLI_EXPORT_FILENAMES)
assert os.path.isfile(OSXPHOTOS_EXPORT_DB)
os.unlink(OSXPHOTOS_EXPORT_DB)
# update
result = runner.invoke(
export, [os.path.join(cwd, CLI_PHOTOS_DB), ".", "--update"]
)
assert result.exit_code == 0
assert (
"Exported: 0 photos, updated: 0 photos, skipped: 8 photos, updated EXIF data: 0 photos"
in result.output
)
assert os.path.isfile(OSXPHOTOS_EXPORT_DB)
def test_export_then_hardlink():
""" test export then hardlink """
import glob
import os
import os.path
import osxphotos
from osxphotos.__main__ import export
photosdb = osxphotos.PhotosDB(dbfile=CLI_PHOTOS_DB)
photo = photosdb.photos(uuid=[CLI_EXPORT_UUID])[0]
runner = CliRunner()
cwd = os.getcwd()
# pylint: disable=not-context-manager
with runner.isolated_filesystem():
# basic export
result = runner.invoke(export, [os.path.join(cwd, CLI_PHOTOS_DB), ".", "-V",],)
assert result.exit_code == 0
files = glob.glob("*")
assert sorted(files) == sorted(CLI_EXPORT_FILENAMES)
assert not os.path.samefile(CLI_EXPORT_UUID_FILENAME, photo.path)
result = runner.invoke(
export, [os.path.join(cwd, CLI_PHOTOS_DB), ".", "--export-as-hardlink", "--overwrite"]
)
assert result.exit_code == 0
assert "Exported: 8 photos" in result.output
assert os.path.samefile(CLI_EXPORT_UUID_FILENAME, photo.path)