Feature add sync command 887 (#921)

* Starting work on sync command, #887

* Added parsing for --set, --merge

* Added query options

* Added --incloud, --not-incloud, --not-missing, --cloudasset, --not-cloudasset to query options, #800 (#902)

* Got basic import logic working

* Got basic set/merge logic working

* add to album now working

* Resolve paths for --import

* Refactored report writer to reuse code from export report

* Removed report_writer_sync.py

* add oPromessa as a contributor for code (#914)

* update README.md [skip ci]

* update .all-contributorsrc [skip ci]

Co-authored-by: allcontributors[bot] <46447321+allcontributors[bot]@users.noreply.github.com>

* Added --profile, --watch, --breakpoint, --debug as global options (#917)

* Removed unnecessary import

* Release 0.56.0 (#918)

* Release 0 56 0 (#919)

* Release 0.56.0

* Release 0.56.0

* Updated CHANGELOG.md [skip ci]

* Got CSV reporting, summary results done

* Added json report for sync results

* Added sqlite report for sync

* Basic set/merge working for sync

* sync mvp working

* Added help text for sync

* Added test for sync

* Updated tests for sync

Co-authored-by: allcontributors[bot] <46447321+allcontributors[bot]@users.noreply.github.com>
This commit is contained in:
Rhet Turnbull 2023-01-14 22:06:20 -08:00 committed by GitHub
parent 5fc8c022ab
commit c383212822
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 1459 additions and 129 deletions

View File

@ -34,6 +34,7 @@ from .places import places
from .query import query
from .repl import repl
from .snap_diff import diff, snap
from .sync import sync
from .theme import theme
from .timewarp import timewarp
from .tutorial import tutorial
@ -128,6 +129,7 @@ for command in [
repl,
run,
snap,
sync,
theme,
timewarp,
tutorial,

View File

@ -1,6 +1,7 @@
"""Globals and constants use by the CLI commands"""
import dataclasses
import os
import pathlib
from datetime import datetime
@ -10,6 +11,7 @@ from packaging import version
from xdg import xdg_config_home, xdg_data_home
import osxphotos
from osxphotos import QueryOptions
from osxphotos._constants import APP_NAME
from osxphotos._version import __version__
from osxphotos.utils import get_latest_version
@ -42,10 +44,17 @@ __all__ = [
"get_photos_db",
"load_uuid_from_file",
"noop",
"query_options_from_kwargs",
"time_stamp",
]
class IncompatibleQueryOptions(Exception):
"""Incompatible query options"""
pass
def noop(*args, **kwargs):
"""no-op function"""
pass
@ -660,3 +669,103 @@ def check_version():
"to suppress this message and prevent osxphotos from checking for latest version.",
err=True,
)
def query_options_from_kwargs(**kwargs) -> QueryOptions:
"""Validate query options and create a QueryOptions instance"""
# sanity check input args
nonexclusive = [
"added_after",
"added_before",
"added_in_last",
"album",
"duplicate",
"edited",
"exif",
"external_edit",
"folder",
"from_date",
"from_time",
"has_raw",
"keyword",
"label",
"max_size",
"min_size",
"name",
"person",
"query_eval",
"query_function",
"regex",
"selected",
"to_date",
"to_time",
"uti",
"uuid_from_file",
"uuid",
"year",
]
exclusive = [
("burst", "not_burst"),
("cloudasset", "not_cloudasset"),
("favorite", "not_favorite"),
("has_comment", "no_comment"),
("has_likes", "no_likes"),
("hdr", "not_hdr"),
("hidden", "not_hidden"),
("in_album", "not_in_album"),
("incloud", "not_incloud"),
("live", "not_live"),
("location", "no_location"),
("keyword", "no_keyword"),
("missing", "not_missing"),
("only_photos", "only_movies"),
("panorama", "not_panorama"),
("portrait", "not_portrait"),
("screenshot", "not_screenshot"),
("selfie", "not_selfie"),
("shared", "not_shared"),
("slow_mo", "not_slow_mo"),
("time_lapse", "not_time_lapse"),
("is_reference", "not_reference"),
]
# print help if no non-exclusive term or a double exclusive term is given
# TODO: add option to validate requiring at least one query arg
if any(all([kwargs[b], kwargs[n]]) for b, n in exclusive) or any(
[
all([any(kwargs["title"]), kwargs["no_title"]]),
all([any(kwargs["description"]), kwargs["no_description"]]),
all([any(kwargs["place"]), kwargs["no_place"]]),
all([any(kwargs["keyword"]), kwargs["no_keyword"]]),
]
):
raise IncompatibleQueryOptions
# can also be used with --deleted/--not-deleted which are not part of
# standard query options
try:
if kwargs["deleted"] and kwargs["not_deleted"]:
raise IncompatibleQueryOptions
except KeyError:
pass
# actually have something to query
include_photos = True
include_movies = True # default searches for everything
if kwargs["only_movies"]:
include_photos = False
if kwargs["only_photos"]:
include_movies = False
# load UUIDs if necessary and append to any uuids passed with --uuid
uuid = None
if kwargs["uuid_from_file"]:
uuid_list = list(kwargs["uuid"]) # Click option is a tuple
uuid_list.extend(load_uuid_from_file(kwargs["uuid_from_file"]))
uuid = tuple(uuid_list)
query_fields = [field.name for field in dataclasses.fields(QueryOptions)]
query_dict = {field: kwargs.get(field) for field in query_fields}
query_dict["photos"] = include_photos
query_dict["movies"] = include_movies
query_dict["uuid"] = uuid
return QueryOptions(**query_dict)

View File

@ -28,7 +28,7 @@ from .color_themes import get_theme
from .common import DB_OPTION, THEME_OPTION, get_photos_db
from .export import export, render_and_validate_report
from .param_types import ExportDBType, TemplateString
from .report_writer import ReportWriterNoOp, report_writer_factory
from .report_writer import ReportWriterNoOp, export_report_writer_factory
from .rich_progress import rich_progress
from .verbose import get_verbose_console, verbose_print
@ -320,7 +320,7 @@ def process_files(
report = render_and_validate_report(
options.report, options.exiftool_path, export_dir
)
report_writer = report_writer_factory(report, options.append)
report_writer = export_report_writer_factory(report, options.append)
else:
report_writer = ReportWriterNoOp()

View File

@ -29,7 +29,6 @@ from osxphotos._constants import (
EXTENDED_ATTRIBUTE_NAMES_QUOTED,
OSXPHOTOS_EXPORT_DB,
POST_COMMAND_CATEGORIES,
PROFILE_SORT_KEYS,
SIDECAR_EXIFTOOL,
SIDECAR_JSON,
SIDECAR_XMP,
@ -86,7 +85,7 @@ from .common import (
from .help import ExportCommand, get_help_msg
from .list import _list_libraries
from .param_types import ExportDBType, FunctionCall, TemplateString
from .report_writer import ReportWriterNoOp, report_writer_factory
from .report_writer import ReportWriterNoOp, export_report_writer_factory
from .rich_progress import rich_progress
from .verbose import get_verbose_console, time_stamp, verbose_print
@ -1218,7 +1217,7 @@ def export(
if report:
report = render_and_validate_report(report, exiftool_path, dest)
report_writer = report_writer_factory(report, append)
report_writer = export_report_writer_factory(report, append)
else:
report_writer = ReportWriterNoOp()

View File

@ -36,7 +36,7 @@ from .click_rich_echo import (
from .color_themes import get_theme
from .export import render_and_validate_report
from .param_types import TemplateString
from .report_writer import report_writer_factory
from .report_writer import export_report_writer_factory
from .verbose import get_verbose_console, verbose_print
@ -439,7 +439,7 @@ def exportdb(
rich_echo(f"[error]No report results found for run ID {run_id}[/error]")
sys.exit(1)
try:
report_writer = report_writer_factory(report_filename, append=append)
report_writer = export_report_writer_factory(report_filename, append=append)
except ValueError as e:
rich_echo(f"[error]Error: {e}[/error]")
sys.exit(1)

View File

@ -1,6 +1,5 @@
"""repl command for osxphotos CLI"""
import dataclasses
import os
import os.path
import pathlib
@ -23,16 +22,13 @@ from .common import (
DB_ARGUMENT,
DB_OPTION,
DELETED_OPTIONS,
IncompatibleQueryOptions,
QUERY_OPTIONS,
get_photos_db,
load_uuid_from_file,
query_options_from_kwargs,
)
class IncompatibleQueryOptions(Exception):
pass
@click.command(name="repl")
@DB_OPTION
@click.pass_obj
@ -85,7 +81,7 @@ def repl(ctx, cli_obj, db, emacs, beta, **kwargs):
print("Getting photos")
tic = time.perf_counter()
try:
query_options = _query_options_from_kwargs(**kwargs)
query_options = query_options_from_kwargs(**kwargs)
except IncompatibleQueryOptions:
click.echo("Incompatible query options", err=True)
click.echo(ctx.obj.group.commands["repl"].get_help(ctx), err=True)
@ -211,99 +207,6 @@ def _spotlight_photo(photo: PhotoInfo):
photo_.spotlight()
def _query_options_from_kwargs(**kwargs) -> QueryOptions:
"""Validate query options and create a QueryOptions instance"""
# sanity check input args
nonexclusive = [
"added_after",
"added_before",
"added_in_last",
"album",
"duplicate",
"edited",
"exif",
"external_edit",
"folder",
"from_date",
"from_time",
"has_raw",
"keyword",
"label",
"max_size",
"min_size",
"name",
"person",
"query_eval",
"query_function",
"regex",
"selected",
"to_date",
"to_time",
"uti",
"uuid_from_file",
"uuid",
"year",
]
exclusive = [
("burst", "not_burst"),
("cloudasset", "not_cloudasset"),
("deleted", "deleted_only"),
("favorite", "not_favorite"),
("has_comment", "no_comment"),
("has_likes", "no_likes"),
("hdr", "not_hdr"),
("hidden", "not_hidden"),
("in_album", "not_in_album"),
("incloud", "not_incloud"),
("live", "not_live"),
("location", "no_location"),
("keyword", "no_keyword"),
("missing", "not_missing"),
("only_photos", "only_movies"),
("panorama", "not_panorama"),
("portrait", "not_portrait"),
("screenshot", "not_screenshot"),
("selfie", "not_selfie"),
("shared", "not_shared"),
("slow_mo", "not_slow_mo"),
("time_lapse", "not_time_lapse"),
("is_reference", "not_reference"),
]
# print help if no non-exclusive term or a double exclusive term is given
# TODO: add option to validate requiring at least one query arg
if any(all([kwargs[b], kwargs[n]]) for b, n in exclusive) or any(
[
all([any(kwargs["title"]), kwargs["no_title"]]),
all([any(kwargs["description"]), kwargs["no_description"]]),
all([any(kwargs["place"]), kwargs["no_place"]]),
all([any(kwargs["keyword"]), kwargs["no_keyword"]]),
]
):
raise IncompatibleQueryOptions
# actually have something to query
include_photos = True
include_movies = True # default searches for everything
if kwargs["only_movies"]:
include_photos = False
if kwargs["only_photos"]:
include_movies = False
# load UUIDs if necessary and append to any uuids passed with --uuid
uuid = None
if kwargs["uuid_from_file"]:
uuid_list = list(kwargs["uuid"]) # Click option is a tuple
uuid_list.extend(load_uuid_from_file(kwargs["uuid_from_file"]))
uuid = tuple(uuid_list)
query_fields = [field.name for field in dataclasses.fields(QueryOptions)]
query_dict = {field: kwargs.get(field) for field in query_fields}
query_dict["photos"] = include_photos
query_dict["movies"] = include_movies
query_dict["uuid"] = uuid
return QueryOptions(**query_dict)
def _query_photos(photosdb: PhotosDB, query_options: QueryOptions) -> List:
"""Query photos given a QueryOptions instance"""
try:

View File

@ -1,5 +1,6 @@
"""Report writer for the --report option of `osxphotos export`"""
from __future__ import annotations
import csv
import datetime
@ -15,20 +16,28 @@ from osxphotos.export_db import OSXPHOTOS_ABOUT_STRING
from osxphotos.photoexporter import ExportResults
from osxphotos.sqlite_utils import sqlite_columns
from .sync_results import SyncResults
__all__ = [
"report_writer_factory",
"ExportReportWriterCSV",
"ExportReportWriterJSON",
"ExportReportWriterSqlite",
"ReportWriterABC",
"ReportWriterCSV",
"ReportWriterSqlite",
"ReportWriterNoOp",
"SyncReportWriterCSV",
"SyncReportWriterJSON",
"SyncReportWriterSqlite",
"export_report_writer_factory",
"sync_report_writer_factory",
]
# Abstract base class for report writers
class ReportWriterABC(ABC):
"""Abstract base class for report writers"""
@abstractmethod
def write(self, export_results: ExportResults):
def write(self, results: ExportResults | SyncResults):
"""Write results to the output file"""
pass
@ -38,13 +47,16 @@ class ReportWriterABC(ABC):
pass
# Report writer that does nothing, used for --dry-run or when --report not specified
class ReportWriterNoOp(ABC):
"""Report writer that does nothing"""
def __init__(self):
pass
def write(self, export_results: ExportResults):
def write(self, results: ExportResults | SyncResults):
"""Write results to the output file"""
pass
@ -53,8 +65,9 @@ class ReportWriterNoOp(ABC):
pass
class ReportWriterCSV(ReportWriterABC):
"""Write CSV report file"""
# Classes for writing ExportResults to report file
class ExportReportWriterCSV(ReportWriterABC):
"""Write CSV report file for export results"""
def __init__(
self, output_file: Union[str, bytes, os.PathLike], append: bool = False
@ -95,7 +108,7 @@ class ReportWriterCSV(ReportWriterABC):
def write(self, export_results: ExportResults):
"""Write results to the output file"""
all_results = prepare_results_for_writing(export_results)
all_results = prepare_export_results_for_writing(export_results)
for data in list(all_results.values()):
self._csv_writer.writerow(data)
self._output_fh.flush()
@ -109,8 +122,8 @@ class ReportWriterCSV(ReportWriterABC):
self._output_fh.close()
class ReportWriterJSON(ReportWriterABC):
"""Write JSON report file"""
class ExportReportWriterJSON(ReportWriterABC):
"""Write JSON report file for export results"""
def __init__(
self, output_file: Union[str, bytes, os.PathLike], append: bool = False
@ -134,7 +147,9 @@ class ReportWriterJSON(ReportWriterABC):
def write(self, export_results: ExportResults):
"""Write results to the output file"""
all_results = prepare_results_for_writing(export_results, bool_values=True)
all_results = prepare_export_results_for_writing(
export_results, bool_values=True
)
for data in list(all_results.values()):
if self._first_record_written:
self._output_fh.write(",\n")
@ -153,8 +168,8 @@ class ReportWriterJSON(ReportWriterABC):
self.close()
class ReportWriterSQLite(ReportWriterABC):
"""Write sqlite report file"""
class ExportReportWriterSQLite(ReportWriterABC):
"""Write sqlite report file for export data"""
def __init__(
self, output_file: Union[str, bytes, os.PathLike], append: bool = False
@ -173,7 +188,7 @@ class ReportWriterSQLite(ReportWriterABC):
def write(self, export_results: ExportResults):
"""Write results to the output file"""
all_results = prepare_results_for_writing(export_results)
all_results = prepare_export_results_for_writing(export_results)
for data in list(all_results.values()):
data["report_id"] = self.report_id
cursor = self._conn.cursor()
@ -284,7 +299,7 @@ class ReportWriterSQLite(ReportWriterABC):
self.close()
def prepare_results_for_writing(
def prepare_export_results_for_writing(
export_results: ExportResults, bool_values: bool = False
) -> Dict:
"""Return all results for writing to report
@ -406,17 +421,250 @@ def prepare_results_for_writing(
return all_results
def report_writer_factory(
def export_report_writer_factory(
output_file: Union[str, bytes, os.PathLike], append: bool = False
) -> ReportWriterABC:
"""Return a ReportWriter instance appropriate for the output file type"""
output_type = os.path.splitext(output_file)[1]
output_type = output_type.lower()[1:]
if output_type == "csv":
return ReportWriterCSV(output_file, append)
return ExportReportWriterCSV(output_file, append)
elif output_type == "json":
return ReportWriterJSON(output_file, append)
return ExportReportWriterJSON(output_file, append)
elif output_type in ["sqlite", "db"]:
return ReportWriterSQLite(output_file, append)
return ExportReportWriterSQLite(output_file, append)
else:
raise ValueError(f"Unknown report file type: {output_file}")
# Classes for writing Sync results to a report file
class SyncReportWriterCSV(ReportWriterABC):
"""Write CSV report file"""
def __init__(
self, output_file: Union[str, bytes, os.PathLike], append: bool = False
):
self.output_file = output_file
self.append = append
mode = "a" if append else "w"
self._output_fh = open(self.output_file, mode)
def write(self, sync_results: SyncResults):
"""Write results to the output file"""
report_columns = sync_results.results_header
self._csv_writer = csv.DictWriter(self._output_fh, fieldnames=report_columns)
if not self.append:
self._csv_writer.writeheader()
for data in sync_results.results_list:
self._csv_writer.writerow(dict(zip(report_columns, data)))
self._output_fh.flush()
def close(self):
"""Close the output file"""
self._output_fh.close()
def __del__(self):
with suppress(Exception):
self._output_fh.close()
class SyncReportWriterJSON(ReportWriterABC):
"""Write JSON SyncResults report file"""
def __init__(
self, output_file: Union[str, bytes, os.PathLike], append: bool = False
):
self.output_file = output_file
self.append = append
self.indent = 4
self._first_record_written = False
if append:
with open(self.output_file, "r") as fh:
existing_data = json.load(fh)
self._output_fh = open(self.output_file, "w")
self._output_fh.write("[")
for data in existing_data:
self._output_fh.write(json.dumps(data, indent=self.indent))
self._output_fh.write(",\n")
else:
self._output_fh = open(self.output_file, "w")
self._output_fh.write("[")
def write(self, results: SyncResults):
"""Write results to the output file"""
# convert datetimes to strings
def default(o):
if isinstance(o, (datetime.date, datetime.datetime)):
return o.isoformat()
for data in list(results.results_dict.values()):
if self._first_record_written:
self._output_fh.write(",\n")
else:
self._first_record_written = True
self._output_fh.write(json.dumps(data, indent=self.indent, default=default))
self._output_fh.flush()
def close(self):
"""Close the output file"""
self._output_fh.write("]")
self._output_fh.close()
def __del__(self):
with suppress(Exception):
self.close()
class SyncReportWriterSQLite(ReportWriterABC):
"""Write sqlite SyncResults report file"""
def __init__(
self, output_file: Union[str, bytes, os.PathLike], append: bool = False
):
self.output_file = output_file
self.append = append
if not append:
with suppress(FileNotFoundError):
os.unlink(self.output_file)
self._conn = sqlite3.connect(self.output_file)
self._create_tables()
self.report_id = self._generate_report_id()
def write(self, results: SyncResults):
"""Write results to the output file"""
# insert rows of values into sqlite report table
for row in list(results.results_list):
report_id = self.report_id
data = [str(v) if v else "" for v in row]
cursor = self._conn.cursor()
cursor.execute(
"INSERT INTO report "
"(report_id, uuid, filename, fingerprint, updated, "
"albums_updated, albums_datetime, albums_before, albums_after, "
"description_updated, description_datetime, description_before, description_after, "
"favorite_updated, favorite_datetime, favorite_before, favorite_after, "
"keywords_updated, keywords_datetime, keywords_before, keywords_after, "
"title_updated, title_datetime, title_before, title_after)"
"VALUES "
"(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(report_id, *data),
)
self._conn.commit()
def close(self):
"""Close the output file"""
self._conn.close()
def _create_tables(self):
c = self._conn.cursor()
c.execute(
"""
CREATE TABLE IF NOT EXISTS report (
report_id TEXT,
uuid TEXT,
filename TEXT,
fingerprint TEXT,
updated INT,
albums_updated INT,
albums_datetime TEXT,
albums_before TEXT,
albums_after TEXT,
description_updated INT,
description_datetime TEXT,
description_before TEXT,
description_after TEXT,
favorite_updated INT,
favorite_datetime TEXT,
favorite_before TEXT,
favorite_after TEXT,
keywords_updated INT,
keywords_datetime TEXT,
keywords_before TEXT,
keywords_after TEXT,
title_updated INT,
title_datetime TEXT,
title_before TEXT,
title_after TEXT
);
"""
)
c.execute(
"""
CREATE TABLE IF NOT EXISTS about (
id INTEGER PRIMARY KEY,
about TEXT
);"""
)
c.execute(
"INSERT INTO about(about) VALUES (?);",
(f"OSXPhotos Sync Report. {OSXPHOTOS_ABOUT_STRING}",),
)
c.execute(
"""
CREATE TABLE IF NOT EXISTS report_id (
report_id INTEGER PRIMARY KEY,
datetime TEXT
);"""
)
self._conn.commit()
# create report_summary view
c.execute("DROP VIEW IF EXISTS report_summary;")
c.execute(
"""
CREATE VIEW report_summary AS
SELECT
r.report_id,
i.datetime AS report_datetime,
COUNT(r.uuid) as processed,
COUNT(CASE r.updated WHEN 'True' THEN 1 ELSE NULL END) as updated,
COUNT(case r.albums_updated WHEN 'True' THEN 1 ELSE NULL END) as albums_updated,
COUNT(case r.description_updated WHEN 'True' THEN 1 ELSE NULL END) as description_updated,
COUNT(case r.favorite_updated WHEN 'True' THEN 1 ELSE NULL END) as favorite_updated,
COUNT(case r.keywords_updated WHEN 'True' THEN 1 ELSE NULL END) as keywords_updated,
COUNT(case r.title_updated WHEN 'True' THEN 1 ELSE NULL END) as title_updated
FROM report as r
INNER JOIN report_id as i ON r.report_id = i.report_id
GROUP BY r.report_id;
"""
)
self._conn.commit()
def _generate_report_id(self) -> int:
"""Get a new report ID for this report"""
c = self._conn.cursor()
c.execute(
"INSERT INTO report_id(datetime) VALUES (?);",
(datetime.datetime.now().isoformat(),),
)
report_id = c.lastrowid
self._conn.commit()
return report_id
def __del__(self):
with suppress(Exception):
self.close()
def sync_report_writer_factory(
output_file: Union[str, bytes, os.PathLike], append: bool = False
) -> ReportWriterABC:
"""Return a ReportWriter instance appropriate for the output file type"""
output_type = os.path.splitext(output_file)[1]
output_type = output_type.lower()[1:]
if output_type == "csv":
return SyncReportWriterCSV(output_file, append)
elif output_type == "json":
return SyncReportWriterJSON(output_file, append)
elif output_type in ["sqlite", "db"]:
return SyncReportWriterSQLite(output_file, append)
else:
raise ValueError(f"Unknown report file type: {output_file}")

749
osxphotos/cli/sync.py Normal file
View File

@ -0,0 +1,749 @@
"""Sync metadata and albums between Photos libraries"""
from __future__ import annotations
import datetime
import json
import os
import pathlib
from typing import Any, Callable, Literal
import click
import photoscript
from osxphotos import PhotoInfo, PhotosDB, __version__
from osxphotos.photoinfo import PhotoInfoNone
from osxphotos.photosalbum import PhotosAlbum
from osxphotos.photosdb.photosdb_utils import get_db_version
from osxphotos.phototemplate import PhotoTemplate, RenderOptions
from osxphotos.sqlitekvstore import SQLiteKVStore
from osxphotos.utils import pluralize
from .click_rich_echo import (
rich_click_echo,
rich_echo_error,
set_rich_console,
set_rich_theme,
set_rich_timestamp,
)
from .color_themes import get_theme
from .common import DB_OPTION, QUERY_OPTIONS, THEME_OPTION, query_options_from_kwargs
from .param_types import TemplateString
from .report_writer import sync_report_writer_factory
from .rich_progress import rich_progress
from .sync_results import SYNC_PROPERTIES, SyncResults
from .verbose import get_verbose_console, verbose_print
SYNC_ABOUT_STRING = (
f"Sync Metadata Database created by osxphotos version {__version__} "
+ f"(https://github.com/RhetTbull/osxphotos) on {datetime.datetime.now()}"
)
SYNC_IMPORT_TYPES = [
"keywords",
"albums",
"title",
"description",
"favorite",
]
SYNC_IMPORT_TYPES_ALL = ["all"] + SYNC_IMPORT_TYPES
class SyncImportPath(click.ParamType):
"""A path to a Photos library or a metadata export file created by --export"""
name = "SYNC_IMPORT_PATH"
def convert(self, value, param, ctx):
try:
if not pathlib.Path(value).exists():
self.fail(f"{value} is not a file or directory")
value = str(pathlib.Path(value).expanduser().resolve())
# call get_import_type to raise exception if not a valid import type
get_import_type(value)
return value
except Exception as e:
self.fail(f"Could not determine import type for {value}: {e}")
class SyncImportType(click.ParamType):
"""A string indicating which metadata to set or merge from the import source"""
# valid values are specified in METADATA_IMPORT_TYPES_ALL
name = "SYNC_IMPORT_TYPE"
def convert(self, value, param, ctx):
try:
if value not in SYNC_IMPORT_TYPES_ALL:
values = [v.strip() for v in value.split(",")]
for v in values:
if v not in SYNC_IMPORT_TYPES_ALL:
self.fail(
f"{v} is not a valid import type, valid values are {', '.join(SYNC_IMPORT_TYPES_ALL)}"
)
return value
except Exception as e:
self.fail(f"Could not determine import type for {value}: {e}")
def render_and_validate_report(report: str) -> str:
"""Render a report file template and validate the filename
Args:
report: the template string
Returns:
the rendered report filename
Note:
Exits with error if the report filename is invalid
"""
# render report template and validate the filename
template = PhotoTemplate(PhotoInfoNone())
render_options = RenderOptions()
report_file, _ = template.render(report, options=render_options)
report = report_file[0]
if os.path.isdir(report):
rich_click_echo(
f"[error]Report '{report}' is a directory, must be file name",
err=True,
)
raise click.Abort()
return report
def parse_set_merge(values: tuple[str]) -> tuple[str]:
"""Parse --set and --merge options which may be passed individually or as a comma-separated list"""
new_values = []
for value in values:
new_values.extend([v.strip() for v in value.split(",")])
return tuple(new_values)
def open_metadata_db(db_path: str):
"""Open metadata database at db_path"""
metadata_db = SQLiteKVStore(
db_path,
wal=False, # don't use WAL to keep database a single file
)
if not metadata_db.about:
metadata_db.about = f"osxphotos metadata sync database\n{SYNC_ABOUT_STRING}"
return metadata_db
def key_from_photo(photo: PhotoInfo) -> str:
"""Return key for photo used to correlate photos between libraries"""
return f"{photo.fingerprint}:{photo.original_filename}"
def get_photo_metadata(photos: list[PhotoInfo]) -> str:
"""Return JSON string of metadata for photos; if more than one photo, merge metadata"""
if len(photos) == 1:
return photos[0].json()
# more than one photo with same fingerprint; merge metadata
merge_fields = ["keywords", "persons", "albums", "title", "description", "uuid"]
photos_dict = {}
for photo in photos:
data = photo.asdict()
for k, v in data.items():
if k not in photos_dict:
photos_dict[k] = v.copy() if isinstance(v, (list, dict)) else v
else:
# merge data if it's a merge field
if k in merge_fields and v:
if isinstance(v, (list, tuple)):
photos_dict[k] = sorted(list(set(photos_dict[k]) | set(v)))
else:
if v:
if not photos_dict[k]:
photos_dict[k] = v
elif photos_dict[k] and v != photos_dict[k]:
photos_dict[k] = f"{photos_dict[k]} {v}"
# convert photos_dict to JSON string
# wouldn't it be nice if json encoder handled datetimes...
def default(o):
if isinstance(o, (datetime.date, datetime.datetime)):
return o.isoformat()
return json.dumps(photos_dict, sort_keys=True, default=default)
def export_metadata(
photos: list[PhotoInfo], output_path: str, verbose: Callable[..., None]
):
"""Export metadata to metadata_db"""
metadata_db = open_metadata_db(output_path)
verbose(f"Exporting metadata to [filepath]{output_path}[/]")
num_photos = len(photos)
photo_word = pluralize(num_photos, "photo", "photos")
verbose(f"Analyzing [num]{num_photos}[/] {photo_word} to export")
verbose(f"Exporting [num]{len(photos)}[/] {photo_word} to {output_path}")
export_metadata_to_db(photos, metadata_db, progress=True)
rich_click_echo(
f"Done: exported metadata for [num]{len(photos)}[/] {photo_word} to [filepath]{output_path}[/]"
)
metadata_db.close()
def export_metadata_to_db(
photos: list[PhotoInfo],
metadata_db: SQLiteKVStore,
progress: bool = True,
):
"""Export metadata for photos to metadata database
Args:
photos: list of PhotoInfo objects
metadata_db: SQLiteKVStore object
progress: if True, show progress bar
"""
# it is possible to have multiple photos with the same fingerprint
# for example, the same photo was imported twice or the photo was duplicated in Photos
# in this case, we need to merge the metadata for the photos with the same fingerprint
# as there is no way to know which photo is the "correct" one
key_to_photos = {}
for photo in photos:
key = key_from_photo(photo)
if key in key_to_photos:
key_to_photos[key].append(photo)
else:
key_to_photos[key] = [photo]
with rich_progress(console=get_verbose_console(), mock=not progress) as progress:
task = progress.add_task("Exporting metadata", total=len(key_to_photos))
for key, key_photos in key_to_photos.items():
metadata_db[key] = get_photo_metadata(key_photos)
progress.advance(task)
def get_import_type(import_path: str) -> Literal["library", "export"]:
"""Determine if import_path is a Photos library, Photos database, or metadata export file"""
if pathlib.Path(import_path).is_dir():
if import_path.endswith(".photoslibrary"):
return "library"
else:
raise ValueError(
f"Unable to determine type of import library: {import_path}"
)
else:
# import_path is a file, need to determine if it's a Photos database or metadata export file
try:
get_db_version(import_path)
except Exception as e:
try:
db = SQLiteKVStore(import_path)
if db.about:
return "export"
else:
raise ValueError(
f"Unable to determine type of import file: {import_path}"
) from e
except Exception as e:
raise ValueError(
f"Unable to determine type of import file: {import_path}"
) from e
else:
return "library"
def import_metadata(
photos: list[PhotoInfo],
import_path: str,
set_: tuple[str, ...],
merge: tuple[str, ...],
dry_run: bool,
verbose: Callable[..., None],
) -> SyncResults:
"""Import metadata from metadata_db"""
import_type = get_import_type(import_path)
photo_word = pluralize(len(photos), "photo", "photos")
verbose(
f"Importing metadata for [num]{len(photos)}[/] {photo_word} from [filepath]{import_path}[/]"
)
# build mapping of key to photo
key_to_photo = {}
for photo in photos:
key = key_from_photo(photo)
if key in key_to_photo:
key_to_photo[key].append(photo)
else:
key_to_photo[key] = [photo]
# find keys in import_path that match keys in photos
if import_type == "library":
# create an in memory database of the import library
# so that the rest of the comparison code can be the same
photosdb = PhotosDB(import_path, verbose=verbose)
photos = photosdb.photos()
import_db = SQLiteKVStore(":memory:")
verbose(f"Loading metadata from import library: [filepath]{import_path}[/]")
export_metadata_to_db(photos, import_db, progress=False)
elif import_type == "export":
import_db = open_metadata_db(import_path)
else:
rich_echo_error(
f"Unable to determine type of import file: [filepath]{import_path}[/]"
)
raise click.Abort()
results = SyncResults()
for key, key_photos in key_to_photo.items():
if key in import_db:
# import metadata from import_db
for photo in key_photos:
rich_click_echo(
f"Importing metadata for [filename]{photo.original_filename}[/] ([uuid]{photo.uuid}[/])"
)
metadata = import_db[key]
results += import_metadata_for_photo(
photo, metadata, set_, merge, dry_run, verbose
)
else:
# unable to find metadata for photo in import_db
for photo in key_photos:
rich_click_echo(
f"Unable to find metadata for [filename]{photo.original_filename}[/] ([uuid]{photo.uuid}[/]) in [filepath]{import_path}[/]"
)
# find any keys in import_db that don't match keys in photos
for key in import_db.keys():
if key not in key_to_photo:
rich_click_echo(f"Unable to find [uuid]{key}[/] in current library.")
return results
def import_metadata_for_photo(
photo: PhotoInfo,
metadata: str,
set_: tuple[str, ...],
merge: tuple[str, ...],
dry_run: bool,
verbose: Callable[..., None],
) -> SyncResults:
"""Update metadata for photo from metadata
Args:
photo: PhotoInfo object
metadata: metadata to import (JSON string)
set_: tuple of metadata fields to set
merge: tuple of metadata fields to merge
dry_run: if True, don't actually update metadata
verbose: verbose function
"""
# convert metadata to dict
metadata = json.loads(metadata)
results = SyncResults()
if "albums" in set_ or "albums" in merge:
# behavior is the same for albums for set and merge:
# add photo to any new albums but do not remove from existing albums
results += _update_albums_for_photo(photo, metadata, dry_run, verbose)
results += _set_metadata_for_photo(photo, metadata, set_, dry_run, verbose)
results += _merge_metadata_for_photo(photo, metadata, merge, dry_run, verbose)
return results
def _update_albums_for_photo(
photo: PhotoInfo,
metadata: dict[str, Any],
dry_run: bool,
verbose: Callable[..., None],
) -> SyncResults:
"""Add photo to new albums if necessary"""
# add photo to any new albums but do not remove from existing albums
results = SyncResults()
value = sorted(metadata["albums"])
before = sorted(photo.albums)
albums_to_add = set(value) - set(before)
if not albums_to_add:
verbose(f"\tNothing to do for albums")
results.add_result(
photo.uuid,
photo.original_filename,
photo.fingerprint,
"albums",
False,
before,
value,
)
return results
for album in albums_to_add:
verbose(f"\tAdding to album [filepath]{album}[/]")
if not dry_run:
PhotosAlbum(album, verbose=lambda x: verbose(f"\t{x}"), rich=True).add(
photo
)
results.add_result(
photo.uuid,
photo.original_filename,
photo.fingerprint,
"albums",
True,
before,
value,
)
return results
def _set_metadata_for_photo(
photo: PhotoInfo,
metadata: dict[str, Any],
set_: tuple[str, ...],
dry_run: bool,
verbose: Callable[..., None],
) -> SyncResults:
"""Set metadata for photo"""
results = SyncResults()
photo_ = photoscript.Photo(photo.uuid)
for field in set_:
if field == "albums":
continue
value = metadata[field]
before = getattr(photo, field)
if isinstance(value, list):
value = sorted(value)
if isinstance(before, list):
before = sorted(before)
if value != before:
verbose(f"\tSetting {field} to {value} from {before}")
if not dry_run:
set_photo_property(photo_, field, value)
else:
verbose(f"\tNothing to do for {field}")
results.add_result(
photo.uuid,
photo.original_filename,
photo.fingerprint,
field,
value != before,
before,
value,
)
return results
def _merge_metadata_for_photo(
photo: PhotoInfo,
metadata: dict[str, Any],
merge: tuple[str, ...],
dry_run: bool,
verbose: Callable[..., None],
) -> SyncResults:
"""Merge metadata for photo"""
results = SyncResults()
photo_ = photoscript.Photo(photo.uuid)
for field in merge:
if field == "albums":
continue
value = metadata[field]
before = getattr(photo, field)
if isinstance(value, list):
value = sorted(value)
if isinstance(before, list):
before = sorted(before)
if value == before:
verbose(f"\tNothing to do for {field}")
results.add_result(
photo.uuid,
photo.original_filename,
photo.fingerprint,
field,
False,
before,
value,
)
continue
if isinstance(value, list) and isinstance(before, list):
new_value = sorted(set(value + before))
elif isinstance(before, bool):
new_value = value or bool(before)
elif isinstance(before, str):
value = value or ""
new_value = f"{before} {value}" if value and value not in before else before
elif before is None:
new_value = value
else:
rich_echo_error(
f"Unable to merge {field} for [filename]{photo.original_filename}[filename]"
)
raise click.Abort()
if new_value != before:
verbose(f"\tMerging {field} to {new_value} from {before}")
if not dry_run:
set_photo_property(photo_, field, new_value)
else:
# Merge'd value might still be the same as original value
# (e.g. if value is str and has previously been merged)
verbose(f"\tNothing to do for {field}")
results.add_result(
photo.uuid,
photo.original_filename,
photo.fingerprint,
field,
new_value != before,
before,
new_value,
)
return results
def set_photo_property(photo: photoscript.Photo, property: str, value: Any):
"""Set property on photo"""
# do some basic validation
if property == "keywords" and not isinstance(value, list):
raise ValueError(f"keywords must be a list, not {type(value)}")
elif property in {"title", "description"} and not isinstance(value, str):
raise ValueError(f"{property} must be a str, not {type(value)}")
elif property == "favorite":
value = bool(value)
elif property not in {"title", "description", "favorite", "keywords"}:
raise ValueError(f"Unknown property: {property}")
setattr(photo, property, value)
def print_import_summary(results: SyncResults):
"""Print summary of import results"""
summary = results.results_summary()
property_summary = ", ".join(
f"updated {property}: [num]{summary.get(property,0)}[/]"
for property in SYNC_PROPERTIES
)
rich_click_echo(
f"Processed [num]{summary['total']}[/] photos, updated: [num]{summary['updated']}[/], {property_summary}"
)
@click.command()
@click.option(
"--export",
"-e",
"export_path",
metavar="EXPORT_FILE",
help="Export metadata to file EXPORT_FILE for later use with --import. "
"The export file will be a SQLite database; it is recommended to use the "
".db extension though this is not required.",
type=click.Path(dir_okay=False, writable=True),
)
@click.option(
"--import",
"-i",
"import_path",
metavar="IMPORT_PATH",
help="Import metadata from file IMPORT_PATH. "
"IMPORT_PATH can a Photos library, a Photos database, or a metadata export file "
"created with --export.",
type=SyncImportPath(),
)
@click.option(
"--set",
"-s",
"set_",
metavar="METADATA",
multiple=True,
help="When used with --import, set metadata in local Photos library to match import data. "
"Multiple metadata properties can be specified by repeating the --set option "
"or by using a comma-separated list. "
f"METADATA can be one of: {', '.join(SYNC_IMPORT_TYPES_ALL)}. "
"For example, to set keywords and favorite, use `--set keywords --set favorite` "
"or `--set keywords,favorite`. "
"If `--set all` is specified, all metadata will be set. "
"Note that using --set overwrites any existing metadata in the local Photos library. "
"For example, if a photo is marked as favorite in the local library but not in the import source, "
"--set favorite will clear the favorite status in the local library. "
"The exception to this is that `--set album` will not remove the photo "
"from any existing albums in the local library but will add the photo to any new albums specified "
"in the import source."
"See also --merge.",
type=SyncImportType(),
)
@click.option(
"--merge",
"-m",
"merge",
metavar="METADATA",
multiple=True,
help="When used with --import, merge metadata in local Photos library with import data. "
"Multiple metadata properties can be specified by repeating the --merge option "
"or by using a comma-separated list. "
f"METADATA can be one of: {', '.join(SYNC_IMPORT_TYPES_ALL)}. "
"For example, to merge keywords and favorite, use `--merge keywords --merge favorite` "
"or `--merge keywords,favorite`. "
"If `--merge all` is specified, all metadata will be merged. "
"Note that using --merge does not overwrite any existing metadata in the local Photos library. "
"For example, if a photo is marked as favorite in the local library but not in the import source, "
"--merge favorite will not change the favorite status in the local library. "
"See also --set.",
type=SyncImportType(),
)
@click.option(
"--report",
"-R",
metavar="REPORT_FILE",
help="Write a report of all photos that were processed with --import. "
"The extension of the report filename will be used to determine the format. "
"Valid extensions are: "
".csv (CSV file), .json (JSON), .db and .sqlite (SQLite database). "
"REPORT_FILE may be a an osxphotos template string, for example, "
"--report 'update_{today.date}.csv' will write a CSV report file named with today's date. "
"See also --append.",
type=TemplateString(),
)
@click.option(
"--append",
"-A",
is_flag=True,
help="If used with --report, add data to existing report file instead of overwriting it. "
"See also --report.",
)
@click.option(
"--dry-run",
is_flag=True,
help="Dry run; " "when used with --import, don't actually update metadata.",
)
@click.option("--verbose", "-V", "verbose_", is_flag=True, help="Print verbose output.")
@click.option(
"--timestamp", "-T", is_flag=True, help="Add time stamp to verbose output."
)
@QUERY_OPTIONS
@DB_OPTION
@THEME_OPTION
@click.pass_obj
@click.pass_context
def sync(
ctx,
cli_obj,
db,
append,
dry_run,
export_path,
import_path,
merge,
report,
set_,
theme,
timestamp,
verbose_,
**kwargs, # query options
):
"""Sync metadata and albums between Photos libraries.
Use sync to update metadata in a local Photos library to match
metadata in another Photos library. The sync command works by
finding identical photos in the local library and the import source
and then updating the metadata in the local library to match the
metadata in the import source. Photos are considered identical if
their original filename and fingerprint match.
The import source can be a Photos library or a metadata export file
created with the --export option.
The sync command can be useful if you have imported the same photos to
multiple Photos libraries and want to keep the metadata in all libraries
in sync.
Metadata can be overwritten (--set) or merged (--merge) with the metadata
in the import source. You may specify specific metadata to sync or sync
all metadata. See --set and --merge for more details.
The sync command can be used to sync metadata between an iPhone or iPad
and a Mac, for example, in the case where you do not use iCloud but
manually import photos from your iPhone or iPad to your Mac. To do this,
you'll first need to copy the Photos database from the iPhone or iPad to
your Mac. This can be done by connecting your iPhone or iPad to your Mac
using a USB cable and then copying the Photos database from the iPhone
using a third-party tool such as iMazing (https://imazing.com/). You can
then use the sync command and set the import source to the Photos database
you copied from the iPhone or iPad.
The sync command can also be used to sync metadata between users using
iCloud Shared Photo Library. NOTE: This use case has not yet been
tested. If you use iCloud Shared Photo Library and would like to help
test this use case, please connect with me on GitHub:
https://github.com/RhetTbull/osxphotos/issues/887
You can run the --export and --import commands together. In this case,
the import will be run first and then the export will be run.
For example, if you want to sync two Photos libraries between users or
two different computers, you can export the metadata to a shared folder.
On the first computer, run:
osxphotos sync --export /path/to/export/folder/computer1.db --merge all --import /path/to/export/folder/computer2.db
On the second computer, run:
osxphotos sync --export /path/to/export/folder/computer2.db --merge all --import /path/to/export/folder/computer1.db
"""
color_theme = get_theme(theme)
verbose = verbose_print(
verbose_, timestamp, rich=True, theme=color_theme, highlight=False
)
# set console for rich_echo to be same as for verbose_
set_rich_console(get_verbose_console())
set_rich_theme(color_theme)
set_rich_timestamp(timestamp)
if (set_ or merge) and not import_path:
rich_echo_error("--set and --merge can only be used with --import")
ctx.exit(1)
set_ = parse_set_merge(set_)
merge = parse_set_merge(merge)
if "all" in set_:
set_ = tuple(SYNC_IMPORT_TYPES)
if "all" in merge:
merge = tuple(SYNC_IMPORT_TYPES)
if set_ and merge:
# fields in set cannot be in merge and vice versa
set_ = set(set_)
merge = set(merge)
if set_ & merge:
rich_echo_error(
"--set and --merge cannot be used with the same fields: "
f"set: {set_}, merge: {merge}"
)
ctx.exit(1)
if import_path:
query_options = query_options_from_kwargs(**kwargs)
photosdb = PhotosDB(dbfile=db, verbose=verbose)
photos = photosdb.query(query_options)
results = import_metadata(photos, import_path, set_, merge, dry_run, verbose)
if report:
report_path = render_and_validate_report(report)
verbose(f"Writing report to {report_path}")
report_writer = sync_report_writer_factory(report_path, append=append)
report_writer.write(results)
report_writer.close()
print_import_summary(results)
if export_path:
photosdb = PhotosDB(dbfile=db, verbose=verbose)
query_options = query_options_from_kwargs(**kwargs)
photos = photosdb.query(query_options)
export_metadata(photos, export_path, verbose)

View File

@ -0,0 +1,175 @@
"""SyncResults class for osxphotos sync command"""
from __future__ import annotations
import datetime
import json
from osxphotos.photoinfo import PhotoInfo
SYNC_PROPERTIES = [
"albums",
"description",
"favorite",
"keywords",
"title",
]
class SyncResults:
"""Results of sync set/merge"""
def __init__(self):
self._results = {}
self._datetime = datetime.datetime.now()
def add_result(
self,
uuid: str,
filename: str,
fingerprint: str,
property: str,
updated: bool,
before: str | list[str] | bool | None,
after: str | list[str] | bool | None,
):
"""Add result for a single photo"""
if uuid not in self._results:
self._results[uuid] = {
"filename": filename,
"fingerprint": fingerprint,
"properties": {
property: {
"updated": updated,
"datetime": datetime.datetime.now().isoformat(),
"before": before,
"after": after,
},
},
}
else:
self._results[uuid]["properties"][property] = {
"updated": updated,
"datetime": datetime.datetime.now().isoformat(),
"before": before,
"after": after,
}
@property
def results(self):
"""Return results"""
return self._results
@property
def results_list(self):
"""Return results as list lists where each sublist is values for a single photo"""
results = []
for uuid, record in self._results.items():
row = [
uuid,
record["filename"],
record["fingerprint"],
self._any_updated(uuid),
]
for property in SYNC_PROPERTIES:
if property in record["properties"]:
row.extend(
record["properties"][property][column]
for column in ["updated", "datetime", "before", "after"]
)
else:
row.extend([False, "", "", ""])
results.append(row)
return results
@property
def results_header(self):
"""Return headers for results_list"""
header = ["uuid", "filename", "fingerprint", "updated"]
for property in SYNC_PROPERTIES:
header.extend(
f"{property}_{column}"
for column in ["updated", "datetime", "before", "after"]
)
return header
@property
def results_dict(self):
"""Return dictionary of results"""
results = {}
for uuid, record in self._results.items():
results[uuid] = {
"uuid": uuid,
"filename": record["filename"],
"fingerprint": record["fingerprint"],
"updated": self._any_updated(uuid),
}
for property in SYNC_PROPERTIES:
if property in record["properties"]:
results[uuid][property] = record["properties"][property]
else:
results[uuid][property] = {
"updated": False,
"datetime": None,
"before": None,
"after": None,
}
return results
def results_summary(self):
"""Get summary of results"""
updated = sum(bool(self._any_updated(uuid)) for uuid in self._results.keys())
property_updated = {}
for property in SYNC_PROPERTIES:
property_updated[property] = 0
for uuid in self._results.keys():
if self._results[uuid]["properties"].get(property, {"updated": False})[
"updated"
]:
property_updated[property] += 1
return {
"total": len(self._results),
"updated": updated,
} | property_updated
def _any_updated(self, uuid: str) -> bool:
"""Return True if any property was updated for this photo"""
return any(
self._results[uuid]["properties"].get(property, {"updated": False})[
"updated"
]
for property in SYNC_PROPERTIES
)
def __add__(self, other):
"""Add results from another SyncResults"""
for uuid in other._results.keys():
for property, values in other._results[uuid]["properties"].items():
self.add_result(
uuid,
other._results[uuid]["filename"],
other._results[uuid]["fingerprint"],
property,
values["updated"],
values["before"],
values["after"],
)
return self
def __iadd__(self, other):
"""Add results from another SyncResults"""
for uuid in other._results.keys():
for property, values in other._results[uuid]["properties"].items():
self.add_result(
uuid,
other._results[uuid]["filename"],
other._results[uuid]["fingerprint"],
property,
values["updated"],
values["before"],
values["after"],
)
return self
def __str__(self):
return json.dumps(self._results, indent=2)

View File

@ -3,6 +3,7 @@
import logging
import pathlib
import plistlib
import sys
from .._constants import (
_PHOTOS_2_VERSION,
@ -62,7 +63,7 @@ def get_db_version(db_file):
if version not in _TESTED_DB_VERSIONS:
print(
f"WARNING: Only tested on database versions [{', '.join(_TESTED_DB_VERSIONS)}]"
+ f" You have database version={version} which has not been tested"
+ f" You have database version={version} which has not been tested", file=sys.stderr
)
return version

View File

@ -15,9 +15,12 @@ from .test_catalina_10_15_7 import UUID_DICT_LOCAL
# run timewarp tests (configured with --timewarp)
TEST_TIMEWARP = False
# run import tests (configured with --import)
# run import tests (configured with --test-import)
TEST_IMPORT = False
# run sync tests (configured with --test-sync)
TEST_SYNC = False
# don't clean up crash logs (configured with --no-cleanup)
NO_CLEANUP = False
@ -46,10 +49,12 @@ OS_VER = get_os_version()[1]
if OS_VER == "15":
TEST_LIBRARY = "tests/Test-10.15.7.photoslibrary"
TEST_LIBRARY_IMPORT = TEST_LIBRARY
TEST_LIBRARY_SYNC = TEST_LIBRARY
from tests.config_timewarp_catalina import TEST_LIBRARY_TIMEWARP
else:
TEST_LIBRARY = None
TEST_LIBRARY_TIMEWARP = None
TEST_LIBRARY_SYNC = None
# pytest.exit("This test suite currently only runs on MacOS Catalina ")
@ -67,6 +72,13 @@ def setup_photos_import():
copy_photos_library(TEST_LIBRARY_IMPORT, delay=10)
@pytest.fixture(scope="session", autouse=True)
def setup_photos_sync():
if not TEST_SYNC:
return
copy_photos_library(TEST_LIBRARY_SYNC, delay=10)
@pytest.fixture(autouse=True)
def reset_singletons():
"""Need to clean up any ExifTool singletons between tests"""
@ -89,6 +101,12 @@ def pytest_addoption(parser):
default=False,
help="run `osxphotos import` tests",
)
parser.addoption(
"--test-sync",
action="store_true",
default=False,
help="run `osxphotos sync` tests",
)
parser.addoption(
"--no-cleanup",
action="store_true",
@ -105,11 +123,14 @@ def pytest_configure(config):
config.getoption("--addalbum"),
config.getoption("--timewarp"),
config.getoption("--test-import"),
config.getoption("--test-sync"),
]
)
> 1
):
pytest.exit("--addalbum, --timewarp, --test-import are mutually exclusive")
pytest.exit(
"--addalbum, --timewarp, --test-import, --test-sync are mutually exclusive"
)
config.addinivalue_line(
"markers", "addalbum: mark test as requiring --addalbum to run"
@ -120,6 +141,9 @@ def pytest_configure(config):
config.addinivalue_line(
"markers", "test_import: mark test as requiring --test-import to run"
)
config.addinivalue_line(
"markers", "test_sync: mark test as requiring --test-sync to run"
)
# this is hacky but I can't figure out how to check config options in other fixtures
if config.getoption("--timewarp"):
@ -130,6 +154,10 @@ def pytest_configure(config):
global TEST_IMPORT
TEST_IMPORT = True
if config.getoption("--test-sync"):
global TEST_SYNC
TEST_SYNC = True
if config.getoption("--no-cleanup"):
global NO_CLEANUP
NO_CLEANUP = True
@ -160,6 +188,14 @@ def pytest_collection_modifyitems(config, items):
if "test_import" in item.keywords:
item.add_marker(skip_test_import)
if not (config.getoption("--test-sync") and TEST_LIBRARY_SYNC is not None):
skip_test_sync = pytest.mark.skip(
reason="need --test-sync option and MacOS Catalina to run"
)
for item in items:
if "test_sync" in item.keywords:
item.add_marker(skip_test_sync)
def copy_photos_library(photos_library, delay=0):
"""copy the test library and open Photos, returns path to copied library"""

108
tests/test_cli_sync.py Normal file
View File

@ -0,0 +1,108 @@
"""Test osxphotos sync command"""
import os
import json
import photoscript
import pytest
from click.testing import CliRunner
from osxphotos.cli.sync import sync
UUID_TEST_PHOTO_1 = "D79B8D77-BFFC-460B-9312-034F2877D35B" # Pumkins2.jpg
UUID_TEST_PHOTO_2 = "E9BC5C36-7CD1-40A1-A72B-8B8FAC227D51" # wedding.jpg
TEST_ALBUM_NAME = "SyncTestAlbum"
@pytest.mark.test_sync
def test_sync_export():
"""Test --export"""
with CliRunner().isolated_filesystem():
result = CliRunner().invoke(
sync,
[
"--export",
"test.db",
],
)
assert result.exit_code == 0
assert os.path.exists("test.db")
@pytest.mark.test_sync
def test_sync_export_import():
"""Test --export and --import"""
photoslib = photoscript.PhotosLibrary()
# create a new album and initialize metadata
test_album = photoslib.create_album(TEST_ALBUM_NAME)
for uuid in [UUID_TEST_PHOTO_1, UUID_TEST_PHOTO_2]:
photo = photoscript.Photo(uuid)
photo.favorite = True
test_album.add([photo])
# export data
with CliRunner().isolated_filesystem():
result = CliRunner().invoke(
sync,
[
"--export",
"test.db",
],
)
assert result.exit_code == 0
# preserve metadata for comparison and clear metadata
metadata_before = {}
for uuid in [UUID_TEST_PHOTO_1, UUID_TEST_PHOTO_2]:
photo = photoscript.Photo(uuid)
metadata_before[uuid] = {
"title": photo.title,
"description": photo.description,
"keywords": photo.keywords,
"favorites": photo.favorite,
}
photo.title = ""
photo.description = ""
photo.keywords = ["NewKeyword"]
photo.favorite = False
# delete the test album
photoslib.delete_album(test_album)
# import metadata
result = CliRunner().invoke(
sync,
[
"--import",
"test.db",
"--set",
"title,description,favorite,albums",
"--merge",
"keywords",
"--report",
"test_report.json",
],
)
assert result.exit_code == 0
assert os.path.exists("test_report.json")
# check metadata
for uuid in [UUID_TEST_PHOTO_1, UUID_TEST_PHOTO_2]:
photo = photoscript.Photo(uuid)
assert photo.title == metadata_before[uuid]["title"]
assert photo.description == metadata_before[uuid]["description"]
assert sorted(photo.keywords) == sorted(
["NewKeyword", *metadata_before[uuid]["keywords"]]
)
assert photo.favorite == metadata_before[uuid]["favorites"]
assert TEST_ALBUM_NAME in [album.title for album in photo.albums]
# check report
with open("test_report.json", "r") as f:
report = json.load(f)
report_data = {record["uuid"]: record for record in report}
for uuid in [UUID_TEST_PHOTO_1, UUID_TEST_PHOTO_2]:
assert report_data[uuid]["updated"]
assert report_data[uuid]["albums"]["updated"]