diff --git a/README.md b/README.md index 4f7655ba..fd1236a8 100644 --- a/README.md +++ b/README.md @@ -203,6 +203,8 @@ Options: w/o TZ). --update Only export new or updated files. See notes below on export and --update. + --dry-run Dry run (test) the export but don't actually + export any files; most useful with --verbose --export-as-hardlink Hardlink files instead of copying them. Cannot be used with --exiftool which creates copies of the files with embedded EXIF data. @@ -1427,11 +1429,6 @@ Convert latitude, longitude in degrees to degrees, minutes, seconds as string. returns: string tuple in format ("51 deg 30' 12.86\\" N", "0 deg 7' 54.50\\" W") This is the same format used by exiftool's json format. -#### `create_path_by_date(dest, dt)` -Creates a path in dest folder in form dest/YYYY/MM/DD/ -- `dest`: valid path as str -- `dt`: datetime.timetuple() object -Checks to see if path exists, if it does, do nothing and return path. If path does not exist, creates it and returns path. Useful for exporting photos to a date-based folder structure. ## Examples diff --git a/osxphotos/__main__.py b/osxphotos/__main__.py index 25ccc2ec..a345b8ba 100644 --- a/osxphotos/__main__.py +++ b/osxphotos/__main__.py @@ -1,5 +1,7 @@ +""" command line interface for osxphotos """ import csv import datetime +import functools import json import logging import os @@ -20,15 +22,16 @@ from pathvalidate import ( import osxphotos from ._constants import _EXIF_TOOL_URL, _PHOTOS_4_VERSION, _UNKNOWN_PLACE +from .datetime_formatter import DateTimeFormatter from ._version import __version__ from .exiftool import get_exiftool_path +from .fileutil import FileUtil, FileUtilNoOp from .photoinfo import ExportResults from .photoinfo.template import ( TEMPLATE_SUBSTITUTIONS, TEMPLATE_SUBSTITUTIONS_MULTI_VALUED, ) -from .utils import _copy_file, create_path_by_date -from ._export_db import ExportDB +from ._export_db import ExportDB, ExportDBInMemory # global variable to control verbose output # set via --verbose/-V @@ -111,9 +114,11 @@ class ExportCommand(click.Command): + "should be treated as a backup, not a working copy where you intend to make changes. " ) formatter.write("\n") - formatter.write_text("Note: The number of files reported for export and the number actually exported " - +"may differ due to live photos, associated RAW images, and edited photos which are reported " - +"in the total photos exported.") + formatter.write_text( + "Note: The number of files reported for export and the number actually exported " + + "may differ due to live photos, associated RAW images, and edited photos which are reported " + + "in the total photos exported." + ) formatter.write("\n") formatter.write_text( "Implementation note: To determine which files need to be updated, " @@ -916,6 +921,11 @@ def query( is_flag=True, help="Only export new or updated files. See notes below on export and --update.", ) +@click.option( + "--dry-run", + is_flag=True, + help="Dry run (test) the export but don't actually export any files; most useful with --verbose", +) @click.option( "--export-as-hardlink", is_flag=True, @@ -1068,6 +1078,7 @@ def export( to_date, verbose_, update, + dry_run, export_as_hardlink, overwrite, export_by_date, @@ -1190,8 +1201,15 @@ def export( _list_libraries() return - # open export database - export_db = ExportDB(os.path.join(dest, OSXPHOTOS_EXPORT_DB)) + # open export database and assign copy/link/unlink functions + if dry_run: + export_db = ExportDBInMemory(os.path.join(dest, OSXPHOTOS_EXPORT_DB)) + # echo = functools.partial(click.echo, err=True) + # fileutil = FileUtilNoOp(verbose=echo) + fileutil = FileUtilNoOp + else: + export_db = ExportDB(os.path.join(dest, OSXPHOTOS_EXPORT_DB)) + fileutil = FileUtil photos = _query( db=db, @@ -1269,26 +1287,28 @@ def export( with click.progressbar(photos) as bar: for p in bar: results = export_photo( - p, - dest, - verbose_, - export_by_date, - sidecar, - update, - export_as_hardlink, - overwrite, - export_edited, - original_name, - export_live, - download_missing, - exiftool, - directory, - no_extended_attributes, - export_raw, - album_keyword, - person_keyword, - keyword_template, - export_db, + photo=p, + dest=dest, + verbose_=verbose_, + export_by_date=export_by_date, + sidecar=sidecar, + update=update, + export_as_hardlink=export_as_hardlink, + overwrite=overwrite, + export_edited=export_edited, + original_name=original_name, + export_live=export_live, + download_missing=download_missing, + exiftool=exiftool, + directory=directory, + no_extended_attributes=no_extended_attributes, + export_raw=export_raw, + album_keyword=album_keyword, + person_keyword=person_keyword, + keyword_template=keyword_template, + export_db=export_db, + fileutil=fileutil, + dry_run = dry_run, ) results_exported.extend(results.exported) results_new.extend(results.new) @@ -1298,26 +1318,28 @@ def export( else: for p in photos: results = export_photo( - p, - dest, - verbose_, - export_by_date, - sidecar, - update, - export_as_hardlink, - overwrite, - export_edited, - original_name, - export_live, - download_missing, - exiftool, - directory, - no_extended_attributes, - export_raw, - album_keyword, - person_keyword, - keyword_template, - export_db, + photo=p, + dest=dest, + verbose_=verbose_, + export_by_date=export_by_date, + sidecar=sidecar, + update=update, + export_as_hardlink=export_as_hardlink, + overwrite=overwrite, + export_edited=export_edited, + original_name=original_name, + export_live=export_live, + download_missing=download_missing, + exiftool=exiftool, + directory=directory, + no_extended_attributes=no_extended_attributes, + export_raw=export_raw, + album_keyword=album_keyword, + person_keyword=person_keyword, + keyword_template=keyword_template, + export_db=export_db, + fileutil=fileutil, + dry_run=dry_run, ) results_exported.extend(results.exported) results_new.extend(results.new) @@ -1715,26 +1737,28 @@ def _query( def export_photo( - photo, - dest, - verbose_, - export_by_date, - sidecar, - update, - export_as_hardlink, - overwrite, - export_edited, - original_name, - export_live, - download_missing, - exiftool, - directory, - no_extended_attributes, - export_raw, - album_keyword, - person_keyword, - keyword_template, - export_db, + photo=None, + dest=None, + verbose_=None, + export_by_date=None, + sidecar=None, + update=None, + export_as_hardlink=None, + overwrite=None, + export_edited=None, + original_name=None, + export_live=None, + download_missing=None, + exiftool=None, + directory=None, + no_extended_attributes=None, + export_raw=None, + album_keyword=None, + person_keyword=None, + keyword_template=None, + export_db=None, + fileutil=FileUtil, + dry_run=None, ): """ Helper function for export that does the actual export photo: PhotoInfo object @@ -1755,6 +1779,9 @@ def export_photo( album_keyword: boolean; if True, exports album names as keywords in metadata person_keyword: boolean; if True, exports person names as keywords in metadata keyword_template: list of strings; if provided use rendered template strings as keywords + export_db: export database instance compatible with ExportDB_ABC + fileutil: file util class compatible with FileUtilABC + dry_run: boolean; if True, doesn't actually export or update any files returns list of path(s) of exported photo or None if photo was missing """ global VERBOSE @@ -1791,8 +1818,10 @@ def export_photo( verbose(f"Exporting {photo.filename} as {filename}") if export_by_date: - date_created = photo.date.timetuple() - dest_path = create_path_by_date(dest, date_created) + date_created = DateTimeFormatter(photo.date) + dest_path = os.path.join(dest, date_created.year, date_created.mm, date_created.dd) + if not dry_run and not os.path.isdir(dest_path): + os.makedirs(dest_path) dest_paths = [dest_path] elif directory: # got a directory template, render it and check results are valid @@ -1808,7 +1837,7 @@ def export_photo( dest_path = os.path.join(dest, dirname) if not is_valid_filepath(dest_path, platform="auto"): raise ValueError(f"Invalid file path: '{dest_path}'") - if not os.path.isdir(dest_path): + if not dry_run and not os.path.isdir(dest_path): os.makedirs(dest_path) dest_paths.append(dest_path) @@ -1849,6 +1878,8 @@ def export_photo( keyword_template=keyword_template, update=update, export_db=export_db, + fileutil=fileutil, + dry_run = dry_run, ) results_exported.extend(export_results.exported) @@ -1902,6 +1933,8 @@ def export_photo( keyword_template=keyword_template, update=update, export_db=export_db, + fileutil=fileutil, + dry_run = dry_run, ) results_exported.extend(export_results_edited.exported) diff --git a/osxphotos/_export_db.py b/osxphotos/_export_db.py index 8a587ea2..3a4608bb 100644 --- a/osxphotos/_export_db.py +++ b/osxphotos/_export_db.py @@ -9,6 +9,7 @@ import pathlib import sqlite3 import sys from abc import ABC, abstractmethod +from io import StringIO from sqlite3 import Error from ._version import __version__ @@ -17,6 +18,7 @@ OSXPHOTOS_EXPORTDB_VERSION = "1.0" class ExportDB_ABC(ABC): + """ abstract base class for ExportDB """ @abstractmethod def get_uuid_for_file(self, filename): pass @@ -58,7 +60,7 @@ class ExportDB_ABC(ABC): pass @abstractmethod - def set_data(self, file, uuid, orig_stat, exif_stat, info_json, exif_json): + def set_data(self, filename, uuid, orig_stat, exif_stat, info_json, exif_json): pass @@ -95,7 +97,7 @@ class ExportDBNoOp(ExportDB_ABC): def set_exifdata_for_file(self, uuid, exifdata): pass - def set_data(self, file, uuid, orig_stat, exif_stat, info_json, exif_json): + def set_data(self, filename, uuid, orig_stat, exif_stat, info_json, exif_json): pass @@ -302,7 +304,7 @@ class ExportDB(ExportDB_ABC): logging.debug(f"set_exifdata: {filename}, {exifdata}") - def set_data(self, file, uuid, orig_stat, exif_stat, info_json, exif_json): + def set_data(self, filename, uuid, orig_stat, exif_stat, info_json, exif_json): """ sets all the data for file and uuid at once calls set_uuid_for_file set_info_for_uuid @@ -310,8 +312,6 @@ class ExportDB(ExportDB_ABC): set_stat_exif_for_file set_exifdata_for_file """ - filename = str(pathlib.Path(filename).relative_to(self._path)).lower() - self.set_uuid_for_file(filename, uuid) self.set_info_for_uuid(uuid, info_json) self.set_stat_orig_for_file(filename, orig_stat) @@ -437,3 +437,64 @@ class ExportDB(ExportDB_ABC): conn.commit() except Error as e: logging.warning(e) + + +class ExportDBInMemory(ExportDB): + """ In memory version of ExportDB + Copies the on-disk database into memory so it may be operated on without + modifying the on-disk verison + """ + + def init(self, dbfile): + self._dbfile = dbfile + # _path is parent of the database + # all files referenced by get_/set_uuid_for_file will be converted to + # relative paths to this parent _path + # this allows the entire export tree to be moved to a new disk/location + # whilst preserving the UUID to filename mappping + self._path = pathlib.Path(dbfile).parent + self._conn = self._open_export_db(dbfile) + self._insert_run_info() + + def _open_export_db(self, dbfile): + """ open export database and return a db connection + if dbfile does not exist, will create and initialize the database + returns: connection to the database + """ + if not os.path.isfile(dbfile): + logging.debug(f"dbfile {dbfile} doesn't exist, creating in memory version") + conn = self._get_db_connection() + if conn: + self._create_db_tables(conn) + else: + raise Exception("Error getting connection to in-memory database") + else: + logging.debug(f"dbfile {dbfile} exists, opening it and copying to memory") + try: + conn = sqlite3.connect(dbfile) + except Error as e: + logging.warning(e) + raise e + + tempfile = StringIO() + for line in conn.iterdump(): + tempfile.write("%s\n" % line) + conn.close() + tempfile.seek(0) + + # Create a database in memory and import from tempfile + conn = sqlite3.connect(":memory:") + conn.cursor().executescript(tempfile.read()) + conn.commit() + + return conn + + def _get_db_connection(self): + """ return db connection to in memory database """ + try: + conn = sqlite3.connect(":memory:") + except Error as e: + logging.warning(e) + conn = None + + return conn diff --git a/osxphotos/_filecmp.py b/osxphotos/_filecmp.py deleted file mode 100644 index 03e8f24b..00000000 --- a/osxphotos/_filecmp.py +++ /dev/null @@ -1,54 +0,0 @@ -"""Utilities for comparing files - -Modified from CPython/Lib/filecmp.py - -Functions: - cmp_file(f1, s2) -> int - file_sig(f1) -> Tuple[int, int, float] - -""" - -import os -import stat - -__all__ = ["cmp", "sig"] - - -def cmp_file(f1, s2): - """Compare file f1 to signature s2. - - Arguments: - - f1 -- File name - - s2 -- stats as returned by sig - - Return value: - - True if the files are the same, False otherwise. - - This function uses a cache for past comparisons and the results, - with cache entries invalidated if their stat information - changes. The cache may be cleared by calling clear_cache(). - - """ - - if not s2: - return False - - s1 = _sig(os.stat(f1)) - - if s1[0] != stat.S_IFREG or s2[0] != stat.S_IFREG: - return False - if s1 == s2: - return True - return False - - -def _sig(st): - return (stat.S_IFMT(st.st_mode), st.st_size, st.st_mtime) - - -def file_sig(f1): - """ return os.stat signature for file f1 """ - return _sig(os.stat(f1)) diff --git a/osxphotos/_version.py b/osxphotos/_version.py index 2ff7ad30..5899a8b2 100644 --- a/osxphotos/_version.py +++ b/osxphotos/_version.py @@ -1,3 +1,3 @@ """ version info """ -__version__ = "0.29.4" +__version__ = "0.29.5" diff --git a/osxphotos/fileutil.py b/osxphotos/fileutil.py new file mode 100644 index 00000000..c07c9570 --- /dev/null +++ b/osxphotos/fileutil.py @@ -0,0 +1,175 @@ +""" FileUtil class with methods for copy, hardlink, unlink, etc. """ + +import logging +import os +import pathlib +import stat +import subprocess +import sys +from abc import ABC, abstractmethod + + +class FileUtilABC(ABC): + """ Abstract base class for FileUtil """ + @classmethod + @abstractmethod + def hardlink(cls, src, dest): + pass + + @classmethod + @abstractmethod + def copy(cls, src, dest, norsrc=False): + pass + + @classmethod + @abstractmethod + def unlink(cls, dest): + pass + + @classmethod + @abstractmethod + def cmp_sig(cls, file1, file2): + pass + + @classmethod + @abstractmethod + def file_sig(cls, file1): + pass + + +class FileUtilMacOS(FileUtilABC): + """ Various file utilities """ + @classmethod + def hardlink(cls, src, dest): + """ Hardlinks a file from src path to dest path + src: source path as string + dest: destination path as string + Raises exception if linking fails or either path is None """ + + if src is None or dest is None: + raise ValueError("src and dest must not be None", src, dest) + + if not os.path.isfile(src): + raise FileNotFoundError("src file does not appear to exist", src) + + # if error on copy, subprocess will raise CalledProcessError + try: + os.link(src, dest) + except Exception as e: + logging.critical(f"os.link returned error: {e}") + raise e + + @classmethod + def copy(cls, src, dest, norsrc=False): + """ Copies a file from src path to dest path + src: source path as string + dest: destination path as string + norsrc: (bool) if True, uses --norsrc flag with ditto so it will not copy + resource fork or extended attributes. May be useful on volumes that + don't work with extended attributes (likely only certain SMB mounts) + default is False + Uses ditto to perform copy; will silently overwrite dest if it exists + Raises exception if copy fails or either path is None """ + + if src is None or dest is None: + raise ValueError("src and dest must not be None", src, dest) + + if not os.path.isfile(src): + raise FileNotFoundError("src file does not appear to exist", src) + + if norsrc: + command = ["/usr/bin/ditto", "--norsrc", src, dest] + else: + command = ["/usr/bin/ditto", src, dest] + + # if error on copy, subprocess will raise CalledProcessError + try: + result = subprocess.run(command, check=True, stderr=subprocess.PIPE) + except subprocess.CalledProcessError as e: + logging.critical( + f"ditto returned error: {e.returncode} {e.stderr.decode(sys.getfilesystemencoding()).rstrip()}" + ) + raise e + + return result.returncode + + @classmethod + def unlink(cls, filepath): + """ unlink filepath; if it's pathlib.Path, use Path.unlink, otherwise use os.unlink """ + if isinstance(filepath, pathlib.Path): + filepath.unlink() + else: + os.unlink(filepath) + + @classmethod + def cmp_sig(cls, f1, s2): + """Compare file f1 to signature s2. + Arguments: + f1 -- File name + s2 -- stats as returned by sig + + Return value: + True if the files are the same, False otherwise. + """ + + if not s2: + return False + + s1 = cls._sig(os.stat(f1)) + + if s1[0] != stat.S_IFREG or s2[0] != stat.S_IFREG: + return False + if s1 == s2: + return True + return False + + @classmethod + def file_sig(cls, f1): + """ return os.stat signature for file f1 """ + return cls._sig(os.stat(f1)) + + @staticmethod + def _sig(st): + return (stat.S_IFMT(st.st_mode), st.st_size, st.st_mtime) + + +class FileUtil(FileUtilMacOS): + """ Various file utilities """ + pass + +class FileUtilNoOp(FileUtil): + """ No-Op implementation of FileUtil for testing / dry-run mode + all methods with exception of cmp_sig and file_cmp are no-op + cmp_sig functions as FileUtil.cmp_sig does + file_cmp returns mock data + """ + @staticmethod + def noop(*args): + pass + + verbose = noop + + def __new__(cls, verbose=None): + if verbose: + if callable(verbose): + cls.verbose = verbose + else: + raise ValueError(f"verbose {verbose} not callable") + return super(FileUtilNoOp, cls).__new__(cls) + + @classmethod + def hardlink(cls, src, dest): + cls.verbose(f"hardlink: {src} {dest}") + + @classmethod + def copy(cls, src, dest, norsrc=False): + cls.verbose(f"copy: {src} {dest}") + + @classmethod + def unlink(cls, dest): + cls.verbose(f"unlink: {dest}") + + @classmethod + def file_sig(cls, file1): + cls.verbose(f"file_sig: {file1}") + return (42, 42, 42) diff --git a/osxphotos/photoinfo/_photoinfo_export.py b/osxphotos/photoinfo/_photoinfo_export.py index 4123da6d..ee4a675d 100644 --- a/osxphotos/photoinfo/_photoinfo_export.py +++ b/osxphotos/photoinfo/_photoinfo_export.py @@ -1,4 +1,13 @@ -""" export methods for PhotoInfo """ +""" Export methods for PhotoInfo + The following methods are defined and must be imported into PhotoInfo as instance methods: + export + export2 + _export_photo + _write_exif_data + _exiftool_json_sidecar + _xmp_sidecar + _write_sidecar + """ # TODO: should this be its own PhotoExporter class? @@ -9,10 +18,12 @@ import logging import os import pathlib import re +import tempfile from collections import namedtuple # pylint: disable=syntax-error from mako.template import Template +from .._applescript import AppleScript from .._constants import ( _MAX_IPTC_KEYWORD_LEN, _OSXPHOTOS_NONE_SENTINEL, @@ -20,21 +31,131 @@ from .._constants import ( _UNKNOWN_PERSON, _XMP_TEMPLATE_NAME, ) -from ..exiftool import ExifTool from .._export_db import ExportDBNoOp -from .._filecmp import cmp_file, file_sig -from ..utils import ( - _copy_file, - _export_photo_uuid_applescript, - _hardlink_file, - dd_to_dms_str, -) +from ..exiftool import ExifTool +from ..fileutil import FileUtil +from ..utils import dd_to_dms_str ExportResults = namedtuple( "ExportResults", ["exported", "new", "updated", "skipped", "exif_updated"] ) +# _export_photo_uuid_applescript is not a class method, don't import this into PhotoInfo +def _export_photo_uuid_applescript( + uuid, + dest, + filestem=None, + original=True, + edited=False, + live_photo=False, + timeout=120, + burst=False, + dry_run=False, +): + """ Export photo to dest path using applescript to control Photos + If photo is a live photo, exports both the photo and associated .mov file + uuid: UUID of photo to export + dest: destination path to export to + filestem: (string) if provided, exported filename will be named stem.ext + where ext is extension of the file exported by photos (e.g. .jpeg, .mov, etc) + If not provided, file will be named with whatever name Photos uses + If filestem.ext exists, it wil be overwritten + original: (boolean) if True, export original image; default = True + edited: (boolean) if True, export edited photo; default = False + If photo not edited and edited=True, will still export the original image + caller must verify image has been edited + *Note*: must be called with either edited or original but not both, + will raise error if called with both edited and original = True + live_photo: (boolean) if True, export associated .mov live photo; default = False + timeout: timeout value in seconds; export will fail if applescript run time exceeds timeout + burst: (boolean) set to True if file is a burst image to avoid Photos export error + dry_run: (boolean) set to True to run in "dry run" mode which will download file but not actually copy to destination + Returns: list of paths to exported file(s) or None if export failed + Note: For Live Photos, if edited=True, will export a jpeg but not the movie, even if photo + has not been edited. This is due to how Photos Applescript interface works. + """ + + # setup the applescript to do the export + export_scpt = AppleScript( + """ + on export_by_uuid(theUUID, thePath, original, edited, theTimeOut) + tell application "Photos" + set thePath to thePath + set theItem to media item id theUUID + set theFilename to filename of theItem + set itemList to {theItem} + + if original then + with timeout of theTimeOut seconds + export itemList to POSIX file thePath with using originals + end timeout + end if + + if edited then + with timeout of theTimeOut seconds + export itemList to POSIX file thePath + end timeout + end if + + return theFilename + end tell + + end export_by_uuid + """ + ) + + dest = pathlib.Path(dest) + if not dest.is_dir: + raise ValueError(f"dest {dest} must be a directory") + + if not original ^ edited: + raise ValueError(f"edited or original must be True but not both") + + tmpdir = tempfile.TemporaryDirectory(prefix="osxphotos_") + + # export original + filename = None + try: + filename = export_scpt.call( + "export_by_uuid", uuid, tmpdir.name, original, edited, timeout + ) + except Exception as e: + logging.warning(f"Error exporting uuid {uuid}: {e}") + return None + + if filename is not None: + # need to find actual filename as sometimes Photos renames JPG to jpeg on export + # may be more than one file exported (e.g. if Live Photo, Photos exports both .jpeg and .mov) + # TemporaryDirectory will cleanup on return + filename_stem = pathlib.Path(filename).stem + files = glob.glob(os.path.join(tmpdir.name, "*")) + exported_paths = [] + for fname in files: + path = pathlib.Path(fname) + if len(files) > 1 and not live_photo and path.suffix.lower() == ".mov": + # it's the .mov part of live photo but not requested, so don't export + logging.debug(f"Skipping live photo file {path}") + continue + if len(files) > 1 and burst and path.stem != filename_stem: + # skip any burst photo that's not the one we asked for + logging.debug(f"Skipping burst photo file {path}") + continue + if filestem: + # rename the file based on filestem, keeping original extension + dest_new = dest / f"{filestem}{path.suffix}" + else: + # use the name Photos provided + dest_new = dest / path.name + logging.debug(f"exporting {path} to dest_new: {dest_new}") + if not dry_run: + FileUtil.copy(str(path), str(dest_new)) + exported_paths.append(str(dest_new)) + return exported_paths + else: + return None + + def export( self, dest, @@ -135,8 +256,10 @@ def export2( keyword_template=None, update=False, export_db=None, + fileutil=FileUtil, + dry_run=False, ): - """ export photo + """ export photo, like export but with update and dry_run options dest: must be valid destination path (or exception raised) filename: (optional): name of exported picture; if not provided, will use current filename **NOTE**: if provided, user must ensure file extension (suffix) is correct. @@ -171,12 +294,17 @@ def export2( not export the photo if the current version already exists in the destination export_db: (ExportDB_ABC); instance of a class that conforms to ExportDB_ABC with methods for getting/setting data related to exported files to compare update state + fileutil: (FileUtilABC); class that conforms to FileUtilABC with various file utilities + dry_run: (boolean, default=False); set to True to run in "dry run" mode + Returns: ExportResults namedtuple with fields: exported, new, updated, skipped where each field is a list of file paths + + Note: to use dry run mode, you must set dry_run=True and also pass in memory version of export_db, + and no-op fileutil (e.g. ExportDBInMemory and FileUtilNoOp) """ - # if update, caller may pass function refs to get/set uuid for file being exported - # and for setting/getting the PhotoInfo json info for an exported file + # when called from export(), won't get an export_db, so use no-op version if export_db is None: export_db = ExportDBNoOp() @@ -212,7 +340,7 @@ def export2( # verify destination is a valid path if dest is None: raise ValueError("Destination must not be None") - elif not os.path.isdir(dest): + elif not dry_run and not os.path.isdir(dest): raise FileNotFoundError("Invalid path passed to export") if filename and len(filename) == 1: @@ -279,10 +407,6 @@ def export2( count += 1 dest = dest.parent / f"{dest_new}{dest.suffix}" - # TODO: need way to check if DB is missing, try to find the right photo anyway by seeing if they're the same and then updating - # move the checks into "if not use_photos_export" block below - # if use_photos_export is True then we'll export wether destination exists or not - # if overwrite==False and #increment==False, export should fail if file exists if dest.exists() and not update and not overwrite and not increment: raise FileExistsError( @@ -330,7 +454,7 @@ def export2( dest_uuid = self.uuid export_db.set_uuid_for_file(dest, self.uuid) export_db.set_info_for_uuid(self.uuid, self.json()) - export_db.set_stat_orig_for_file(dest, file_sig(dest)) + export_db.set_stat_orig_for_file(dest, fileutil.file_sig(dest)) export_db.set_stat_exif_for_file(dest, (None, None, None)) export_db.set_exifdata_for_file(dest, None) if dest_uuid != self.uuid: @@ -360,7 +484,7 @@ def export2( found_match = True export_db.set_uuid_for_file(file_, self.uuid) export_db.set_info_for_uuid(self.uuid, self.json()) - export_db.set_stat_orig_for_file(dest, file_sig(dest)) + export_db.set_stat_orig_for_file(dest, fileutil.file_sig(dest)) export_db.set_stat_exif_for_file(dest, (None, None, None)) export_db.set_exifdata_for_file(dest, None) break @@ -392,6 +516,7 @@ def export2( no_xattr, export_as_hardlink, exiftool, + fileutil, ) exported_files = results.exported update_new_files = results.new @@ -416,6 +541,7 @@ def export2( no_xattr, export_as_hardlink, exiftool, + fileutil, ) exported_files.extend(results.exported) update_new_files.extend(results.new) @@ -440,6 +566,7 @@ def export2( no_xattr, export_as_hardlink, exiftool, + fileutil, ) exported_files.extend(results.exported) update_new_files.extend(results.new) @@ -471,6 +598,7 @@ def export2( live_photo=live_photo, timeout=timeout, burst=self.burst, + dry_run=dry_run, ) else: # export original version and not edited @@ -484,6 +612,7 @@ def export2( live_photo=live_photo, timeout=timeout, burst=self.burst, + dry_run=dry_run, ) if exported is not None: @@ -504,11 +633,12 @@ def export2( use_persons_as_keywords=use_persons_as_keywords, keyword_template=keyword_template, ) - try: - self._write_sidecar(sidecar_filename, sidecar_str) - except Exception as e: - logging.warning(f"Error writing json sidecar to {sidecar_filename}") - raise e + if not dry_run: + try: + self._write_sidecar(sidecar_filename, sidecar_str) + except Exception as e: + logging.warning(f"Error writing json sidecar to {sidecar_filename}") + raise e if sidecar_xmp: logging.debug("writing xmp_sidecar") @@ -518,11 +648,12 @@ def export2( use_persons_as_keywords=use_persons_as_keywords, keyword_template=keyword_template, ) - try: - self._write_sidecar(sidecar_filename, sidecar_str) - except Exception as e: - logging.warning(f"Error writing xmp sidecar to {sidecar_filename}") - raise e + if not dry_run: + try: + self._write_sidecar(sidecar_filename, sidecar_str) + except Exception as e: + logging.warning(f"Error writing xmp sidecar to {sidecar_filename}") + raise e # if exiftool, write the metadata if update: @@ -552,12 +683,13 @@ def export2( # didn't have old data, assume we need to write it # or files were different logging.debug(f"No exifdata for {exported_file}, writing it") - self._write_exif_data( - exported_file, - use_albums_as_keywords=use_albums_as_keywords, - use_persons_as_keywords=use_persons_as_keywords, - keyword_template=keyword_template, - ) + if not dry_run: + self._write_exif_data( + exported_file, + use_albums_as_keywords=use_albums_as_keywords, + use_persons_as_keywords=use_persons_as_keywords, + keyword_template=keyword_template, + ) export_db.set_exifdata_for_file( exported_file, self._exiftool_json_sidecar( @@ -566,17 +698,20 @@ def export2( keyword_template=keyword_template, ), ) - export_db.set_stat_exif_for_file(exported_file, file_sig(exported_file)) + export_db.set_stat_exif_for_file( + exported_file, fileutil.file_sig(exported_file) + ) exif_files_updated.append(exported_file) elif exiftool and exif_files: for exported_file in exif_files: logging.debug(f"Writing exif data to {exported_file}") - self._write_exif_data( - exported_file, - use_albums_as_keywords=use_albums_as_keywords, - use_persons_as_keywords=use_persons_as_keywords, - keyword_template=keyword_template, - ) + if not dry_run: + self._write_exif_data( + exported_file, + use_albums_as_keywords=use_albums_as_keywords, + use_persons_as_keywords=use_persons_as_keywords, + keyword_template=keyword_template, + ) export_db.set_exifdata_for_file( exported_file, self._exiftool_json_sidecar( @@ -585,7 +720,9 @@ def export2( keyword_template=keyword_template, ), ) - export_db.set_stat_exif_for_file(exported_file, file_sig(exported_file)) + export_db.set_stat_exif_for_file( + exported_file, fileutil.file_sig(exported_file) + ) exif_files_updated.append(exported_file) return ExportResults( @@ -607,6 +744,7 @@ def _export_photo( no_xattr, export_as_hardlink, exiftool, + fileutil=FileUtil, ): """ Helper function for export() Does the actual copy or hardlink taking the appropriate @@ -621,6 +759,7 @@ def _export_photo( no_xattr: don't copy extended attributes export_as_hardlink: bool exiftool: bool + fileutil: FileUtil class that conforms to fileutil.FileUtilABC Returns: ExportResults """ @@ -637,12 +776,13 @@ def _export_photo( # not update, do the the hardlink if overwrite and dest.exists(): # need to remove the destination first - dest.unlink() + # dest.unlink() + fileutil.unlink(dest) logging.debug(f"Not update: export_as_hardlink linking file {src} {dest}") - _hardlink_file(src, dest) + fileutil.hardlink(src, dest) export_db.set_uuid_for_file(dest_str, self.uuid) export_db.set_info_for_uuid(self.uuid, self.json()) - export_db.set_stat_orig_for_file(dest_str, file_sig(dest_str)) + export_db.set_stat_orig_for_file(dest_str, fileutil.file_sig(dest_str)) export_db.set_stat_exif_for_file(dest_str, (None, None, None)) export_db.set_exifdata_for_file(dest_str, None) exported_files.append(dest_str) @@ -657,11 +797,12 @@ def _export_photo( logging.debug( f"Update: removing existing file prior to export_as_hardlink {src} {dest}" ) - dest.unlink() - _hardlink_file(src, dest) + # dest.unlink() + fileutil.unlink(dest) + fileutil.hardlink(src, dest) export_db.set_uuid_for_file(dest_str, self.uuid) export_db.set_info_for_uuid(self.uuid, self.json()) - export_db.set_stat_orig_for_file(dest_str, file_sig(dest_str)) + export_db.set_stat_orig_for_file(dest_str, fileutil.file_sig(dest_str)) export_db.set_stat_exif_for_file(dest_str, (None, None, None)) export_db.set_exifdata_for_file(dest_str, None) update_updated_files.append(dest_str) @@ -671,10 +812,10 @@ def _export_photo( logging.debug( f"Update: exporting new file with export_as_hardlink {src} {dest}" ) - _hardlink_file(src, dest) + fileutil.hardlink(src, dest) export_db.set_uuid_for_file(dest_str, self.uuid) export_db.set_info_for_uuid(self.uuid, self.json()) - export_db.set_stat_orig_for_file(dest_str, file_sig(dest_str)) + export_db.set_stat_orig_for_file(dest_str, fileutil.file_sig(dest_str)) export_db.set_stat_exif_for_file(dest_str, (None, None, None)) export_db.set_exifdata_for_file(dest_str, None) exported_files.append(dest_str) @@ -684,12 +825,13 @@ def _export_photo( # not update, do the the copy if overwrite and dest.exists(): # need to remove the destination first - dest.unlink() + # dest.unlink() + fileutil.unlink(dest) logging.debug(f"Not update: copying file {src} {dest}") - _copy_file(src, dest_str, norsrc=no_xattr) + fileutil.copy(src, dest_str, norsrc=no_xattr) export_db.set_uuid_for_file(dest_str, self.uuid) export_db.set_info_for_uuid(self.uuid, self.json()) - export_db.set_stat_orig_for_file(dest_str, file_sig(dest_str)) + export_db.set_stat_orig_for_file(dest_str, fileutil.file_sig(dest_str)) export_db.set_stat_exif_for_file(dest_str, (None, None, None)) export_db.set_exifdata_for_file(dest_str, None) exported_files.append(dest_str) @@ -704,12 +846,12 @@ def _export_photo( logging.debug(f"Update: skipping identifical original files {src} {dest}") # call set_stat because code can reach this spot if no export DB but exporting a RAW or live photo # potentially re-writes the data in the database but ensures database is complete - export_db.set_stat_orig_for_file(dest_str, file_sig(dest_str)) + export_db.set_stat_orig_for_file(dest_str, fileutil.file_sig(dest_str)) update_skipped_files.append(dest_str) elif ( dest_exists and exiftool - and cmp_file(dest_str, export_db.get_stat_exif_for_file(dest_str)) + and fileutil.cmp_sig(dest_str, export_db.get_stat_exif_for_file(dest_str)) and not dest.samefile(src) ): # destination exists but is identical @@ -720,11 +862,12 @@ def _export_photo( logging.debug(f"Update: removing existing file prior to copy {src} {dest}") stat_src = os.stat(src) stat_dest = os.stat(dest) - dest.unlink() - _copy_file(src, dest_str, norsrc=no_xattr) + # dest.unlink() + fileutil.unlink(dest) + fileutil.copy(src, dest_str, norsrc=no_xattr) export_db.set_uuid_for_file(dest_str, self.uuid) export_db.set_info_for_uuid(self.uuid, self.json()) - export_db.set_stat_orig_for_file(dest_str, file_sig(dest_str)) + export_db.set_stat_orig_for_file(dest_str, fileutil.file_sig(dest_str)) export_db.set_stat_exif_for_file(dest_str, (None, None, None)) export_db.set_exifdata_for_file(dest_str, None) exported_files.append(dest_str) @@ -732,10 +875,10 @@ def _export_photo( else: # destination doesn't exist, copy the file logging.debug(f"Update: copying new file {src} {dest}") - _copy_file(src, dest_str, norsrc=no_xattr) + fileutil.copy(src, dest_str, norsrc=no_xattr) export_db.set_uuid_for_file(dest_str, self.uuid) export_db.set_info_for_uuid(self.uuid, self.json()) - export_db.set_stat_orig_for_file(dest_str, file_sig(dest_str)) + export_db.set_stat_orig_for_file(dest_str, fileutil.file_sig(dest_str)) export_db.set_stat_exif_for_file(dest_str, (None, None, None)) export_db.set_exifdata_for_file(dest_str, None) exported_files.append(dest_str) diff --git a/osxphotos/utils.py b/osxphotos/utils.py index bc8aa7ec..131cfcf5 100644 --- a/osxphotos/utils.py +++ b/osxphotos/utils.py @@ -18,7 +18,7 @@ import CoreServices import objc from Foundation import * -from osxphotos._applescript import AppleScript +from .fileutil import FileUtil _DEBUG = False @@ -118,60 +118,6 @@ def _dd_to_dms(dd): return int(deg_), int(min_), sec_ -def _hardlink_file(src, dest): - """ Hardlinks a file from src path to dest path - src: source path as string - dest: destination path as string - Raises exception if linking fails or either path is None """ - - if src is None or dest is None: - raise ValueError("src and dest must not be None", src, dest) - - if not os.path.isfile(src): - raise FileNotFoundError("src file does not appear to exist", src) - - # if error on copy, subprocess will raise CalledProcessError - try: - os.link(src, dest) - except Exception as e: - logging.critical(f"os.link returned error: {e}") - raise e - - -def _copy_file(src, dest, norsrc=False): - """ Copies a file from src path to dest path - src: source path as string - dest: destination path as string - norsrc: (bool) if True, uses --norsrc flag with ditto so it will not copy - resource fork or extended attributes. May be useful on volumes that - don't work with extended attributes (likely only certain SMB mounts) - default is False - Uses ditto to perform copy; will silently overwrite dest if it exists - Raises exception if copy fails or either path is None """ - - if src is None or dest is None: - raise ValueError("src and dest must not be None", src, dest) - - if not os.path.isfile(src): - raise FileNotFoundError("src file does not appear to exist", src) - - if norsrc: - command = ["/usr/bin/ditto", "--norsrc", src, dest] - else: - command = ["/usr/bin/ditto", src, dest] - - # if error on copy, subprocess will raise CalledProcessError - try: - result = subprocess.run(command, check=True, stderr=subprocess.PIPE) - except subprocess.CalledProcessError as e: - logging.critical( - f"ditto returned error: {e.returncode} {e.stderr.decode(sys.getfilesystemencoding()).rstrip()}" - ) - raise e - - return result.returncode - - def dd_to_dms_str(lat, lon): """ convert latitude, longitude in degrees to degrees, minutes, seconds as string """ """ lat: latitude in degrees """ @@ -308,24 +254,6 @@ def list_photo_libraries(): return lib_list -def create_path_by_date(dest, dt): - """ Creates a path in dest folder in form dest/YYYY/MM/DD/ - dest: valid path as str - dt: datetime.timetuple() object - Checks to see if path exists, if it does, do nothing and return path - If path does not exist, creates it and returns path""" - if not os.path.isdir(dest): - raise FileNotFoundError(f"dest {dest} must be valid path") - yyyy, mm, dd = dt[0:3] - yyyy = str(yyyy).zfill(4) - mm = str(mm).zfill(2) - dd = str(dd).zfill(2) - new_dest = os.path.join(dest, yyyy, mm, dd) - if not os.path.isdir(new_dest): - os.makedirs(new_dest) - return new_dest - - def get_preferred_uti_extension(uti): """ get preferred extension for a UTI type uti: UTI str, e.g. 'public.jpeg' @@ -365,117 +293,6 @@ def findfiles(pattern, path_): # open_scpt.run() -def _export_photo_uuid_applescript( - uuid, - dest, - filestem=None, - original=True, - edited=False, - live_photo=False, - timeout=120, - burst=False, -): - """ Export photo to dest path using applescript to control Photos - If photo is a live photo, exports both the photo and associated .mov file - uuid: UUID of photo to export - dest: destination path to export to - filestem: (string) if provided, exported filename will be named stem.ext - where ext is extension of the file exported by photos (e.g. .jpeg, .mov, etc) - If not provided, file will be named with whatever name Photos uses - If filestem.ext exists, it wil be overwritten - original: (boolean) if True, export original image; default = True - edited: (boolean) if True, export edited photo; default = False - If photo not edited and edited=True, will still export the original image - caller must verify image has been edited - *Note*: must be called with either edited or original but not both, - will raise error if called with both edited and original = True - live_photo: (boolean) if True, export associated .mov live photo; default = False - timeout: timeout value in seconds; export will fail if applescript run time exceeds timeout - burst: (boolean) set to True if file is a burst image to avoid Photos export error - Returns: list of paths to exported file(s) or None if export failed - Note: For Live Photos, if edited=True, will export a jpeg but not the movie, even if photo - has not been edited. This is due to how Photos Applescript interface works. - """ - - # setup the applescript to do the export - export_scpt = AppleScript( - """ - on export_by_uuid(theUUID, thePath, original, edited, theTimeOut) - tell application "Photos" - set thePath to thePath - set theItem to media item id theUUID - set theFilename to filename of theItem - set itemList to {theItem} - - if original then - with timeout of theTimeOut seconds - export itemList to POSIX file thePath with using originals - end timeout - end if - - if edited then - with timeout of theTimeOut seconds - export itemList to POSIX file thePath - end timeout - end if - - return theFilename - end tell - - end export_by_uuid - """ - ) - - dest = pathlib.Path(dest) - if not dest.is_dir: - raise ValueError(f"dest {dest} must be a directory") - - if not original ^ edited: - raise ValueError(f"edited or original must be True but not both") - - tmpdir = tempfile.TemporaryDirectory(prefix="osxphotos_") - - # export original - filename = None - try: - filename = export_scpt.call( - "export_by_uuid", uuid, tmpdir.name, original, edited, timeout - ) - except Exception as e: - logging.warning("Error exporting uuid %s: %s" % (uuid, str(e))) - return None - - if filename is not None: - # need to find actual filename as sometimes Photos renames JPG to jpeg on export - # may be more than one file exported (e.g. if Live Photo, Photos exports both .jpeg and .mov) - # TemporaryDirectory will cleanup on return - filename_stem = pathlib.Path(filename).stem - files = glob.glob(os.path.join(tmpdir.name, "*")) - exported_paths = [] - for fname in files: - path = pathlib.Path(fname) - if len(files) > 1 and not live_photo and path.suffix.lower() == ".mov": - # it's the .mov part of live photo but not requested, so don't export - logging.debug(f"Skipping live photo file {path}") - continue - if len(files) > 1 and burst and path.stem != filename_stem: - # skip any burst photo that's not the one we asked for - logging.debug(f"Skipping burst photo file {path}") - continue - if filestem: - # rename the file based on filestem, keeping original extension - dest_new = dest / f"{filestem}{path.suffix}" - else: - # use the name Photos provided - dest_new = dest / path.name - logging.debug(f"exporting {path} to dest_new: {dest_new}") - _copy_file(str(path), str(dest_new)) - exported_paths.append(str(dest_new)) - return exported_paths - else: - return None - - def _open_sql_file(dbname): """ opens sqlite file dbname in read-only mode returns tuple of (connection, cursor) """ @@ -527,7 +344,7 @@ def _db_is_locked(dbname): # except KeyError: # uuid_str = None # return uuid_str - + # def set_uuid_for_file(filepath, uuid): # """ sets the UUID associated with an exported file # filepath: path to exported photo diff --git a/tests/test_cli.py b/tests/test_cli.py index 1b6b9aa1..edc7e394 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1299,8 +1299,116 @@ def test_export_then_hardlink(): assert not os.path.samefile(CLI_EXPORT_UUID_FILENAME, photo.path) result = runner.invoke( - export, [os.path.join(cwd, CLI_PHOTOS_DB), ".", "--export-as-hardlink", "--overwrite"] + export, + [ + os.path.join(cwd, CLI_PHOTOS_DB), + ".", + "--export-as-hardlink", + "--overwrite", + ], ) assert result.exit_code == 0 assert "Exported: 8 photos" in result.output assert os.path.samefile(CLI_EXPORT_UUID_FILENAME, photo.path) + + +def test_export_dry_run(): + """ test export with dry-run flag """ + import glob + import os + import os.path + import osxphotos + from osxphotos.__main__ import export + + runner = CliRunner() + cwd = os.getcwd() + # pylint: disable=not-context-manager + with runner.isolated_filesystem(): + result = runner.invoke( + export, [os.path.join(cwd, CLI_PHOTOS_DB), ".", "-V", "--dry-run"] + ) + assert result.exit_code == 0 + assert "Exported: 8 photos" in result.output + for filepath in CLI_EXPORT_FILENAMES: + assert f"Exported {filepath}" in result.output + assert not os.path.isfile(filepath) + + +def test_export_update_edits_dry_run(): + """ test export then update after removing and editing files with dry-run flag """ + import glob + import os + import os.path + import shutil + + import osxphotos + from osxphotos.__main__ import export + + runner = CliRunner() + cwd = os.getcwd() + # pylint: disable=not-context-manager + with runner.isolated_filesystem(): + # basic export + result = runner.invoke( + export, [os.path.join(cwd, CLI_PHOTOS_DB), ".", "-V", "--export-by-date"] + ) + assert result.exit_code == 0 + + # change a couple of destination photos + os.unlink(CLI_EXPORT_BY_DATE[1]) + shutil.copyfile(CLI_EXPORT_BY_DATE[0], CLI_EXPORT_BY_DATE[1]) + os.unlink(CLI_EXPORT_BY_DATE[0]) + + # update dry-run + result = runner.invoke( + export, + [ + os.path.join(cwd, CLI_PHOTOS_DB), + ".", + "--update", + "--export-by-date", + "--dry-run", + ], + ) + assert result.exit_code == 0 + assert ( + "Exported: 1 photo, updated: 1 photo, skipped: 6 photos, updated EXIF data: 0 photos" + in result.output + ) + + # make sure file didn't really get copied + assert not os.path.isfile(CLI_EXPORT_BY_DATE[0]) + + +def test_export_directory_template_1_dry_run(): + """ test export using directory template with dry-run flag """ + import glob + import locale + import os + import os.path + import osxphotos + from osxphotos.__main__ import export + + locale.setlocale(locale.LC_ALL, "en_US") + + runner = CliRunner() + cwd = os.getcwd() + # pylint: disable=not-context-manager + with runner.isolated_filesystem(): + result = runner.invoke( + export, + [ + os.path.join(cwd, CLI_PHOTOS_DB), + ".", + "-V", + "--directory", + "{created.year}/{created.month}", + "--dry-run", + ], + ) + assert result.exit_code == 0 + assert "Exported: 8 photos" in result.output + workdir = os.getcwd() + for filepath in CLI_EXPORTED_DIRECTORY_TEMPLATE_FILENAMES1: + assert f"Exported {filepath}" in result.output + assert not os.path.isfile(os.path.join(workdir, filepath)) diff --git a/tests/test_exiftool.py b/tests/test_exiftool.py index 9ad01815..9044acf7 100644 --- a/tests/test_exiftool.py +++ b/tests/test_exiftool.py @@ -94,12 +94,12 @@ def test_setvalue_1(): # test setting a tag value import os.path import tempfile - from osxphotos.utils import _copy_file import osxphotos.exiftool + from osxphotos.fileutil import FileUtil tempdir = tempfile.TemporaryDirectory(prefix="osxphotos_") tempfile = os.path.join(tempdir.name, os.path.basename(TEST_FILE_ONE_KEYWORD)) - _copy_file(TEST_FILE_ONE_KEYWORD, tempfile) + FileUtil.copy(TEST_FILE_ONE_KEYWORD, tempfile) exif = osxphotos.exiftool.ExifTool(tempfile) exif.setvalue("IPTC:Keywords", "test") @@ -111,12 +111,12 @@ def test_clear_value(): # test clearing a tag value import os.path import tempfile - from osxphotos.utils import _copy_file import osxphotos.exiftool + from osxphotos.fileutil import FileUtil tempdir = tempfile.TemporaryDirectory(prefix="osxphotos_") tempfile = os.path.join(tempdir.name, os.path.basename(TEST_FILE_ONE_KEYWORD)) - _copy_file(TEST_FILE_ONE_KEYWORD, tempfile) + FileUtil.copy(TEST_FILE_ONE_KEYWORD, tempfile) exif = osxphotos.exiftool.ExifTool(tempfile) assert "IPTC:Keywords" in exif.data @@ -130,12 +130,12 @@ def test_addvalues_1(): # test setting a tag value import os.path import tempfile - from osxphotos.utils import _copy_file import osxphotos.exiftool + from osxphotos.fileutil import FileUtil tempdir = tempfile.TemporaryDirectory(prefix="osxphotos_") tempfile = os.path.join(tempdir.name, os.path.basename(TEST_FILE_ONE_KEYWORD)) - _copy_file(TEST_FILE_ONE_KEYWORD, tempfile) + FileUtil.copy(TEST_FILE_ONE_KEYWORD, tempfile) exif = osxphotos.exiftool.ExifTool(tempfile) exif.addvalues("IPTC:Keywords", "test") @@ -147,12 +147,12 @@ def test_addvalues_2(): # test setting a tag value where multiple values already exist import os.path import tempfile - from osxphotos.utils import _copy_file import osxphotos.exiftool + from osxphotos.fileutil import FileUtil tempdir = tempfile.TemporaryDirectory(prefix="osxphotos_") tempfile = os.path.join(tempdir.name, os.path.basename(TEST_FILE_MULTI_KEYWORD)) - _copy_file(TEST_FILE_MULTI_KEYWORD, tempfile) + FileUtil.copy(TEST_FILE_MULTI_KEYWORD, tempfile) exif = osxphotos.exiftool.ExifTool(tempfile) assert sorted(exif.data["IPTC:Keywords"]) == sorted(TEST_MULTI_KEYWORDS) diff --git a/tests/test_export_db.py b/tests/test_export_db.py new file mode 100644 index 00000000..df32608b --- /dev/null +++ b/tests/test_export_db.py @@ -0,0 +1,149 @@ +""" Test ExportDB """ + +import pytest + +EXIF_DATA = """[{"_CreatedBy": "osxphotos, https://github.com/RhetTbull/osxphotos", "EXIF:ImageDescription": "\u2068Elder Park\u2069, \u2068Adelaide\u2069, \u2068Australia\u2069", "XMP:Description": "\u2068Elder Park\u2069, \u2068Adelaide\u2069, \u2068Australia\u2069", "XMP:Title": "Elder Park", "EXIF:GPSLatitude": "34 deg 55' 8.01\" S", "EXIF:GPSLongitude": "138 deg 35' 48.70\" E", "Composite:GPSPosition": "34 deg 55' 8.01\" S, 138 deg 35' 48.70\" E", "EXIF:GPSLatitudeRef": "South", "EXIF:GPSLongitudeRef": "East", "EXIF:DateTimeOriginal": "2017:06:20 17:18:56", "EXIF:OffsetTimeOriginal": "+09:30", "EXIF:ModifyDate": "2020:05:18 14:42:04"}]""" +INFO_DATA = """{"uuid": "3DD2C897-F19E-4CA6-8C22-B027D5A71907", "filename": "3DD2C897-F19E-4CA6-8C22-B027D5A71907.jpeg", "original_filename": "IMG_4547.jpg", "date": "2017-06-20T17:18:56.518000+09:30", "description": "\u2068Elder Park\u2069, \u2068Adelaide\u2069, \u2068Australia\u2069", "title": "Elder Park", "keywords": [], "labels": ["Statue", "Art"], "albums": ["AlbumInFolder"], "folders": {"AlbumInFolder": ["Folder1", "SubFolder2"]}, "persons": [], "path": "/Users/rhet/Pictures/Test-10.15.4.photoslibrary/originals/3/3DD2C897-F19E-4CA6-8C22-B027D5A71907.jpeg", "ismissing": false, "hasadjustments": true, "external_edit": false, "favorite": false, "hidden": false, "latitude": -34.91889167000001, "longitude": 138.59686167, "path_edited": "/Users/rhet/Pictures/Test-10.15.4.photoslibrary/resources/renders/3/3DD2C897-F19E-4CA6-8C22-B027D5A71907_1_201_a.jpeg", "shared": false, "isphoto": true, "ismovie": false, "uti": "public.jpeg", "burst": false, "live_photo": false, "path_live_photo": null, "iscloudasset": false, "incloud": null, "date_modified": "2020-05-18T14:42:04.608664+09:30", "portrait": false, "screenshot": false, "slow_mo": false, "time_lapse": false, "hdr": false, "selfie": false, "panorama": false, "has_raw": false, "uti_raw": null, "path_raw": null, "place": {"name": "Elder Park, Adelaide, South Australia, Australia, River Torrens", "names": {"field0": [], "country": ["Australia"], "state_province": ["South Australia"], "sub_administrative_area": ["Adelaide"], "city": ["Adelaide", "Adelaide"], "field5": [], "additional_city_info": ["Adelaide CBD", "Tarndanya"], "ocean": [], "area_of_interest": ["Elder Park", ""], "inland_water": ["River Torrens", "River Torrens"], "field10": [], "region": [], "sub_throughfare": [], "field13": [], "postal_code": [], "field15": [], "field16": [], "street_address": [], "body_of_water": ["River Torrens", "River Torrens"]}, "country_code": "AU", "ishome": false, "address_str": "River Torrens, Adelaide SA, Australia", "address": {"street": null, "sub_locality": "Tarndanya", "city": "Adelaide", "sub_administrative_area": "Adelaide", "state_province": "SA", "postal_code": null, "country": "Australia", "iso_country_code": "AU"}}, "exif": {"flash_fired": false, "iso": 320, "metering_mode": 3, "sample_rate": null, "track_format": null, "white_balance": 0, "aperture": 2.2, "bit_rate": null, "duration": null, "exposure_bias": 0.0, "focal_length": 4.15, "fps": null, "latitude": null, "longitude": null, "shutter_speed": 0.058823529411764705, "camera_make": "Apple", "camera_model": "iPhone 6s", "codec": null, "lens_model": "iPhone 6s back camera 4.15mm f/2.2"}}""" +EXIF_DATA2 = """[{"_CreatedBy": "osxphotos, https://github.com/RhetTbull/osxphotos", "XMP:Title": "St. James's Park", "XMP:TagsList": ["London 2018", "St. James's Park", "England", "United Kingdom", "UK", "London"], "IPTC:Keywords": ["London 2018", "St. James's Park", "England", "United Kingdom", "UK", "London"], "XMP:Subject": ["London 2018", "St. James's Park", "England", "United Kingdom", "UK", "London"], "EXIF:GPSLatitude": "51 deg 30' 12.86\" N", "EXIF:GPSLongitude": "0 deg 7' 54.50\" W", "Composite:GPSPosition": "51 deg 30' 12.86\" N, 0 deg 7' 54.50\" W", "EXIF:GPSLatitudeRef": "North", "EXIF:GPSLongitudeRef": "West", "EXIF:DateTimeOriginal": "2018:10:13 09:18:12", "EXIF:OffsetTimeOriginal": "-04:00", "EXIF:ModifyDate": "2019:12:08 14:06:44"}]""" +INFO_DATA2 = """{"uuid": "F2BB3F98-90F0-4E4C-A09B-25C6822A4529", "filename": "F2BB3F98-90F0-4E4C-A09B-25C6822A4529.jpeg", "original_filename": "IMG_8440.JPG", "date": "2019-06-11T11:42:06.711805-07:00", "description": null, "title": null, "keywords": [], "labels": ["Sky", "Cloudy", "Fence", "Land", "Outdoor", "Park", "Amusement Park", "Roller Coaster"], "albums": [], "folders": {}, "persons": [], "path": "/Volumes/MacBook Catalina - Data/Users/rhet/Pictures/Photos Library.photoslibrary/originals/F/F2BB3F98-90F0-4E4C-A09B-25C6822A4529.jpeg", "ismissing": false, "hasadjustments": false, "external_edit": false, "favorite": false, "hidden": false, "latitude": 33.81558666666667, "longitude": -117.99298, "path_edited": null, "shared": false, "isphoto": true, "ismovie": false, "uti": "public.jpeg", "burst": false, "live_photo": false, "path_live_photo": null, "iscloudasset": true, "incloud": true, "date_modified": "2019-10-14T00:51:47.141950-07:00", "portrait": false, "screenshot": false, "slow_mo": false, "time_lapse": false, "hdr": false, "selfie": false, "panorama": false, "has_raw": false, "uti_raw": null, "path_raw": null, "place": {"name": "Adventure City, Stanton, California, United States", "names": {"field0": [], "country": ["United States"], "state_province": ["California"], "sub_administrative_area": ["Orange"], "city": ["Stanton", "Anaheim", "Anaheim"], "field5": [], "additional_city_info": ["West Anaheim"], "ocean": [], "area_of_interest": ["Adventure City", "Adventure City"], "inland_water": [], "field10": [], "region": [], "sub_throughfare": [], "field13": [], "postal_code": [], "field15": [], "field16": [], "street_address": [], "body_of_water": []}, "country_code": "US", "ishome": false, "address_str": "Adventure City, 1240 S Beach Blvd, Anaheim, CA 92804, United States", "address": {"street": "1240 S Beach Blvd", "sub_locality": "West Anaheim", "city": "Stanton", "sub_administrative_area": "Orange", "state_province": "CA", "postal_code": "92804", "country": "United States", "iso_country_code": "US"}}, "exif": {"flash_fired": false, "iso": 25, "metering_mode": 5, "sample_rate": null, "track_format": null, "white_balance": 0, "aperture": 2.2, "bit_rate": null, "duration": null, "exposure_bias": 0.0, "focal_length": 4.15, "fps": null, "latitude": null, "longitude": null, "shutter_speed": 0.0004940711462450593, "camera_make": "Apple", "camera_model": "iPhone 6s", "codec": null, "lens_model": "iPhone 6s back camera 4.15mm f/2.2"}}""" + +def test_export_db(): + """ test ExportDB """ + import os + import tempfile + from osxphotos._export_db import ExportDB + + tempdir = tempfile.TemporaryDirectory(prefix="osxphotos_") + dbname = os.path.join(tempdir.name, ".osxphotos_export.db") + db = ExportDB(dbname) + assert os.path.isfile(dbname) + + filepath = os.path.join(tempdir.name,"test.JPG") + filepath_lower = os.path.join(tempdir.name,"test.jpg") + + db.set_uuid_for_file(filepath, "FOO-BAR") + # filename should be case-insensitive + assert db.get_uuid_for_file(filepath_lower) == "FOO-BAR" + db.set_info_for_uuid("FOO-BAR", INFO_DATA) + assert db.get_info_for_uuid("FOO-BAR") == INFO_DATA + db.set_exifdata_for_file(filepath, EXIF_DATA) + assert db.get_exifdata_for_file(filepath) == EXIF_DATA + db.set_stat_orig_for_file(filepath, (1, 2, 3)) + assert db.get_stat_orig_for_file(filepath) == (1, 2, 3) + db.set_stat_exif_for_file(filepath, (4, 5, 6)) + assert db.get_stat_exif_for_file(filepath) == (4, 5, 6) + + # test set_data which sets all at the same time + filepath2 = os.path.join(tempdir.name,"test2.jpg") + db.set_data(filepath2, "BAR-FOO", (1, 2, 3), (4, 5, 6), INFO_DATA, EXIF_DATA) + assert db.get_uuid_for_file(filepath2) == "BAR-FOO" + assert db.get_info_for_uuid("BAR-FOO") == INFO_DATA + assert db.get_exifdata_for_file(filepath2) == EXIF_DATA + assert db.get_stat_orig_for_file(filepath2) == (1, 2, 3) + assert db.get_stat_exif_for_file(filepath2) == (4, 5, 6) + + # close and re-open + db.close() + db = ExportDB(dbname) + assert db.get_uuid_for_file(filepath2) == "BAR-FOO" + assert db.get_info_for_uuid("BAR-FOO") == INFO_DATA + assert db.get_exifdata_for_file(filepath2) == EXIF_DATA + assert db.get_stat_orig_for_file(filepath2) == (1, 2, 3) + assert db.get_stat_exif_for_file(filepath2) == (4, 5, 6) + + # update data + db.set_uuid_for_file(filepath, "FUBAR") + assert db.get_uuid_for_file(filepath) == "FUBAR" + +def test_export_db_no_op(): + """ test ExportDBNoOp """ + import os + import tempfile + from osxphotos._export_db import ExportDBNoOp + + tempdir = tempfile.TemporaryDirectory(prefix="osxphotos_") + db = ExportDBNoOp() + + filepath = os.path.join(tempdir.name,"test.JPG") + filepath_lower = os.path.join(tempdir.name,"test.jpg") + + db.set_uuid_for_file(filepath, "FOO-BAR") + # filename should be case-insensitive + assert db.get_uuid_for_file(filepath_lower) is None + db.set_info_for_uuid("FOO-BAR", INFO_DATA) + assert db.get_info_for_uuid("FOO-BAR") is None + db.set_exifdata_for_file(filepath, EXIF_DATA) + assert db.get_exifdata_for_file(filepath) is None + db.set_stat_orig_for_file(filepath, (1, 2, 3)) + assert db.get_stat_orig_for_file(filepath) is None + db.set_stat_exif_for_file(filepath, (4, 5, 6)) + assert db.get_stat_exif_for_file(filepath) is None + + # test set_data which sets all at the same time + filepath2 = os.path.join(tempdir.name,"test2.jpg") + db.set_data(filepath2, "BAR-FOO", (1, 2, 3), (4, 5, 6), INFO_DATA, EXIF_DATA) + assert db.get_uuid_for_file(filepath2) is None + assert db.get_info_for_uuid("BAR-FOO") is None + assert db.get_exifdata_for_file(filepath2) is None + assert db.get_stat_orig_for_file(filepath2) is None + assert db.get_stat_exif_for_file(filepath2) is None + + # update data + db.set_uuid_for_file(filepath, "FUBAR") + assert db.get_uuid_for_file(filepath) is None + +def test_export_db_in_memory(): + """ test ExportDBInMemory """ + import os + import tempfile + from osxphotos._export_db import ExportDB, ExportDBInMemory + + tempdir = tempfile.TemporaryDirectory(prefix="osxphotos_") + dbname = os.path.join(tempdir.name, ".osxphotos_export.db") + db = ExportDB(dbname) + assert os.path.isfile(dbname) + + filepath = os.path.join(tempdir.name,"test.JPG") + filepath_lower = os.path.join(tempdir.name,"test.jpg") + + db.set_uuid_for_file(filepath, "FOO-BAR") + db.set_info_for_uuid("FOO-BAR", INFO_DATA) + db.set_exifdata_for_file(filepath, EXIF_DATA) + db.set_stat_orig_for_file(filepath, (1, 2, 3)) + db.set_stat_exif_for_file(filepath, (4, 5, 6)) + + db.close() + + dbram = ExportDBInMemory(dbname) + + # verify values as expected + assert dbram.get_uuid_for_file(filepath_lower) == "FOO-BAR" + assert dbram.get_info_for_uuid("FOO-BAR") == INFO_DATA + assert dbram.get_exifdata_for_file(filepath) == EXIF_DATA + assert dbram.get_stat_orig_for_file(filepath) == (1, 2, 3) + assert dbram.get_stat_exif_for_file(filepath) == (4, 5, 6) + + # change a value + dbram.set_uuid_for_file(filepath, "FUBAR") + dbram.set_info_for_uuid("FUBAR", INFO_DATA2) + dbram.set_exifdata_for_file(filepath, EXIF_DATA2) + dbram.set_stat_orig_for_file(filepath, (7,8,9)) + dbram.set_stat_exif_for_file(filepath, (10,11,12)) + + assert dbram.get_uuid_for_file(filepath_lower) == "FUBAR" + assert dbram.get_info_for_uuid("FUBAR") == INFO_DATA2 + assert dbram.get_exifdata_for_file(filepath) == EXIF_DATA2 + assert dbram.get_stat_orig_for_file(filepath) == (7,8,9) + assert dbram.get_stat_exif_for_file(filepath) == (10,11,12) + + dbram.close() + + # re-open on disk and verify no changes + db = ExportDB(dbname) + assert db.get_uuid_for_file(filepath_lower) == "FOO-BAR" + assert db.get_info_for_uuid("FOO-BAR") == INFO_DATA + assert db.get_exifdata_for_file(filepath) == EXIF_DATA + assert db.get_stat_orig_for_file(filepath) == (1, 2, 3) + assert db.get_stat_exif_for_file(filepath) == (4, 5, 6) + + assert db.get_info_for_uuid("FUBAR") is None \ No newline at end of file diff --git a/tests/test_fileutil.py b/tests/test_fileutil.py new file mode 100644 index 00000000..c50db763 --- /dev/null +++ b/tests/test_fileutil.py @@ -0,0 +1,69 @@ +""" test FileUtil """ + +import pytest + + +def test_copy_file_valid(): + # copy file with valid src, dest + import os.path + import tempfile + from osxphotos.fileutil import FileUtil + + temp_dir = tempfile.TemporaryDirectory(prefix="osxphotos_") + src = "tests/test-images/wedding.jpg" + result = FileUtil.copy(src, temp_dir.name) + assert result == 0 + assert os.path.isfile(os.path.join(temp_dir.name, "wedding.jpg")) + + +def test_copy_file_invalid(): + # copy file with invalid src + import tempfile + from osxphotos.fileutil import FileUtil + + temp_dir = tempfile.TemporaryDirectory(prefix="osxphotos_") + src = "tests/test-images/wedding_DOES_NOT_EXIST.jpg" + with pytest.raises(Exception) as e: + assert FileUtil.copy(src, temp_dir.name) + assert e.type == FileNotFoundError + + +def test_copy_file_norsrc(): + # copy file with --norsrc + import os.path + import tempfile + from osxphotos.fileutil import FileUtil + + temp_dir = tempfile.TemporaryDirectory(prefix="osxphotos_") + src = "tests/test-images/wedding.jpg" + result = FileUtil.copy(src, temp_dir.name, norsrc=True) + assert result == 0 + assert os.path.isfile(os.path.join(temp_dir.name, "wedding.jpg")) + + +def test_hardlink_file_valid(): + # hardlink file with valid src, dest + import os.path + import tempfile + from osxphotos.fileutil import FileUtil + + temp_dir = tempfile.TemporaryDirectory(prefix="osxphotos_") + src = "tests/test-images/wedding.jpg" + dest = os.path.join(temp_dir.name, "wedding.jpg") + FileUtil.hardlink(src, dest) + assert os.path.isfile(dest) + assert os.path.samefile(src, dest) + + +def test_unlink_file(): + import os.path + import tempfile + from osxphotos.fileutil import FileUtil + + temp_dir = tempfile.TemporaryDirectory(prefix="osxphotos_") + src = "tests/test-images/wedding.jpg" + dest = os.path.join(temp_dir.name, "wedding.jpg") + result = FileUtil.copy(src, temp_dir.name) + assert os.path.isfile(dest) + FileUtil.unlink(dest) + assert not os.path.isfile(dest) diff --git a/tests/test_utils.py b/tests/test_utils.py index fe3872e4..2986e22c 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -55,44 +55,6 @@ def test_db_is_locked_unlocked(): assert not osxphotos.utils._db_is_locked(DB_UNLOCKED_10_15) -def test_copy_file_valid(): - # _copy_file with valid src, dest - import os.path - import tempfile - from osxphotos.utils import _copy_file - - temp_dir = tempfile.TemporaryDirectory(prefix="osxphotos_") - src = "tests/test-images/wedding.jpg" - result = _copy_file(src, temp_dir.name) - assert result == 0 - assert os.path.isfile(os.path.join(temp_dir.name, "wedding.jpg")) - - -def test_copy_file_invalid(): - # _copy_file with invalid src - import tempfile - from osxphotos.utils import _copy_file - - temp_dir = tempfile.TemporaryDirectory(prefix="osxphotos_") - src = "tests/test-images/wedding_DOES_NOT_EXIST.jpg" - with pytest.raises(Exception) as e: - assert _copy_file(src, temp_dir.name) - assert e.type == FileNotFoundError - - -def test_copy_file_norsrc(): - # _copy_file with --norsrc - import os.path - import tempfile - from osxphotos.utils import _copy_file - - temp_dir = tempfile.TemporaryDirectory(prefix="osxphotos_") - src = "tests/test-images/wedding.jpg" - result = _copy_file(src, temp_dir.name, norsrc=True) - assert result == 0 - assert os.path.isfile(os.path.join(temp_dir.name, "wedding.jpg")) - - def test_get_preferred_uti_extension(): from osxphotos.utils import get_preferred_uti_extension