diff --git a/README.md b/README.md
index 01167376..606ffbe1 100644
--- a/README.md
+++ b/README.md
@@ -1732,7 +1732,7 @@ Substitution Description
{lf} A line feed: '\n', alias for {newline}
{cr} A carriage return: '\r'
{crlf} a carriage return + line feed: '\r\n'
-{osxphotos_version} The osxphotos version, e.g. '0.45.12'
+{osxphotos_version} The osxphotos version, e.g. '0.46.0'
{osxphotos_cmd_line} The full command line used to run osxphotos
The following substitutions may result in multiple values. Thus if specified for
@@ -3636,7 +3636,7 @@ The following template field substitutions are availabe for use the templating s
|{lf}|A line feed: '\n', alias for {newline}|
|{cr}|A carriage return: '\r'|
|{crlf}|a carriage return + line feed: '\r\n'|
-|{osxphotos_version}|The osxphotos version, e.g. '0.45.12'|
+|{osxphotos_version}|The osxphotos version, e.g. '0.46.0'|
|{osxphotos_cmd_line}|The full command line used to run osxphotos|
|{album}|Album(s) photo is contained in|
|{folder_album}|Folder path + album photo is contained in. e.g. 'Folder/Subfolder/Album' or just 'Album' if no enclosing folder|
@@ -3736,7 +3736,7 @@ Args:
Returns: ExportResults instance
-*Note*: to use dry run mode, you must set options.dry_run=True and also pass in memory version of export_db, and no-op fileutil (e.g. ExportDBInMemory and FileUtilNoOp) in options.export_db and options.fileutil respectively.
+*Note*: to use dry run mode, you must set options.dry_run=True and also pass in memory version of export_db, and no-op fileutil (e.g. `ExportDBInMemory` and `FileUtilNoOp`) in options.export_db and options.fileutil respectively.
#### `ExportOptions`
@@ -3752,7 +3752,7 @@ Attributes:
- exiftool_flags (list of str): optional list of flags to pass to exiftool when using exiftool option, e.g ["-m", "-F"]
- exiftool: (bool, default = False): if True, will use exiftool to write metadata to export file
- export_as_hardlink: (bool, default=False): if True, will hardlink files instead of copying them
-- export_db: (ExportDB_ABC): instance of a class that conforms to ExportDB_ABC with methods for getting/setting data related to exported files to compare update state
+- export_db: (ExportDB): instance of a class that conforms to ExportDB with methods for getting/setting data related to exported files to compare update state
- fileutil: (FileUtilABC): class that conforms to FileUtilABC with various file utilities
- ignore_date_modified (bool): for use with sidecar and exiftool; if True, sets EXIF:ModifyDate to EXIF:DateTimeOriginal even if date_modified is set
- ignore_signature (bool, default=False): ignore file signature when used with update (look only at filename)
diff --git a/dev_requirements.txt b/dev_requirements.txt
index d322ae60..fd1fbbe6 100644
--- a/dev_requirements.txt
+++ b/dev_requirements.txt
@@ -8,3 +8,4 @@ sphinx_rtd_theme
twine
wheel
Sphinx
+pdbpp
\ No newline at end of file
diff --git a/docs/.buildinfo b/docs/.buildinfo
index c310cc0e..2ceb67e7 100644
--- a/docs/.buildinfo
+++ b/docs/.buildinfo
@@ -1,4 +1,4 @@
# Sphinx build info version 1
# This file hashes the configuration used when building these files. When it is not found, a full rebuild will be done.
-config: 30f35e310e3c2ebf8f561c37c515c685
+config: 5b6236594d7900f08d9a1afda487bf3c
tags: 645f666f9bcd5a90fca523b33c5a78b7
diff --git a/docs/_static/documentation_options.js b/docs/_static/documentation_options.js
index 8d356e17..5eee5cef 100644
--- a/docs/_static/documentation_options.js
+++ b/docs/_static/documentation_options.js
@@ -1,6 +1,6 @@
var DOCUMENTATION_OPTIONS = {
URL_ROOT: document.getElementById("documentation_options").getAttribute('data-url_root'),
- VERSION: '0.45.12',
+ VERSION: '0.46.0',
LANGUAGE: 'None',
COLLAPSE_INDEX: false,
BUILDER: 'html',
diff --git a/docs/cli.html b/docs/cli.html
index a938ec4c..8e1f26d0 100644
--- a/docs/cli.html
+++ b/docs/cli.html
@@ -6,7 +6,7 @@
-
osxphotos command line interface (CLI) — osxphotos 0.45.12 documentation
+ osxphotos command line interface (CLI) — osxphotos 0.46.0 documentation
diff --git a/docs/genindex.html b/docs/genindex.html
index b1d9c556..8cd0385c 100644
--- a/docs/genindex.html
+++ b/docs/genindex.html
@@ -5,7 +5,7 @@
- Index — osxphotos 0.45.12 documentation
+ Index — osxphotos 0.46.0 documentation
diff --git a/docs/index.html b/docs/index.html
index bac94ce1..dd6ea4f3 100644
--- a/docs/index.html
+++ b/docs/index.html
@@ -6,7 +6,7 @@
- Welcome to osxphotos’s documentation! — osxphotos 0.45.12 documentation
+ Welcome to osxphotos’s documentation! — osxphotos 0.46.0 documentation
diff --git a/docs/modules.html b/docs/modules.html
index ac89b12a..1a7ed36a 100644
--- a/docs/modules.html
+++ b/docs/modules.html
@@ -6,7 +6,7 @@
- osxphotos — osxphotos 0.45.12 documentation
+ osxphotos — osxphotos 0.46.0 documentation
diff --git a/docs/reference.html b/docs/reference.html
index 566dc0bf..33a2ed55 100644
--- a/docs/reference.html
+++ b/docs/reference.html
@@ -6,7 +6,7 @@
- osxphotos package — osxphotos 0.45.12 documentation
+ osxphotos package — osxphotos 0.46.0 documentation
diff --git a/docs/search.html b/docs/search.html
index b965cf3f..509c15c4 100644
--- a/docs/search.html
+++ b/docs/search.html
@@ -5,7 +5,7 @@
- Search — osxphotos 0.45.12 documentation
+ Search — osxphotos 0.46.0 documentation
diff --git a/osxphotos/__init__.py b/osxphotos/__init__.py
index 482c734d..a28dc6c4 100644
--- a/osxphotos/__init__.py
+++ b/osxphotos/__init__.py
@@ -1,7 +1,7 @@
from ._constants import AlbumSortOrder
from ._version import __version__
from .exiftool import ExifTool
-from .export_db import ExportDB, ExportDBInMemory, ExportDBNoOp
+from .export_db import ExportDB
from .fileutil import FileUtil, FileUtilNoOp
from .momentinfo import MomentInfo
from .personinfo import PersonInfo
@@ -25,8 +25,7 @@ __all__ = [
"CommentInfo",
"ExifTool",
"ExportDB",
- "ExportDBInMemory",
- "ExportDBNoOp",
+ "ExportDBTemp",
"ExportOptions",
"ExportResults",
"FileUtil",
diff --git a/osxphotos/_version.py b/osxphotos/_version.py
index 0e28e942..8d0b24ed 100644
--- a/osxphotos/_version.py
+++ b/osxphotos/_version.py
@@ -1,3 +1,3 @@
""" version info """
-__version__ = "0.45.12"
+__version__ = "0.46.0"
diff --git a/osxphotos/cli.py b/osxphotos/cli.py
index 328d37d9..67a971a4 100644
--- a/osxphotos/cli.py
+++ b/osxphotos/cli.py
@@ -65,6 +65,16 @@ from .crash_reporter import crash_reporter
from .datetime_formatter import DateTimeFormatter
from .exiftool import get_exiftool_path
from .export_db import ExportDB, ExportDBInMemory
+from .export_db_utils import (
+ OSXPHOTOS_EXPORTDB_VERSION,
+ export_db_check_signatures,
+ export_db_get_last_run,
+ export_db_get_version,
+ export_db_save_config_to_file,
+ export_db_touch_files,
+ export_db_update_signatures,
+ export_db_vacuum,
+)
from .fileutil import FileUtil, FileUtilNoOp
from .path_utils import is_valid_filepath, sanitize_filename, sanitize_filepath
from .photoexporter import ExportOptions, ExportResults, PhotoExporter
@@ -274,6 +284,24 @@ class FunctionCall(click.ParamType):
return (function, value)
+class ExportDBType(click.ParamType):
+
+ name = "EXPORTDB"
+
+ def convert(self, value, param, ctx):
+ try:
+ export_db_name = pathlib.Path(value)
+ if export_db_name.is_dir():
+ raise click.BadParameter(f"{value} is a directory")
+ if export_db_name.is_file():
+ # verify it's actually an osxphotos export_db
+ # export_db_get_version will raise an error if it's not valid
+ osxphotos_ver, export_db_ver = export_db_get_version(value)
+ return value
+ except Exception:
+ self.fail(f"{value} exists but is not a valid osxphotos export database. ")
+
+
class IncompatibleQueryOptions(Exception):
pass
@@ -1160,7 +1188,7 @@ def cli(ctx, db, json_, debug):
f"If --exportdb is not specified, export database will be saved to '{OSXPHOTOS_EXPORT_DB}' "
"in the export directory. If --exportdb is specified, it will be saved to the specified file. "
),
- type=click.Path(),
+ type=ExportDBType(),
)
@click.option(
"--load-config",
@@ -1182,9 +1210,15 @@ def cli(ctx, db, json_, debug):
required=False,
metavar="",
default=None,
- help=("Save options to file for use with --load-config. File format is TOML."),
+ help="Save options to file for use with --load-config. File format is TOML. "
+ "See also --config-only.",
type=click.Path(),
)
+@click.option(
+ "--config-only",
+ is_flag=True,
+ help="If specified, saves the config file but does not export any files; must be used with --save-config.",
+)
@click.option(
"--beta",
is_flag=True,
@@ -1351,6 +1385,7 @@ def export(
exportdb,
load_config,
save_config,
+ config_only,
is_reference,
beta,
in_album,
@@ -1413,7 +1448,7 @@ def export(
cfg = ConfigOptions(
"export",
locals(),
- ignore=["ctx", "cli_obj", "dest", "load_config", "save_config"],
+ ignore=["ctx", "cli_obj", "dest", "load_config", "save_config", "config_only"],
)
global VERBOSE
@@ -1620,6 +1655,14 @@ def export(
)
sys.exit(1)
+ if config_only and not save_config:
+ click.secho(
+ "--config-only must be used with --save-config",
+ fg=CLI_COLOR_ERROR,
+ err=True,
+ )
+ sys.exit(1)
+
if all(x in [s.lower() for s in sidecar] for x in ["json", "exiftool"]):
click.echo(
click.style(
@@ -1644,8 +1687,11 @@ def export(
sys.exit(1)
if save_config:
- verbose_(f"Saving options to file {save_config}")
+ verbose_(f"Saving options to config file '{save_config}'")
cfg.write_to_file(save_config)
+ if config_only:
+ click.echo(f"Saved config file to '{save_config}'")
+ sys.exit(0)
# set defaults for options that need them
jpeg_quality = DEFAULT_JPEG_QUALITY if jpeg_quality is None else jpeg_quality
@@ -1797,6 +1843,9 @@ def export(
f"Upgraded export database {export_db_path} from version {upgraded[0]} to {upgraded[1]}"
)
+ # save config to export_db
+ export_db.set_config(cfg.write_to_str())
+
photosdb = osxphotos.PhotosDB(dbfile=db, verbose=verbose_, exiftool=exiftool_path)
# enable beta features if requested
@@ -4703,6 +4752,210 @@ def run(python_file):
run_path(python_file, run_name="__main__")
+@cli.command(name="exportdb", hidden=OSXPHOTOS_HIDDEN)
+@click.option("--version", is_flag=True, help="Print export database version and exit.")
+@click.option("--vacuum", is_flag=True, help="Run VACUUM to defragment the database.")
+@click.option(
+ "--check-signatures",
+ is_flag=True,
+ help="Check signatures for all exported photos in the database to find signatures that don't match.",
+)
+@click.option(
+ "--update-signatures",
+ is_flag=True,
+ help="Update signatures for all exported photos in the database to match on-disk signatures.",
+)
+@click.option(
+ "--touch-file",
+ is_flag=True,
+ help="Touch files on disk to match created date in Photos library and update export database signatures",
+)
+@click.option(
+ "--last-run",
+ is_flag=True,
+ help="Show last run osxphotos commands used with this database.",
+)
+@click.option(
+ "--save-config",
+ metavar="CONFIG_FILE",
+ help="Save last run configuration to TOML file for use by --load-config.",
+)
+@click.option(
+ "--info",
+ metavar="FILE_PATH",
+ nargs=1,
+ help="Print information about FILE_PATH contained in the database.",
+)
+@click.option(
+ "--migrate",
+ is_flag=True,
+ help="Migrate (if needed) export database to current version."
+)
+@click.option(
+ "--export-dir",
+ help="Optional path to export directory (if not parent of export database).",
+ type=click.Path(exists=True, file_okay=False, dir_okay=True),
+)
+@click.option("--verbose", "-V", is_flag=True, help="Print verbose output.")
+@click.option(
+ "--dry-run",
+ is_flag=True,
+ help="Run in dry-run mode (don't actually update files), e.g. for use with --update-signatures.",
+)
+@click.argument("export_db", metavar="EXPORT_DATABASE", type=click.Path(exists=True))
+def exportdb(
+ version,
+ vacuum,
+ check_signatures,
+ update_signatures,
+ touch_file,
+ last_run,
+ save_config,
+ info,
+ migrate,
+ export_dir,
+ verbose,
+ dry_run,
+ export_db,
+):
+ """Utilities for working with the osxphotos export database"""
+ export_db = pathlib.Path(export_db)
+ if export_db.is_dir():
+ # assume it's the export folder
+ export_db = export_db / OSXPHOTOS_EXPORT_DB
+ if not export_db.is_file():
+ print(
+ f"[red]Error: {OSXPHOTOS_EXPORT_DB} missing from {export_db.parent}[/red]"
+ )
+ sys.exit(1)
+
+ export_dir = export_dir or export_db.parent
+
+ sub_commands = [
+ version,
+ check_signatures,
+ update_signatures,
+ touch_file,
+ last_run,
+ bool(save_config),
+ bool(info),
+ migrate,
+ ]
+ if sum(sub_commands) > 1:
+ print(f"[red]Only a single sub-command may be specified at a time[/red]")
+ sys.exit(1)
+
+ if version:
+ try:
+ osxphotos_ver, export_db_ver = export_db_get_version(export_db)
+ except Exception as e:
+ print(f"[red]Error: could not read version from {export_db}: {e}[/red]")
+ sys.exit(1)
+ else:
+ print(
+ f"osxphotos version: {osxphotos_ver}, export database version: {export_db_ver}"
+ )
+ sys.exit(0)
+
+ if vacuum:
+ try:
+ start_size = pathlib.Path(export_db).stat().st_size
+ export_db_vacuum(export_db)
+ except Exception as e:
+ print(f"[red]Error: {e}[/red]")
+ sys.exit(1)
+ else:
+ print(
+ f"Vacuumed {export_db}! {start_size} bytes -> {pathlib.Path(export_db).stat().st_size} bytes"
+ )
+ sys.exit(0)
+
+ if update_signatures:
+ try:
+ updated, skipped = export_db_update_signatures(
+ export_db, export_dir, verbose, dry_run
+ )
+ except Exception as e:
+ print(f"[red]Error: {e}[/red]")
+ sys.exit(1)
+ else:
+ print(f"Done. Updated {updated} files, skipped {skipped} files.")
+ sys.exit(0)
+
+ if last_run:
+ try:
+ last_run_info = export_db_get_last_run(export_db)
+ except Exception as e:
+ print(f"[red]Error: {e}[/red]")
+ sys.exit(1)
+ else:
+ print(f"last run at {last_run_info[0]}:")
+ print(f"osxphotos {last_run_info[1]}")
+ sys.exit(0)
+
+ if save_config:
+ try:
+ export_db_save_config_to_file(export_db, save_config)
+ except Exception as e:
+ print(f"[red]Error: {e}[/red]")
+ sys.exit(1)
+ else:
+ print(f"Saved configuration to {save_config}")
+ sys.exit(0)
+
+ if check_signatures:
+ try:
+ matched, notmatched, skipped = export_db_check_signatures(
+ export_db, export_dir, verbose=verbose
+ )
+ except Exception as e:
+ print(f"[red]Error: {e}[/red]")
+ sys.exit(1)
+ else:
+ print(
+ f"Done. Found {matched} matching signatures and {notmatched} signatures that don't match. Skipped {skipped} missing files."
+ )
+ sys.exit(0)
+
+ if touch_file:
+ try:
+ touched, not_touched, skipped = export_db_touch_files(
+ export_db, export_dir, verbose=verbose, dry_run=dry_run
+ )
+ except Exception as e:
+ print(f"[red]Error: {e}[/red]")
+ sys.exit(1)
+ else:
+ print(
+ f"Done. Touched {touched} files, skipped {not_touched} up to date files, skipped {skipped} missing files."
+ )
+ sys.exit(0)
+
+ if info:
+ exportdb = ExportDB(export_db, export_dir)
+ try:
+ info_rec = exportdb.get_file_record(info)
+ except Exception as e:
+ print(f"[red]Error: {e}[/red]")
+ sys.exit(1)
+ else:
+ if info_rec:
+ print(info_rec.asdict())
+ else:
+ print(f"[red]File '{info}' not found in export database[/red]")
+ sys.exit(0)
+
+ if migrate:
+ exportdb = ExportDB(export_db, export_dir)
+ upgraded = exportdb.was_upgraded
+ if upgraded:
+ print(
+ f"Migrated export database {export_db} from version {upgraded[0]} to {upgraded[1]}"
+ )
+ else:
+ print(f"Export database {export_db} is already at latest version {OSXPHOTOS_EXPORTDB_VERSION}")
+ sys.exit(0)
+
def _query_options_from_kwargs(**kwargs) -> QueryOptions:
"""Validate query options and create a QueryOptions instance"""
# sanity check input args
diff --git a/osxphotos/configoptions.py b/osxphotos/configoptions.py
index a1afdd6d..7e71b574 100644
--- a/osxphotos/configoptions.py
+++ b/osxphotos/configoptions.py
@@ -134,18 +134,12 @@ class ConfigOptions:
filename: full path to TOML file to write; filename will be overwritten if it exists
"""
# todo: add overwrite and option to merge contents already in TOML file (under different [section] with new content)
- data = {}
- for attr in sorted(self._attrs.keys()):
- val = getattr(self, attr)
- if val in [False, ()]:
- val = None
- else:
- val = list(val) if type(val) == tuple else val
-
- data[attr] = val
-
with open(filename, "w") as fd:
- toml.dump({self._name: data}, fd)
+ toml.dump(self._get_toml_dict(), fd)
+
+ def write_to_str(self) -> str:
+ """Write self to TOML str"""
+ return toml.dumps(self._get_toml_dict())
def load_from_file(self, filename, override=False):
"""Load options from a TOML file.
@@ -178,3 +172,17 @@ class ConfigOptions:
def asdict(self):
return {attr: getattr(self, attr) for attr in sorted(self._attrs.keys())}
+
+ def _get_toml_dict(self):
+ """Return dict for writing to TOML file"""
+ data = {}
+ for attr in sorted(self._attrs.keys()):
+ val = getattr(self, attr)
+ if val in [False, ()]:
+ val = None
+ else:
+ val = list(val) if type(val) == tuple else val
+
+ data[attr] = val
+
+ return {self._name: data}
diff --git a/osxphotos/export_db.py b/osxphotos/export_db.py
index cf762d72..092784ab 100644
--- a/osxphotos/export_db.py
+++ b/osxphotos/export_db.py
@@ -1,224 +1,45 @@
-""" Helper class for managing a database used by PhotoInfo.export for tracking state of exports and updates """
+""" Helper class for managing database used by PhotoExporter for tracking state of exports and updates """
import datetime
+import json
import logging
import os
import pathlib
import sqlite3
import sys
-from abc import ABC, abstractmethod
from io import StringIO
from sqlite3 import Error
-from typing import Union
+from tempfile import TemporaryDirectory
+from typing import Optional, Tuple, Union
from ._constants import OSXPHOTOS_EXPORT_DB
from ._version import __version__
+from .fileutil import FileUtil
from .utils import normalize_fs_path
-__all__ = ["ExportDB_ABC", "ExportDBNoOp", "ExportDB", "ExportDBInMemory"]
-
-OSXPHOTOS_EXPORTDB_VERSION = "5.0"
-OSXPHOTOS_EXPORTDB_VERSION_MIGRATE_FILEPATH = "4.3"
-OSXPHOTOS_EXPORTDB_VERSION_MIGRATE_TABLES = "4.3"
+__all__ = [
+ "ExportDB",
+ "ExportDBInMemory",
+ "ExportDBTemp",
+]
+OSXPHOTOS_EXPORTDB_VERSION = "6.0"
OSXPHOTOS_ABOUT_STRING = f"Created by osxphotos version {__version__} (https://github.com/RhetTbull/osxphotos) on {datetime.datetime.now()}"
-class ExportDB_ABC(ABC):
- """abstract base class for ExportDB"""
-
- @abstractmethod
- def get_uuid_for_file(self, filename):
- pass
-
- @abstractmethod
- def set_uuid_for_file(self, filename, uuid):
- pass
-
- @abstractmethod
- def set_stat_orig_for_file(self, filename, stats):
- pass
-
- @abstractmethod
- def get_stat_orig_for_file(self, filename):
- pass
-
- @abstractmethod
- def set_stat_edited_for_file(self, filename, stats):
- pass
-
- @abstractmethod
- def get_stat_edited_for_file(self, filename):
- pass
-
- @abstractmethod
- def set_stat_converted_for_file(self, filename, stats):
- pass
-
- @abstractmethod
- def get_stat_converted_for_file(self, filename):
- pass
-
- @abstractmethod
- def set_stat_exif_for_file(self, filename, stats):
- pass
-
- @abstractmethod
- def get_stat_exif_for_file(self, filename):
- pass
-
- @abstractmethod
- def get_info_for_uuid(self, uuid):
- pass
-
- @abstractmethod
- def set_info_for_uuid(self, uuid, info):
- pass
-
- @abstractmethod
- def get_exifdata_for_file(self, uuid):
- pass
-
- @abstractmethod
- def set_exifdata_for_file(self, uuid, exifdata):
- pass
-
- @abstractmethod
- def set_sidecar_for_file(self, filename, sidecar_data, sidecar_sig):
- pass
-
- @abstractmethod
- def get_sidecar_for_file(self, filename):
- pass
-
- @abstractmethod
- def get_previous_uuids(self):
- pass
-
- @abstractmethod
- def get_detected_text_for_uuid(self, uuid):
- pass
-
- @abstractmethod
- def set_detected_text_for_uuid(self, uuid, json_text):
- pass
-
- @abstractmethod
- def set_metadata_for_file(self, filename, metadata):
- pass
-
- @abstractmethod
- def get_metadata_for_file(self, filename):
- pass
-
- @abstractmethod
- def set_data(
- self,
- filename,
- uuid,
- orig_stat=None,
- exif_stat=None,
- converted_stat=None,
- edited_stat=None,
- info_json=None,
- exif_json=None,
- metadata=None,
- ):
- pass
-
-
-class ExportDBNoOp(ExportDB_ABC):
- """An ExportDB with NoOp methods"""
-
- def __init__(self):
- self.was_created = True
- self.was_upgraded = False
- self.version = OSXPHOTOS_EXPORTDB_VERSION
-
- def get_uuid_for_file(self, filename):
- pass
-
- def set_uuid_for_file(self, filename, uuid):
- pass
-
- def set_stat_orig_for_file(self, filename, stats):
- pass
-
- def get_stat_orig_for_file(self, filename):
- pass
-
- def set_stat_edited_for_file(self, filename, stats):
- pass
-
- def get_stat_edited_for_file(self, filename):
- pass
-
- def set_stat_converted_for_file(self, filename, stats):
- pass
-
- def get_stat_converted_for_file(self, filename):
- pass
-
- def set_stat_exif_for_file(self, filename, stats):
- pass
-
- def get_stat_exif_for_file(self, filename):
- pass
-
- def get_info_for_uuid(self, uuid):
- pass
-
- def set_info_for_uuid(self, uuid, info):
- pass
-
- def get_exifdata_for_file(self, uuid):
- pass
-
- def set_exifdata_for_file(self, uuid, exifdata):
- pass
-
- def set_sidecar_for_file(self, filename, sidecar_data, sidecar_sig):
- pass
-
- def get_sidecar_for_file(self, filename):
- return None, (None, None, None)
-
- def get_previous_uuids(self):
- return []
-
- def get_detected_text_for_uuid(self, uuid):
- return None
-
- def set_detected_text_for_uuid(self, uuid, json_text):
- pass
-
- def set_metadata_for_file(self, filename, metadata):
- pass
-
- def get_metadata_for_file(self, filename):
- pass
-
- def set_data(
- self,
- filename,
- uuid,
- orig_stat=None,
- exif_stat=None,
- converted_stat=None,
- edited_stat=None,
- info_json=None,
- exif_json=None,
- metadata=None,
- ):
- pass
-
-
-class ExportDB(ExportDB_ABC):
+class ExportDB:
"""Interface to sqlite3 database used to store state information for osxphotos export command"""
def __init__(self, dbfile, export_dir):
- """dbfile: path to osxphotos export database file"""
+ """create a new ExportDB object
+
+ Args:
+ dbfile: path to osxphotos export database file
+ export_dir: path to directory where exported files are stored
+ memory: if True, use in-memory database
+ """
+
self._dbfile = dbfile
# export_dir is required as all files referenced by get_/set_uuid_for_file will be converted to
# relative paths to this path
@@ -226,8 +47,63 @@ class ExportDB(ExportDB_ABC):
# whilst preserving the UUID to filename mapping
self._path = export_dir
self._conn = self._open_export_db(dbfile)
+ self._perform_db_maintenace(self._conn)
self._insert_run_info()
+ def get_file_record(self, filename: Union[pathlib.Path, str]) -> "ExportRecord":
+ """get info for filename and uuid
+
+ Returns: an ExportRecord object
+ """
+ filename = self._relative_filepath(filename)
+ filename_normalized = self._normalize_filepath(filename)
+ conn = self._conn
+ c = conn.cursor()
+ row = c.execute(
+ "SELECT uuid FROM export_data WHERE filepath_normalized = ?;",
+ (filename_normalized,),
+ ).fetchone()
+
+ if not row:
+ return None
+ return ExportRecord(conn, filename_normalized)
+
+ def create_file_record(
+ self, filename: Union[pathlib.Path, str], uuid: str
+ ) -> "ExportRecord":
+ """create a new record for filename and uuid
+
+ Returns: an ExportRecord object
+ """
+ filename = self._relative_filepath(filename)
+ filename_normalized = self._normalize_filepath(filename)
+ conn = self._conn
+ c = conn.cursor()
+ c.execute(
+ "INSERT INTO export_data (filepath, filepath_normalized, uuid) VALUES (?, ?, ?);",
+ (filename, filename_normalized, uuid),
+ )
+ conn.commit()
+ return ExportRecord(conn, filename_normalized)
+
+ def create_or_get_file_record(
+ self, filename: Union[pathlib.Path, str], uuid: str
+ ) -> "ExportRecord":
+ """create a new record for filename and uuid or return existing record
+
+ Returns: an ExportRecord object
+ """
+ filename = self._relative_filepath(filename)
+ filename_normalized = self._normalize_filepath(filename)
+ conn = self._conn
+ c = conn.cursor()
+ c.execute(
+ "INSERT OR IGNORE INTO export_data (filepath, filepath_normalized, uuid) VALUES (?, ?, ?);",
+ (filename, filename_normalized, uuid),
+ )
+ conn.commit()
+ return ExportRecord(conn, filename_normalized)
+
def get_uuid_for_file(self, filename):
"""query database for filename and return UUID
returns None if filename not found in database
@@ -237,7 +113,7 @@ class ExportDB(ExportDB_ABC):
try:
c = conn.cursor()
c.execute(
- "SELECT uuid FROM files WHERE filepath_normalized = ?",
+ "SELECT uuid FROM export_data WHERE filepath_normalized = ?",
(filepath_normalized,),
)
results = c.fetchone()
@@ -247,142 +123,12 @@ class ExportDB(ExportDB_ABC):
uuid = None
return uuid
- def set_uuid_for_file(self, filename, uuid):
- """set UUID of filename to uuid in the database"""
- filename = str(pathlib.Path(filename).relative_to(self._path))
- filename_normalized = self._normalize_filepath(filename)
+ def get_photoinfo_for_uuid(self, uuid):
+ """returns the photoinfo JSON struct for a UUID"""
conn = self._conn
try:
c = conn.cursor()
- c.execute(
- "INSERT OR REPLACE INTO files(filepath, filepath_normalized, uuid) VALUES (?, ?, ?);",
- (filename, filename_normalized, uuid),
- )
-
- conn.commit()
- except Error as e:
- logging.warning(e)
-
- def set_stat_orig_for_file(self, filename, stats):
- """set stat info for filename
- filename: filename to set the stat info for
- stat: a tuple of length 3: mode, size, mtime"""
- filename = self._normalize_filepath_relative(filename)
- if len(stats) != 3:
- raise ValueError(f"expected 3 elements for stat, got {len(stats)}")
-
- conn = self._conn
- try:
- c = conn.cursor()
- c.execute(
- "UPDATE files "
- + "SET orig_mode = ?, orig_size = ?, orig_mtime = ? "
- + "WHERE filepath_normalized = ?;",
- (*stats, filename),
- )
- conn.commit()
- except Error as e:
- logging.warning(e)
-
- def get_stat_orig_for_file(self, filename):
- """get stat info for filename
- returns: tuple of (mode, size, mtime)
- """
- filename = self._normalize_filepath_relative(filename)
- conn = self._conn
- try:
- c = conn.cursor()
- c.execute(
- "SELECT orig_mode, orig_size, orig_mtime FROM files WHERE filepath_normalized = ?",
- (filename,),
- )
- results = c.fetchone()
- if results:
- stats = results[:3]
- mtime = int(stats[2]) if stats[2] is not None else None
- stats = (stats[0], stats[1], mtime)
- else:
- stats = (None, None, None)
- except Error as e:
- logging.warning(e)
- stats = None, None, None
- return stats
-
- def set_stat_edited_for_file(self, filename, stats):
- """set stat info for edited version of image (in Photos' library)
- filename: filename to set the stat info for
- stat: a tuple of length 3: mode, size, mtime"""
- return self._set_stat_for_file("edited", filename, stats)
-
- def get_stat_edited_for_file(self, filename):
- """get stat info for edited version of image (in Photos' library)
- filename: filename to set the stat info for
- stat: a tuple of length 3: mode, size, mtime"""
- return self._get_stat_for_file("edited", filename)
-
- def set_stat_exif_for_file(self, filename, stats):
- """set stat info for filename (after exiftool has updated it)
- filename: filename to set the stat info for
- stat: a tuple of length 3: mode, size, mtime"""
- filename = self._normalize_filepath_relative(filename)
- if len(stats) != 3:
- raise ValueError(f"expected 3 elements for stat, got {len(stats)}")
-
- conn = self._conn
- try:
- c = conn.cursor()
- c.execute(
- "UPDATE files "
- + "SET exif_mode = ?, exif_size = ?, exif_mtime = ? "
- + "WHERE filepath_normalized = ?;",
- (*stats, filename),
- )
- conn.commit()
- except Error as e:
- logging.warning(e)
-
- def get_stat_exif_for_file(self, filename):
- """get stat info for filename (after exiftool has updated it)
- returns: tuple of (mode, size, mtime)
- """
- filename = self._normalize_filepath_relative(filename)
- conn = self._conn
- try:
- c = conn.cursor()
- c.execute(
- "SELECT exif_mode, exif_size, exif_mtime FROM files WHERE filepath_normalized = ?",
- (filename,),
- )
- results = c.fetchone()
- if results:
- stats = results[:3]
- mtime = int(stats[2]) if stats[2] is not None else None
- stats = (stats[0], stats[1], mtime)
- else:
- stats = (None, None, None)
- except Error as e:
- logging.warning(e)
- stats = None, None, None
- return stats
-
- def set_stat_converted_for_file(self, filename, stats):
- """set stat info for filename (after image converted to jpeg)
- filename: filename to set the stat info for
- stat: a tuple of length 3: mode, size, mtime"""
- return self._set_stat_for_file("converted", filename, stats)
-
- def get_stat_converted_for_file(self, filename):
- """get stat info for filename (after jpeg conversion)
- returns: tuple of (mode, size, mtime)
- """
- return self._get_stat_for_file("converted", filename)
-
- def get_info_for_uuid(self, uuid):
- """returns the info JSON struct for a UUID"""
- conn = self._conn
- try:
- c = conn.cursor()
- c.execute("SELECT json_info FROM info WHERE uuid = ?", (uuid,))
+ c.execute("SELECT photoinfo FROM photoinfo WHERE uuid = ?", (uuid,))
results = c.fetchone()
info = results[0] if results else None
except Error as e:
@@ -391,245 +137,42 @@ class ExportDB(ExportDB_ABC):
return info
- def set_info_for_uuid(self, uuid, info):
- """sets the info JSON struct for a UUID"""
+ def set_photoinfo_for_uuid(self, uuid, info):
+ """sets the photoinfo JSON struct for a UUID"""
conn = self._conn
try:
c = conn.cursor()
c.execute(
- "INSERT OR REPLACE INTO info(uuid, json_info) VALUES (?, ?);",
+ "INSERT OR REPLACE INTO photoinfo(uuid, photoinfo) VALUES (?, ?);",
(uuid, info),
)
conn.commit()
except Error as e:
logging.warning(e)
- def get_exifdata_for_file(self, filename):
- """returns the exifdata JSON struct for a file"""
- filename = self._normalize_filepath_relative(filename)
- conn = self._conn
- try:
- c = conn.cursor()
- c.execute(
- "SELECT json_exifdata FROM exifdata WHERE filepath_normalized = ?",
- (filename,),
- )
- results = c.fetchone()
- exifdata = results[0] if results else None
- except Error as e:
- logging.warning(e)
- exifdata = None
-
- return exifdata
-
- def set_exifdata_for_file(self, filename, exifdata):
- """sets the exifdata JSON struct for a file"""
- filename = self._normalize_filepath_relative(filename)
- conn = self._conn
- try:
- c = conn.cursor()
- c.execute(
- "INSERT OR REPLACE INTO exifdata(filepath_normalized, json_exifdata) VALUES (?, ?);",
- (filename, exifdata),
- )
- conn.commit()
- except Error as e:
- logging.warning(e)
-
- def get_sidecar_for_file(self, filename):
- """returns the sidecar data and signature for a file"""
- filename = self._normalize_filepath_relative(filename)
- conn = self._conn
- try:
- c = conn.cursor()
- c.execute(
- "SELECT sidecar_data, mode, size, mtime FROM sidecar WHERE filepath_normalized = ?",
- (filename,),
- )
- results = c.fetchone()
- if results:
- sidecar_data = results[0]
- sidecar_sig = (
- results[1],
- results[2],
- int(results[3]) if results[3] is not None else None,
- )
- else:
- sidecar_data = None
- sidecar_sig = (None, None, None)
- except Error as e:
- logging.warning(e)
- sidecar_data = None
- sidecar_sig = (None, None, None)
-
- return sidecar_data, sidecar_sig
-
- def set_sidecar_for_file(self, filename, sidecar_data, sidecar_sig):
- """sets the sidecar data and signature for a file"""
- filename = self._normalize_filepath_relative(filename)
- conn = self._conn
- try:
- c = conn.cursor()
- c.execute(
- "INSERT OR REPLACE INTO sidecar(filepath_normalized, sidecar_data, mode, size, mtime) VALUES (?, ?, ?, ?, ?);",
- (filename, sidecar_data, *sidecar_sig),
- )
- conn.commit()
- except Error as e:
- logging.warning(e)
-
def get_previous_uuids(self):
"""returns list of UUIDs of previously exported photos found in export database"""
conn = self._conn
previous_uuids = []
try:
c = conn.cursor()
- c.execute("SELECT DISTINCT uuid FROM files")
+ c.execute("SELECT DISTINCT uuid FROM export_data")
results = c.fetchall()
previous_uuids = [row[0] for row in results]
except Error as e:
logging.warning(e)
return previous_uuids
- def get_detected_text_for_uuid(self, uuid):
- """Get the detected_text for a uuid"""
+ def set_config(self, config_data):
+ """set config in the database"""
conn = self._conn
try:
+ dt = datetime.datetime.now().isoformat()
c = conn.cursor()
c.execute(
- "SELECT text_data FROM detected_text WHERE uuid = ?",
- (uuid,),
+ "INSERT OR REPLACE INTO config(datetime, config) VALUES (?, ?);",
+ (dt, config_data),
)
- results = c.fetchone()
- detected_text = results[0] if results else None
- except Error as e:
- logging.warning(e)
- detected_text = None
-
- return detected_text
-
- def set_detected_text_for_uuid(self, uuid, text_json):
- """Set the detected text for uuid"""
- conn = self._conn
- try:
- c = conn.cursor()
- c.execute(
- "INSERT OR REPLACE INTO detected_text(uuid, text_data) VALUES (?, ?);",
- (
- uuid,
- text_json,
- ),
- )
- conn.commit()
- except Error as e:
- logging.warning(e)
-
- def set_metadata_for_file(self, filename, metadata):
- """set metadata of filename in the database"""
- filename = str(pathlib.Path(filename).relative_to(self._path))
- filename_normalized = self._normalize_filepath(filename)
- conn = self._conn
- try:
- c = conn.cursor()
- c.execute(
- "UPDATE files SET metadata = ? WHERE filepath_normalized = ?;",
- (metadata, filename_normalized),
- )
- conn.commit()
- except Error as e:
- logging.warning(e)
-
- def get_metadata_for_file(self, filename):
- """get metadata value for file"""
- filename = self._normalize_filepath_relative(filename)
- conn = self._conn
- try:
- c = conn.cursor()
- c.execute(
- "SELECT metadata FROM files WHERE filepath_normalized = ?",
- (filename,),
- )
- results = c.fetchone()
- metadata = results[0] if results else None
- except Error as e:
- logging.warning(e)
- metadata = None
-
- return metadata
-
- def set_data(
- self,
- filename,
- uuid,
- orig_stat=None,
- exif_stat=None,
- converted_stat=None,
- edited_stat=None,
- info_json=None,
- exif_json=None,
- metadata=None,
- ):
- """sets all the data for file and uuid at once; if any value is None, does not set it"""
- filename = str(pathlib.Path(filename).relative_to(self._path))
- filename_normalized = self._normalize_filepath(filename)
- conn = self._conn
- try:
- c = conn.cursor()
- # update files table (if needed);
- # this statement works around fact that there was no unique constraint on files.filepath_normalized
- c.execute(
- """INSERT OR IGNORE INTO files(filepath, filepath_normalized, uuid) VALUES (?, ?, ?);""",
- (filename, filename_normalized, uuid),
- )
-
- if orig_stat is not None:
- c.execute(
- "UPDATE files "
- + "SET orig_mode = ?, orig_size = ?, orig_mtime = ? "
- + "WHERE filepath_normalized = ?;",
- (*orig_stat, filename_normalized),
- )
-
- if exif_stat is not None:
- c.execute(
- "UPDATE files "
- + "SET exif_mode = ?, exif_size = ?, exif_mtime = ? "
- + "WHERE filepath_normalized = ?;",
- (*exif_stat, filename_normalized),
- )
-
- if converted_stat is not None:
- c.execute(
- "INSERT OR REPLACE INTO converted(filepath_normalized, mode, size, mtime) VALUES (?, ?, ?, ?);",
- (filename_normalized, *converted_stat),
- )
-
- if edited_stat is not None:
- c.execute(
- "INSERT OR REPLACE INTO edited(filepath_normalized, mode, size, mtime) VALUES (?, ?, ?, ?);",
- (filename_normalized, *edited_stat),
- )
-
- if info_json is not None:
- c.execute(
- "INSERT OR REPLACE INTO info(uuid, json_info) VALUES (?, ?);",
- (uuid, info_json),
- )
-
- if exif_json is not None:
- c.execute(
- "INSERT OR REPLACE INTO exifdata(filepath_normalized, json_exifdata) VALUES (?, ?);",
- (filename_normalized, exif_json),
- )
-
- if metadata is not None:
- c.execute(
- "UPDATE files "
- + "SET metadata = ? "
- + "WHERE filepath_normalized = ?;",
- (metadata, filename_normalized),
- )
-
conn.commit()
except Error as e:
logging.warning(e)
@@ -641,40 +184,10 @@ class ExportDB(ExportDB_ABC):
except Error as e:
logging.warning(e)
- def _set_stat_for_file(self, table, filename, stats):
- filename = self._normalize_filepath_relative(filename)
- if len(stats) != 3:
- raise ValueError(f"expected 3 elements for stat, got {len(stats)}")
-
- conn = self._conn
- c = conn.cursor()
- c.execute(
- f"INSERT OR REPLACE INTO {table}(filepath_normalized, mode, size, mtime) VALUES (?, ?, ?, ?);",
- (filename, *stats),
- )
- conn.commit()
-
- def _get_stat_for_file(self, table, filename):
- filename = self._normalize_filepath_relative(filename)
- conn = self._conn
- c = conn.cursor()
- c.execute(
- f"SELECT mode, size, mtime FROM {table} WHERE filepath_normalized = ?",
- (filename,),
- )
- results = c.fetchone()
- if results:
- stats = results[:3]
- mtime = int(stats[2]) if stats[2] is not None else None
- stats = (stats[0], stats[1], mtime)
- else:
- stats = (None, None, None)
-
- return stats
-
def _open_export_db(self, dbfile):
"""open export database and return a db connection
if dbfile does not exist, will create and initialize the database
+ if dbfile needs to be upgraded, will perform needed migrations
returns: connection to the database
"""
@@ -731,7 +244,7 @@ class ExportDB(ExportDB_ABC):
try:
version = self._get_database_version(conn)
except Exception as e:
- version = (__version__, OSXPHOTOS_EXPORTDB_VERSION_MIGRATE_TABLES)
+ version = (__version__, "4.3")
# Current for version 4.3, for anything greater, do a migration after creation
sql_commands = [
@@ -824,17 +337,18 @@ class ExportDB(ExportDB_ABC):
logging.warning(e)
# perform needed migrations
- if version[1] < OSXPHOTOS_EXPORTDB_VERSION_MIGRATE_FILEPATH:
+ if version[1] < "4.3":
self._migrate_normalized_filepath(conn)
- if version[1] < OSXPHOTOS_EXPORTDB_VERSION:
- try:
- c = conn.cursor()
- # add metadata column to files to support --force-update
- c.execute("ALTER TABLE files ADD COLUMN metadata TEXT;")
- conn.commit()
- except Error as e:
- logging.warning(e)
+ if version[1] < "5.0":
+ self._migrate_4_3_to_5_0(conn)
+
+ if version[1] < "6.0":
+ # create export_data table
+ self._migrate_5_0_to_6_0(conn)
+
+ conn.execute("VACUUM;")
+ conn.commit()
def __del__(self):
"""ensure the database connection is closed"""
@@ -861,13 +375,17 @@ class ExportDB(ExportDB_ABC):
except Error as e:
logging.warning(e)
+ def _relative_filepath(self, filepath: Union[str, pathlib.Path]) -> str:
+ """return filepath relative to self._path"""
+ return str(pathlib.Path(filepath).relative_to(self._path))
+
def _normalize_filepath(self, filepath: Union[str, pathlib.Path]) -> str:
"""normalize filepath for unicode, lower case"""
return normalize_fs_path(str(filepath)).lower()
def _normalize_filepath_relative(self, filepath: Union[str, pathlib.Path]) -> str:
"""normalize filepath for unicode, relative path (to export dir), lower case"""
- filepath = str(pathlib.Path(filepath).relative_to(self._path))
+ filepath = self._relative_filepath(filepath)
return normalize_fs_path(str(filepath)).lower()
def _migrate_normalized_filepath(self, conn):
@@ -909,6 +427,138 @@ class ExportDB(ExportDB_ABC):
)
conn.commit()
+ def _migrate_4_3_to_5_0(self, conn):
+ """Migrate database from version 4.3 to 5.0"""
+ try:
+ c = conn.cursor()
+ # add metadata column to files to support --force-update
+ c.execute("ALTER TABLE files ADD COLUMN metadata TEXT;")
+ conn.commit()
+ except Error as e:
+ logging.warning(e)
+
+ def _migrate_5_0_to_6_0(self, conn):
+ try:
+ c = conn.cursor()
+
+ # add export_data table
+ c.execute(
+ """ CREATE TABLE IF NOT EXISTS export_data(
+ id INTEGER PRIMARY KEY,
+ filepath_normalized TEXT NOT NULL,
+ filepath TEXT NOT NULL,
+ uuid TEXT NOT NULL,
+ src_mode INTEGER,
+ src_size INTEGER,
+ src_mtime REAL,
+ dest_mode INTEGER,
+ dest_size INTEGER,
+ dest_mtime REAL,
+ digest TEXT,
+ exifdata JSON,
+ export_options INTEGER,
+ UNIQUE(filepath_normalized)
+ ); """,
+ )
+ c.execute(
+ """ CREATE UNIQUE INDEX IF NOT EXISTS idx_export_data_filepath_normalized on export_data (filepath_normalized); """,
+ )
+
+ # migrate data
+ c.execute(
+ """ INSERT INTO export_data (filepath_normalized, filepath, uuid) SELECT filepath_normalized, filepath, uuid FROM files;""",
+ )
+ c.execute(
+ """ UPDATE export_data
+ SET (src_mode, src_size, src_mtime) =
+ (SELECT mode, size, mtime
+ FROM edited
+ WHERE export_data.filepath_normalized = edited.filepath_normalized);
+ """,
+ )
+ c.execute(
+ """ UPDATE export_data
+ SET (dest_mode, dest_size, dest_mtime) =
+ (SELECT orig_mode, orig_size, orig_mtime
+ FROM files
+ WHERE export_data.filepath_normalized = files.filepath_normalized);
+ """,
+ )
+ c.execute(
+ """ UPDATE export_data SET digest =
+ (SELECT metadata FROM files
+ WHERE files.filepath_normalized = export_data.filepath_normalized
+ ); """
+ )
+ c.execute(
+ """ UPDATE export_data SET exifdata =
+ (SELECT json_exifdata FROM exifdata
+ WHERE exifdata.filepath_normalized = export_data.filepath_normalized
+ ); """
+ )
+
+ # create config table
+ c.execute(
+ """ CREATE TABLE IF NOT EXISTS config (
+ id INTEGER PRIMARY KEY,
+ datetime TEXT,
+ config TEXT
+ ); """
+ )
+
+ # create photoinfo table
+ c.execute(
+ """ CREATE TABLE IF NOT EXISTS photoinfo (
+ id INTEGER PRIMARY KEY,
+ uuid TEXT NOT NULL,
+ photoinfo JSON,
+ UNIQUE(uuid)
+ ); """
+ )
+ c.execute(
+ """CREATE UNIQUE INDEX IF NOT EXISTS idx_photoinfo_uuid on photoinfo (uuid);"""
+ )
+ c.execute(
+ """ INSERT INTO photoinfo (uuid, photoinfo) SELECT uuid, json_info FROM info;"""
+ )
+
+ # drop indexes no longer needed
+ c.execute("DROP INDEX IF EXISTS idx_files_filepath_normalized;")
+ c.execute("DROP INDEX IF EXISTS idx_exifdata_filename;")
+ c.execute("DROP INDEX IF EXISTS idx_edited_filename;")
+ c.execute("DROP INDEX IF EXISTS idx_converted_filename;")
+ c.execute("DROP INDEX IF EXISTS idx_sidecar_filename;")
+ c.execute("DROP INDEX IF EXISTS idx_detected_text;")
+
+ # drop tables no longer needed
+ c.execute("DROP TABLE IF EXISTS files;")
+ c.execute("DROP TABLE IF EXISTS info;")
+ c.execute("DROP TABLE IF EXISTS exifdata;")
+ c.execute("DROP TABLE IF EXISTS edited;")
+ c.execute("DROP TABLE IF EXISTS converted;")
+ c.execute("DROP TABLE IF EXISTS sidecar;")
+ c.execute("DROP TABLE IF EXISTS detected_text;")
+
+ conn.commit()
+ except Error as e:
+ logging.warning(e)
+
+ def _perform_db_maintenace(self, conn):
+ """Perform database maintenance"""
+ try:
+ c = conn.cursor()
+ c.execute(
+ """DELETE FROM config
+ WHERE id < (
+ SELECT MIN(id)
+ FROM (SELECT id FROM config ORDER BY id DESC LIMIT 9)
+ );
+ """
+ )
+ conn.commit()
+ except Error as e:
+ logging.warning(e)
+
class ExportDBInMemory(ExportDB):
"""In memory version of ExportDB
@@ -942,7 +592,7 @@ class ExportDBInMemory(ExportDB):
conn = sqlite3.connect(dbfile)
except Error as e:
logging.warning(e)
- raise e
+ raise e from e
tempfile = StringIO()
for line in conn.iterdump():
@@ -955,13 +605,14 @@ class ExportDBInMemory(ExportDB):
conn.cursor().executescript(tempfile.read())
conn.commit()
self.was_created = False
- _, exportdb_ver = self._get_database_version(conn)
- if exportdb_ver < OSXPHOTOS_EXPORTDB_VERSION:
+ version_info = self._get_database_version(conn)
+ if version_info[1] < OSXPHOTOS_EXPORTDB_VERSION:
self._create_or_migrate_db_tables(conn)
- self.was_upgraded = (exportdb_ver, OSXPHOTOS_EXPORTDB_VERSION)
+ self.was_upgraded = (version_info[1], OSXPHOTOS_EXPORTDB_VERSION)
else:
self.was_upgraded = ()
self.version = OSXPHOTOS_EXPORTDB_VERSION
+
return conn
def _get_db_connection(self):
@@ -973,3 +624,269 @@ class ExportDBInMemory(ExportDB):
conn = None
return conn
+
+
+class ExportDBTemp(ExportDBInMemory):
+ """Temporary in-memory version of ExportDB"""
+
+ def __init__(self):
+ self._temp_dir = TemporaryDirectory()
+ self._dbfile = f"{self._temp_dir.name}/{OSXPHOTOS_EXPORT_DB}"
+ self._path = self._temp_dir.name
+ self._conn = self._open_export_db(self._dbfile)
+ self._insert_run_info()
+
+ def _relative_filepath(self, filepath: Union[str, pathlib.Path]) -> str:
+ """Overrides _relative_filepath to return a path for use in the temp db"""
+ filepath = str(filepath)
+ if filepath[0] == "/":
+ return filepath[1:]
+ return filepath
+
+
+class ExportRecord:
+ """ExportRecord class"""
+
+ __slots__ = [
+ "_conn",
+ "_context_manager",
+ "_filepath_normalized",
+ ]
+
+ def __init__(self, conn, filepath_normalized):
+ self._conn = conn
+ self._filepath_normalized = filepath_normalized
+ self._context_manager = False
+
+ @property
+ def filepath(self):
+ """return filepath"""
+ conn = self._conn
+ c = conn.cursor()
+ row = c.execute(
+ "SELECT filepath FROM export_data WHERE filepath_normalized = ?;",
+ (self._filepath_normalized,),
+ ).fetchone()
+ if row:
+ return row[0]
+
+ raise ValueError(
+ f"No filepath found in database for {self._filepath_normalized}"
+ )
+
+ @property
+ def filepath_normalized(self):
+ """return filepath_normalized"""
+ return self._filepath_normalized
+
+ @property
+ def uuid(self):
+ """return uuid"""
+ conn = self._conn
+ c = conn.cursor()
+ row = c.execute(
+ "SELECT uuid FROM export_data WHERE filepath_normalized = ?;",
+ (self._filepath_normalized,),
+ ).fetchone()
+ if row:
+ return row[0]
+
+ raise ValueError(f"No uuid found in database for {self._filepath_normalized}")
+
+ @property
+ def digest(self):
+ """returns the digest value"""
+ conn = self._conn
+ c = conn.cursor()
+ row = c.execute(
+ "SELECT digest FROM export_data WHERE filepath_normalized = ?;",
+ (self._filepath_normalized,),
+ ).fetchone()
+ if row:
+ return row[0]
+
+ raise ValueError(f"No digest found in database for {self._filepath_normalized}")
+
+ @digest.setter
+ def digest(self, value):
+ """set digest value"""
+ conn = self._conn
+ c = conn.cursor()
+ c.execute(
+ "UPDATE export_data SET digest = ? WHERE filepath_normalized = ?;",
+ (value, self._filepath_normalized),
+ )
+ if not self._context_manager:
+ conn.commit()
+
+ @property
+ def exifdata(self):
+ """returns exifdata value for record"""
+ conn = self._conn
+ c = conn.cursor()
+ row = c.execute(
+ "SELECT exifdata FROM export_data WHERE filepath_normalized = ?;",
+ (self._filepath_normalized,),
+ ).fetchone()
+ if row:
+ return row[0]
+
+ raise ValueError(
+ f"No exifdata found in database for {self._filepath_normalized}"
+ )
+
+ @exifdata.setter
+ def exifdata(self, value):
+ """set exifdata value"""
+ conn = self._conn
+ c = conn.cursor()
+ c.execute(
+ "UPDATE export_data SET exifdata = ? WHERE filepath_normalized = ?;",
+ (
+ value,
+ self._filepath_normalized,
+ ),
+ )
+ if not self._context_manager:
+ conn.commit()
+
+ @property
+ def src_sig(self):
+ """return source file signature value"""
+ conn = self._conn
+ c = conn.cursor()
+ row = c.execute(
+ "SELECT src_mode, src_size, src_mtime FROM export_data WHERE filepath_normalized = ?;",
+ (self._filepath_normalized,),
+ ).fetchone()
+ if row:
+ mtime = int(row[2]) if row[2] is not None else None
+ return (row[0], row[1], mtime)
+
+ raise ValueError(
+ f"No src_sig found in database for {self._filepath_normalized}"
+ )
+
+ @src_sig.setter
+ def src_sig(self, value):
+ """set source file signature value"""
+ conn = self._conn
+ c = conn.cursor()
+ c.execute(
+ "UPDATE export_data SET src_mode = ?, src_size = ?, src_mtime = ? WHERE filepath_normalized = ?;",
+ (
+ value[0],
+ value[1],
+ value[2],
+ self._filepath_normalized,
+ ),
+ )
+ if not self._context_manager:
+ conn.commit()
+
+ @property
+ def dest_sig(self):
+ """return destination file signature"""
+ conn = self._conn
+ c = conn.cursor()
+ row = c.execute(
+ "SELECT dest_mode, dest_size, dest_mtime FROM export_data WHERE filepath_normalized = ?;",
+ (self._filepath_normalized,),
+ ).fetchone()
+ if row:
+ mtime = int(row[2]) if row[2] is not None else None
+ return (row[0], row[1], mtime)
+
+ raise ValueError(
+ f"No dest_sig found in database for {self._filepath_normalized}"
+ )
+
+ @dest_sig.setter
+ def dest_sig(self, value):
+ """set destination file signature"""
+ conn = self._conn
+ c = conn.cursor()
+ c.execute(
+ "UPDATE export_data SET dest_mode = ?, dest_size = ?, dest_mtime = ? WHERE filepath_normalized = ?;",
+ (
+ value[0],
+ value[1],
+ value[2],
+ self._filepath_normalized,
+ ),
+ )
+ if not self._context_manager:
+ conn.commit()
+
+ @property
+ def photoinfo(self):
+ """Returns info value"""
+ conn = self._conn
+ c = conn.cursor()
+ row = c.execute(
+ "SELECT photoinfo from photoinfo where uuid = ?;",
+ (self.uuid,),
+ ).fetchone()
+ return row[0] if row else None
+
+ @photoinfo.setter
+ def photoinfo(self, value):
+ """Sets info value"""
+ conn = self._conn
+ c = conn.cursor()
+ c.execute(
+ "INSERT OR REPLACE INTO photoinfo (uuid, photoinfo) VALUES (?, ?);",
+ (self.uuid, value),
+ )
+ if not self._context_manager:
+ conn.commit()
+
+ @property
+ def export_options(self):
+ """Get export_options value"""
+ conn = self._conn
+ c = conn.cursor()
+ row = c.execute(
+ "SELECT export_options from export_data where filepath_normalized = ?;",
+ (self._filepath_normalized,),
+ ).fetchone()
+ return row[0] if row else None
+
+ @export_options.setter
+ def export_options(self, value):
+ """Set export_options value"""
+ conn = self._conn
+ c = conn.cursor()
+ c.execute(
+ "UPDATE export_data SET export_options = ? WHERE filepath_normalized = ?;",
+ (value, self._filepath_normalized),
+ )
+ if not self._context_manager:
+ conn.commit()
+
+ def asdict(self):
+ """Return dict of self"""
+ exifdata = json.loads(self.exifdata) if self.exifdata else None
+ photoinfo = json.loads(self.photoinfo) if self.photoinfo else None
+ return {
+ "filepath": self.filepath,
+ "filepath_normalized": self.filepath_normalized,
+ "uuid": self.uuid,
+ "digest": self.digest,
+ "src_sig": self.src_sig,
+ "dest_sig": self.dest_sig,
+ "export_options": self.export_options,
+ "exifdata": exifdata,
+ "photoinfo": photoinfo,
+ }
+
+ def __enter__(self):
+ self._context_manager = True
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ if exc_type:
+ self._conn.rollback()
+ else:
+ self._conn.commit()
+ self._context_manager = False
diff --git a/osxphotos/export_db_utils.py b/osxphotos/export_db_utils.py
new file mode 100644
index 00000000..bf78c65b
--- /dev/null
+++ b/osxphotos/export_db_utils.py
@@ -0,0 +1,264 @@
+""" Utility functions for working with export_db """
+
+
+import pathlib
+import sqlite3
+from typing import Optional, Tuple, Union
+import datetime
+import os
+
+import toml
+from rich import print
+
+from ._constants import OSXPHOTOS_EXPORT_DB
+from ._version import __version__
+from .export_db import OSXPHOTOS_EXPORTDB_VERSION, ExportDB
+from .fileutil import FileUtil
+from .photosdb import PhotosDB
+
+__all__ = [
+ "export_db_check_signatures",
+ "export_db_get_last_run",
+ "export_db_get_version",
+ "export_db_save_config_to_file",
+ "export_db_touch_files",
+ "export_db_update_signatures",
+ "export_db_vacuum",
+]
+
+
+def isotime_from_ts(ts: int) -> str:
+ """Convert timestamp to ISO 8601 time string"""
+ return datetime.datetime.fromtimestamp(ts).isoformat()
+
+
+def export_db_get_version(
+ dbfile: Union[str, pathlib.Path]
+) -> Tuple[Optional[int], Optional[int]]:
+ """returns version from export database as tuple of (osxphotos version, export_db version)"""
+ conn = sqlite3.connect(str(dbfile))
+ c = conn.cursor()
+ row = c.execute(
+ "SELECT osxphotos, exportdb FROM version ORDER BY id DESC LIMIT 1;"
+ ).fetchone()
+ if row:
+ return (row[0], row[1])
+ return (None, None)
+
+
+def export_db_vacuum(dbfile: Union[str, pathlib.Path]) -> None:
+ """Vacuum export database"""
+ conn = sqlite3.connect(str(dbfile))
+ c = conn.cursor()
+ c.execute("VACUUM;")
+ conn.commit()
+
+
+def export_db_update_signatures(
+ dbfile: Union[str, pathlib.Path],
+ export_dir: Union[str, pathlib.Path],
+ verbose: bool = False,
+ dry_run: bool = False,
+) -> Tuple[int, int]:
+ """Update signatures for all files found in the export database to match what's on disk
+
+ Returns: tuple of (updated, skipped)
+ """
+ export_dir = pathlib.Path(export_dir)
+ fileutil = FileUtil
+ conn = sqlite3.connect(str(dbfile))
+ c = conn.cursor()
+ c.execute("SELECT filepath_normalized, filepath FROM export_data;")
+ rows = c.fetchall()
+ updated = 0
+ skipped = 0
+ for row in rows:
+ filepath_normalized = row[0]
+ filepath = row[1]
+ filepath = export_dir / filepath
+ if not os.path.exists(filepath):
+ skipped += 1
+ if verbose:
+ print(f"[dark_orange]Skipping missing file[/dark_orange]: '{filepath}'")
+ continue
+ updated += 1
+ file_sig = fileutil.file_sig(filepath)
+ if verbose:
+ print(f"[green]Updating signature for[/green]: '{filepath}'")
+ if not dry_run:
+ c.execute(
+ "UPDATE export_data SET dest_mode = ?, dest_size = ?, dest_mtime = ? WHERE filepath_normalized = ?;",
+ (file_sig[0], file_sig[1], file_sig[2], filepath_normalized),
+ )
+
+ if not dry_run:
+ conn.commit()
+
+ return (updated, skipped)
+
+
+def export_db_get_last_run(
+ export_db: Union[str, pathlib.Path]
+) -> Tuple[Optional[str], Optional[str]]:
+ """Get last run from export database"""
+ conn = sqlite3.connect(str(export_db))
+ c = conn.cursor()
+ row = c.execute(
+ "SELECT datetime, args FROM runs ORDER BY id DESC LIMIT 1;"
+ ).fetchone()
+ if row:
+ return row[0], row[1]
+ return None, None
+
+
+def export_db_save_config_to_file(
+ export_db: Union[str, pathlib.Path], config_file: Union[str, pathlib.Path]
+) -> None:
+ """Save export_db last run config to file"""
+ export_db = pathlib.Path(export_db)
+ config_file = pathlib.Path(config_file)
+ conn = sqlite3.connect(str(export_db))
+ c = conn.cursor()
+ row = c.execute("SELECT config FROM config ORDER BY id DESC LIMIT 1;").fetchone()
+ if not row:
+ return ValueError("No config found in export_db")
+ with config_file.open("w") as f:
+ f.write(row[0])
+
+
+def export_db_check_signatures(
+ dbfile: Union[str, pathlib.Path],
+ export_dir: Union[str, pathlib.Path],
+ verbose: bool = False,
+) -> Tuple[int, int, int]:
+ """Check signatures for all files found in the export database to verify what matches the on disk files
+
+ Returns: tuple of (updated, skipped)
+ """
+ export_dir = pathlib.Path(export_dir)
+ fileutil = FileUtil
+ conn = sqlite3.connect(str(dbfile))
+ c = conn.cursor()
+ c.execute("SELECT filepath_normalized, filepath FROM export_data;")
+ rows = c.fetchall()
+ exportdb = ExportDB(dbfile, export_dir)
+ matched = 0
+ notmatched = 0
+ skipped = 0
+ for row in rows:
+ filepath_normalized = row[0]
+ filepath = row[1]
+ filepath = export_dir / filepath
+ if not filepath.exists():
+ skipped += 1
+ if verbose:
+ print(f"[dark_orange]Skipping missing file[/dark_orange]: '{filepath}'")
+ continue
+ file_sig = fileutil.file_sig(filepath)
+ file_rec = exportdb.get_file_record(filepath)
+ if file_rec.dest_sig == file_sig:
+ matched += 1
+ if verbose:
+ print(f"[green]Signatures matched[/green]: '{filepath}'")
+ else:
+ notmatched += 1
+ if verbose:
+ print(f"[deep_pink3]Signatures do not match[/deep_pink3]: '{filepath}'")
+
+ return (matched, notmatched, skipped)
+
+
+def export_db_touch_files(
+ dbfile: Union[str, pathlib.Path],
+ export_dir: Union[str, pathlib.Path],
+ verbose: bool = False,
+ dry_run: bool = False,
+) -> Tuple[int, int, int]:
+ """Touch files on disk to match the Photos library created date
+
+ Returns: tuple of (touched, not_touched, skipped)
+ """
+ export_dir = pathlib.Path(export_dir)
+
+ # open and close exportdb to ensure it gets migrated
+ exportdb = ExportDB(dbfile, export_dir)
+ upgraded = exportdb.was_upgraded
+ if upgraded and verbose:
+ print(
+ f"Upgraded export database {dbfile} from version {upgraded[0]} to {upgraded[1]}"
+ )
+ exportdb.close()
+
+ conn = sqlite3.connect(str(dbfile))
+ c = conn.cursor()
+ # get most recent config
+ row = c.execute("SELECT config FROM config ORDER BY id DESC LIMIT 1;").fetchone()
+ if row:
+ config = toml.loads(row[0])
+ try:
+ photos_db_path = config["export"].get("db", None)
+ except KeyError:
+ photos_db_path = None
+ else:
+ # TODO: parse the runs table to get the last --db
+ # in the mean time, photos_db_path = None will use the default library
+ photos_db_path = None
+
+ verbose_ = print if verbose else lambda *args, **kwargs: None
+ photosdb = PhotosDB(dbfile=photos_db_path, verbose=verbose_)
+ exportdb = ExportDB(dbfile, export_dir)
+ c.execute(
+ "SELECT filepath_normalized, filepath, uuid, dest_mode, dest_size FROM export_data;"
+ )
+ rows = c.fetchall()
+ touched = 0
+ not_touched = 0
+ skipped = 0
+ for row in rows:
+ filepath_normalized = row[0]
+ filepath = row[1]
+ filepath = export_dir / filepath
+ uuid = row[2]
+ dest_mode = row[3]
+ dest_size = row[4]
+ if not filepath.exists():
+ skipped += 1
+ if verbose:
+ print(
+ f"[dark_orange]Skipping missing file (not in export directory)[/dark_orange]: '{filepath}'"
+ )
+ continue
+
+ photo = photosdb.get_photo(uuid)
+ if not photo:
+ skipped += 1
+ if verbose:
+ print(
+ f"[dark_orange]Skipping missing photo (did not find in Photos Library)[/dark_orange]: '{filepath}' ({uuid})"
+ )
+ continue
+
+ ts = int(photo.date.timestamp())
+ stat = os.stat(str(filepath))
+ mtime = stat.st_mtime
+ if mtime == ts:
+ not_touched += 1
+ if verbose:
+ print(
+ f"[green]Skipping file (timestamp matches)[/green]: '{filepath}' [dodger_blue1]{isotime_from_ts(ts)} ({ts})[/dodger_blue1]"
+ )
+ continue
+
+ touched += 1
+ if verbose:
+ print(
+ f"[deep_pink3]Touching file[/deep_pink3]: '{filepath}' "
+ f"[dodger_blue1]{isotime_from_ts(mtime)} ({mtime}) -> {isotime_from_ts(ts)} ({ts})[/dodger_blue1]"
+ )
+
+ if not dry_run:
+ os.utime(str(filepath), (ts, ts))
+ rec = exportdb.get_file_record(filepath)
+ rec.dest_sig = (dest_mode, dest_size, ts)
+
+ return (touched, not_touched, skipped)
diff --git a/osxphotos/fileutil.py b/osxphotos/fileutil.py
index d135c156..7769d00b 100644
--- a/osxphotos/fileutil.py
+++ b/osxphotos/fileutil.py
@@ -143,7 +143,7 @@ class FileUtilMacOS(FileUtilABC):
@classmethod
def utime(cls, path, times):
"""Set the access and modified time of path."""
- os.utime(path, times)
+ os.utime(path, times=times)
@classmethod
def cmp(cls, f1, f2, mtime1=None):
@@ -187,7 +187,7 @@ class FileUtilMacOS(FileUtilABC):
@classmethod
def file_sig(cls, f1):
- """return os.stat signature for file f1"""
+ """return os.stat signature for file f1 as tuple of (mode, size, mtime)"""
return cls._sig(os.stat(f1))
@classmethod
diff --git a/osxphotos/photoexporter.py b/osxphotos/photoexporter.py
index b94e0ab2..18c86373 100644
--- a/osxphotos/photoexporter.py
+++ b/osxphotos/photoexporter.py
@@ -33,7 +33,7 @@ from ._constants import (
from ._version import __version__
from .datetime_utils import datetime_tz_to_utc
from .exiftool import ExifTool, exiftool_can_write
-from .export_db import ExportDB_ABC, ExportDBNoOp
+from .export_db import ExportDB, ExportDBTemp
from .fileutil import FileUtil
from .photokit import (
PHOTOS_VERSION_CURRENT,
@@ -81,7 +81,7 @@ class ExportOptions:
exiftool_flags (list of str): optional list of flags to pass to exiftool when using exiftool option, e.g ["-m", "-F"]
exiftool: (bool, default = False): if True, will use exiftool to write metadata to export file
export_as_hardlink: (bool, default=False): if True, will hardlink files instead of copying them
- export_db: (ExportDB_ABC): instance of a class that conforms to ExportDB_ABC with methods for getting/setting data related to exported files to compare update state
+ export_db: (ExportDB): instance of a class that conforms to ExportDB with methods for getting/setting data related to exported files to compare update state
face_regions: (bool, default=True): if True, will export face regions
fileutil: (FileUtilABC): class that conforms to FileUtilABC with various file utilities
force_update: (bool, default=False): if True, will export photo if any metadata has changed but export otherwise would not be triggered (e.g. metadata changed but not using exiftool)
@@ -128,7 +128,7 @@ class ExportOptions:
exiftool_flags: Optional[List] = None
exiftool: bool = False
export_as_hardlink: bool = False
- export_db: Optional[ExportDB_ABC] = None
+ export_db: Optional[ExportDB] = None
face_regions: bool = True
fileutil: Optional[FileUtil] = None
force_update: bool = False
@@ -164,6 +164,12 @@ class ExportOptions:
def asdict(self):
return asdict(self)
+ @property
+ def bit_flags(self):
+ """Return bit flags representing options that affect export"""
+ # currently only exiftool makes a difference
+ return self.exiftool << 1
+
class StagedFiles:
"""Represents files staged for export"""
@@ -403,8 +409,8 @@ class PhotoExporter:
"Cannot use export_as_hardlink with download_missing or use_photos_export"
)
- # when called from export(), won't get an export_db, so use no-op version
- options.export_db = options.export_db or ExportDBNoOp()
+ # when called from export(), won't get an export_db, so use temp version
+ options.export_db = options.export_db or ExportDBTemp()
# ensure there's a FileUtil class to use
options.fileutil = options.fileutil or FileUtil
@@ -443,6 +449,7 @@ class PhotoExporter:
# get the right destination path depending on options.update, etc.
dest = self._get_dest_path(src, dest, options)
+
self._render_options.filepath = str(dest)
all_results = ExportResults()
@@ -545,23 +552,20 @@ class PhotoExporter:
all_results += self._write_sidecar_files(dest=dest, options=options)
- if options.touch_file:
- all_results += self._touch_files(all_results, options)
-
return all_results
- def _touch_files(
- self, results: ExportResults, options: ExportOptions
- ) -> ExportResults:
- """touch file date/time to match photo creation date/time"""
+ def _touch_files(self, touch_files: List, options: ExportOptions) -> ExportResults:
+ """touch file date/time to match photo creation date/time; only touches files if needed"""
fileutil = options.fileutil
- touch_files = set(results.to_touch)
- touch_results = ExportResults()
- for touch_file in touch_files:
+ touch_results = []
+ for touch_file in set(touch_files):
ts = int(self.photo.date.timestamp())
- fileutil.utime(touch_file, (ts, ts))
- touch_results.touched.append(touch_file)
- return touch_results
+ stat = os.stat(touch_file)
+ if stat.st_mtime != ts:
+ if not options.dry_run:
+ fileutil.utime(touch_file, (ts, ts))
+ touch_results.append(touch_file)
+ return ExportResults(touched=touch_results)
def _get_edited_filename(self, original_filename):
"""Return the filename for the exported edited photo
@@ -610,21 +614,11 @@ class PhotoExporter:
):
return pathlib.Path(increment_filename(dest))
- # if update and file exists, need to check to see if it's the write file by checking export db
+ # if update and file exists, need to check to see if it's the right file by checking export db
if (options.update or options.force_update) and dest.exists() and src:
export_db = options.export_db
- fileutil = options.fileutil
# destination exists, check to see if destination is the right UUID
dest_uuid = export_db.get_uuid_for_file(dest)
- if dest_uuid is None and fileutil.cmp(src, dest):
- # might be exporting into a pre-ExportDB folder or the DB got deleted
- dest_uuid = self.photo.uuid
- export_db.set_data(
- filename=dest,
- uuid=self.photo.uuid,
- orig_stat=fileutil.file_sig(dest),
- info_json=self.photo.json(),
- )
if dest_uuid != self.photo.uuid:
# not the right file, find the right one
# find files that match "dest_name (*.ext" (e.g. "dest_name (1).jpg", "dest_name (2).jpg)", ...)
@@ -639,16 +633,6 @@ class PhotoExporter:
if dest_uuid == self.photo.uuid:
dest = pathlib.Path(file_)
break
- elif dest_uuid is None and fileutil.cmp(src, file_):
- # files match, update the UUID
- dest = pathlib.Path(file_)
- export_db.set_data(
- filename=dest,
- uuid=self.photo.uuid,
- orig_stat=fileutil.file_sig(dest),
- info_json=self.photo.json(),
- )
- break
else:
# increment the destination file
dest = pathlib.Path(increment_filename(dest))
@@ -656,6 +640,57 @@ class PhotoExporter:
# either dest was updated in the if clause above or not updated at all
return dest
+ def _should_update_photo(
+ self, src: pathlib.Path, dest: pathlib.Path, options: ExportOptions
+ ) -> bool:
+ """Return True if photo should be updated, else False"""
+ export_db = options.export_db
+ fileutil = options.fileutil
+
+ file_record = export_db.get_file_record(dest)
+
+ if not file_record:
+ # photo doesn't exist in database, should update
+ return True
+
+ if options.export_as_hardlink and not dest.samefile(src):
+ # different files, should update
+ return True
+
+ if not options.export_as_hardlink and dest.samefile(src):
+ # same file but not exporting as hardlink, should update
+ return True
+
+ if not options.ignore_signature and not fileutil.cmp_file_sig(
+ dest, file_record.dest_sig
+ ):
+ # destination file doesn't match what was last exported
+ return True
+
+ if file_record.export_options != options.bit_flags:
+ # exporting with different set of options (e.g. exiftool), should update
+ # need to check this before exiftool in case exiftool options are different
+ # and export database is missing; this will always be True if database is missing
+ # as it'll be None and bit_flags will be an int
+ return True
+
+ if options.exiftool:
+ current_exifdata = self._exiftool_json_sidecar(options=options)
+ return current_exifdata != file_record.exifdata
+
+ if options.edited and not fileutil.cmp_file_sig(src, file_record.src_sig):
+ # edited file in Photos doesn't match what was last exported
+ return True
+
+ if options.force_update:
+ current_digest = hexdigest(self.photo.json())
+ if current_digest != file_record.digest:
+ # metadata in Photos changed, force update
+ return True
+
+ # photo should not be updated
+ return False
+
def _stage_photos_for_export(self, options: ExportOptions) -> StagedFiles:
"""Stages photos for export
@@ -989,11 +1024,8 @@ class PhotoExporter:
update_updated_files = []
update_new_files = []
update_skipped_files = [] # skip files that are already up to date
- touched_files = []
converted_to_jpeg_files = []
exif_results = ExportResults()
- converted_stat = None
- edited_stat = None
dest_str = str(dest)
dest_exists = dest.exists()
@@ -1002,98 +1034,25 @@ class PhotoExporter:
export_db = options.export_db
if options.update or options.force_update: # updating
- cmp_touch, cmp_orig = False, False
if dest_exists:
- # update, destination exists, but we might not need to replace it...
- if options.exiftool:
- sig_exif = export_db.get_stat_exif_for_file(dest_str)
- cmp_orig = fileutil.cmp_file_sig(dest_str, sig_exif)
- if cmp_orig:
- # if signatures match also need to compare exifdata to see if metadata changed
- cmp_orig = not self._should_run_exiftool(dest_str, options)
- sig_exif = (
- sig_exif[0],
- sig_exif[1],
- int(self.photo.date.timestamp()),
- )
- cmp_touch = fileutil.cmp_file_sig(dest_str, sig_exif)
- elif options.convert_to_jpeg:
- sig_converted = export_db.get_stat_converted_for_file(dest_str)
- cmp_orig = fileutil.cmp_file_sig(dest_str, sig_converted)
- sig_converted = (
- sig_converted[0],
- sig_converted[1],
- int(self.photo.date.timestamp()),
- )
- cmp_touch = fileutil.cmp_file_sig(dest_str, sig_converted)
- else:
- cmp_orig = options.ignore_signature or fileutil.cmp(src, dest)
- cmp_touch = fileutil.cmp(
- src, dest, mtime1=int(self.photo.date.timestamp())
- )
- if options.force_update:
- # need to also check the photo's metadata to that in the database
- # and if anything changed, we need to update the file
- # ony the hex digest of the metadata is stored in the database
- photo_digest = hexdigest(self.photo.json())
- db_digest = export_db.get_metadata_for_file(dest_str)
- cmp_orig = photo_digest == db_digest
-
- sig_cmp = cmp_touch if options.touch_file else cmp_orig
-
- if options.edited:
- # requested edited version of photo
- # need to see if edited version in Photos library has changed
- # (e.g. it's been edited again)
- sig_edited = export_db.get_stat_edited_for_file(dest_str)
- cmp_edited = (
- fileutil.cmp_file_sig(src, sig_edited)
- if sig_edited != (None, None, None)
- else False
- )
- sig_cmp = sig_cmp and (options.force_update or cmp_edited)
-
- if (options.export_as_hardlink and dest.samefile(src)) or (
- not options.export_as_hardlink
- and not dest.samefile(src)
- and sig_cmp
- ):
- # destination exists and signatures match, skip it
- update_skipped_files.append(dest_str)
- elif options.touch_file and cmp_orig and not cmp_touch:
- # destination exists, signature matches original but does not match expected touch time
- # skip exporting but update touch time
- update_skipped_files.append(dest_str)
- touched_files.append(dest_str)
- elif not options.touch_file and cmp_touch and not cmp_orig:
- # destination exists, signature matches expected touch but not original
- # user likely exported with touch_file and is now exporting without touch_file
- # don't update the file because it's same but leave touch time
- update_skipped_files.append(dest_str)
- else:
- # destination exists but is different
+ if self._should_update_photo(src, dest, options):
update_updated_files.append(dest_str)
- if options.touch_file:
- touched_files.append(dest_str)
+ else:
+ update_skipped_files.append(dest_str)
else:
# update, destination doesn't exist (new file)
update_new_files.append(dest_str)
- if options.touch_file:
- touched_files.append(dest_str)
else:
# not update, export the file
exported_files.append(dest_str)
- if options.touch_file:
- sig = fileutil.file_sig(src)
- sig = (sig[0], sig[1], int(self.photo.date.timestamp()))
- if not fileutil.cmp_file_sig(src, sig):
- touched_files.append(dest_str)
- if not update_skipped_files:
- # have file to export
- edited_stat = (
- fileutil.file_sig(src) if options.edited else (None, None, None)
+ export_files = update_new_files + update_updated_files + exported_files
+ for export_dest in export_files:
+ # set src_sig before any modifications by convert_to_jpeg or exiftool
+ export_record = export_db.create_or_get_file_record(
+ export_dest, self.photo.uuid
)
+ export_record.src_sig = fileutil.file_sig(src)
if dest_exists and any(
[options.overwrite, options.update, options.force_update]
):
@@ -1123,7 +1082,6 @@ class PhotoExporter:
src, tmp_file, compression_quality=options.jpeg_quality
)
src = tmp_file
- converted_stat = fileutil.file_sig(tmp_file)
converted_to_jpeg_files.append(dest_str)
if options.exiftool:
@@ -1139,20 +1097,7 @@ class PhotoExporter:
f"Error copying file {src} to {dest_str}: {e} ({lineno(__file__)})"
) from e
- json_info = self.photo.json()
- # don't set the metadata digest if not force_update so that future use of force_update catches metadata change
- metadata_digest = hexdigest(json_info) if options.force_update else None
- export_db.set_data(
- filename=dest_str,
- uuid=self.photo.uuid,
- orig_stat=fileutil.file_sig(dest_str),
- converted_stat=converted_stat,
- edited_stat=edited_stat,
- info_json=json_info,
- metadata=metadata_digest,
- )
-
- return ExportResults(
+ results = ExportResults(
converted_to_jpeg=converted_to_jpeg_files,
error=exif_results.error,
exif_updated=exif_results.exif_updated,
@@ -1161,10 +1106,34 @@ class PhotoExporter:
exported=exported_files + update_new_files + update_updated_files,
new=update_new_files,
skipped=update_skipped_files,
- to_touch=touched_files,
updated=update_updated_files,
)
+ # touch files if needed
+ if options.touch_file:
+ results += self._touch_files(
+ exported_files
+ + update_new_files
+ + update_updated_files
+ + update_skipped_files,
+ options,
+ )
+
+ # set data in the database
+ with export_db.create_or_get_file_record(dest_str, self.photo.uuid) as rec:
+ photoinfo = self.photo.json()
+ rec.photoinfo = photoinfo
+ rec.export_options = options.bit_flags
+ # don't set src_sig as that is set above before any modifications by convert_to_jpeg or exiftool
+ if not options.ignore_signature:
+ rec.dest_sig = fileutil.file_sig(dest)
+ if options.exiftool:
+ rec.exifdata = self._exiftool_json_sidecar(options)
+ if options.force_update:
+ rec.digest = hexdigest(photoinfo)
+
+ return results
+
def _write_sidecar_files(
self,
dest: pathlib.Path,
@@ -1245,8 +1214,8 @@ class PhotoExporter:
sidecar_type = data[4]
sidecar_digest = hexdigest(sidecar_str)
- old_sidecar_digest, sidecar_sig = export_db.get_sidecar_for_file(
- sidecar_filename
+ sidecar_record = export_db.create_or_get_file_record(
+ sidecar_filename, self.photo.uuid
)
write_sidecar = (
not (options.update or options.force_update)
@@ -1256,8 +1225,10 @@ class PhotoExporter:
)
or (
(options.update or options.force_update)
- and (sidecar_digest != old_sidecar_digest)
- or not fileutil.cmp_file_sig(sidecar_filename, sidecar_sig)
+ and (sidecar_digest != sidecar_record.digest)
+ or not fileutil.cmp_file_sig(
+ sidecar_filename, sidecar_record.dest_sig
+ )
)
)
if write_sidecar:
@@ -1265,16 +1236,13 @@ class PhotoExporter:
files_written.append(str(sidecar_filename))
if not options.dry_run:
self._write_sidecar(sidecar_filename, sidecar_str)
- export_db.set_sidecar_for_file(
- sidecar_filename,
- sidecar_digest,
- fileutil.file_sig(sidecar_filename),
- )
+ sidecar_record.digest = sidecar_digest
+ sidecar_record.dest_sig = fileutil.file_sig(sidecar_filename)
else:
verbose(f"Skipped up to date {sidecar_type} sidecar {sidecar_filename}")
files_skipped.append(str(sidecar_filename))
- return ExportResults(
+ results = ExportResults(
sidecar_json_written=sidecar_json_files_written,
sidecar_json_skipped=sidecar_json_files_skipped,
sidecar_exiftool_written=sidecar_exiftool_files_written,
@@ -1283,6 +1251,26 @@ class PhotoExporter:
sidecar_xmp_skipped=sidecar_xmp_files_skipped,
)
+ if options.touch_file:
+ all_sidecars = (
+ sidecar_json_files_written
+ + sidecar_exiftool_files_written
+ + sidecar_xmp_files_written
+ + sidecar_json_files_skipped
+ + sidecar_exiftool_files_skipped
+ + sidecar_xmp_files_skipped
+ )
+ results += self._touch_files(all_sidecars, options)
+
+ # update destination signatures in database
+ for sidecar_filename in all_sidecars:
+ sidecar_record = export_db.create_or_get_file_record(
+ sidecar_filename, self.photo.uuid
+ )
+ sidecar_record.dest_sig = fileutil.file_sig(sidecar_filename)
+
+ return results
+
def _write_exif_metadata_to_file(
self,
src,
@@ -1297,10 +1285,7 @@ class PhotoExporter:
local machine prior to being copied to the export destination which may be on a
network drive or other slower external storage."""
- export_db = options.export_db
- fileutil = options.fileutil
verbose = options.verbose or self._verbose
-
exiftool_results = ExportResults()
# don't try to write if unsupported file type for exiftool
@@ -1312,53 +1297,35 @@ class PhotoExporter:
)
)
# set file signature so the file doesn't get re-exported with --update
- export_db.set_data(
- dest,
- uuid=self.photo.uuid,
- exif_stat=fileutil.file_sig(src),
- exif_json=self._exiftool_json_sidecar(options=options),
- )
return exiftool_results
# determine if we need to write the exif metadata
# if we are not updating, we always write
# else, need to check the database to determine if we need to write
- run_exiftool = self._should_run_exiftool(dest, options)
- if run_exiftool:
- verbose(f"Writing metadata with exiftool for {pathlib.Path(dest).name}")
- if not options.dry_run:
- warning_, error_ = self._write_exif_data(src, options=options)
- if warning_:
- exiftool_results.exiftool_warning.append((dest, warning_))
- if error_:
- exiftool_results.exiftool_error.append((dest, error_))
- exiftool_results.error.append((dest, error_))
+ verbose(f"Writing metadata with exiftool for {pathlib.Path(dest).name}")
+ if not options.dry_run:
+ warning_, error_ = self._write_exif_data(src, options=options)
+ if warning_:
+ exiftool_results.exiftool_warning.append((dest, warning_))
+ if error_:
+ exiftool_results.exiftool_error.append((dest, error_))
+ exiftool_results.error.append((dest, error_))
- export_db.set_data(
- dest,
- uuid=self.photo.uuid,
- exif_stat=fileutil.file_sig(src),
- exif_json=self._exiftool_json_sidecar(options=options),
- )
- exiftool_results.exif_updated.append(dest)
- exiftool_results.to_touch.append(dest)
- else:
- verbose(
- f"Skipped up to date exiftool metadata for {pathlib.Path(dest).name}"
- )
+ exiftool_results.exif_updated.append(dest)
+ exiftool_results.to_touch.append(dest)
return exiftool_results
def _should_run_exiftool(self, dest, options: ExportOptions) -> bool:
"""Return True if exiftool should be run to update metadata"""
- run_exiftool = not (options.update or options.force_update)
+ run_exiftool = not options.update and not options.force_update
if options.update or options.force_update:
files_are_different = False
- old_data = options.export_db.get_exifdata_for_file(dest)
+ exif_record = options.export_db.get_file_record(dest)
+ old_data = exif_record.exifdata if exif_record else None
if old_data is not None:
old_data = json.loads(old_data)[0]
- current_data = json.loads(self._exiftool_json_sidecar(options=options))[
- 0
- ]
+ current_data = json.loads(self._exiftool_json_sidecar(options=options))
+ current_data = current_data[0]
if old_data != current_data:
files_are_different = True
diff --git a/tests/test_cli.py b/tests/test_cli.py
index d7717c02..6e59fba0 100644
--- a/tests/test_cli.py
+++ b/tests/test_cli.py
@@ -3,6 +3,7 @@
import os
import sqlite3
import tempfile
+from tempfile import TemporaryDirectory
import pytest
from click.testing import CliRunner
@@ -675,6 +676,8 @@ CLI_EXIFTOOL_IGNORE_DATE_MODIFIED = {
CLI_EXIFTOOL_ERROR = ["E2078879-A29C-4D6F-BACB-E3BBE6C3EB91"]
+CLI_NOT_REALLY_A_JPEG = "E2078879-A29C-4D6F-BACB-E3BBE6C3EB91"
+
CLI_EXIFTOOL_DUPLICATE_KEYWORDS = {
"E9BC5C36-7CD1-40A1-A72B-8B8FAC227D51": "wedding.jpg"
}
@@ -4936,12 +4939,126 @@ def test_export_force_update():
export, [os.path.join(cwd, photos_db_path), ".", "--force-update"]
)
assert result.exit_code == 0
+ print(result.output)
assert (
f"Processed: {PHOTOS_NOT_IN_TRASH_LEN_15_7} photos, exported: 0, updated: 0, skipped: {PHOTOS_NOT_IN_TRASH_LEN_15_7+PHOTOS_EDITED_15_7}, updated EXIF data: 0, missing: 3, error: 0"
in result.output
)
+@pytest.mark.skipif(exiftool is None, reason="exiftool not installed")
+def test_export_update_complex():
+ """test complex --update scenario, #630"""
+ import glob
+ import os
+ import os.path
+
+ import osxphotos
+ from osxphotos.cli import OSXPHOTOS_EXPORT_DB, export
+
+ runner = CliRunner()
+ cwd = os.getcwd()
+ # pylint: disable=not-context-manager
+ with runner.isolated_filesystem():
+ # basic export
+ result = runner.invoke(export, [os.path.join(cwd, CLI_PHOTOS_DB), ".", "-V"])
+ assert result.exit_code == 0
+ files = glob.glob("*")
+ assert sorted(files) == sorted(CLI_EXPORT_FILENAMES)
+ assert os.path.isfile(OSXPHOTOS_EXPORT_DB)
+
+ src = os.path.join(cwd, CLI_PHOTOS_DB)
+ dest = os.path.join(os.getcwd(), "export_complex_update.photoslibrary")
+ photos_db_path = copy_photos_library_to_path(src, dest)
+
+ tempdir = TemporaryDirectory()
+
+ options = [
+ "--verbose",
+ "--update",
+ "--cleanup",
+ "--directory",
+ "{created.year}/{created.month}",
+ "--description-template",
+ "Album:{album,}{newline}Description:{descr,}",
+ "--exiftool",
+ "--exiftool-merge-keywords",
+ "--exiftool-merge-persons",
+ "--keyword-template",
+ "{keyword}",
+ "--not-hidden",
+ "--retry",
+ "2",
+ "--skip-original-if-edited",
+ "--timestamp",
+ "--strip",
+ "--skip-uuid",
+ CLI_NOT_REALLY_A_JPEG,
+ ]
+ # update
+ result = runner.invoke(
+ export, [os.path.join(cwd, photos_db_path), tempdir.name, *options]
+ )
+ assert result.exit_code == 0
+ assert (
+ f"exported: {PHOTOS_NOT_IN_TRASH_LEN_15_7-1}, updated: 0, skipped: 0, updated EXIF data: {PHOTOS_NOT_IN_TRASH_LEN_15_7-1}"
+ in result.output
+ )
+
+ result = runner.invoke(
+ export, [os.path.join(cwd, photos_db_path), tempdir.name, *options]
+ )
+ assert result.exit_code == 0
+ assert "exported: 0" in result.output
+
+ # update a file
+ dbpath = os.path.join(photos_db_path, "database/Photos.sqlite")
+ try:
+ conn = sqlite3.connect(dbpath)
+ c = conn.cursor()
+ except sqlite3.Error as e:
+ pytest.exit(f"An error occurred opening sqlite file")
+
+ # photo is IMG_4547.jpg
+ c.execute(
+ "UPDATE ZADDITIONALASSETATTRIBUTES SET Z_OPT=9, ZTITLE='My Updated Title' WHERE Z_PK=8;"
+ )
+ conn.commit()
+
+ # run --update to see if updated metadata forced update
+ result = runner.invoke(
+ export, [os.path.join(cwd, photos_db_path), tempdir.name, *options]
+ )
+ assert result.exit_code == 0
+ assert (
+ f"exported: 0, updated: 1, skipped: {PHOTOS_NOT_IN_TRASH_LEN_15_7-2}, updated EXIF data: 1"
+ in result.output
+ )
+
+ # update, nothing should export
+ result = runner.invoke(
+ export, [os.path.join(cwd, photos_db_path), tempdir.name, *options]
+ )
+ assert result.exit_code == 0
+ assert (
+ f"exported: 0, updated: 0, skipped: {PHOTOS_NOT_IN_TRASH_LEN_15_7-1}, updated EXIF data: 0"
+ in result.output
+ )
+
+ # change the template and run again
+ options.extend(["--keyword-template", "FOO"])
+
+ # run update and all photos should be updated
+ result = runner.invoke(
+ export, [os.path.join(cwd, photos_db_path), tempdir.name, *options]
+ )
+ assert result.exit_code == 0
+ assert (
+ f"exported: 0, updated: {PHOTOS_NOT_IN_TRASH_LEN_15_7-1}, skipped: 0, updated EXIF data: {PHOTOS_NOT_IN_TRASH_LEN_15_7-1}"
+ in result.output
+ )
+
+
@pytest.mark.skipif(
"OSXPHOTOS_TEST_EXPORT" not in os.environ,
reason="Skip if not running on author's personal library.",
@@ -5266,17 +5383,14 @@ def test_export_update_no_db():
assert os.path.isfile(OSXPHOTOS_EXPORT_DB)
os.unlink(OSXPHOTOS_EXPORT_DB)
- # update
+ # update, will re-export all files with different names
result = runner.invoke(
export, [os.path.join(cwd, CLI_PHOTOS_DB), ".", "--update"]
)
assert result.exit_code == 0
- # unedited files will be skipped because their signatures will compare but
- # edited files will be re-exported because there won't be an edited signature
- # in the database
assert (
- f"Processed: {PHOTOS_NOT_IN_TRASH_LEN_15_7} photos, exported: 0, updated: {PHOTOS_EDITED_15_7}, skipped: {PHOTOS_NOT_IN_TRASH_LEN_15_7}, updated EXIF data: 0, missing: 3, error: 0"
+ f"Processed: {PHOTOS_NOT_IN_TRASH_LEN_15_7} photos, exported: {PHOTOS_NOT_IN_TRASH_LEN_15_7+PHOTOS_EDITED_15_7}, updated: 0"
in result.output
)
assert os.path.isfile(OSXPHOTOS_EXPORT_DB)
@@ -5590,7 +5704,7 @@ def test_export_touch_files_update():
# touch one file and run update again
ts = time.time()
- os.utime(CLI_EXPORT_BY_DATE[0], (ts, ts))
+ os.utime(CLI_EXPORT_BY_DATE_NEED_TOUCH[1], (ts, ts))
result = runner.invoke(
export,
@@ -5922,7 +6036,12 @@ def test_export_ignore_signature_sidecar():
# should result in a new sidecar being exported but not the image itself
exportdb = osxphotos.export_db.ExportDB("./.osxphotos_export.db", ".")
for filename in CLI_EXPORT_IGNORE_SIGNATURE_FILENAMES:
- exportdb.set_sidecar_for_file(f"{filename}.xmp", "FOO", (0, 1, 2))
+ record = exportdb.get_file_record(filename)
+ sidecar_record = exportdb.create_or_get_file_record(
+ f"{filename}.xmp", record.uuid
+ )
+ sidecar_record.dest_sig = (0, 1, 2)
+ sidecar_record.digest = "FOO"
result = runner.invoke(
export,
@@ -6426,7 +6545,7 @@ def test_save_load_config():
],
)
assert result.exit_code == 0
- assert "Saving options to file" in result.output
+ assert "Saving options to config file" in result.output
files = glob.glob("*")
assert "config.toml" in files
@@ -6462,7 +6581,7 @@ def test_save_load_config():
],
)
assert result.exit_code == 0
- assert "Saving options to file" in result.output
+ assert "Saving options to config file" in result.output
files = glob.glob("*")
assert "config.toml" in files
@@ -6499,6 +6618,41 @@ def test_save_load_config():
assert "Writing XMP sidecar" not in result.output
+def test_config_only():
+ """test --save-config, --config-only"""
+ import glob
+ import os
+ import os.path
+
+ from osxphotos.cli import export
+
+ runner = CliRunner()
+ cwd = os.getcwd()
+ # pylint: disable=not-context-manager
+ with runner.isolated_filesystem():
+ # test save config file
+ result = runner.invoke(
+ export,
+ [
+ os.path.join(cwd, CLI_PHOTOS_DB),
+ ".",
+ "-V",
+ "--sidecar",
+ "XMP",
+ "--touch-file",
+ "--update",
+ "--save-config",
+ "config.toml",
+ "--config-only",
+ ],
+ )
+ assert result.exit_code == 0
+ assert "Saved config file" in result.output
+ assert "Processed:" not in result.output
+ files = glob.glob("*")
+ assert "config.toml" in files
+
+
def test_export_exportdb():
"""test --exportdb"""
import glob
diff --git a/tests/test_export_db.py b/tests/test_export_db.py
index 75b46f42..43291284 100644
--- a/tests/test_export_db.py
+++ b/tests/test_export_db.py
@@ -1,26 +1,36 @@
""" Test ExportDB """
-import json
+import os
+import pathlib
+import sqlite3
+import tempfile
import pytest
+from osxphotos._version import __version__
+from osxphotos.export_db import (
+ OSXPHOTOS_EXPORTDB_VERSION,
+ ExportDB,
+ ExportDBInMemory,
+ ExportDBTemp,
+ ExportRecord,
+)
+from osxphotos.export_db_utils import export_db_get_version
+
EXIF_DATA = """[{"_CreatedBy": "osxphotos, https://github.com/RhetTbull/osxphotos", "EXIF:ImageDescription": "\u2068Elder Park\u2069, \u2068Adelaide\u2069, \u2068Australia\u2069", "XMP:Description": "\u2068Elder Park\u2069, \u2068Adelaide\u2069, \u2068Australia\u2069", "XMP:Title": "Elder Park", "EXIF:GPSLatitude": "34 deg 55' 8.01\" S", "EXIF:GPSLongitude": "138 deg 35' 48.70\" E", "Composite:GPSPosition": "34 deg 55' 8.01\" S, 138 deg 35' 48.70\" E", "EXIF:GPSLatitudeRef": "South", "EXIF:GPSLongitudeRef": "East", "EXIF:DateTimeOriginal": "2017:06:20 17:18:56", "EXIF:OffsetTimeOriginal": "+09:30", "EXIF:ModifyDate": "2020:05:18 14:42:04"}]"""
INFO_DATA = """{"uuid": "3DD2C897-F19E-4CA6-8C22-B027D5A71907", "filename": "3DD2C897-F19E-4CA6-8C22-B027D5A71907.jpeg", "original_filename": "IMG_4547.jpg", "date": "2017-06-20T17:18:56.518000+09:30", "description": "\u2068Elder Park\u2069, \u2068Adelaide\u2069, \u2068Australia\u2069", "title": "Elder Park", "keywords": [], "labels": ["Statue", "Art"], "albums": ["AlbumInFolder"], "folders": {"AlbumInFolder": ["Folder1", "SubFolder2"]}, "persons": [], "path": "/Users/rhet/Pictures/Test-10.15.4.photoslibrary/originals/3/3DD2C897-F19E-4CA6-8C22-B027D5A71907.jpeg", "ismissing": false, "hasadjustments": true, "external_edit": false, "favorite": false, "hidden": false, "latitude": -34.91889167000001, "longitude": 138.59686167, "path_edited": "/Users/rhet/Pictures/Test-10.15.4.photoslibrary/resources/renders/3/3DD2C897-F19E-4CA6-8C22-B027D5A71907_1_201_a.jpeg", "shared": false, "isphoto": true, "ismovie": false, "uti": "public.jpeg", "burst": false, "live_photo": false, "path_live_photo": null, "iscloudasset": false, "incloud": null, "date_modified": "2020-05-18T14:42:04.608664+09:30", "portrait": false, "screenshot": false, "slow_mo": false, "time_lapse": false, "hdr": false, "selfie": false, "panorama": false, "has_raw": false, "uti_raw": null, "path_raw": null, "place": {"name": "Elder Park, Adelaide, South Australia, Australia, River Torrens", "names": {"field0": [], "country": ["Australia"], "state_province": ["South Australia"], "sub_administrative_area": ["Adelaide"], "city": ["Adelaide", "Adelaide"], "field5": [], "additional_city_info": ["Adelaide CBD", "Tarndanya"], "ocean": [], "area_of_interest": ["Elder Park", ""], "inland_water": ["River Torrens", "River Torrens"], "field10": [], "region": [], "sub_throughfare": [], "field13": [], "postal_code": [], "field15": [], "field16": [], "street_address": [], "body_of_water": ["River Torrens", "River Torrens"]}, "country_code": "AU", "ishome": false, "address_str": "River Torrens, Adelaide SA, Australia", "address": {"street": null, "sub_locality": "Tarndanya", "city": "Adelaide", "sub_administrative_area": "Adelaide", "state_province": "SA", "postal_code": null, "country": "Australia", "iso_country_code": "AU"}}, "exif": {"flash_fired": false, "iso": 320, "metering_mode": 3, "sample_rate": null, "track_format": null, "white_balance": 0, "aperture": 2.2, "bit_rate": null, "duration": null, "exposure_bias": 0.0, "focal_length": 4.15, "fps": null, "latitude": null, "longitude": null, "shutter_speed": 0.058823529411764705, "camera_make": "Apple", "camera_model": "iPhone 6s", "codec": null, "lens_model": "iPhone 6s back camera 4.15mm f/2.2"}}"""
SIDECAR_DATA = """FOO_BAR"""
METADATA_DATA = "FIZZ"
+DIGEST_DATA = "FIZZ"
EXIF_DATA2 = """[{"_CreatedBy": "osxphotos, https://github.com/RhetTbull/osxphotos", "XMP:Title": "St. James's Park", "XMP:TagsList": ["London 2018", "St. James's Park", "England", "United Kingdom", "UK", "London"], "IPTC:Keywords": ["London 2018", "St. James's Park", "England", "United Kingdom", "UK", "London"], "XMP:Subject": ["London 2018", "St. James's Park", "England", "United Kingdom", "UK", "London"], "EXIF:GPSLatitude": "51 deg 30' 12.86\" N", "EXIF:GPSLongitude": "0 deg 7' 54.50\" W", "Composite:GPSPosition": "51 deg 30' 12.86\" N, 0 deg 7' 54.50\" W", "EXIF:GPSLatitudeRef": "North", "EXIF:GPSLongitudeRef": "West", "EXIF:DateTimeOriginal": "2018:10:13 09:18:12", "EXIF:OffsetTimeOriginal": "-04:00", "EXIF:ModifyDate": "2019:12:08 14:06:44"}]"""
INFO_DATA2 = """{"uuid": "F2BB3F98-90F0-4E4C-A09B-25C6822A4529", "filename": "F2BB3F98-90F0-4E4C-A09B-25C6822A4529.jpeg", "original_filename": "IMG_8440.JPG", "date": "2019-06-11T11:42:06.711805-07:00", "description": null, "title": null, "keywords": [], "labels": ["Sky", "Cloudy", "Fence", "Land", "Outdoor", "Park", "Amusement Park", "Roller Coaster"], "albums": [], "folders": {}, "persons": [], "path": "/Volumes/MacBook Catalina - Data/Users/rhet/Pictures/Photos Library.photoslibrary/originals/F/F2BB3F98-90F0-4E4C-A09B-25C6822A4529.jpeg", "ismissing": false, "hasadjustments": false, "external_edit": false, "favorite": false, "hidden": false, "latitude": 33.81558666666667, "longitude": -117.99298, "path_edited": null, "shared": false, "isphoto": true, "ismovie": false, "uti": "public.jpeg", "burst": false, "live_photo": false, "path_live_photo": null, "iscloudasset": true, "incloud": true, "date_modified": "2019-10-14T00:51:47.141950-07:00", "portrait": false, "screenshot": false, "slow_mo": false, "time_lapse": false, "hdr": false, "selfie": false, "panorama": false, "has_raw": false, "uti_raw": null, "path_raw": null, "place": {"name": "Adventure City, Stanton, California, United States", "names": {"field0": [], "country": ["United States"], "state_province": ["California"], "sub_administrative_area": ["Orange"], "city": ["Stanton", "Anaheim", "Anaheim"], "field5": [], "additional_city_info": ["West Anaheim"], "ocean": [], "area_of_interest": ["Adventure City", "Adventure City"], "inland_water": [], "field10": [], "region": [], "sub_throughfare": [], "field13": [], "postal_code": [], "field15": [], "field16": [], "street_address": [], "body_of_water": []}, "country_code": "US", "ishome": false, "address_str": "Adventure City, 1240 S Beach Blvd, Anaheim, CA 92804, United States", "address": {"street": "1240 S Beach Blvd", "sub_locality": "West Anaheim", "city": "Stanton", "sub_administrative_area": "Orange", "state_province": "CA", "postal_code": "92804", "country": "United States", "iso_country_code": "US"}}, "exif": {"flash_fired": false, "iso": 25, "metering_mode": 5, "sample_rate": null, "track_format": null, "white_balance": 0, "aperture": 2.2, "bit_rate": null, "duration": null, "exposure_bias": 0.0, "focal_length": 4.15, "fps": null, "latitude": null, "longitude": null, "shutter_speed": 0.0004940711462450593, "camera_make": "Apple", "camera_model": "iPhone 6s", "codec": null, "lens_model": "iPhone 6s back camera 4.15mm f/2.2"}}"""
+DIGEST_DATA2 = "BUZZ"
DATABASE_VERSION1 = "tests/export_db_version1.db"
def test_export_db():
"""test ExportDB"""
- import os
- import tempfile
-
- from osxphotos.export_db import OSXPHOTOS_EXPORTDB_VERSION, ExportDB
-
tempdir = tempfile.TemporaryDirectory(prefix="osxphotos_")
dbname = os.path.join(tempdir.name, ".osxphotos_export.db")
db = ExportDB(dbname, tempdir.name)
@@ -32,104 +42,51 @@ def test_export_db():
filepath = os.path.join(tempdir.name, "test.JPG")
filepath_lower = os.path.join(tempdir.name, "test.jpg")
- db.set_uuid_for_file(filepath, "FOO-BAR")
- # filename should be case-insensitive
- assert db.get_uuid_for_file(filepath_lower) == "FOO-BAR"
- db.set_info_for_uuid("FOO-BAR", INFO_DATA)
- assert db.get_info_for_uuid("FOO-BAR") == INFO_DATA
- db.set_exifdata_for_file(filepath, EXIF_DATA)
- assert db.get_exifdata_for_file(filepath) == EXIF_DATA
- db.set_stat_orig_for_file(filepath, (1, 2, 3))
- assert db.get_stat_orig_for_file(filepath) == (1, 2, 3)
- db.set_stat_exif_for_file(filepath, (4, 5, 6))
- assert db.get_stat_exif_for_file(filepath) == (4, 5, 6)
- db.set_stat_edited_for_file(filepath, (10, 11, 12))
- assert db.get_stat_edited_for_file(filepath) == (10, 11, 12)
- db.set_stat_converted_for_file(filepath, (7, 8, 9))
- assert db.get_stat_converted_for_file(filepath) == (7, 8, 9)
- db.set_sidecar_for_file(filepath, SIDECAR_DATA, (13, 14, 15))
- assert db.get_sidecar_for_file(filepath) == (SIDECAR_DATA, (13, 14, 15))
- assert db.get_previous_uuids() == ["FOO-BAR"]
+ uuid = "FOOBAR"
+ assert db.get_photoinfo_for_uuid(uuid) is None
+ db.set_photoinfo_for_uuid(uuid, INFO_DATA)
+ assert db.get_photoinfo_for_uuid(uuid) == INFO_DATA
- db.set_detected_text_for_uuid("FOO-BAR", json.dumps([["foo", 0.5]]))
- assert json.loads(db.get_detected_text_for_uuid("FOO-BAR")) == [["foo", 0.5]]
+ assert db.get_uuid_for_file(filepath) is None
+ db.create_file_record(filepath, uuid)
+ assert db.get_uuid_for_file(filepath) == uuid
- # test set_data which sets all at the same time
- filepath2 = os.path.join(tempdir.name, "test2.jpg")
- db.set_data(
- filepath2,
- "BAR-FOO",
- (1, 2, 3),
- (4, 5, 6),
- (7, 8, 9),
- (10, 11, 12),
- INFO_DATA,
- EXIF_DATA,
- METADATA_DATA,
- )
- assert db.get_uuid_for_file(filepath2) == "BAR-FOO"
- assert db.get_info_for_uuid("BAR-FOO") == INFO_DATA
- assert db.get_exifdata_for_file(filepath2) == EXIF_DATA
- assert db.get_stat_orig_for_file(filepath2) == (1, 2, 3)
- assert db.get_stat_exif_for_file(filepath2) == (4, 5, 6)
- assert db.get_stat_converted_for_file(filepath2) == (7, 8, 9)
- assert db.get_stat_edited_for_file(filepath2) == (10, 11, 12)
- assert sorted(db.get_previous_uuids()) == (["BAR-FOO", "FOO-BAR"])
- assert db.get_metadata_for_file(filepath2) == METADATA_DATA
+ record = db.get_file_record(filepath)
+ assert record.uuid == uuid
+ assert record.photoinfo == INFO_DATA
+ assert record.filepath == pathlib.Path(filepath).name
+ assert record.filepath_normalized == pathlib.Path(filepath).name.lower()
+ assert record.src_sig == (None, None, None)
+ assert record.dest_sig == (None, None, None)
+ assert record.digest is None
+ assert record.exifdata is None
+ record.digest = DIGEST_DATA # for next assert
- # test set_data value=None doesn't overwrite existing data
- db.set_data(
- filepath2,
- "BAR-FOO",
- None,
- None,
- None,
- None,
- None,
- None,
- None,
- )
- assert db.get_uuid_for_file(filepath2) == "BAR-FOO"
- assert db.get_info_for_uuid("BAR-FOO") == INFO_DATA
- assert db.get_exifdata_for_file(filepath2) == EXIF_DATA
- assert db.get_stat_orig_for_file(filepath2) == (1, 2, 3)
- assert db.get_stat_exif_for_file(filepath2) == (4, 5, 6)
- assert db.get_stat_converted_for_file(filepath2) == (7, 8, 9)
- assert db.get_stat_edited_for_file(filepath2) == (10, 11, 12)
- assert sorted(db.get_previous_uuids()) == (["BAR-FOO", "FOO-BAR"])
- assert db.get_metadata_for_file(filepath2) == METADATA_DATA
+ # test create_or_get_file_record
+ # existing record
+ record2 = db.create_or_get_file_record(filepath, uuid)
+ assert record2.uuid == uuid
+ assert record.photoinfo == INFO_DATA
+ assert record.digest == DIGEST_DATA
- # close and re-open
- db.close()
- db = ExportDB(dbname, tempdir.name)
- assert not db.was_created
- assert db.get_uuid_for_file(filepath2) == "BAR-FOO"
- assert db.get_info_for_uuid("BAR-FOO") == INFO_DATA
- assert db.get_exifdata_for_file(filepath2) == EXIF_DATA
- assert db.get_stat_orig_for_file(filepath2) == (1, 2, 3)
- assert db.get_stat_exif_for_file(filepath2) == (4, 5, 6)
- assert db.get_stat_converted_for_file(filepath2) == (7, 8, 9)
- assert db.get_stat_edited_for_file(filepath2) == (10, 11, 12)
- assert sorted(db.get_previous_uuids()) == (["BAR-FOO", "FOO-BAR"])
- assert json.loads(db.get_detected_text_for_uuid("FOO-BAR")) == [["foo", 0.5]]
- assert db.get_metadata_for_file(filepath2) == METADATA_DATA
+ # new record
+ filepath3 = os.path.join(tempdir.name, "test3.JPG")
+ record3 = db.create_or_get_file_record(filepath3, "new_uuid")
+ assert record3.uuid == "new_uuid"
+ assert record3.photoinfo is None
+ assert record3.digest is None
+
+ # all uuids
+ uuids = db.get_previous_uuids()
+ assert sorted(uuids) == sorted(["new_uuid", uuid])
- # update data
- db.set_uuid_for_file(filepath, "FUBAR")
- assert db.get_uuid_for_file(filepath) == "FUBAR"
- assert sorted(db.get_previous_uuids()) == (["BAR-FOO", "FUBAR"])
-
-
-def test_export_db_no_op():
- """test ExportDBNoOp"""
- import os
- import tempfile
-
- from osxphotos.export_db import OSXPHOTOS_EXPORTDB_VERSION, ExportDBNoOp
-
+def test_export_db_constraints():
+ """test ExportDB constraints"""
tempdir = tempfile.TemporaryDirectory(prefix="osxphotos_")
- db = ExportDBNoOp()
+ dbname = os.path.join(tempdir.name, ".osxphotos_export.db")
+ db = ExportDB(dbname, tempdir.name)
+ assert os.path.isfile(dbname)
assert db.was_created
assert not db.was_upgraded
assert db.version == OSXPHOTOS_EXPORTDB_VERSION
@@ -137,68 +94,28 @@ def test_export_db_no_op():
filepath = os.path.join(tempdir.name, "test.JPG")
filepath_lower = os.path.join(tempdir.name, "test.jpg")
- db.set_uuid_for_file(filepath, "FOO-BAR")
- # filename should be case-insensitive
- assert db.get_uuid_for_file(filepath_lower) is None
- db.set_info_for_uuid("FOO-BAR", INFO_DATA)
- assert db.get_info_for_uuid("FOO-BAR") is None
- db.set_exifdata_for_file(filepath, EXIF_DATA)
- assert db.get_exifdata_for_file(filepath) is None
- db.set_stat_orig_for_file(filepath, (1, 2, 3))
- assert db.get_stat_orig_for_file(filepath) is None
- db.set_stat_exif_for_file(filepath, (4, 5, 6))
- assert db.get_stat_exif_for_file(filepath) is None
- db.set_stat_converted_for_file(filepath, (7, 8, 9))
- assert db.get_stat_converted_for_file(filepath) is None
- db.set_stat_edited_for_file(filepath, (10, 11, 12))
- assert db.get_stat_edited_for_file(filepath) is None
- db.set_sidecar_for_file(filepath, SIDECAR_DATA, (13, 14, 15))
- assert db.get_sidecar_for_file(filepath) == (None, (None, None, None))
- assert db.get_previous_uuids() == []
- db.set_detected_text_for_uuid("FOO-BAR", json.dumps([["foo", 0.5]]))
- assert db.get_detected_text_for_uuid("FOO-BAR") is None
- db.set_metadata_for_file(filepath, METADATA_DATA)
- assert db.get_metadata_for_file(filepath) is None
+ uuid = "FOOBAR"
+ db.set_photoinfo_for_uuid(uuid, INFO_DATA)
+ record = db.create_file_record(filepath, uuid)
+ record.photoinfo = INFO_DATA
+ record.exifdata = EXIF_DATA
+ record.digest = DIGEST_DATA
+ record.src_sig = (7, 8, 9)
+ record.dest_sig = (10, 11, 12)
- # test set_data which sets all at the same time
- filepath2 = os.path.join(tempdir.name, "test2.jpg")
- db.set_data(
- filepath2,
- "BAR-FOO",
- (1, 2, 3),
- (4, 5, 6),
- (7, 8, 9),
- (10, 11, 12),
- INFO_DATA,
- EXIF_DATA,
- METADATA_DATA,
- )
- assert db.get_uuid_for_file(filepath2) is None
- assert db.get_info_for_uuid("BAR-FOO") is None
- assert db.get_exifdata_for_file(filepath2) is None
- assert db.get_stat_orig_for_file(filepath2) is None
- assert db.get_stat_exif_for_file(filepath2) is None
- assert db.get_stat_converted_for_file(filepath) is None
- assert db.get_stat_edited_for_file(filepath) is None
- assert db.get_previous_uuids() == []
- assert db.get_metadata_for_file(filepath) is None
+ with pytest.raises(AttributeError):
+ record.uuid = "BARFOO"
- # update data
- db.set_uuid_for_file(filepath, "FUBAR")
- assert db.get_uuid_for_file(filepath) is None
+ with pytest.raises(sqlite3.IntegrityError):
+ record2 = db.create_file_record(filepath, "NEW_UUID")
+
+ with pytest.raises(AttributeError):
+ # verify we can't add new attributes
+ record.src_stats = (7, 8, 9)
def test_export_db_in_memory():
"""test ExportDBInMemory"""
- import os
- import tempfile
-
- from osxphotos.export_db import (
- OSXPHOTOS_EXPORTDB_VERSION,
- ExportDB,
- ExportDBInMemory,
- )
-
tempdir = tempfile.TemporaryDirectory(prefix="osxphotos_")
dbname = os.path.join(tempdir.name, ".osxphotos_export.db")
db = ExportDB(dbname, tempdir.name)
@@ -207,87 +124,61 @@ def test_export_db_in_memory():
filepath = os.path.join(tempdir.name, "test.JPG")
filepath_lower = os.path.join(tempdir.name, "test.jpg")
- db.set_uuid_for_file(filepath, "FOO-BAR")
- db.set_info_for_uuid("FOO-BAR", INFO_DATA)
- db.set_exifdata_for_file(filepath, EXIF_DATA)
- db.set_stat_orig_for_file(filepath, (1, 2, 3))
- db.set_stat_exif_for_file(filepath, (4, 5, 6))
- db.set_stat_converted_for_file(filepath, (7, 8, 9))
- db.set_stat_edited_for_file(filepath, (10, 11, 12))
- db.set_sidecar_for_file(filepath, SIDECAR_DATA, (13, 14, 15))
- assert db.get_previous_uuids() == ["FOO-BAR"]
- db.set_detected_text_for_uuid("FOO-BAR", json.dumps([["foo", 0.5]]))
- db.set_metadata_for_file(filepath, METADATA_DATA)
+ uuid = "FOOBAR"
+ record = db.create_file_record(filepath, uuid)
+ record.photoinfo = INFO_DATA
+ record.exifdata = EXIF_DATA
+ record.digest = DIGEST_DATA
+ record.src_sig = (7, 8, 9)
+ record.dest_sig = (10, 11, 12)
db.close()
+ # create in memory version
dbram = ExportDBInMemory(dbname, tempdir.name)
- assert not dbram.was_created
- assert not dbram.was_upgraded
- assert dbram.version == OSXPHOTOS_EXPORTDB_VERSION
+ record2 = dbram.get_file_record(filepath)
+ assert record2.uuid == uuid
+ assert record2.photoinfo == INFO_DATA
+ assert record2.exifdata == EXIF_DATA
+ assert record2.digest == DIGEST_DATA
+ assert record2.src_sig == (7, 8, 9)
+ assert record2.dest_sig == (10, 11, 12)
- # verify values as expected
- assert dbram.get_uuid_for_file(filepath_lower) == "FOO-BAR"
- assert dbram.get_info_for_uuid("FOO-BAR") == INFO_DATA
- assert dbram.get_exifdata_for_file(filepath) == EXIF_DATA
- assert dbram.get_stat_orig_for_file(filepath) == (1, 2, 3)
- assert dbram.get_stat_exif_for_file(filepath) == (4, 5, 6)
- assert dbram.get_stat_converted_for_file(filepath) == (7, 8, 9)
- assert dbram.get_stat_edited_for_file(filepath) == (10, 11, 12)
- assert dbram.get_sidecar_for_file(filepath) == (SIDECAR_DATA, (13, 14, 15))
- assert dbram.get_previous_uuids() == ["FOO-BAR"]
- assert json.loads(dbram.get_detected_text_for_uuid("FOO-BAR")) == [["foo", 0.5]]
- assert dbram.get_metadata_for_file(filepath) == METADATA_DATA
+ # change some values
+ record2.photoinfo = INFO_DATA2
+ record2.exifdata = EXIF_DATA2
+ record2.digest = DIGEST_DATA2
+ record2.src_sig = (13, 14, 15)
+ record2.dest_sig = (16, 17, 18)
- # change a value
- dbram.set_uuid_for_file(filepath, "FUBAR")
- dbram.set_info_for_uuid("FUBAR", INFO_DATA2)
- dbram.set_exifdata_for_file(filepath, EXIF_DATA2)
- dbram.set_stat_orig_for_file(filepath, (7, 8, 9))
- dbram.set_stat_exif_for_file(filepath, (10, 11, 12))
- dbram.set_stat_converted_for_file(filepath, (1, 2, 3))
- dbram.set_stat_edited_for_file(filepath, (4, 5, 6))
- dbram.set_sidecar_for_file(filepath, "FUBAR", (20, 21, 22))
- dbram.set_detected_text_for_uuid("FUBAR", json.dumps([["bar", 0.5]]))
- dbram.set_metadata_for_file(filepath, "FUBAR")
+ assert record2.photoinfo == INFO_DATA2
+ assert record2.exifdata == EXIF_DATA2
+ assert record2.digest == DIGEST_DATA2
+ assert record2.src_sig == (13, 14, 15)
+ assert record2.dest_sig == (16, 17, 18)
- assert dbram.get_uuid_for_file(filepath_lower) == "FUBAR"
- assert dbram.get_info_for_uuid("FUBAR") == INFO_DATA2
- assert dbram.get_exifdata_for_file(filepath) == EXIF_DATA2
- assert dbram.get_stat_orig_for_file(filepath) == (7, 8, 9)
- assert dbram.get_stat_exif_for_file(filepath) == (10, 11, 12)
- assert dbram.get_stat_converted_for_file(filepath) == (1, 2, 3)
- assert dbram.get_stat_edited_for_file(filepath) == (4, 5, 6)
- assert dbram.get_sidecar_for_file(filepath) == ("FUBAR", (20, 21, 22))
- assert dbram.get_previous_uuids() == ["FUBAR"]
- assert json.loads(dbram.get_detected_text_for_uuid("FUBAR")) == [["bar", 0.5]]
- assert dbram.get_metadata_for_file(filepath) == "FUBAR"
+ # all uuids
+ uuids = dbram.get_previous_uuids()
+ assert uuids == [uuid]
dbram.close()
- # re-open on disk and verify no changes
+ # re-open original, assert no changes
db = ExportDB(dbname, tempdir.name)
- assert db.get_uuid_for_file(filepath_lower) == "FOO-BAR"
- assert db.get_info_for_uuid("FOO-BAR") == INFO_DATA
- assert db.get_exifdata_for_file(filepath) == EXIF_DATA
- assert db.get_stat_orig_for_file(filepath) == (1, 2, 3)
- assert db.get_stat_exif_for_file(filepath) == (4, 5, 6)
- assert db.get_stat_converted_for_file(filepath) == (7, 8, 9)
- assert db.get_stat_edited_for_file(filepath) == (10, 11, 12)
- assert db.get_sidecar_for_file(filepath) == (SIDECAR_DATA, (13, 14, 15))
- assert db.get_previous_uuids() == ["FOO-BAR"]
+ record = db.get_file_record(filepath)
+ assert record.uuid == uuid
+ assert record.photoinfo == INFO_DATA
+ assert record.exifdata == EXIF_DATA
+ assert record.digest == DIGEST_DATA
+ assert record.src_sig == (7, 8, 9)
+ assert record.dest_sig == (10, 11, 12)
- assert db.get_info_for_uuid("FUBAR") is None
- assert db.get_detected_text_for_uuid("FUBAR") is None
- assert db.get_metadata_for_file(filepath) == METADATA_DATA
+ # all uuids
+ uuids = db.get_previous_uuids()
+ assert uuids == [uuid]
def test_export_db_in_memory_nofile():
"""test ExportDBInMemory with no dbfile"""
- import os
- import tempfile
-
- from osxphotos.export_db import OSXPHOTOS_EXPORTDB_VERSION, ExportDBInMemory
-
tempdir = tempfile.TemporaryDirectory(prefix="osxphotos_")
filepath = os.path.join(tempdir.name, "test.JPG")
filepath_lower = os.path.join(tempdir.name, "test.jpg")
@@ -299,28 +190,187 @@ def test_export_db_in_memory_nofile():
assert not dbram.was_upgraded
assert dbram.version == OSXPHOTOS_EXPORTDB_VERSION
- # change a value
- dbram.set_uuid_for_file(filepath, "FUBAR")
- dbram.set_info_for_uuid("FUBAR", INFO_DATA2)
- dbram.set_exifdata_for_file(filepath, EXIF_DATA2)
- dbram.set_stat_orig_for_file(filepath, (7, 8, 9))
- dbram.set_stat_exif_for_file(filepath, (10, 11, 12))
- dbram.set_stat_converted_for_file(filepath, (1, 2, 3))
- dbram.set_stat_edited_for_file(filepath, (4, 5, 6))
- dbram.set_sidecar_for_file(filepath, "FUBAR", (20, 21, 22))
- dbram.set_detected_text_for_uuid("FUBAR", json.dumps([["bar", 0.5]]))
- dbram.set_metadata_for_file(filepath, METADATA_DATA)
+ # set values
+ uuid = "FOOBAR"
+ record = dbram.create_file_record(filepath, uuid)
+ record.photoinfo = INFO_DATA
+ record.exifdata = EXIF_DATA
+ record.digest = DIGEST_DATA
+ record.src_sig = (7, 8, 9)
+ record.dest_sig = (10, 11, 12)
- assert dbram.get_uuid_for_file(filepath_lower) == "FUBAR"
- assert dbram.get_info_for_uuid("FUBAR") == INFO_DATA2
- assert dbram.get_exifdata_for_file(filepath) == EXIF_DATA2
- assert dbram.get_stat_orig_for_file(filepath) == (7, 8, 9)
- assert dbram.get_stat_exif_for_file(filepath) == (10, 11, 12)
- assert dbram.get_stat_converted_for_file(filepath) == (1, 2, 3)
- assert dbram.get_stat_edited_for_file(filepath) == (4, 5, 6)
- assert dbram.get_sidecar_for_file(filepath) == ("FUBAR", (20, 21, 22))
- assert dbram.get_previous_uuids() == ["FUBAR"]
- assert json.loads(dbram.get_detected_text_for_uuid("FUBAR")) == [["bar", 0.5]]
- assert dbram.get_metadata_for_file(filepath) == METADATA_DATA
+ assert record.photoinfo == INFO_DATA
+ assert record.exifdata == EXIF_DATA
+ assert record.digest == DIGEST_DATA
+ assert record.src_sig == (7, 8, 9)
+ assert record.dest_sig == (10, 11, 12)
+ assert record.uuid == uuid
+
+ # change some values
+ record.photoinfo = INFO_DATA2
+ record.digest = DIGEST_DATA2
+ assert record.photoinfo == INFO_DATA2
+ assert record.digest == DIGEST_DATA2
+ assert record.exifdata == EXIF_DATA
+
+
+def test_export_db_temp():
+ """test ExportDBTemp"""
+ tempdir = tempfile.TemporaryDirectory(prefix="osxphotos_")
+ filepath = os.path.join(tempdir.name, "test.JPG")
+ filepath_lower = os.path.join(tempdir.name, "test.jpg")
+
+ dbram = ExportDBTemp()
+ assert dbram.was_created
+ assert not dbram.was_upgraded
+ assert dbram.version == OSXPHOTOS_EXPORTDB_VERSION
+
+ # set values
+ uuid = "FOOBAR"
+ record = dbram.create_file_record(filepath, uuid)
+ record.photoinfo = INFO_DATA
+ record.exifdata = EXIF_DATA
+ record.digest = DIGEST_DATA
+ record.src_sig = (7, 8, 9)
+ record.dest_sig = (10, 11, 12)
+
+ assert record.photoinfo == INFO_DATA
+ assert record.exifdata == EXIF_DATA
+ assert record.digest == DIGEST_DATA
+ assert record.src_sig == (7, 8, 9)
+ assert record.dest_sig == (10, 11, 12)
+ assert record.uuid == uuid
+
+ # change some values
+ record.photoinfo = INFO_DATA2
+ record.digest = DIGEST_DATA2
+ assert record.photoinfo == INFO_DATA2
+ assert record.digest == DIGEST_DATA2
+ assert record.exifdata == EXIF_DATA
dbram.close()
+
+
+def test_export_record():
+ """Test ExportRecord"""
+ tempdir = tempfile.TemporaryDirectory(prefix="osxphotos_")
+ filepath = os.path.join(tempdir.name, "test.JPG")
+ uuid = "FOOBAR"
+ dbname = os.path.join(tempdir.name, ".osxphotos_export.db")
+ db = ExportDB(dbname, tempdir.name)
+
+ assert db.get_file_record(filepath) is None
+ record = db.create_file_record(filepath, uuid)
+ assert record.uuid == uuid
+ assert record.filepath == pathlib.Path(filepath).name
+ assert record.filepath_normalized == pathlib.Path(filepath).name.lower()
+ record.src_sig = (1, 2, 3.0)
+ assert record.src_sig == (1, 2, 3)
+ record.dest_sig = (4, 5, 6.0)
+ assert record.dest_sig == (4, 5, 6)
+ record.digest = DIGEST_DATA
+ assert record.digest == DIGEST_DATA
+ record.exifdata = EXIF_DATA
+ assert record.exifdata == EXIF_DATA
+ record.photoinfo = INFO_DATA
+ assert record.photoinfo == INFO_DATA
+ record.export_options = 1
+ assert record.export_options == 1
+
+ # close and re-open
+ db.close()
+ db2 = ExportDB(dbname, tempdir.name)
+ record = db2.get_file_record(filepath)
+ assert record.uuid == uuid
+ assert record.filepath == pathlib.Path(filepath).name
+ assert record.filepath_normalized == pathlib.Path(filepath).name.lower()
+ assert record.src_sig == (1, 2, 3)
+ assert record.dest_sig == (4, 5, 6)
+ assert record.digest == "FIZZ"
+ assert record.exifdata == EXIF_DATA
+ assert record.photoinfo == INFO_DATA
+ assert record.export_options == 1
+
+
+def test_export_record_context_manager():
+ """Test ExportRecord as context manager"""
+ tempdir = tempfile.TemporaryDirectory(prefix="osxphotos_")
+ filepath = os.path.join(tempdir.name, "test.JPG")
+ uuid = "FOOBAR_CONTEXT"
+ dbname = os.path.join(tempdir.name, ".osxphotos_export.db")
+ db = ExportDB(dbname, tempdir.name)
+
+ assert db.get_file_record(filepath) is None
+
+ with db.create_file_record(filepath, uuid) as record:
+ record.src_sig = (1, 2, 3.0)
+ record.dest_sig = (4, 5, 6.0)
+ record.digest = DIGEST_DATA
+ record.exifdata = EXIF_DATA
+ record.photoinfo = INFO_DATA
+ record.export_options = 1
+
+ assert record.uuid == uuid
+ assert record.filepath == pathlib.Path(filepath).name
+ assert record.filepath_normalized == pathlib.Path(filepath).name.lower()
+ assert record.src_sig == (1, 2, 3)
+ assert record.dest_sig == (4, 5, 6)
+ assert record.digest == "FIZZ"
+ assert record.exifdata == EXIF_DATA
+ assert record.photoinfo == INFO_DATA
+ assert record.export_options == 1
+
+ # close and re-open
+ db.close()
+ db2 = ExportDB(dbname, tempdir.name)
+ record = db2.get_file_record(filepath)
+ assert record.uuid == uuid
+ assert record.filepath == pathlib.Path(filepath).name
+ assert record.filepath_normalized == pathlib.Path(filepath).name.lower()
+ assert record.src_sig == (1, 2, 3)
+ assert record.dest_sig == (4, 5, 6)
+ assert record.digest == "FIZZ"
+ assert record.exifdata == EXIF_DATA
+ assert record.photoinfo == INFO_DATA
+ assert record.export_options == 1
+
+
+def test_export_record_context_manager_error():
+ """Test ExportRecord as context manager doesn't commit data on error"""
+ tempdir = tempfile.TemporaryDirectory(prefix="osxphotos_")
+ filepath = os.path.join(tempdir.name, "test_boom.JPG")
+ uuid = "FOOBAR_CONTEXT_BOOM"
+ dbname = os.path.join(tempdir.name, ".osxphotos_export.db")
+ db = ExportDB(dbname, tempdir.name)
+
+ try:
+ with db.create_file_record(filepath, uuid) as record:
+ record.src_sig = (1, 2, 3.0)
+ record.dest_sig = (4, 5, 6.0)
+ record.digest = DIGEST_DATA
+ record.exifdata = EXIF_DATA
+ record.photoinfo = INFO_DATA
+ raise Exception("Boom")
+ except Exception:
+ pass
+
+ record = db.get_file_record(filepath)
+ assert record.uuid == uuid
+ assert record.filepath == pathlib.Path(filepath).name
+ assert record.filepath_normalized == pathlib.Path(filepath).name.lower()
+ assert record.src_sig == (None, None, None)
+ assert record.dest_sig == (None, None, None)
+ assert record.digest is None
+ assert record.exifdata is None
+ assert record.photoinfo is None
+
+
+def test_get_export_db_version():
+ """Test export_db_get_version"""
+ tempdir = tempfile.TemporaryDirectory(prefix="osxphotos_")
+ dbname = os.path.join(tempdir.name, ".osxphotos_export.db")
+ db = ExportDB(dbname, tempdir.name)
+
+ osxphotos_ver, export_db_ver = export_db_get_version(dbname)
+ assert osxphotos_ver == __version__
+ assert export_db_ver == OSXPHOTOS_EXPORTDB_VERSION