From 991511af0752580f665c16a70159cf3aff4028bf Mon Sep 17 00:00:00 2001 From: Rhet Turnbull Date: Mon, 22 Aug 2022 07:29:53 -0700 Subject: [PATCH] Added --report to import command (#766) --- osxphotos/cli/import_cli.py | 290 +++++++++++++++++++++++++++++++++++- tests/test_cli_import.py | 228 ++++++++++++++++++++++++++++ 2 files changed, 513 insertions(+), 5 deletions(-) diff --git a/osxphotos/cli/import_cli.py b/osxphotos/cli/import_cli.py index 09bbe537..6c6e93ee 100644 --- a/osxphotos/cli/import_cli.py +++ b/osxphotos/cli/import_cli.py @@ -1,16 +1,26 @@ """import command for osxphotos CLI to import photos into Photos""" +# Note: the style in this module is a bit different than much of the other osxphotos code +# As an experiment, I've used mostly functions instead of classes (e.g. the report writer +# functions vs ReportWriter class used by export) and I've kept everything for import +# self-contained in this one file + +import csv import datetime import fnmatch +import json import logging import os import os.path +import sqlite3 import sys import uuid from collections import namedtuple -from pathlib import Path +from contextlib import suppress +from dataclasses import asdict, dataclass, field +from pathlib import Path, PosixPath from textwrap import dedent -from typing import Callable, List, Optional, Tuple, Union +from typing import Callable, Dict, List, Optional, Tuple, Union import click from photoscript import Photo, PhotosLibrary @@ -18,13 +28,15 @@ from rich.console import Console from rich.markdown import Markdown from osxphotos._constants import _OSXPHOTOS_NONE_SENTINEL +from osxphotos._version import __version__ from osxphotos.cli.help import HELP_WIDTH +from osxphotos.cli.param_types import TemplateString from osxphotos.datetime_utils import datetime_naive_to_local from osxphotos.exiftool import ExifToolCaching, get_exiftool_path +from osxphotos.photoinfo import PhotoInfoNone from osxphotos.photosalbum import PhotosAlbumPhotoScript from osxphotos.phototemplate import PhotoTemplate, RenderOptions from osxphotos.utils import pluralize -from osxphotos.cli.param_types import TemplateString from .click_rich_echo import ( rich_click_echo, @@ -39,6 +51,8 @@ from .verbose import get_verbose_console, verbose_print MetaData = namedtuple("MetaData", ["title", "description", "keywords", "location"]) +OSXPHOTOS_ABOUT_STRING = f"Created by osxphotos version {__version__} (https://github.com/RhetTbull/osxphotos) on {datetime.datetime.now()}" + def echo(message, emoji=True, **kwargs): """Echo text with rich""" @@ -478,7 +492,6 @@ def check_templates_and_exit( description: Optional[str], keyword: Tuple[str], album: Tuple[str], - split_folder: Optional[str], exiftool_path: Optional[str], exiftool: bool, ): @@ -524,6 +537,240 @@ def check_templates_and_exit( sys.exit(0) +@dataclass +class ReportRecord: + albums: List[str] = field(default_factory=list) + description: str = "" + error: bool = False + filename: str = "" + filepath: Path = field(default_factory=Path) + import_datetime: datetime.datetime = datetime.datetime.now() + imported: bool = False + keywords: List[str] = field(default_factory=list) + location: Tuple[float, float] = field(default_factory=tuple) + title: str = "" + uuid: str = "" + + def asdict(self): + return asdict(self) + + def asjsondict(self): + """Return a JSON serializable dict""" + dict_data = self.asdict() + dict_data["filepath"] = str(dict_data["filepath"]) + dict_data["import_datetime"] = dict_data["import_datetime"].isoformat() + return dict_data + + +def update_report_record(report_record: ReportRecord, photo: Photo, filepath: Path): + """Update a ReportRecord with data from a Photo""" + report_record.albums = [a.title for a in photo.albums] + report_record.description = photo.description + report_record.filename = filepath.name + report_record.filepath = filepath + report_record.imported = True + report_record.keywords = photo.keywords + report_record.location = photo.location + report_record.title = photo.title + report_record.uuid = photo.uuid + + return report_record + + +def write_report(report_file: str, report_data: Dict[Path, ReportRecord], append: bool): + """Write report to file""" + report_type = os.path.splitext(report_file)[1][1:].lower() + if report_type == "csv": + write_csv_report(report_file, report_data, append) + elif report_type == "json": + write_json_report(report_file, report_data, append) + elif report_type in ["db", "sqlite"]: + write_sqlite_report(report_file, report_data, append) + else: + echo(f"Unknown report type: {report_type}", err=True) + raise click.Abort() + + +def write_csv_report( + report_file: str, report_data: Dict[Path, ReportRecord], append: bool +): + """Write report to csv file""" + with open(report_file, "a" if append else "w") as f: + writer = csv.writer(f) + if not append: + writer.writerow( + [ + "filepath", + "filename", + "datetime", + "uuid", + "imported", + "error", + "title", + "description", + "keywords", + "albums", + "location", + ] + ) + for report_record in report_data.values(): + writer.writerow( + [ + report_record.filepath, + report_record.filename, + report_record.import_datetime, + report_record.uuid, + report_record.imported, + report_record.error, + report_record.title, + report_record.description, + ",".join(report_record.keywords), + ",".join(report_record.albums), + report_record.location, + ] + ) + + +def write_json_report( + report_file: str, report_data: Dict[Path, ReportRecord], append: bool +): + """Write report to JSON file""" + records = [v.asjsondict() for v in report_data.values()] + if append: + with open(report_file, "r") as f: + existing_records = json.load(f) + records.extend(existing_records) + with open(report_file, "w") as f: + json.dump(records, f, indent=4) + + +def write_sqlite_report( + report_file: str, report_data: Dict[Path, ReportRecord], append: bool +): + """Write report to SQLite file""" + if not append: + with suppress(FileNotFoundError): + os.unlink(report_file) + + file_exists = os.path.isfile(report_file) + + conn = sqlite3.connect(report_file) + c = conn.cursor() + + if not append or not file_exists: + # Create the tables + c.execute( + """CREATE TABLE IF NOT EXISTS report ( + report_id INTEGER, + filepath TEXT, + filename TEXT, + datetime TEXT, + uuid TEXT, + imported INTEGER, + error INTEGER, + title TEXT, + description TEXT, + keywords TEXT, + albums TEXT, + location TEXT + )""" + ) + c.execute( + """ + CREATE TABLE IF NOT EXISTS about ( + id INTEGER PRIMARY KEY, + about TEXT + );""" + ) + c.execute( + "INSERT INTO about(about) VALUES (?);", + (f"OSXPhotos Import Report. {OSXPHOTOS_ABOUT_STRING}",), + ) + c.execute( + """ + CREATE TABLE IF NOT EXISTS report_id ( + report_id INTEGER PRIMARY KEY, + datetime TEXT + );""" + ) + + # Insert report_id + c.execute( + "INSERT INTO report_id(datetime) VALUES (?);", + (datetime.datetime.now().isoformat(),), + ) + report_id = c.lastrowid + + for report_record in report_data.values(): + c.execute( + """INSERT INTO report ( + report_id, + filepath, + filename, + datetime, + uuid, + imported, + error, + title, + description, + keywords, + albums, + location + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);""", + ( + report_id, + str(report_record.filepath), + report_record.filename, + report_record.import_datetime, + report_record.uuid, + report_record.imported, + report_record.error, + report_record.title, + report_record.description, + ",".join(report_record.keywords), + ",".join(report_record.albums), + f"{report_record.location[0]},{report_record.location[1]}", + ), + ) + conn.commit() + conn.close() + + +def render_and_validate_report(report: str) -> str: + """Render a report file template and validate the filename + + Args: + report: the template string + + Returns: + the rendered report filename + + Note: + Exits with error if the report filename is invalid + """ + # render report template and validate the filename + template = PhotoTemplate(PhotoInfoNone()) + render_options = RenderOptions(caller="import") + report_file, _ = template.render(report, options=render_options) + report = report_file[0] + + if os.path.isdir(report): + rich_click_echo( + f"[error]Report '{report}' is a directory, must be file name", + err=True, + ) + sys.exit(1) + + extension = os.path.splitext(report)[1] + if extension.lower() not in [".csv", ".json", ".db", ".sqlite"]: + rich_click_echo( + f"[error]Report '{report}' has invalid extension, must be .csv, .json, .db, or .sqlite", + err=True, + ) + sys.exit(1) + return report + + def filename_matches_patterns(filename: str, patterns: Tuple[str]) -> bool: """Return True if filename matches any pattern in patterns""" return any(fnmatch.fnmatch(filename, pattern) for pattern in patterns) @@ -911,6 +1158,24 @@ class ImportCommand(click.Command): "GLOB is a Unix shell-style glob pattern, for example: '--glob \"*.jpg\"'. " "GLOB may be repeated to import multiple patterns.", ) +@click.option( + "--report", + metavar="REPORT_FILE", + help="Write a report of all files that were imported. " + "The extension of the report filename will be used to determine the format. " + "Valid extensions are: " + ".csv (CSV file), .json (JSON), .db and .sqlite (SQLite database). " + "REPORT_FILE may be a template string (see Template System), for example, " + "--report 'export_{today.date}.csv' will write a CSV report file named with today's date. " + "See also --append.", + type=TemplateString(), +) +@click.option( + "--append", + is_flag=True, + help="If used with --report, add data to existing report file instead of overwriting it. " + "See also --report.", +) @click.option("--verbose", "-V", "verbose_", is_flag=True, help="Print verbose output.") @click.option( "--timestamp", "-T", is_flag=True, help="Add time stamp to verbose output" @@ -932,6 +1197,7 @@ def import_cli( ctx, cli_obj, album, + append, check_templates, clear_location, clear_metadata, @@ -946,6 +1212,7 @@ def import_cli( merge_keywords, no_progress, relative_to, + report, split_folder, theme, timestamp, @@ -977,6 +1244,7 @@ def import_cli( # _list_libraries() # return + report_file = render_and_validate_report(report) if report else None relative_to = Path(relative_to) if relative_to else None imported_count = 0 @@ -990,11 +1258,14 @@ def import_cli( description, keyword, album, - split_folder, exiftool_path, exiftool, ) + # initialize report data + # report data is set even if no report is generated + report_data: Dict[Path, ReportRecord] = {} + filecount = len(files) with rich_progress(console=get_verbose_console(), mock=no_progress) as progress: task = progress.add_task( @@ -1006,9 +1277,13 @@ def import_cli( relative_filepath = get_relative_filepath(filepath, relative_to) verbose(f"Importing [filepath]{filepath}[/]") + report_data[filepath] = ReportRecord( + filepath=filepath, filename=filepath.name + ) photo, error = import_photo(filepath, dup_check, verbose) if error: error_count += 1 + report_data[filepath].error = True continue imported_count += 1 @@ -1063,8 +1338,13 @@ def import_cli( verbose, ) + update_report_record(report_data[filepath], photo, filepath) progress.advance(task) + if report: + write_report(report_file, report_data, append) + verbose(f"Wrote import report to [filepath]{report_file}[/]") + echo( f"Done: imported [num]{imported_count}[/] {pluralize(imported_count, 'file', 'files')}, " f"[num]{error_count}[/] {pluralize(error_count, 'error', 'errors')}", diff --git a/tests/test_cli_import.py b/tests/test_cli_import.py index b593da3a..83ecf02c 100644 --- a/tests/test_cli_import.py +++ b/tests/test_cli_import.py @@ -1,10 +1,13 @@ """ Tests which require user interaction to run for osxphotos import command; run with pytest --test-import """ +import csv +import json import os import os.path import pathlib import re import shutil +import sqlite3 import time from tempfile import TemporaryDirectory from typing import Dict @@ -680,3 +683,228 @@ def test_import_function_template(): assert photo_1.filename == file_1 albums = [a.title for a in photo_1.albums] assert albums == ["MyAlbum"] + + +@pytest.mark.test_import +def test_import_report(): + """test import with --report option""" + + runner = CliRunner() + cwd = os.getcwd() + test_image_1 = os.path.join(cwd, TEST_IMAGE_1) + + with runner.isolated_filesystem(): + result = runner.invoke( + import_cli, + [ + test_image_1, + "--report", + "report.csv", + "--verbose", + ], + ) + assert result.exit_code == 0 + assert "Wrote import report" in result.output + assert os.path.exists("report.csv") + with open("report.csv", "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + filenames = [str(pathlib.Path(row["filename"]).name) for row in rows] + assert filenames == [pathlib.Path(TEST_IMAGE_1).name] + + # test report gets overwritten + result = runner.invoke( + import_cli, + [ + test_image_1, + "--report", + "report.csv", + "--verbose", + ], + ) + assert result.exit_code == 0 + with open("report.csv", "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + filenames = [str(pathlib.Path(row["filename"]).name) for row in rows] + assert filenames == [pathlib.Path(TEST_IMAGE_1).name] + + # test report with --append + result = runner.invoke( + import_cli, + [ + test_image_1, + "--report", + "report.csv", + "--append", + "--verbose", + ], + ) + assert result.exit_code == 0 + with open("report.csv", "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + filenames = [str(pathlib.Path(row["filename"]).name) for row in rows] + assert filenames == [ + pathlib.Path(TEST_IMAGE_1).name, + pathlib.Path(TEST_IMAGE_1).name, + ] + + +@pytest.mark.test_import +def test_import_report_json(): + """test import with --report option with json output""" + + runner = CliRunner() + cwd = os.getcwd() + test_image_1 = os.path.join(cwd, TEST_IMAGE_1) + + with runner.isolated_filesystem(): + result = runner.invoke( + import_cli, + [ + test_image_1, + "--report", + "report.json", + "--verbose", + ], + ) + assert result.exit_code == 0 + assert "Wrote import report" in result.output + assert os.path.exists("report.json") + with open("report.json", "r") as f: + rows = json.load(f) + filenames = [str(pathlib.Path(row["filename"]).name) for row in rows] + assert filenames == [pathlib.Path(TEST_IMAGE_1).name] + + # test report gets overwritten + result = runner.invoke( + import_cli, + [ + test_image_1, + "--report", + "report.json", + "--verbose", + ], + ) + assert result.exit_code == 0 + assert "Wrote import report" in result.output + assert os.path.exists("report.json") + with open("report.json", "r") as f: + rows = json.load(f) + filenames = [str(pathlib.Path(row["filename"]).name) for row in rows] + assert filenames == [pathlib.Path(TEST_IMAGE_1).name] + + # test report with --append + result = runner.invoke( + import_cli, + [ + test_image_1, + "--report", + "report.json", + "--append", + "--verbose", + ], + ) + assert result.exit_code == 0 + assert "Wrote import report" in result.output + assert os.path.exists("report.json") + with open("report.json", "r") as f: + rows = json.load(f) + filenames = [str(pathlib.Path(row["filename"]).name) for row in rows] + assert filenames == [ + pathlib.Path(TEST_IMAGE_1).name, + pathlib.Path(TEST_IMAGE_1).name, + ] + + +@pytest.mark.test_import +@pytest.mark.parametrize("report_file", ["report.db", "report.sqlite"]) +def test_import_report_sqlite(report_file): + """test import with --report option with sqlite output""" + + runner = CliRunner() + cwd = os.getcwd() + test_image_1 = os.path.join(cwd, TEST_IMAGE_1) + + with runner.isolated_filesystem(): + result = runner.invoke( + import_cli, + [ + test_image_1, + "--report", + report_file, + "--verbose", + ], + ) + assert result.exit_code == 0 + assert "Wrote import report" in result.output + assert os.path.exists(report_file) + conn = sqlite3.connect(report_file) + c = conn.cursor() + c.execute("SELECT filename FROM report") + filenames = [str(pathlib.Path(row[0]).name) for row in c.fetchall()] + assert filenames == [pathlib.Path(TEST_IMAGE_1).name] + + # test report gets overwritten + result = runner.invoke( + import_cli, + [ + test_image_1, + "--report", + report_file, + "--verbose", + ], + ) + assert result.exit_code == 0 + assert "Wrote import report" in result.output + assert os.path.exists(report_file) + conn = sqlite3.connect(report_file) + c = conn.cursor() + c.execute("SELECT filename FROM report") + filenames = [str(pathlib.Path(row[0]).name) for row in c.fetchall()] + assert filenames == [pathlib.Path(TEST_IMAGE_1).name] + + # test report with --append + result = runner.invoke( + import_cli, + [ + test_image_1, + "--report", + report_file, + "--append", + "--verbose", + ], + ) + assert result.exit_code == 0 + assert "Wrote import report" in result.output + assert os.path.exists(report_file) + conn = sqlite3.connect(report_file) + c = conn.cursor() + c.execute("SELECT filename FROM report") + filenames = [str(pathlib.Path(row[0]).name) for row in c.fetchall()] + assert filenames == [ + pathlib.Path(TEST_IMAGE_1).name, + pathlib.Path(TEST_IMAGE_1).name, + ] + + +@pytest.mark.test_import +def test_import_report_invalid_name(): + """test import with --report option with invalid report""" + + runner = CliRunner() + cwd = os.getcwd() + test_image_1 = os.path.join(cwd, TEST_IMAGE_1) + + with runner.isolated_filesystem(): + result = runner.invoke( + import_cli, + [ + test_image_1, + "--report", + "report", # invalid filename, no extension + "--verbose", + ], + ) + assert result.exit_code != 0