Exportdb refactor (#638)

* Working on export_db refactor

* Added exportdb command, removed logic for missing export_db, #630

* Updated tests

* updated docs

* Added --config-only, #606

* Added validation for --exportdb

* Added --info to exportdb command

* Fixed exportdb --touch-file to migrate database if needed

* Added exportdb --migrate
This commit is contained in:
Rhet Turnbull
2022-02-21 10:15:01 -07:00
committed by GitHub
parent d8204e65eb
commit ecbd370a47
20 changed files with 1673 additions and 1060 deletions

View File

@@ -1732,7 +1732,7 @@ Substitution Description
{lf} A line feed: '\n', alias for {newline}
{cr} A carriage return: '\r'
{crlf} a carriage return + line feed: '\r\n'
{osxphotos_version} The osxphotos version, e.g. '0.45.12'
{osxphotos_version} The osxphotos version, e.g. '0.46.0'
{osxphotos_cmd_line} The full command line used to run osxphotos
The following substitutions may result in multiple values. Thus if specified for
@@ -3636,7 +3636,7 @@ The following template field substitutions are availabe for use the templating s
|{lf}|A line feed: '\n', alias for {newline}|
|{cr}|A carriage return: '\r'|
|{crlf}|a carriage return + line feed: '\r\n'|
|{osxphotos_version}|The osxphotos version, e.g. '0.45.12'|
|{osxphotos_version}|The osxphotos version, e.g. '0.46.0'|
|{osxphotos_cmd_line}|The full command line used to run osxphotos|
|{album}|Album(s) photo is contained in|
|{folder_album}|Folder path + album photo is contained in. e.g. 'Folder/Subfolder/Album' or just 'Album' if no enclosing folder|
@@ -3736,7 +3736,7 @@ Args:
Returns: ExportResults instance
*Note*: to use dry run mode, you must set options.dry_run=True and also pass in memory version of export_db, and no-op fileutil (e.g. ExportDBInMemory and FileUtilNoOp) in options.export_db and options.fileutil respectively.
*Note*: to use dry run mode, you must set options.dry_run=True and also pass in memory version of export_db, and no-op fileutil (e.g. `ExportDBInMemory` and `FileUtilNoOp`) in options.export_db and options.fileutil respectively.
#### `ExportOptions`
@@ -3752,7 +3752,7 @@ Attributes:
- exiftool_flags (list of str): optional list of flags to pass to exiftool when using exiftool option, e.g ["-m", "-F"]
- exiftool: (bool, default = False): if True, will use exiftool to write metadata to export file
- export_as_hardlink: (bool, default=False): if True, will hardlink files instead of copying them
- export_db: (ExportDB_ABC): instance of a class that conforms to ExportDB_ABC with methods for getting/setting data related to exported files to compare update state
- export_db: (ExportDB): instance of a class that conforms to ExportDB with methods for getting/setting data related to exported files to compare update state
- fileutil: (FileUtilABC): class that conforms to FileUtilABC with various file utilities
- ignore_date_modified (bool): for use with sidecar and exiftool; if True, sets EXIF:ModifyDate to EXIF:DateTimeOriginal even if date_modified is set
- ignore_signature (bool, default=False): ignore file signature when used with update (look only at filename)

View File

@@ -8,3 +8,4 @@ sphinx_rtd_theme
twine
wheel
Sphinx
pdbpp

View File

@@ -1,4 +1,4 @@
# Sphinx build info version 1
# This file hashes the configuration used when building these files. When it is not found, a full rebuild will be done.
config: 30f35e310e3c2ebf8f561c37c515c685
config: 5b6236594d7900f08d9a1afda487bf3c
tags: 645f666f9bcd5a90fca523b33c5a78b7

View File

@@ -1,6 +1,6 @@
var DOCUMENTATION_OPTIONS = {
URL_ROOT: document.getElementById("documentation_options").getAttribute('data-url_root'),
VERSION: '0.45.12',
VERSION: '0.46.0',
LANGUAGE: 'None',
COLLAPSE_INDEX: false,
BUILDER: 'html',

View File

@@ -6,7 +6,7 @@
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" /><meta name="generator" content="Docutils 0.17.1: http://docutils.sourceforge.net/" />
<title>osxphotos command line interface (CLI) &#8212; osxphotos 0.45.12 documentation</title>
<title>osxphotos command line interface (CLI) &#8212; osxphotos 0.46.0 documentation</title>
<link rel="stylesheet" type="text/css" href="_static/pygments.css" />
<link rel="stylesheet" type="text/css" href="_static/alabaster.css" />
<script data-url_root="./" id="documentation_options" src="_static/documentation_options.js"></script>

View File

@@ -5,7 +5,7 @@
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Index &#8212; osxphotos 0.45.12 documentation</title>
<title>Index &#8212; osxphotos 0.46.0 documentation</title>
<link rel="stylesheet" type="text/css" href="_static/pygments.css" />
<link rel="stylesheet" type="text/css" href="_static/alabaster.css" />
<script data-url_root="./" id="documentation_options" src="_static/documentation_options.js"></script>

View File

@@ -6,7 +6,7 @@
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" /><meta name="generator" content="Docutils 0.17.1: http://docutils.sourceforge.net/" />
<title>Welcome to osxphotoss documentation! &#8212; osxphotos 0.45.12 documentation</title>
<title>Welcome to osxphotoss documentation! &#8212; osxphotos 0.46.0 documentation</title>
<link rel="stylesheet" type="text/css" href="_static/pygments.css" />
<link rel="stylesheet" type="text/css" href="_static/alabaster.css" />
<script data-url_root="./" id="documentation_options" src="_static/documentation_options.js"></script>

View File

@@ -6,7 +6,7 @@
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" /><meta name="generator" content="Docutils 0.17.1: http://docutils.sourceforge.net/" />
<title>osxphotos &#8212; osxphotos 0.45.12 documentation</title>
<title>osxphotos &#8212; osxphotos 0.46.0 documentation</title>
<link rel="stylesheet" type="text/css" href="_static/pygments.css" />
<link rel="stylesheet" type="text/css" href="_static/alabaster.css" />
<script data-url_root="./" id="documentation_options" src="_static/documentation_options.js"></script>

View File

@@ -6,7 +6,7 @@
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" /><meta name="generator" content="Docutils 0.17.1: http://docutils.sourceforge.net/" />
<title>osxphotos package &#8212; osxphotos 0.45.12 documentation</title>
<title>osxphotos package &#8212; osxphotos 0.46.0 documentation</title>
<link rel="stylesheet" type="text/css" href="_static/pygments.css" />
<link rel="stylesheet" type="text/css" href="_static/alabaster.css" />
<script data-url_root="./" id="documentation_options" src="_static/documentation_options.js"></script>

View File

@@ -5,7 +5,7 @@
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Search &#8212; osxphotos 0.45.12 documentation</title>
<title>Search &#8212; osxphotos 0.46.0 documentation</title>
<link rel="stylesheet" type="text/css" href="_static/pygments.css" />
<link rel="stylesheet" type="text/css" href="_static/alabaster.css" />

View File

@@ -1,7 +1,7 @@
from ._constants import AlbumSortOrder
from ._version import __version__
from .exiftool import ExifTool
from .export_db import ExportDB, ExportDBInMemory, ExportDBNoOp
from .export_db import ExportDB
from .fileutil import FileUtil, FileUtilNoOp
from .momentinfo import MomentInfo
from .personinfo import PersonInfo
@@ -25,8 +25,7 @@ __all__ = [
"CommentInfo",
"ExifTool",
"ExportDB",
"ExportDBInMemory",
"ExportDBNoOp",
"ExportDBTemp",
"ExportOptions",
"ExportResults",
"FileUtil",

View File

@@ -1,3 +1,3 @@
""" version info """
__version__ = "0.45.12"
__version__ = "0.46.0"

View File

@@ -65,6 +65,16 @@ from .crash_reporter import crash_reporter
from .datetime_formatter import DateTimeFormatter
from .exiftool import get_exiftool_path
from .export_db import ExportDB, ExportDBInMemory
from .export_db_utils import (
OSXPHOTOS_EXPORTDB_VERSION,
export_db_check_signatures,
export_db_get_last_run,
export_db_get_version,
export_db_save_config_to_file,
export_db_touch_files,
export_db_update_signatures,
export_db_vacuum,
)
from .fileutil import FileUtil, FileUtilNoOp
from .path_utils import is_valid_filepath, sanitize_filename, sanitize_filepath
from .photoexporter import ExportOptions, ExportResults, PhotoExporter
@@ -274,6 +284,24 @@ class FunctionCall(click.ParamType):
return (function, value)
class ExportDBType(click.ParamType):
name = "EXPORTDB"
def convert(self, value, param, ctx):
try:
export_db_name = pathlib.Path(value)
if export_db_name.is_dir():
raise click.BadParameter(f"{value} is a directory")
if export_db_name.is_file():
# verify it's actually an osxphotos export_db
# export_db_get_version will raise an error if it's not valid
osxphotos_ver, export_db_ver = export_db_get_version(value)
return value
except Exception:
self.fail(f"{value} exists but is not a valid osxphotos export database. ")
class IncompatibleQueryOptions(Exception):
pass
@@ -1160,7 +1188,7 @@ def cli(ctx, db, json_, debug):
f"If --exportdb is not specified, export database will be saved to '{OSXPHOTOS_EXPORT_DB}' "
"in the export directory. If --exportdb is specified, it will be saved to the specified file. "
),
type=click.Path(),
type=ExportDBType(),
)
@click.option(
"--load-config",
@@ -1182,9 +1210,15 @@ def cli(ctx, db, json_, debug):
required=False,
metavar="<config file path>",
default=None,
help=("Save options to file for use with --load-config. File format is TOML."),
help="Save options to file for use with --load-config. File format is TOML. "
"See also --config-only.",
type=click.Path(),
)
@click.option(
"--config-only",
is_flag=True,
help="If specified, saves the config file but does not export any files; must be used with --save-config.",
)
@click.option(
"--beta",
is_flag=True,
@@ -1351,6 +1385,7 @@ def export(
exportdb,
load_config,
save_config,
config_only,
is_reference,
beta,
in_album,
@@ -1413,7 +1448,7 @@ def export(
cfg = ConfigOptions(
"export",
locals(),
ignore=["ctx", "cli_obj", "dest", "load_config", "save_config"],
ignore=["ctx", "cli_obj", "dest", "load_config", "save_config", "config_only"],
)
global VERBOSE
@@ -1620,6 +1655,14 @@ def export(
)
sys.exit(1)
if config_only and not save_config:
click.secho(
"--config-only must be used with --save-config",
fg=CLI_COLOR_ERROR,
err=True,
)
sys.exit(1)
if all(x in [s.lower() for s in sidecar] for x in ["json", "exiftool"]):
click.echo(
click.style(
@@ -1644,8 +1687,11 @@ def export(
sys.exit(1)
if save_config:
verbose_(f"Saving options to file {save_config}")
verbose_(f"Saving options to config file '{save_config}'")
cfg.write_to_file(save_config)
if config_only:
click.echo(f"Saved config file to '{save_config}'")
sys.exit(0)
# set defaults for options that need them
jpeg_quality = DEFAULT_JPEG_QUALITY if jpeg_quality is None else jpeg_quality
@@ -1797,6 +1843,9 @@ def export(
f"Upgraded export database {export_db_path} from version {upgraded[0]} to {upgraded[1]}"
)
# save config to export_db
export_db.set_config(cfg.write_to_str())
photosdb = osxphotos.PhotosDB(dbfile=db, verbose=verbose_, exiftool=exiftool_path)
# enable beta features if requested
@@ -4703,6 +4752,210 @@ def run(python_file):
run_path(python_file, run_name="__main__")
@cli.command(name="exportdb", hidden=OSXPHOTOS_HIDDEN)
@click.option("--version", is_flag=True, help="Print export database version and exit.")
@click.option("--vacuum", is_flag=True, help="Run VACUUM to defragment the database.")
@click.option(
"--check-signatures",
is_flag=True,
help="Check signatures for all exported photos in the database to find signatures that don't match.",
)
@click.option(
"--update-signatures",
is_flag=True,
help="Update signatures for all exported photos in the database to match on-disk signatures.",
)
@click.option(
"--touch-file",
is_flag=True,
help="Touch files on disk to match created date in Photos library and update export database signatures",
)
@click.option(
"--last-run",
is_flag=True,
help="Show last run osxphotos commands used with this database.",
)
@click.option(
"--save-config",
metavar="CONFIG_FILE",
help="Save last run configuration to TOML file for use by --load-config.",
)
@click.option(
"--info",
metavar="FILE_PATH",
nargs=1,
help="Print information about FILE_PATH contained in the database.",
)
@click.option(
"--migrate",
is_flag=True,
help="Migrate (if needed) export database to current version."
)
@click.option(
"--export-dir",
help="Optional path to export directory (if not parent of export database).",
type=click.Path(exists=True, file_okay=False, dir_okay=True),
)
@click.option("--verbose", "-V", is_flag=True, help="Print verbose output.")
@click.option(
"--dry-run",
is_flag=True,
help="Run in dry-run mode (don't actually update files), e.g. for use with --update-signatures.",
)
@click.argument("export_db", metavar="EXPORT_DATABASE", type=click.Path(exists=True))
def exportdb(
version,
vacuum,
check_signatures,
update_signatures,
touch_file,
last_run,
save_config,
info,
migrate,
export_dir,
verbose,
dry_run,
export_db,
):
"""Utilities for working with the osxphotos export database"""
export_db = pathlib.Path(export_db)
if export_db.is_dir():
# assume it's the export folder
export_db = export_db / OSXPHOTOS_EXPORT_DB
if not export_db.is_file():
print(
f"[red]Error: {OSXPHOTOS_EXPORT_DB} missing from {export_db.parent}[/red]"
)
sys.exit(1)
export_dir = export_dir or export_db.parent
sub_commands = [
version,
check_signatures,
update_signatures,
touch_file,
last_run,
bool(save_config),
bool(info),
migrate,
]
if sum(sub_commands) > 1:
print(f"[red]Only a single sub-command may be specified at a time[/red]")
sys.exit(1)
if version:
try:
osxphotos_ver, export_db_ver = export_db_get_version(export_db)
except Exception as e:
print(f"[red]Error: could not read version from {export_db}: {e}[/red]")
sys.exit(1)
else:
print(
f"osxphotos version: {osxphotos_ver}, export database version: {export_db_ver}"
)
sys.exit(0)
if vacuum:
try:
start_size = pathlib.Path(export_db).stat().st_size
export_db_vacuum(export_db)
except Exception as e:
print(f"[red]Error: {e}[/red]")
sys.exit(1)
else:
print(
f"Vacuumed {export_db}! {start_size} bytes -> {pathlib.Path(export_db).stat().st_size} bytes"
)
sys.exit(0)
if update_signatures:
try:
updated, skipped = export_db_update_signatures(
export_db, export_dir, verbose, dry_run
)
except Exception as e:
print(f"[red]Error: {e}[/red]")
sys.exit(1)
else:
print(f"Done. Updated {updated} files, skipped {skipped} files.")
sys.exit(0)
if last_run:
try:
last_run_info = export_db_get_last_run(export_db)
except Exception as e:
print(f"[red]Error: {e}[/red]")
sys.exit(1)
else:
print(f"last run at {last_run_info[0]}:")
print(f"osxphotos {last_run_info[1]}")
sys.exit(0)
if save_config:
try:
export_db_save_config_to_file(export_db, save_config)
except Exception as e:
print(f"[red]Error: {e}[/red]")
sys.exit(1)
else:
print(f"Saved configuration to {save_config}")
sys.exit(0)
if check_signatures:
try:
matched, notmatched, skipped = export_db_check_signatures(
export_db, export_dir, verbose=verbose
)
except Exception as e:
print(f"[red]Error: {e}[/red]")
sys.exit(1)
else:
print(
f"Done. Found {matched} matching signatures and {notmatched} signatures that don't match. Skipped {skipped} missing files."
)
sys.exit(0)
if touch_file:
try:
touched, not_touched, skipped = export_db_touch_files(
export_db, export_dir, verbose=verbose, dry_run=dry_run
)
except Exception as e:
print(f"[red]Error: {e}[/red]")
sys.exit(1)
else:
print(
f"Done. Touched {touched} files, skipped {not_touched} up to date files, skipped {skipped} missing files."
)
sys.exit(0)
if info:
exportdb = ExportDB(export_db, export_dir)
try:
info_rec = exportdb.get_file_record(info)
except Exception as e:
print(f"[red]Error: {e}[/red]")
sys.exit(1)
else:
if info_rec:
print(info_rec.asdict())
else:
print(f"[red]File '{info}' not found in export database[/red]")
sys.exit(0)
if migrate:
exportdb = ExportDB(export_db, export_dir)
upgraded = exportdb.was_upgraded
if upgraded:
print(
f"Migrated export database {export_db} from version {upgraded[0]} to {upgraded[1]}"
)
else:
print(f"Export database {export_db} is already at latest version {OSXPHOTOS_EXPORTDB_VERSION}")
sys.exit(0)
def _query_options_from_kwargs(**kwargs) -> QueryOptions:
"""Validate query options and create a QueryOptions instance"""
# sanity check input args

View File

@@ -134,18 +134,12 @@ class ConfigOptions:
filename: full path to TOML file to write; filename will be overwritten if it exists
"""
# todo: add overwrite and option to merge contents already in TOML file (under different [section] with new content)
data = {}
for attr in sorted(self._attrs.keys()):
val = getattr(self, attr)
if val in [False, ()]:
val = None
else:
val = list(val) if type(val) == tuple else val
data[attr] = val
with open(filename, "w") as fd:
toml.dump({self._name: data}, fd)
toml.dump(self._get_toml_dict(), fd)
def write_to_str(self) -> str:
"""Write self to TOML str"""
return toml.dumps(self._get_toml_dict())
def load_from_file(self, filename, override=False):
"""Load options from a TOML file.
@@ -178,3 +172,17 @@ class ConfigOptions:
def asdict(self):
return {attr: getattr(self, attr) for attr in sorted(self._attrs.keys())}
def _get_toml_dict(self):
"""Return dict for writing to TOML file"""
data = {}
for attr in sorted(self._attrs.keys()):
val = getattr(self, attr)
if val in [False, ()]:
val = None
else:
val = list(val) if type(val) == tuple else val
data[attr] = val
return {self._name: data}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,264 @@
""" Utility functions for working with export_db """
import pathlib
import sqlite3
from typing import Optional, Tuple, Union
import datetime
import os
import toml
from rich import print
from ._constants import OSXPHOTOS_EXPORT_DB
from ._version import __version__
from .export_db import OSXPHOTOS_EXPORTDB_VERSION, ExportDB
from .fileutil import FileUtil
from .photosdb import PhotosDB
__all__ = [
"export_db_check_signatures",
"export_db_get_last_run",
"export_db_get_version",
"export_db_save_config_to_file",
"export_db_touch_files",
"export_db_update_signatures",
"export_db_vacuum",
]
def isotime_from_ts(ts: int) -> str:
"""Convert timestamp to ISO 8601 time string"""
return datetime.datetime.fromtimestamp(ts).isoformat()
def export_db_get_version(
dbfile: Union[str, pathlib.Path]
) -> Tuple[Optional[int], Optional[int]]:
"""returns version from export database as tuple of (osxphotos version, export_db version)"""
conn = sqlite3.connect(str(dbfile))
c = conn.cursor()
row = c.execute(
"SELECT osxphotos, exportdb FROM version ORDER BY id DESC LIMIT 1;"
).fetchone()
if row:
return (row[0], row[1])
return (None, None)
def export_db_vacuum(dbfile: Union[str, pathlib.Path]) -> None:
"""Vacuum export database"""
conn = sqlite3.connect(str(dbfile))
c = conn.cursor()
c.execute("VACUUM;")
conn.commit()
def export_db_update_signatures(
dbfile: Union[str, pathlib.Path],
export_dir: Union[str, pathlib.Path],
verbose: bool = False,
dry_run: bool = False,
) -> Tuple[int, int]:
"""Update signatures for all files found in the export database to match what's on disk
Returns: tuple of (updated, skipped)
"""
export_dir = pathlib.Path(export_dir)
fileutil = FileUtil
conn = sqlite3.connect(str(dbfile))
c = conn.cursor()
c.execute("SELECT filepath_normalized, filepath FROM export_data;")
rows = c.fetchall()
updated = 0
skipped = 0
for row in rows:
filepath_normalized = row[0]
filepath = row[1]
filepath = export_dir / filepath
if not os.path.exists(filepath):
skipped += 1
if verbose:
print(f"[dark_orange]Skipping missing file[/dark_orange]: '{filepath}'")
continue
updated += 1
file_sig = fileutil.file_sig(filepath)
if verbose:
print(f"[green]Updating signature for[/green]: '{filepath}'")
if not dry_run:
c.execute(
"UPDATE export_data SET dest_mode = ?, dest_size = ?, dest_mtime = ? WHERE filepath_normalized = ?;",
(file_sig[0], file_sig[1], file_sig[2], filepath_normalized),
)
if not dry_run:
conn.commit()
return (updated, skipped)
def export_db_get_last_run(
export_db: Union[str, pathlib.Path]
) -> Tuple[Optional[str], Optional[str]]:
"""Get last run from export database"""
conn = sqlite3.connect(str(export_db))
c = conn.cursor()
row = c.execute(
"SELECT datetime, args FROM runs ORDER BY id DESC LIMIT 1;"
).fetchone()
if row:
return row[0], row[1]
return None, None
def export_db_save_config_to_file(
export_db: Union[str, pathlib.Path], config_file: Union[str, pathlib.Path]
) -> None:
"""Save export_db last run config to file"""
export_db = pathlib.Path(export_db)
config_file = pathlib.Path(config_file)
conn = sqlite3.connect(str(export_db))
c = conn.cursor()
row = c.execute("SELECT config FROM config ORDER BY id DESC LIMIT 1;").fetchone()
if not row:
return ValueError("No config found in export_db")
with config_file.open("w") as f:
f.write(row[0])
def export_db_check_signatures(
dbfile: Union[str, pathlib.Path],
export_dir: Union[str, pathlib.Path],
verbose: bool = False,
) -> Tuple[int, int, int]:
"""Check signatures for all files found in the export database to verify what matches the on disk files
Returns: tuple of (updated, skipped)
"""
export_dir = pathlib.Path(export_dir)
fileutil = FileUtil
conn = sqlite3.connect(str(dbfile))
c = conn.cursor()
c.execute("SELECT filepath_normalized, filepath FROM export_data;")
rows = c.fetchall()
exportdb = ExportDB(dbfile, export_dir)
matched = 0
notmatched = 0
skipped = 0
for row in rows:
filepath_normalized = row[0]
filepath = row[1]
filepath = export_dir / filepath
if not filepath.exists():
skipped += 1
if verbose:
print(f"[dark_orange]Skipping missing file[/dark_orange]: '{filepath}'")
continue
file_sig = fileutil.file_sig(filepath)
file_rec = exportdb.get_file_record(filepath)
if file_rec.dest_sig == file_sig:
matched += 1
if verbose:
print(f"[green]Signatures matched[/green]: '{filepath}'")
else:
notmatched += 1
if verbose:
print(f"[deep_pink3]Signatures do not match[/deep_pink3]: '{filepath}'")
return (matched, notmatched, skipped)
def export_db_touch_files(
dbfile: Union[str, pathlib.Path],
export_dir: Union[str, pathlib.Path],
verbose: bool = False,
dry_run: bool = False,
) -> Tuple[int, int, int]:
"""Touch files on disk to match the Photos library created date
Returns: tuple of (touched, not_touched, skipped)
"""
export_dir = pathlib.Path(export_dir)
# open and close exportdb to ensure it gets migrated
exportdb = ExportDB(dbfile, export_dir)
upgraded = exportdb.was_upgraded
if upgraded and verbose:
print(
f"Upgraded export database {dbfile} from version {upgraded[0]} to {upgraded[1]}"
)
exportdb.close()
conn = sqlite3.connect(str(dbfile))
c = conn.cursor()
# get most recent config
row = c.execute("SELECT config FROM config ORDER BY id DESC LIMIT 1;").fetchone()
if row:
config = toml.loads(row[0])
try:
photos_db_path = config["export"].get("db", None)
except KeyError:
photos_db_path = None
else:
# TODO: parse the runs table to get the last --db
# in the mean time, photos_db_path = None will use the default library
photos_db_path = None
verbose_ = print if verbose else lambda *args, **kwargs: None
photosdb = PhotosDB(dbfile=photos_db_path, verbose=verbose_)
exportdb = ExportDB(dbfile, export_dir)
c.execute(
"SELECT filepath_normalized, filepath, uuid, dest_mode, dest_size FROM export_data;"
)
rows = c.fetchall()
touched = 0
not_touched = 0
skipped = 0
for row in rows:
filepath_normalized = row[0]
filepath = row[1]
filepath = export_dir / filepath
uuid = row[2]
dest_mode = row[3]
dest_size = row[4]
if not filepath.exists():
skipped += 1
if verbose:
print(
f"[dark_orange]Skipping missing file (not in export directory)[/dark_orange]: '{filepath}'"
)
continue
photo = photosdb.get_photo(uuid)
if not photo:
skipped += 1
if verbose:
print(
f"[dark_orange]Skipping missing photo (did not find in Photos Library)[/dark_orange]: '{filepath}' ({uuid})"
)
continue
ts = int(photo.date.timestamp())
stat = os.stat(str(filepath))
mtime = stat.st_mtime
if mtime == ts:
not_touched += 1
if verbose:
print(
f"[green]Skipping file (timestamp matches)[/green]: '{filepath}' [dodger_blue1]{isotime_from_ts(ts)} ({ts})[/dodger_blue1]"
)
continue
touched += 1
if verbose:
print(
f"[deep_pink3]Touching file[/deep_pink3]: '{filepath}' "
f"[dodger_blue1]{isotime_from_ts(mtime)} ({mtime}) -> {isotime_from_ts(ts)} ({ts})[/dodger_blue1]"
)
if not dry_run:
os.utime(str(filepath), (ts, ts))
rec = exportdb.get_file_record(filepath)
rec.dest_sig = (dest_mode, dest_size, ts)
return (touched, not_touched, skipped)

View File

@@ -143,7 +143,7 @@ class FileUtilMacOS(FileUtilABC):
@classmethod
def utime(cls, path, times):
"""Set the access and modified time of path."""
os.utime(path, times)
os.utime(path, times=times)
@classmethod
def cmp(cls, f1, f2, mtime1=None):
@@ -187,7 +187,7 @@ class FileUtilMacOS(FileUtilABC):
@classmethod
def file_sig(cls, f1):
"""return os.stat signature for file f1"""
"""return os.stat signature for file f1 as tuple of (mode, size, mtime)"""
return cls._sig(os.stat(f1))
@classmethod

View File

@@ -33,7 +33,7 @@ from ._constants import (
from ._version import __version__
from .datetime_utils import datetime_tz_to_utc
from .exiftool import ExifTool, exiftool_can_write
from .export_db import ExportDB_ABC, ExportDBNoOp
from .export_db import ExportDB, ExportDBTemp
from .fileutil import FileUtil
from .photokit import (
PHOTOS_VERSION_CURRENT,
@@ -81,7 +81,7 @@ class ExportOptions:
exiftool_flags (list of str): optional list of flags to pass to exiftool when using exiftool option, e.g ["-m", "-F"]
exiftool: (bool, default = False): if True, will use exiftool to write metadata to export file
export_as_hardlink: (bool, default=False): if True, will hardlink files instead of copying them
export_db: (ExportDB_ABC): instance of a class that conforms to ExportDB_ABC with methods for getting/setting data related to exported files to compare update state
export_db: (ExportDB): instance of a class that conforms to ExportDB with methods for getting/setting data related to exported files to compare update state
face_regions: (bool, default=True): if True, will export face regions
fileutil: (FileUtilABC): class that conforms to FileUtilABC with various file utilities
force_update: (bool, default=False): if True, will export photo if any metadata has changed but export otherwise would not be triggered (e.g. metadata changed but not using exiftool)
@@ -128,7 +128,7 @@ class ExportOptions:
exiftool_flags: Optional[List] = None
exiftool: bool = False
export_as_hardlink: bool = False
export_db: Optional[ExportDB_ABC] = None
export_db: Optional[ExportDB] = None
face_regions: bool = True
fileutil: Optional[FileUtil] = None
force_update: bool = False
@@ -164,6 +164,12 @@ class ExportOptions:
def asdict(self):
return asdict(self)
@property
def bit_flags(self):
"""Return bit flags representing options that affect export"""
# currently only exiftool makes a difference
return self.exiftool << 1
class StagedFiles:
"""Represents files staged for export"""
@@ -403,8 +409,8 @@ class PhotoExporter:
"Cannot use export_as_hardlink with download_missing or use_photos_export"
)
# when called from export(), won't get an export_db, so use no-op version
options.export_db = options.export_db or ExportDBNoOp()
# when called from export(), won't get an export_db, so use temp version
options.export_db = options.export_db or ExportDBTemp()
# ensure there's a FileUtil class to use
options.fileutil = options.fileutil or FileUtil
@@ -443,6 +449,7 @@ class PhotoExporter:
# get the right destination path depending on options.update, etc.
dest = self._get_dest_path(src, dest, options)
self._render_options.filepath = str(dest)
all_results = ExportResults()
@@ -545,23 +552,20 @@ class PhotoExporter:
all_results += self._write_sidecar_files(dest=dest, options=options)
if options.touch_file:
all_results += self._touch_files(all_results, options)
return all_results
def _touch_files(
self, results: ExportResults, options: ExportOptions
) -> ExportResults:
"""touch file date/time to match photo creation date/time"""
def _touch_files(self, touch_files: List, options: ExportOptions) -> ExportResults:
"""touch file date/time to match photo creation date/time; only touches files if needed"""
fileutil = options.fileutil
touch_files = set(results.to_touch)
touch_results = ExportResults()
for touch_file in touch_files:
touch_results = []
for touch_file in set(touch_files):
ts = int(self.photo.date.timestamp())
fileutil.utime(touch_file, (ts, ts))
touch_results.touched.append(touch_file)
return touch_results
stat = os.stat(touch_file)
if stat.st_mtime != ts:
if not options.dry_run:
fileutil.utime(touch_file, (ts, ts))
touch_results.append(touch_file)
return ExportResults(touched=touch_results)
def _get_edited_filename(self, original_filename):
"""Return the filename for the exported edited photo
@@ -610,21 +614,11 @@ class PhotoExporter:
):
return pathlib.Path(increment_filename(dest))
# if update and file exists, need to check to see if it's the write file by checking export db
# if update and file exists, need to check to see if it's the right file by checking export db
if (options.update or options.force_update) and dest.exists() and src:
export_db = options.export_db
fileutil = options.fileutil
# destination exists, check to see if destination is the right UUID
dest_uuid = export_db.get_uuid_for_file(dest)
if dest_uuid is None and fileutil.cmp(src, dest):
# might be exporting into a pre-ExportDB folder or the DB got deleted
dest_uuid = self.photo.uuid
export_db.set_data(
filename=dest,
uuid=self.photo.uuid,
orig_stat=fileutil.file_sig(dest),
info_json=self.photo.json(),
)
if dest_uuid != self.photo.uuid:
# not the right file, find the right one
# find files that match "dest_name (*.ext" (e.g. "dest_name (1).jpg", "dest_name (2).jpg)", ...)
@@ -639,16 +633,6 @@ class PhotoExporter:
if dest_uuid == self.photo.uuid:
dest = pathlib.Path(file_)
break
elif dest_uuid is None and fileutil.cmp(src, file_):
# files match, update the UUID
dest = pathlib.Path(file_)
export_db.set_data(
filename=dest,
uuid=self.photo.uuid,
orig_stat=fileutil.file_sig(dest),
info_json=self.photo.json(),
)
break
else:
# increment the destination file
dest = pathlib.Path(increment_filename(dest))
@@ -656,6 +640,57 @@ class PhotoExporter:
# either dest was updated in the if clause above or not updated at all
return dest
def _should_update_photo(
self, src: pathlib.Path, dest: pathlib.Path, options: ExportOptions
) -> bool:
"""Return True if photo should be updated, else False"""
export_db = options.export_db
fileutil = options.fileutil
file_record = export_db.get_file_record(dest)
if not file_record:
# photo doesn't exist in database, should update
return True
if options.export_as_hardlink and not dest.samefile(src):
# different files, should update
return True
if not options.export_as_hardlink and dest.samefile(src):
# same file but not exporting as hardlink, should update
return True
if not options.ignore_signature and not fileutil.cmp_file_sig(
dest, file_record.dest_sig
):
# destination file doesn't match what was last exported
return True
if file_record.export_options != options.bit_flags:
# exporting with different set of options (e.g. exiftool), should update
# need to check this before exiftool in case exiftool options are different
# and export database is missing; this will always be True if database is missing
# as it'll be None and bit_flags will be an int
return True
if options.exiftool:
current_exifdata = self._exiftool_json_sidecar(options=options)
return current_exifdata != file_record.exifdata
if options.edited and not fileutil.cmp_file_sig(src, file_record.src_sig):
# edited file in Photos doesn't match what was last exported
return True
if options.force_update:
current_digest = hexdigest(self.photo.json())
if current_digest != file_record.digest:
# metadata in Photos changed, force update
return True
# photo should not be updated
return False
def _stage_photos_for_export(self, options: ExportOptions) -> StagedFiles:
"""Stages photos for export
@@ -989,11 +1024,8 @@ class PhotoExporter:
update_updated_files = []
update_new_files = []
update_skipped_files = [] # skip files that are already up to date
touched_files = []
converted_to_jpeg_files = []
exif_results = ExportResults()
converted_stat = None
edited_stat = None
dest_str = str(dest)
dest_exists = dest.exists()
@@ -1002,98 +1034,25 @@ class PhotoExporter:
export_db = options.export_db
if options.update or options.force_update: # updating
cmp_touch, cmp_orig = False, False
if dest_exists:
# update, destination exists, but we might not need to replace it...
if options.exiftool:
sig_exif = export_db.get_stat_exif_for_file(dest_str)
cmp_orig = fileutil.cmp_file_sig(dest_str, sig_exif)
if cmp_orig:
# if signatures match also need to compare exifdata to see if metadata changed
cmp_orig = not self._should_run_exiftool(dest_str, options)
sig_exif = (
sig_exif[0],
sig_exif[1],
int(self.photo.date.timestamp()),
)
cmp_touch = fileutil.cmp_file_sig(dest_str, sig_exif)
elif options.convert_to_jpeg:
sig_converted = export_db.get_stat_converted_for_file(dest_str)
cmp_orig = fileutil.cmp_file_sig(dest_str, sig_converted)
sig_converted = (
sig_converted[0],
sig_converted[1],
int(self.photo.date.timestamp()),
)
cmp_touch = fileutil.cmp_file_sig(dest_str, sig_converted)
else:
cmp_orig = options.ignore_signature or fileutil.cmp(src, dest)
cmp_touch = fileutil.cmp(
src, dest, mtime1=int(self.photo.date.timestamp())
)
if options.force_update:
# need to also check the photo's metadata to that in the database
# and if anything changed, we need to update the file
# ony the hex digest of the metadata is stored in the database
photo_digest = hexdigest(self.photo.json())
db_digest = export_db.get_metadata_for_file(dest_str)
cmp_orig = photo_digest == db_digest
sig_cmp = cmp_touch if options.touch_file else cmp_orig
if options.edited:
# requested edited version of photo
# need to see if edited version in Photos library has changed
# (e.g. it's been edited again)
sig_edited = export_db.get_stat_edited_for_file(dest_str)
cmp_edited = (
fileutil.cmp_file_sig(src, sig_edited)
if sig_edited != (None, None, None)
else False
)
sig_cmp = sig_cmp and (options.force_update or cmp_edited)
if (options.export_as_hardlink and dest.samefile(src)) or (
not options.export_as_hardlink
and not dest.samefile(src)
and sig_cmp
):
# destination exists and signatures match, skip it
update_skipped_files.append(dest_str)
elif options.touch_file and cmp_orig and not cmp_touch:
# destination exists, signature matches original but does not match expected touch time
# skip exporting but update touch time
update_skipped_files.append(dest_str)
touched_files.append(dest_str)
elif not options.touch_file and cmp_touch and not cmp_orig:
# destination exists, signature matches expected touch but not original
# user likely exported with touch_file and is now exporting without touch_file
# don't update the file because it's same but leave touch time
update_skipped_files.append(dest_str)
else:
# destination exists but is different
if self._should_update_photo(src, dest, options):
update_updated_files.append(dest_str)
if options.touch_file:
touched_files.append(dest_str)
else:
update_skipped_files.append(dest_str)
else:
# update, destination doesn't exist (new file)
update_new_files.append(dest_str)
if options.touch_file:
touched_files.append(dest_str)
else:
# not update, export the file
exported_files.append(dest_str)
if options.touch_file:
sig = fileutil.file_sig(src)
sig = (sig[0], sig[1], int(self.photo.date.timestamp()))
if not fileutil.cmp_file_sig(src, sig):
touched_files.append(dest_str)
if not update_skipped_files:
# have file to export
edited_stat = (
fileutil.file_sig(src) if options.edited else (None, None, None)
export_files = update_new_files + update_updated_files + exported_files
for export_dest in export_files:
# set src_sig before any modifications by convert_to_jpeg or exiftool
export_record = export_db.create_or_get_file_record(
export_dest, self.photo.uuid
)
export_record.src_sig = fileutil.file_sig(src)
if dest_exists and any(
[options.overwrite, options.update, options.force_update]
):
@@ -1123,7 +1082,6 @@ class PhotoExporter:
src, tmp_file, compression_quality=options.jpeg_quality
)
src = tmp_file
converted_stat = fileutil.file_sig(tmp_file)
converted_to_jpeg_files.append(dest_str)
if options.exiftool:
@@ -1139,20 +1097,7 @@ class PhotoExporter:
f"Error copying file {src} to {dest_str}: {e} ({lineno(__file__)})"
) from e
json_info = self.photo.json()
# don't set the metadata digest if not force_update so that future use of force_update catches metadata change
metadata_digest = hexdigest(json_info) if options.force_update else None
export_db.set_data(
filename=dest_str,
uuid=self.photo.uuid,
orig_stat=fileutil.file_sig(dest_str),
converted_stat=converted_stat,
edited_stat=edited_stat,
info_json=json_info,
metadata=metadata_digest,
)
return ExportResults(
results = ExportResults(
converted_to_jpeg=converted_to_jpeg_files,
error=exif_results.error,
exif_updated=exif_results.exif_updated,
@@ -1161,10 +1106,34 @@ class PhotoExporter:
exported=exported_files + update_new_files + update_updated_files,
new=update_new_files,
skipped=update_skipped_files,
to_touch=touched_files,
updated=update_updated_files,
)
# touch files if needed
if options.touch_file:
results += self._touch_files(
exported_files
+ update_new_files
+ update_updated_files
+ update_skipped_files,
options,
)
# set data in the database
with export_db.create_or_get_file_record(dest_str, self.photo.uuid) as rec:
photoinfo = self.photo.json()
rec.photoinfo = photoinfo
rec.export_options = options.bit_flags
# don't set src_sig as that is set above before any modifications by convert_to_jpeg or exiftool
if not options.ignore_signature:
rec.dest_sig = fileutil.file_sig(dest)
if options.exiftool:
rec.exifdata = self._exiftool_json_sidecar(options)
if options.force_update:
rec.digest = hexdigest(photoinfo)
return results
def _write_sidecar_files(
self,
dest: pathlib.Path,
@@ -1245,8 +1214,8 @@ class PhotoExporter:
sidecar_type = data[4]
sidecar_digest = hexdigest(sidecar_str)
old_sidecar_digest, sidecar_sig = export_db.get_sidecar_for_file(
sidecar_filename
sidecar_record = export_db.create_or_get_file_record(
sidecar_filename, self.photo.uuid
)
write_sidecar = (
not (options.update or options.force_update)
@@ -1256,8 +1225,10 @@ class PhotoExporter:
)
or (
(options.update or options.force_update)
and (sidecar_digest != old_sidecar_digest)
or not fileutil.cmp_file_sig(sidecar_filename, sidecar_sig)
and (sidecar_digest != sidecar_record.digest)
or not fileutil.cmp_file_sig(
sidecar_filename, sidecar_record.dest_sig
)
)
)
if write_sidecar:
@@ -1265,16 +1236,13 @@ class PhotoExporter:
files_written.append(str(sidecar_filename))
if not options.dry_run:
self._write_sidecar(sidecar_filename, sidecar_str)
export_db.set_sidecar_for_file(
sidecar_filename,
sidecar_digest,
fileutil.file_sig(sidecar_filename),
)
sidecar_record.digest = sidecar_digest
sidecar_record.dest_sig = fileutil.file_sig(sidecar_filename)
else:
verbose(f"Skipped up to date {sidecar_type} sidecar {sidecar_filename}")
files_skipped.append(str(sidecar_filename))
return ExportResults(
results = ExportResults(
sidecar_json_written=sidecar_json_files_written,
sidecar_json_skipped=sidecar_json_files_skipped,
sidecar_exiftool_written=sidecar_exiftool_files_written,
@@ -1283,6 +1251,26 @@ class PhotoExporter:
sidecar_xmp_skipped=sidecar_xmp_files_skipped,
)
if options.touch_file:
all_sidecars = (
sidecar_json_files_written
+ sidecar_exiftool_files_written
+ sidecar_xmp_files_written
+ sidecar_json_files_skipped
+ sidecar_exiftool_files_skipped
+ sidecar_xmp_files_skipped
)
results += self._touch_files(all_sidecars, options)
# update destination signatures in database
for sidecar_filename in all_sidecars:
sidecar_record = export_db.create_or_get_file_record(
sidecar_filename, self.photo.uuid
)
sidecar_record.dest_sig = fileutil.file_sig(sidecar_filename)
return results
def _write_exif_metadata_to_file(
self,
src,
@@ -1297,10 +1285,7 @@ class PhotoExporter:
local machine prior to being copied to the export destination which may be on a
network drive or other slower external storage."""
export_db = options.export_db
fileutil = options.fileutil
verbose = options.verbose or self._verbose
exiftool_results = ExportResults()
# don't try to write if unsupported file type for exiftool
@@ -1312,53 +1297,35 @@ class PhotoExporter:
)
)
# set file signature so the file doesn't get re-exported with --update
export_db.set_data(
dest,
uuid=self.photo.uuid,
exif_stat=fileutil.file_sig(src),
exif_json=self._exiftool_json_sidecar(options=options),
)
return exiftool_results
# determine if we need to write the exif metadata
# if we are not updating, we always write
# else, need to check the database to determine if we need to write
run_exiftool = self._should_run_exiftool(dest, options)
if run_exiftool:
verbose(f"Writing metadata with exiftool for {pathlib.Path(dest).name}")
if not options.dry_run:
warning_, error_ = self._write_exif_data(src, options=options)
if warning_:
exiftool_results.exiftool_warning.append((dest, warning_))
if error_:
exiftool_results.exiftool_error.append((dest, error_))
exiftool_results.error.append((dest, error_))
verbose(f"Writing metadata with exiftool for {pathlib.Path(dest).name}")
if not options.dry_run:
warning_, error_ = self._write_exif_data(src, options=options)
if warning_:
exiftool_results.exiftool_warning.append((dest, warning_))
if error_:
exiftool_results.exiftool_error.append((dest, error_))
exiftool_results.error.append((dest, error_))
export_db.set_data(
dest,
uuid=self.photo.uuid,
exif_stat=fileutil.file_sig(src),
exif_json=self._exiftool_json_sidecar(options=options),
)
exiftool_results.exif_updated.append(dest)
exiftool_results.to_touch.append(dest)
else:
verbose(
f"Skipped up to date exiftool metadata for {pathlib.Path(dest).name}"
)
exiftool_results.exif_updated.append(dest)
exiftool_results.to_touch.append(dest)
return exiftool_results
def _should_run_exiftool(self, dest, options: ExportOptions) -> bool:
"""Return True if exiftool should be run to update metadata"""
run_exiftool = not (options.update or options.force_update)
run_exiftool = not options.update and not options.force_update
if options.update or options.force_update:
files_are_different = False
old_data = options.export_db.get_exifdata_for_file(dest)
exif_record = options.export_db.get_file_record(dest)
old_data = exif_record.exifdata if exif_record else None
if old_data is not None:
old_data = json.loads(old_data)[0]
current_data = json.loads(self._exiftool_json_sidecar(options=options))[
0
]
current_data = json.loads(self._exiftool_json_sidecar(options=options))
current_data = current_data[0]
if old_data != current_data:
files_are_different = True

View File

@@ -3,6 +3,7 @@
import os
import sqlite3
import tempfile
from tempfile import TemporaryDirectory
import pytest
from click.testing import CliRunner
@@ -675,6 +676,8 @@ CLI_EXIFTOOL_IGNORE_DATE_MODIFIED = {
CLI_EXIFTOOL_ERROR = ["E2078879-A29C-4D6F-BACB-E3BBE6C3EB91"]
CLI_NOT_REALLY_A_JPEG = "E2078879-A29C-4D6F-BACB-E3BBE6C3EB91"
CLI_EXIFTOOL_DUPLICATE_KEYWORDS = {
"E9BC5C36-7CD1-40A1-A72B-8B8FAC227D51": "wedding.jpg"
}
@@ -4936,12 +4939,126 @@ def test_export_force_update():
export, [os.path.join(cwd, photos_db_path), ".", "--force-update"]
)
assert result.exit_code == 0
print(result.output)
assert (
f"Processed: {PHOTOS_NOT_IN_TRASH_LEN_15_7} photos, exported: 0, updated: 0, skipped: {PHOTOS_NOT_IN_TRASH_LEN_15_7+PHOTOS_EDITED_15_7}, updated EXIF data: 0, missing: 3, error: 0"
in result.output
)
@pytest.mark.skipif(exiftool is None, reason="exiftool not installed")
def test_export_update_complex():
"""test complex --update scenario, #630"""
import glob
import os
import os.path
import osxphotos
from osxphotos.cli import OSXPHOTOS_EXPORT_DB, export
runner = CliRunner()
cwd = os.getcwd()
# pylint: disable=not-context-manager
with runner.isolated_filesystem():
# basic export
result = runner.invoke(export, [os.path.join(cwd, CLI_PHOTOS_DB), ".", "-V"])
assert result.exit_code == 0
files = glob.glob("*")
assert sorted(files) == sorted(CLI_EXPORT_FILENAMES)
assert os.path.isfile(OSXPHOTOS_EXPORT_DB)
src = os.path.join(cwd, CLI_PHOTOS_DB)
dest = os.path.join(os.getcwd(), "export_complex_update.photoslibrary")
photos_db_path = copy_photos_library_to_path(src, dest)
tempdir = TemporaryDirectory()
options = [
"--verbose",
"--update",
"--cleanup",
"--directory",
"{created.year}/{created.month}",
"--description-template",
"Album:{album,}{newline}Description:{descr,}",
"--exiftool",
"--exiftool-merge-keywords",
"--exiftool-merge-persons",
"--keyword-template",
"{keyword}",
"--not-hidden",
"--retry",
"2",
"--skip-original-if-edited",
"--timestamp",
"--strip",
"--skip-uuid",
CLI_NOT_REALLY_A_JPEG,
]
# update
result = runner.invoke(
export, [os.path.join(cwd, photos_db_path), tempdir.name, *options]
)
assert result.exit_code == 0
assert (
f"exported: {PHOTOS_NOT_IN_TRASH_LEN_15_7-1}, updated: 0, skipped: 0, updated EXIF data: {PHOTOS_NOT_IN_TRASH_LEN_15_7-1}"
in result.output
)
result = runner.invoke(
export, [os.path.join(cwd, photos_db_path), tempdir.name, *options]
)
assert result.exit_code == 0
assert "exported: 0" in result.output
# update a file
dbpath = os.path.join(photos_db_path, "database/Photos.sqlite")
try:
conn = sqlite3.connect(dbpath)
c = conn.cursor()
except sqlite3.Error as e:
pytest.exit(f"An error occurred opening sqlite file")
# photo is IMG_4547.jpg
c.execute(
"UPDATE ZADDITIONALASSETATTRIBUTES SET Z_OPT=9, ZTITLE='My Updated Title' WHERE Z_PK=8;"
)
conn.commit()
# run --update to see if updated metadata forced update
result = runner.invoke(
export, [os.path.join(cwd, photos_db_path), tempdir.name, *options]
)
assert result.exit_code == 0
assert (
f"exported: 0, updated: 1, skipped: {PHOTOS_NOT_IN_TRASH_LEN_15_7-2}, updated EXIF data: 1"
in result.output
)
# update, nothing should export
result = runner.invoke(
export, [os.path.join(cwd, photos_db_path), tempdir.name, *options]
)
assert result.exit_code == 0
assert (
f"exported: 0, updated: 0, skipped: {PHOTOS_NOT_IN_TRASH_LEN_15_7-1}, updated EXIF data: 0"
in result.output
)
# change the template and run again
options.extend(["--keyword-template", "FOO"])
# run update and all photos should be updated
result = runner.invoke(
export, [os.path.join(cwd, photos_db_path), tempdir.name, *options]
)
assert result.exit_code == 0
assert (
f"exported: 0, updated: {PHOTOS_NOT_IN_TRASH_LEN_15_7-1}, skipped: 0, updated EXIF data: {PHOTOS_NOT_IN_TRASH_LEN_15_7-1}"
in result.output
)
@pytest.mark.skipif(
"OSXPHOTOS_TEST_EXPORT" not in os.environ,
reason="Skip if not running on author's personal library.",
@@ -5266,17 +5383,14 @@ def test_export_update_no_db():
assert os.path.isfile(OSXPHOTOS_EXPORT_DB)
os.unlink(OSXPHOTOS_EXPORT_DB)
# update
# update, will re-export all files with different names
result = runner.invoke(
export, [os.path.join(cwd, CLI_PHOTOS_DB), ".", "--update"]
)
assert result.exit_code == 0
# unedited files will be skipped because their signatures will compare but
# edited files will be re-exported because there won't be an edited signature
# in the database
assert (
f"Processed: {PHOTOS_NOT_IN_TRASH_LEN_15_7} photos, exported: 0, updated: {PHOTOS_EDITED_15_7}, skipped: {PHOTOS_NOT_IN_TRASH_LEN_15_7}, updated EXIF data: 0, missing: 3, error: 0"
f"Processed: {PHOTOS_NOT_IN_TRASH_LEN_15_7} photos, exported: {PHOTOS_NOT_IN_TRASH_LEN_15_7+PHOTOS_EDITED_15_7}, updated: 0"
in result.output
)
assert os.path.isfile(OSXPHOTOS_EXPORT_DB)
@@ -5590,7 +5704,7 @@ def test_export_touch_files_update():
# touch one file and run update again
ts = time.time()
os.utime(CLI_EXPORT_BY_DATE[0], (ts, ts))
os.utime(CLI_EXPORT_BY_DATE_NEED_TOUCH[1], (ts, ts))
result = runner.invoke(
export,
@@ -5922,7 +6036,12 @@ def test_export_ignore_signature_sidecar():
# should result in a new sidecar being exported but not the image itself
exportdb = osxphotos.export_db.ExportDB("./.osxphotos_export.db", ".")
for filename in CLI_EXPORT_IGNORE_SIGNATURE_FILENAMES:
exportdb.set_sidecar_for_file(f"{filename}.xmp", "FOO", (0, 1, 2))
record = exportdb.get_file_record(filename)
sidecar_record = exportdb.create_or_get_file_record(
f"{filename}.xmp", record.uuid
)
sidecar_record.dest_sig = (0, 1, 2)
sidecar_record.digest = "FOO"
result = runner.invoke(
export,
@@ -6426,7 +6545,7 @@ def test_save_load_config():
],
)
assert result.exit_code == 0
assert "Saving options to file" in result.output
assert "Saving options to config file" in result.output
files = glob.glob("*")
assert "config.toml" in files
@@ -6462,7 +6581,7 @@ def test_save_load_config():
],
)
assert result.exit_code == 0
assert "Saving options to file" in result.output
assert "Saving options to config file" in result.output
files = glob.glob("*")
assert "config.toml" in files
@@ -6499,6 +6618,41 @@ def test_save_load_config():
assert "Writing XMP sidecar" not in result.output
def test_config_only():
"""test --save-config, --config-only"""
import glob
import os
import os.path
from osxphotos.cli import export
runner = CliRunner()
cwd = os.getcwd()
# pylint: disable=not-context-manager
with runner.isolated_filesystem():
# test save config file
result = runner.invoke(
export,
[
os.path.join(cwd, CLI_PHOTOS_DB),
".",
"-V",
"--sidecar",
"XMP",
"--touch-file",
"--update",
"--save-config",
"config.toml",
"--config-only",
],
)
assert result.exit_code == 0
assert "Saved config file" in result.output
assert "Processed:" not in result.output
files = glob.glob("*")
assert "config.toml" in files
def test_export_exportdb():
"""test --exportdb"""
import glob

View File

@@ -1,26 +1,36 @@
""" Test ExportDB """
import json
import os
import pathlib
import sqlite3
import tempfile
import pytest
from osxphotos._version import __version__
from osxphotos.export_db import (
OSXPHOTOS_EXPORTDB_VERSION,
ExportDB,
ExportDBInMemory,
ExportDBTemp,
ExportRecord,
)
from osxphotos.export_db_utils import export_db_get_version
EXIF_DATA = """[{"_CreatedBy": "osxphotos, https://github.com/RhetTbull/osxphotos", "EXIF:ImageDescription": "\u2068Elder Park\u2069, \u2068Adelaide\u2069, \u2068Australia\u2069", "XMP:Description": "\u2068Elder Park\u2069, \u2068Adelaide\u2069, \u2068Australia\u2069", "XMP:Title": "Elder Park", "EXIF:GPSLatitude": "34 deg 55' 8.01\" S", "EXIF:GPSLongitude": "138 deg 35' 48.70\" E", "Composite:GPSPosition": "34 deg 55' 8.01\" S, 138 deg 35' 48.70\" E", "EXIF:GPSLatitudeRef": "South", "EXIF:GPSLongitudeRef": "East", "EXIF:DateTimeOriginal": "2017:06:20 17:18:56", "EXIF:OffsetTimeOriginal": "+09:30", "EXIF:ModifyDate": "2020:05:18 14:42:04"}]"""
INFO_DATA = """{"uuid": "3DD2C897-F19E-4CA6-8C22-B027D5A71907", "filename": "3DD2C897-F19E-4CA6-8C22-B027D5A71907.jpeg", "original_filename": "IMG_4547.jpg", "date": "2017-06-20T17:18:56.518000+09:30", "description": "\u2068Elder Park\u2069, \u2068Adelaide\u2069, \u2068Australia\u2069", "title": "Elder Park", "keywords": [], "labels": ["Statue", "Art"], "albums": ["AlbumInFolder"], "folders": {"AlbumInFolder": ["Folder1", "SubFolder2"]}, "persons": [], "path": "/Users/rhet/Pictures/Test-10.15.4.photoslibrary/originals/3/3DD2C897-F19E-4CA6-8C22-B027D5A71907.jpeg", "ismissing": false, "hasadjustments": true, "external_edit": false, "favorite": false, "hidden": false, "latitude": -34.91889167000001, "longitude": 138.59686167, "path_edited": "/Users/rhet/Pictures/Test-10.15.4.photoslibrary/resources/renders/3/3DD2C897-F19E-4CA6-8C22-B027D5A71907_1_201_a.jpeg", "shared": false, "isphoto": true, "ismovie": false, "uti": "public.jpeg", "burst": false, "live_photo": false, "path_live_photo": null, "iscloudasset": false, "incloud": null, "date_modified": "2020-05-18T14:42:04.608664+09:30", "portrait": false, "screenshot": false, "slow_mo": false, "time_lapse": false, "hdr": false, "selfie": false, "panorama": false, "has_raw": false, "uti_raw": null, "path_raw": null, "place": {"name": "Elder Park, Adelaide, South Australia, Australia, River Torrens", "names": {"field0": [], "country": ["Australia"], "state_province": ["South Australia"], "sub_administrative_area": ["Adelaide"], "city": ["Adelaide", "Adelaide"], "field5": [], "additional_city_info": ["Adelaide CBD", "Tarndanya"], "ocean": [], "area_of_interest": ["Elder Park", ""], "inland_water": ["River Torrens", "River Torrens"], "field10": [], "region": [], "sub_throughfare": [], "field13": [], "postal_code": [], "field15": [], "field16": [], "street_address": [], "body_of_water": ["River Torrens", "River Torrens"]}, "country_code": "AU", "ishome": false, "address_str": "River Torrens, Adelaide SA, Australia", "address": {"street": null, "sub_locality": "Tarndanya", "city": "Adelaide", "sub_administrative_area": "Adelaide", "state_province": "SA", "postal_code": null, "country": "Australia", "iso_country_code": "AU"}}, "exif": {"flash_fired": false, "iso": 320, "metering_mode": 3, "sample_rate": null, "track_format": null, "white_balance": 0, "aperture": 2.2, "bit_rate": null, "duration": null, "exposure_bias": 0.0, "focal_length": 4.15, "fps": null, "latitude": null, "longitude": null, "shutter_speed": 0.058823529411764705, "camera_make": "Apple", "camera_model": "iPhone 6s", "codec": null, "lens_model": "iPhone 6s back camera 4.15mm f/2.2"}}"""
SIDECAR_DATA = """FOO_BAR"""
METADATA_DATA = "FIZZ"
DIGEST_DATA = "FIZZ"
EXIF_DATA2 = """[{"_CreatedBy": "osxphotos, https://github.com/RhetTbull/osxphotos", "XMP:Title": "St. James's Park", "XMP:TagsList": ["London 2018", "St. James's Park", "England", "United Kingdom", "UK", "London"], "IPTC:Keywords": ["London 2018", "St. James's Park", "England", "United Kingdom", "UK", "London"], "XMP:Subject": ["London 2018", "St. James's Park", "England", "United Kingdom", "UK", "London"], "EXIF:GPSLatitude": "51 deg 30' 12.86\" N", "EXIF:GPSLongitude": "0 deg 7' 54.50\" W", "Composite:GPSPosition": "51 deg 30' 12.86\" N, 0 deg 7' 54.50\" W", "EXIF:GPSLatitudeRef": "North", "EXIF:GPSLongitudeRef": "West", "EXIF:DateTimeOriginal": "2018:10:13 09:18:12", "EXIF:OffsetTimeOriginal": "-04:00", "EXIF:ModifyDate": "2019:12:08 14:06:44"}]"""
INFO_DATA2 = """{"uuid": "F2BB3F98-90F0-4E4C-A09B-25C6822A4529", "filename": "F2BB3F98-90F0-4E4C-A09B-25C6822A4529.jpeg", "original_filename": "IMG_8440.JPG", "date": "2019-06-11T11:42:06.711805-07:00", "description": null, "title": null, "keywords": [], "labels": ["Sky", "Cloudy", "Fence", "Land", "Outdoor", "Park", "Amusement Park", "Roller Coaster"], "albums": [], "folders": {}, "persons": [], "path": "/Volumes/MacBook Catalina - Data/Users/rhet/Pictures/Photos Library.photoslibrary/originals/F/F2BB3F98-90F0-4E4C-A09B-25C6822A4529.jpeg", "ismissing": false, "hasadjustments": false, "external_edit": false, "favorite": false, "hidden": false, "latitude": 33.81558666666667, "longitude": -117.99298, "path_edited": null, "shared": false, "isphoto": true, "ismovie": false, "uti": "public.jpeg", "burst": false, "live_photo": false, "path_live_photo": null, "iscloudasset": true, "incloud": true, "date_modified": "2019-10-14T00:51:47.141950-07:00", "portrait": false, "screenshot": false, "slow_mo": false, "time_lapse": false, "hdr": false, "selfie": false, "panorama": false, "has_raw": false, "uti_raw": null, "path_raw": null, "place": {"name": "Adventure City, Stanton, California, United States", "names": {"field0": [], "country": ["United States"], "state_province": ["California"], "sub_administrative_area": ["Orange"], "city": ["Stanton", "Anaheim", "Anaheim"], "field5": [], "additional_city_info": ["West Anaheim"], "ocean": [], "area_of_interest": ["Adventure City", "Adventure City"], "inland_water": [], "field10": [], "region": [], "sub_throughfare": [], "field13": [], "postal_code": [], "field15": [], "field16": [], "street_address": [], "body_of_water": []}, "country_code": "US", "ishome": false, "address_str": "Adventure City, 1240 S Beach Blvd, Anaheim, CA 92804, United States", "address": {"street": "1240 S Beach Blvd", "sub_locality": "West Anaheim", "city": "Stanton", "sub_administrative_area": "Orange", "state_province": "CA", "postal_code": "92804", "country": "United States", "iso_country_code": "US"}}, "exif": {"flash_fired": false, "iso": 25, "metering_mode": 5, "sample_rate": null, "track_format": null, "white_balance": 0, "aperture": 2.2, "bit_rate": null, "duration": null, "exposure_bias": 0.0, "focal_length": 4.15, "fps": null, "latitude": null, "longitude": null, "shutter_speed": 0.0004940711462450593, "camera_make": "Apple", "camera_model": "iPhone 6s", "codec": null, "lens_model": "iPhone 6s back camera 4.15mm f/2.2"}}"""
DIGEST_DATA2 = "BUZZ"
DATABASE_VERSION1 = "tests/export_db_version1.db"
def test_export_db():
"""test ExportDB"""
import os
import tempfile
from osxphotos.export_db import OSXPHOTOS_EXPORTDB_VERSION, ExportDB
tempdir = tempfile.TemporaryDirectory(prefix="osxphotos_")
dbname = os.path.join(tempdir.name, ".osxphotos_export.db")
db = ExportDB(dbname, tempdir.name)
@@ -32,104 +42,51 @@ def test_export_db():
filepath = os.path.join(tempdir.name, "test.JPG")
filepath_lower = os.path.join(tempdir.name, "test.jpg")
db.set_uuid_for_file(filepath, "FOO-BAR")
# filename should be case-insensitive
assert db.get_uuid_for_file(filepath_lower) == "FOO-BAR"
db.set_info_for_uuid("FOO-BAR", INFO_DATA)
assert db.get_info_for_uuid("FOO-BAR") == INFO_DATA
db.set_exifdata_for_file(filepath, EXIF_DATA)
assert db.get_exifdata_for_file(filepath) == EXIF_DATA
db.set_stat_orig_for_file(filepath, (1, 2, 3))
assert db.get_stat_orig_for_file(filepath) == (1, 2, 3)
db.set_stat_exif_for_file(filepath, (4, 5, 6))
assert db.get_stat_exif_for_file(filepath) == (4, 5, 6)
db.set_stat_edited_for_file(filepath, (10, 11, 12))
assert db.get_stat_edited_for_file(filepath) == (10, 11, 12)
db.set_stat_converted_for_file(filepath, (7, 8, 9))
assert db.get_stat_converted_for_file(filepath) == (7, 8, 9)
db.set_sidecar_for_file(filepath, SIDECAR_DATA, (13, 14, 15))
assert db.get_sidecar_for_file(filepath) == (SIDECAR_DATA, (13, 14, 15))
assert db.get_previous_uuids() == ["FOO-BAR"]
uuid = "FOOBAR"
assert db.get_photoinfo_for_uuid(uuid) is None
db.set_photoinfo_for_uuid(uuid, INFO_DATA)
assert db.get_photoinfo_for_uuid(uuid) == INFO_DATA
db.set_detected_text_for_uuid("FOO-BAR", json.dumps([["foo", 0.5]]))
assert json.loads(db.get_detected_text_for_uuid("FOO-BAR")) == [["foo", 0.5]]
assert db.get_uuid_for_file(filepath) is None
db.create_file_record(filepath, uuid)
assert db.get_uuid_for_file(filepath) == uuid
# test set_data which sets all at the same time
filepath2 = os.path.join(tempdir.name, "test2.jpg")
db.set_data(
filepath2,
"BAR-FOO",
(1, 2, 3),
(4, 5, 6),
(7, 8, 9),
(10, 11, 12),
INFO_DATA,
EXIF_DATA,
METADATA_DATA,
)
assert db.get_uuid_for_file(filepath2) == "BAR-FOO"
assert db.get_info_for_uuid("BAR-FOO") == INFO_DATA
assert db.get_exifdata_for_file(filepath2) == EXIF_DATA
assert db.get_stat_orig_for_file(filepath2) == (1, 2, 3)
assert db.get_stat_exif_for_file(filepath2) == (4, 5, 6)
assert db.get_stat_converted_for_file(filepath2) == (7, 8, 9)
assert db.get_stat_edited_for_file(filepath2) == (10, 11, 12)
assert sorted(db.get_previous_uuids()) == (["BAR-FOO", "FOO-BAR"])
assert db.get_metadata_for_file(filepath2) == METADATA_DATA
record = db.get_file_record(filepath)
assert record.uuid == uuid
assert record.photoinfo == INFO_DATA
assert record.filepath == pathlib.Path(filepath).name
assert record.filepath_normalized == pathlib.Path(filepath).name.lower()
assert record.src_sig == (None, None, None)
assert record.dest_sig == (None, None, None)
assert record.digest is None
assert record.exifdata is None
record.digest = DIGEST_DATA # for next assert
# test set_data value=None doesn't overwrite existing data
db.set_data(
filepath2,
"BAR-FOO",
None,
None,
None,
None,
None,
None,
None,
)
assert db.get_uuid_for_file(filepath2) == "BAR-FOO"
assert db.get_info_for_uuid("BAR-FOO") == INFO_DATA
assert db.get_exifdata_for_file(filepath2) == EXIF_DATA
assert db.get_stat_orig_for_file(filepath2) == (1, 2, 3)
assert db.get_stat_exif_for_file(filepath2) == (4, 5, 6)
assert db.get_stat_converted_for_file(filepath2) == (7, 8, 9)
assert db.get_stat_edited_for_file(filepath2) == (10, 11, 12)
assert sorted(db.get_previous_uuids()) == (["BAR-FOO", "FOO-BAR"])
assert db.get_metadata_for_file(filepath2) == METADATA_DATA
# test create_or_get_file_record
# existing record
record2 = db.create_or_get_file_record(filepath, uuid)
assert record2.uuid == uuid
assert record.photoinfo == INFO_DATA
assert record.digest == DIGEST_DATA
# close and re-open
db.close()
db = ExportDB(dbname, tempdir.name)
assert not db.was_created
assert db.get_uuid_for_file(filepath2) == "BAR-FOO"
assert db.get_info_for_uuid("BAR-FOO") == INFO_DATA
assert db.get_exifdata_for_file(filepath2) == EXIF_DATA
assert db.get_stat_orig_for_file(filepath2) == (1, 2, 3)
assert db.get_stat_exif_for_file(filepath2) == (4, 5, 6)
assert db.get_stat_converted_for_file(filepath2) == (7, 8, 9)
assert db.get_stat_edited_for_file(filepath2) == (10, 11, 12)
assert sorted(db.get_previous_uuids()) == (["BAR-FOO", "FOO-BAR"])
assert json.loads(db.get_detected_text_for_uuid("FOO-BAR")) == [["foo", 0.5]]
assert db.get_metadata_for_file(filepath2) == METADATA_DATA
# new record
filepath3 = os.path.join(tempdir.name, "test3.JPG")
record3 = db.create_or_get_file_record(filepath3, "new_uuid")
assert record3.uuid == "new_uuid"
assert record3.photoinfo is None
assert record3.digest is None
# all uuids
uuids = db.get_previous_uuids()
assert sorted(uuids) == sorted(["new_uuid", uuid])
# update data
db.set_uuid_for_file(filepath, "FUBAR")
assert db.get_uuid_for_file(filepath) == "FUBAR"
assert sorted(db.get_previous_uuids()) == (["BAR-FOO", "FUBAR"])
def test_export_db_no_op():
"""test ExportDBNoOp"""
import os
import tempfile
from osxphotos.export_db import OSXPHOTOS_EXPORTDB_VERSION, ExportDBNoOp
def test_export_db_constraints():
"""test ExportDB constraints"""
tempdir = tempfile.TemporaryDirectory(prefix="osxphotos_")
db = ExportDBNoOp()
dbname = os.path.join(tempdir.name, ".osxphotos_export.db")
db = ExportDB(dbname, tempdir.name)
assert os.path.isfile(dbname)
assert db.was_created
assert not db.was_upgraded
assert db.version == OSXPHOTOS_EXPORTDB_VERSION
@@ -137,68 +94,28 @@ def test_export_db_no_op():
filepath = os.path.join(tempdir.name, "test.JPG")
filepath_lower = os.path.join(tempdir.name, "test.jpg")
db.set_uuid_for_file(filepath, "FOO-BAR")
# filename should be case-insensitive
assert db.get_uuid_for_file(filepath_lower) is None
db.set_info_for_uuid("FOO-BAR", INFO_DATA)
assert db.get_info_for_uuid("FOO-BAR") is None
db.set_exifdata_for_file(filepath, EXIF_DATA)
assert db.get_exifdata_for_file(filepath) is None
db.set_stat_orig_for_file(filepath, (1, 2, 3))
assert db.get_stat_orig_for_file(filepath) is None
db.set_stat_exif_for_file(filepath, (4, 5, 6))
assert db.get_stat_exif_for_file(filepath) is None
db.set_stat_converted_for_file(filepath, (7, 8, 9))
assert db.get_stat_converted_for_file(filepath) is None
db.set_stat_edited_for_file(filepath, (10, 11, 12))
assert db.get_stat_edited_for_file(filepath) is None
db.set_sidecar_for_file(filepath, SIDECAR_DATA, (13, 14, 15))
assert db.get_sidecar_for_file(filepath) == (None, (None, None, None))
assert db.get_previous_uuids() == []
db.set_detected_text_for_uuid("FOO-BAR", json.dumps([["foo", 0.5]]))
assert db.get_detected_text_for_uuid("FOO-BAR") is None
db.set_metadata_for_file(filepath, METADATA_DATA)
assert db.get_metadata_for_file(filepath) is None
uuid = "FOOBAR"
db.set_photoinfo_for_uuid(uuid, INFO_DATA)
record = db.create_file_record(filepath, uuid)
record.photoinfo = INFO_DATA
record.exifdata = EXIF_DATA
record.digest = DIGEST_DATA
record.src_sig = (7, 8, 9)
record.dest_sig = (10, 11, 12)
# test set_data which sets all at the same time
filepath2 = os.path.join(tempdir.name, "test2.jpg")
db.set_data(
filepath2,
"BAR-FOO",
(1, 2, 3),
(4, 5, 6),
(7, 8, 9),
(10, 11, 12),
INFO_DATA,
EXIF_DATA,
METADATA_DATA,
)
assert db.get_uuid_for_file(filepath2) is None
assert db.get_info_for_uuid("BAR-FOO") is None
assert db.get_exifdata_for_file(filepath2) is None
assert db.get_stat_orig_for_file(filepath2) is None
assert db.get_stat_exif_for_file(filepath2) is None
assert db.get_stat_converted_for_file(filepath) is None
assert db.get_stat_edited_for_file(filepath) is None
assert db.get_previous_uuids() == []
assert db.get_metadata_for_file(filepath) is None
with pytest.raises(AttributeError):
record.uuid = "BARFOO"
# update data
db.set_uuid_for_file(filepath, "FUBAR")
assert db.get_uuid_for_file(filepath) is None
with pytest.raises(sqlite3.IntegrityError):
record2 = db.create_file_record(filepath, "NEW_UUID")
with pytest.raises(AttributeError):
# verify we can't add new attributes
record.src_stats = (7, 8, 9)
def test_export_db_in_memory():
"""test ExportDBInMemory"""
import os
import tempfile
from osxphotos.export_db import (
OSXPHOTOS_EXPORTDB_VERSION,
ExportDB,
ExportDBInMemory,
)
tempdir = tempfile.TemporaryDirectory(prefix="osxphotos_")
dbname = os.path.join(tempdir.name, ".osxphotos_export.db")
db = ExportDB(dbname, tempdir.name)
@@ -207,87 +124,61 @@ def test_export_db_in_memory():
filepath = os.path.join(tempdir.name, "test.JPG")
filepath_lower = os.path.join(tempdir.name, "test.jpg")
db.set_uuid_for_file(filepath, "FOO-BAR")
db.set_info_for_uuid("FOO-BAR", INFO_DATA)
db.set_exifdata_for_file(filepath, EXIF_DATA)
db.set_stat_orig_for_file(filepath, (1, 2, 3))
db.set_stat_exif_for_file(filepath, (4, 5, 6))
db.set_stat_converted_for_file(filepath, (7, 8, 9))
db.set_stat_edited_for_file(filepath, (10, 11, 12))
db.set_sidecar_for_file(filepath, SIDECAR_DATA, (13, 14, 15))
assert db.get_previous_uuids() == ["FOO-BAR"]
db.set_detected_text_for_uuid("FOO-BAR", json.dumps([["foo", 0.5]]))
db.set_metadata_for_file(filepath, METADATA_DATA)
uuid = "FOOBAR"
record = db.create_file_record(filepath, uuid)
record.photoinfo = INFO_DATA
record.exifdata = EXIF_DATA
record.digest = DIGEST_DATA
record.src_sig = (7, 8, 9)
record.dest_sig = (10, 11, 12)
db.close()
# create in memory version
dbram = ExportDBInMemory(dbname, tempdir.name)
assert not dbram.was_created
assert not dbram.was_upgraded
assert dbram.version == OSXPHOTOS_EXPORTDB_VERSION
record2 = dbram.get_file_record(filepath)
assert record2.uuid == uuid
assert record2.photoinfo == INFO_DATA
assert record2.exifdata == EXIF_DATA
assert record2.digest == DIGEST_DATA
assert record2.src_sig == (7, 8, 9)
assert record2.dest_sig == (10, 11, 12)
# verify values as expected
assert dbram.get_uuid_for_file(filepath_lower) == "FOO-BAR"
assert dbram.get_info_for_uuid("FOO-BAR") == INFO_DATA
assert dbram.get_exifdata_for_file(filepath) == EXIF_DATA
assert dbram.get_stat_orig_for_file(filepath) == (1, 2, 3)
assert dbram.get_stat_exif_for_file(filepath) == (4, 5, 6)
assert dbram.get_stat_converted_for_file(filepath) == (7, 8, 9)
assert dbram.get_stat_edited_for_file(filepath) == (10, 11, 12)
assert dbram.get_sidecar_for_file(filepath) == (SIDECAR_DATA, (13, 14, 15))
assert dbram.get_previous_uuids() == ["FOO-BAR"]
assert json.loads(dbram.get_detected_text_for_uuid("FOO-BAR")) == [["foo", 0.5]]
assert dbram.get_metadata_for_file(filepath) == METADATA_DATA
# change some values
record2.photoinfo = INFO_DATA2
record2.exifdata = EXIF_DATA2
record2.digest = DIGEST_DATA2
record2.src_sig = (13, 14, 15)
record2.dest_sig = (16, 17, 18)
# change a value
dbram.set_uuid_for_file(filepath, "FUBAR")
dbram.set_info_for_uuid("FUBAR", INFO_DATA2)
dbram.set_exifdata_for_file(filepath, EXIF_DATA2)
dbram.set_stat_orig_for_file(filepath, (7, 8, 9))
dbram.set_stat_exif_for_file(filepath, (10, 11, 12))
dbram.set_stat_converted_for_file(filepath, (1, 2, 3))
dbram.set_stat_edited_for_file(filepath, (4, 5, 6))
dbram.set_sidecar_for_file(filepath, "FUBAR", (20, 21, 22))
dbram.set_detected_text_for_uuid("FUBAR", json.dumps([["bar", 0.5]]))
dbram.set_metadata_for_file(filepath, "FUBAR")
assert record2.photoinfo == INFO_DATA2
assert record2.exifdata == EXIF_DATA2
assert record2.digest == DIGEST_DATA2
assert record2.src_sig == (13, 14, 15)
assert record2.dest_sig == (16, 17, 18)
assert dbram.get_uuid_for_file(filepath_lower) == "FUBAR"
assert dbram.get_info_for_uuid("FUBAR") == INFO_DATA2
assert dbram.get_exifdata_for_file(filepath) == EXIF_DATA2
assert dbram.get_stat_orig_for_file(filepath) == (7, 8, 9)
assert dbram.get_stat_exif_for_file(filepath) == (10, 11, 12)
assert dbram.get_stat_converted_for_file(filepath) == (1, 2, 3)
assert dbram.get_stat_edited_for_file(filepath) == (4, 5, 6)
assert dbram.get_sidecar_for_file(filepath) == ("FUBAR", (20, 21, 22))
assert dbram.get_previous_uuids() == ["FUBAR"]
assert json.loads(dbram.get_detected_text_for_uuid("FUBAR")) == [["bar", 0.5]]
assert dbram.get_metadata_for_file(filepath) == "FUBAR"
# all uuids
uuids = dbram.get_previous_uuids()
assert uuids == [uuid]
dbram.close()
# re-open on disk and verify no changes
# re-open original, assert no changes
db = ExportDB(dbname, tempdir.name)
assert db.get_uuid_for_file(filepath_lower) == "FOO-BAR"
assert db.get_info_for_uuid("FOO-BAR") == INFO_DATA
assert db.get_exifdata_for_file(filepath) == EXIF_DATA
assert db.get_stat_orig_for_file(filepath) == (1, 2, 3)
assert db.get_stat_exif_for_file(filepath) == (4, 5, 6)
assert db.get_stat_converted_for_file(filepath) == (7, 8, 9)
assert db.get_stat_edited_for_file(filepath) == (10, 11, 12)
assert db.get_sidecar_for_file(filepath) == (SIDECAR_DATA, (13, 14, 15))
assert db.get_previous_uuids() == ["FOO-BAR"]
record = db.get_file_record(filepath)
assert record.uuid == uuid
assert record.photoinfo == INFO_DATA
assert record.exifdata == EXIF_DATA
assert record.digest == DIGEST_DATA
assert record.src_sig == (7, 8, 9)
assert record.dest_sig == (10, 11, 12)
assert db.get_info_for_uuid("FUBAR") is None
assert db.get_detected_text_for_uuid("FUBAR") is None
assert db.get_metadata_for_file(filepath) == METADATA_DATA
# all uuids
uuids = db.get_previous_uuids()
assert uuids == [uuid]
def test_export_db_in_memory_nofile():
"""test ExportDBInMemory with no dbfile"""
import os
import tempfile
from osxphotos.export_db import OSXPHOTOS_EXPORTDB_VERSION, ExportDBInMemory
tempdir = tempfile.TemporaryDirectory(prefix="osxphotos_")
filepath = os.path.join(tempdir.name, "test.JPG")
filepath_lower = os.path.join(tempdir.name, "test.jpg")
@@ -299,28 +190,187 @@ def test_export_db_in_memory_nofile():
assert not dbram.was_upgraded
assert dbram.version == OSXPHOTOS_EXPORTDB_VERSION
# change a value
dbram.set_uuid_for_file(filepath, "FUBAR")
dbram.set_info_for_uuid("FUBAR", INFO_DATA2)
dbram.set_exifdata_for_file(filepath, EXIF_DATA2)
dbram.set_stat_orig_for_file(filepath, (7, 8, 9))
dbram.set_stat_exif_for_file(filepath, (10, 11, 12))
dbram.set_stat_converted_for_file(filepath, (1, 2, 3))
dbram.set_stat_edited_for_file(filepath, (4, 5, 6))
dbram.set_sidecar_for_file(filepath, "FUBAR", (20, 21, 22))
dbram.set_detected_text_for_uuid("FUBAR", json.dumps([["bar", 0.5]]))
dbram.set_metadata_for_file(filepath, METADATA_DATA)
# set values
uuid = "FOOBAR"
record = dbram.create_file_record(filepath, uuid)
record.photoinfo = INFO_DATA
record.exifdata = EXIF_DATA
record.digest = DIGEST_DATA
record.src_sig = (7, 8, 9)
record.dest_sig = (10, 11, 12)
assert dbram.get_uuid_for_file(filepath_lower) == "FUBAR"
assert dbram.get_info_for_uuid("FUBAR") == INFO_DATA2
assert dbram.get_exifdata_for_file(filepath) == EXIF_DATA2
assert dbram.get_stat_orig_for_file(filepath) == (7, 8, 9)
assert dbram.get_stat_exif_for_file(filepath) == (10, 11, 12)
assert dbram.get_stat_converted_for_file(filepath) == (1, 2, 3)
assert dbram.get_stat_edited_for_file(filepath) == (4, 5, 6)
assert dbram.get_sidecar_for_file(filepath) == ("FUBAR", (20, 21, 22))
assert dbram.get_previous_uuids() == ["FUBAR"]
assert json.loads(dbram.get_detected_text_for_uuid("FUBAR")) == [["bar", 0.5]]
assert dbram.get_metadata_for_file(filepath) == METADATA_DATA
assert record.photoinfo == INFO_DATA
assert record.exifdata == EXIF_DATA
assert record.digest == DIGEST_DATA
assert record.src_sig == (7, 8, 9)
assert record.dest_sig == (10, 11, 12)
assert record.uuid == uuid
# change some values
record.photoinfo = INFO_DATA2
record.digest = DIGEST_DATA2
assert record.photoinfo == INFO_DATA2
assert record.digest == DIGEST_DATA2
assert record.exifdata == EXIF_DATA
def test_export_db_temp():
"""test ExportDBTemp"""
tempdir = tempfile.TemporaryDirectory(prefix="osxphotos_")
filepath = os.path.join(tempdir.name, "test.JPG")
filepath_lower = os.path.join(tempdir.name, "test.jpg")
dbram = ExportDBTemp()
assert dbram.was_created
assert not dbram.was_upgraded
assert dbram.version == OSXPHOTOS_EXPORTDB_VERSION
# set values
uuid = "FOOBAR"
record = dbram.create_file_record(filepath, uuid)
record.photoinfo = INFO_DATA
record.exifdata = EXIF_DATA
record.digest = DIGEST_DATA
record.src_sig = (7, 8, 9)
record.dest_sig = (10, 11, 12)
assert record.photoinfo == INFO_DATA
assert record.exifdata == EXIF_DATA
assert record.digest == DIGEST_DATA
assert record.src_sig == (7, 8, 9)
assert record.dest_sig == (10, 11, 12)
assert record.uuid == uuid
# change some values
record.photoinfo = INFO_DATA2
record.digest = DIGEST_DATA2
assert record.photoinfo == INFO_DATA2
assert record.digest == DIGEST_DATA2
assert record.exifdata == EXIF_DATA
dbram.close()
def test_export_record():
"""Test ExportRecord"""
tempdir = tempfile.TemporaryDirectory(prefix="osxphotos_")
filepath = os.path.join(tempdir.name, "test.JPG")
uuid = "FOOBAR"
dbname = os.path.join(tempdir.name, ".osxphotos_export.db")
db = ExportDB(dbname, tempdir.name)
assert db.get_file_record(filepath) is None
record = db.create_file_record(filepath, uuid)
assert record.uuid == uuid
assert record.filepath == pathlib.Path(filepath).name
assert record.filepath_normalized == pathlib.Path(filepath).name.lower()
record.src_sig = (1, 2, 3.0)
assert record.src_sig == (1, 2, 3)
record.dest_sig = (4, 5, 6.0)
assert record.dest_sig == (4, 5, 6)
record.digest = DIGEST_DATA
assert record.digest == DIGEST_DATA
record.exifdata = EXIF_DATA
assert record.exifdata == EXIF_DATA
record.photoinfo = INFO_DATA
assert record.photoinfo == INFO_DATA
record.export_options = 1
assert record.export_options == 1
# close and re-open
db.close()
db2 = ExportDB(dbname, tempdir.name)
record = db2.get_file_record(filepath)
assert record.uuid == uuid
assert record.filepath == pathlib.Path(filepath).name
assert record.filepath_normalized == pathlib.Path(filepath).name.lower()
assert record.src_sig == (1, 2, 3)
assert record.dest_sig == (4, 5, 6)
assert record.digest == "FIZZ"
assert record.exifdata == EXIF_DATA
assert record.photoinfo == INFO_DATA
assert record.export_options == 1
def test_export_record_context_manager():
"""Test ExportRecord as context manager"""
tempdir = tempfile.TemporaryDirectory(prefix="osxphotos_")
filepath = os.path.join(tempdir.name, "test.JPG")
uuid = "FOOBAR_CONTEXT"
dbname = os.path.join(tempdir.name, ".osxphotos_export.db")
db = ExportDB(dbname, tempdir.name)
assert db.get_file_record(filepath) is None
with db.create_file_record(filepath, uuid) as record:
record.src_sig = (1, 2, 3.0)
record.dest_sig = (4, 5, 6.0)
record.digest = DIGEST_DATA
record.exifdata = EXIF_DATA
record.photoinfo = INFO_DATA
record.export_options = 1
assert record.uuid == uuid
assert record.filepath == pathlib.Path(filepath).name
assert record.filepath_normalized == pathlib.Path(filepath).name.lower()
assert record.src_sig == (1, 2, 3)
assert record.dest_sig == (4, 5, 6)
assert record.digest == "FIZZ"
assert record.exifdata == EXIF_DATA
assert record.photoinfo == INFO_DATA
assert record.export_options == 1
# close and re-open
db.close()
db2 = ExportDB(dbname, tempdir.name)
record = db2.get_file_record(filepath)
assert record.uuid == uuid
assert record.filepath == pathlib.Path(filepath).name
assert record.filepath_normalized == pathlib.Path(filepath).name.lower()
assert record.src_sig == (1, 2, 3)
assert record.dest_sig == (4, 5, 6)
assert record.digest == "FIZZ"
assert record.exifdata == EXIF_DATA
assert record.photoinfo == INFO_DATA
assert record.export_options == 1
def test_export_record_context_manager_error():
"""Test ExportRecord as context manager doesn't commit data on error"""
tempdir = tempfile.TemporaryDirectory(prefix="osxphotos_")
filepath = os.path.join(tempdir.name, "test_boom.JPG")
uuid = "FOOBAR_CONTEXT_BOOM"
dbname = os.path.join(tempdir.name, ".osxphotos_export.db")
db = ExportDB(dbname, tempdir.name)
try:
with db.create_file_record(filepath, uuid) as record:
record.src_sig = (1, 2, 3.0)
record.dest_sig = (4, 5, 6.0)
record.digest = DIGEST_DATA
record.exifdata = EXIF_DATA
record.photoinfo = INFO_DATA
raise Exception("Boom")
except Exception:
pass
record = db.get_file_record(filepath)
assert record.uuid == uuid
assert record.filepath == pathlib.Path(filepath).name
assert record.filepath_normalized == pathlib.Path(filepath).name.lower()
assert record.src_sig == (None, None, None)
assert record.dest_sig == (None, None, None)
assert record.digest is None
assert record.exifdata is None
assert record.photoinfo is None
def test_get_export_db_version():
"""Test export_db_get_version"""
tempdir = tempfile.TemporaryDirectory(prefix="osxphotos_")
dbname = os.path.join(tempdir.name, ".osxphotos_export.db")
db = ExportDB(dbname, tempdir.name)
osxphotos_ver, export_db_ver = export_db_get_version(dbname)
assert osxphotos_ver == __version__
assert export_db_ver == OSXPHOTOS_EXPORTDB_VERSION