Added --report to import command (#766)

This commit is contained in:
Rhet Turnbull
2022-08-22 07:29:53 -07:00
committed by GitHub
parent 3c98906158
commit 991511af07
2 changed files with 513 additions and 5 deletions

View File

@@ -1,16 +1,26 @@
"""import command for osxphotos CLI to import photos into Photos"""
# Note: the style in this module is a bit different than much of the other osxphotos code
# As an experiment, I've used mostly functions instead of classes (e.g. the report writer
# functions vs ReportWriter class used by export) and I've kept everything for import
# self-contained in this one file
import csv
import datetime
import fnmatch
import json
import logging
import os
import os.path
import sqlite3
import sys
import uuid
from collections import namedtuple
from pathlib import Path
from contextlib import suppress
from dataclasses import asdict, dataclass, field
from pathlib import Path, PosixPath
from textwrap import dedent
from typing import Callable, List, Optional, Tuple, Union
from typing import Callable, Dict, List, Optional, Tuple, Union
import click
from photoscript import Photo, PhotosLibrary
@@ -18,13 +28,15 @@ from rich.console import Console
from rich.markdown import Markdown
from osxphotos._constants import _OSXPHOTOS_NONE_SENTINEL
from osxphotos._version import __version__
from osxphotos.cli.help import HELP_WIDTH
from osxphotos.cli.param_types import TemplateString
from osxphotos.datetime_utils import datetime_naive_to_local
from osxphotos.exiftool import ExifToolCaching, get_exiftool_path
from osxphotos.photoinfo import PhotoInfoNone
from osxphotos.photosalbum import PhotosAlbumPhotoScript
from osxphotos.phototemplate import PhotoTemplate, RenderOptions
from osxphotos.utils import pluralize
from osxphotos.cli.param_types import TemplateString
from .click_rich_echo import (
rich_click_echo,
@@ -39,6 +51,8 @@ from .verbose import get_verbose_console, verbose_print
MetaData = namedtuple("MetaData", ["title", "description", "keywords", "location"])
OSXPHOTOS_ABOUT_STRING = f"Created by osxphotos version {__version__} (https://github.com/RhetTbull/osxphotos) on {datetime.datetime.now()}"
def echo(message, emoji=True, **kwargs):
"""Echo text with rich"""
@@ -478,7 +492,6 @@ def check_templates_and_exit(
description: Optional[str],
keyword: Tuple[str],
album: Tuple[str],
split_folder: Optional[str],
exiftool_path: Optional[str],
exiftool: bool,
):
@@ -524,6 +537,240 @@ def check_templates_and_exit(
sys.exit(0)
@dataclass
class ReportRecord:
albums: List[str] = field(default_factory=list)
description: str = ""
error: bool = False
filename: str = ""
filepath: Path = field(default_factory=Path)
import_datetime: datetime.datetime = datetime.datetime.now()
imported: bool = False
keywords: List[str] = field(default_factory=list)
location: Tuple[float, float] = field(default_factory=tuple)
title: str = ""
uuid: str = ""
def asdict(self):
return asdict(self)
def asjsondict(self):
"""Return a JSON serializable dict"""
dict_data = self.asdict()
dict_data["filepath"] = str(dict_data["filepath"])
dict_data["import_datetime"] = dict_data["import_datetime"].isoformat()
return dict_data
def update_report_record(report_record: ReportRecord, photo: Photo, filepath: Path):
"""Update a ReportRecord with data from a Photo"""
report_record.albums = [a.title for a in photo.albums]
report_record.description = photo.description
report_record.filename = filepath.name
report_record.filepath = filepath
report_record.imported = True
report_record.keywords = photo.keywords
report_record.location = photo.location
report_record.title = photo.title
report_record.uuid = photo.uuid
return report_record
def write_report(report_file: str, report_data: Dict[Path, ReportRecord], append: bool):
"""Write report to file"""
report_type = os.path.splitext(report_file)[1][1:].lower()
if report_type == "csv":
write_csv_report(report_file, report_data, append)
elif report_type == "json":
write_json_report(report_file, report_data, append)
elif report_type in ["db", "sqlite"]:
write_sqlite_report(report_file, report_data, append)
else:
echo(f"Unknown report type: {report_type}", err=True)
raise click.Abort()
def write_csv_report(
report_file: str, report_data: Dict[Path, ReportRecord], append: bool
):
"""Write report to csv file"""
with open(report_file, "a" if append else "w") as f:
writer = csv.writer(f)
if not append:
writer.writerow(
[
"filepath",
"filename",
"datetime",
"uuid",
"imported",
"error",
"title",
"description",
"keywords",
"albums",
"location",
]
)
for report_record in report_data.values():
writer.writerow(
[
report_record.filepath,
report_record.filename,
report_record.import_datetime,
report_record.uuid,
report_record.imported,
report_record.error,
report_record.title,
report_record.description,
",".join(report_record.keywords),
",".join(report_record.albums),
report_record.location,
]
)
def write_json_report(
report_file: str, report_data: Dict[Path, ReportRecord], append: bool
):
"""Write report to JSON file"""
records = [v.asjsondict() for v in report_data.values()]
if append:
with open(report_file, "r") as f:
existing_records = json.load(f)
records.extend(existing_records)
with open(report_file, "w") as f:
json.dump(records, f, indent=4)
def write_sqlite_report(
report_file: str, report_data: Dict[Path, ReportRecord], append: bool
):
"""Write report to SQLite file"""
if not append:
with suppress(FileNotFoundError):
os.unlink(report_file)
file_exists = os.path.isfile(report_file)
conn = sqlite3.connect(report_file)
c = conn.cursor()
if not append or not file_exists:
# Create the tables
c.execute(
"""CREATE TABLE IF NOT EXISTS report (
report_id INTEGER,
filepath TEXT,
filename TEXT,
datetime TEXT,
uuid TEXT,
imported INTEGER,
error INTEGER,
title TEXT,
description TEXT,
keywords TEXT,
albums TEXT,
location TEXT
)"""
)
c.execute(
"""
CREATE TABLE IF NOT EXISTS about (
id INTEGER PRIMARY KEY,
about TEXT
);"""
)
c.execute(
"INSERT INTO about(about) VALUES (?);",
(f"OSXPhotos Import Report. {OSXPHOTOS_ABOUT_STRING}",),
)
c.execute(
"""
CREATE TABLE IF NOT EXISTS report_id (
report_id INTEGER PRIMARY KEY,
datetime TEXT
);"""
)
# Insert report_id
c.execute(
"INSERT INTO report_id(datetime) VALUES (?);",
(datetime.datetime.now().isoformat(),),
)
report_id = c.lastrowid
for report_record in report_data.values():
c.execute(
"""INSERT INTO report (
report_id,
filepath,
filename,
datetime,
uuid,
imported,
error,
title,
description,
keywords,
albums,
location
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);""",
(
report_id,
str(report_record.filepath),
report_record.filename,
report_record.import_datetime,
report_record.uuid,
report_record.imported,
report_record.error,
report_record.title,
report_record.description,
",".join(report_record.keywords),
",".join(report_record.albums),
f"{report_record.location[0]},{report_record.location[1]}",
),
)
conn.commit()
conn.close()
def render_and_validate_report(report: str) -> str:
"""Render a report file template and validate the filename
Args:
report: the template string
Returns:
the rendered report filename
Note:
Exits with error if the report filename is invalid
"""
# render report template and validate the filename
template = PhotoTemplate(PhotoInfoNone())
render_options = RenderOptions(caller="import")
report_file, _ = template.render(report, options=render_options)
report = report_file[0]
if os.path.isdir(report):
rich_click_echo(
f"[error]Report '{report}' is a directory, must be file name",
err=True,
)
sys.exit(1)
extension = os.path.splitext(report)[1]
if extension.lower() not in [".csv", ".json", ".db", ".sqlite"]:
rich_click_echo(
f"[error]Report '{report}' has invalid extension, must be .csv, .json, .db, or .sqlite",
err=True,
)
sys.exit(1)
return report
def filename_matches_patterns(filename: str, patterns: Tuple[str]) -> bool:
"""Return True if filename matches any pattern in patterns"""
return any(fnmatch.fnmatch(filename, pattern) for pattern in patterns)
@@ -911,6 +1158,24 @@ class ImportCommand(click.Command):
"GLOB is a Unix shell-style glob pattern, for example: '--glob \"*.jpg\"'. "
"GLOB may be repeated to import multiple patterns.",
)
@click.option(
"--report",
metavar="REPORT_FILE",
help="Write a report of all files that were imported. "
"The extension of the report filename will be used to determine the format. "
"Valid extensions are: "
".csv (CSV file), .json (JSON), .db and .sqlite (SQLite database). "
"REPORT_FILE may be a template string (see Template System), for example, "
"--report 'export_{today.date}.csv' will write a CSV report file named with today's date. "
"See also --append.",
type=TemplateString(),
)
@click.option(
"--append",
is_flag=True,
help="If used with --report, add data to existing report file instead of overwriting it. "
"See also --report.",
)
@click.option("--verbose", "-V", "verbose_", is_flag=True, help="Print verbose output.")
@click.option(
"--timestamp", "-T", is_flag=True, help="Add time stamp to verbose output"
@@ -932,6 +1197,7 @@ def import_cli(
ctx,
cli_obj,
album,
append,
check_templates,
clear_location,
clear_metadata,
@@ -946,6 +1212,7 @@ def import_cli(
merge_keywords,
no_progress,
relative_to,
report,
split_folder,
theme,
timestamp,
@@ -977,6 +1244,7 @@ def import_cli(
# _list_libraries()
# return
report_file = render_and_validate_report(report) if report else None
relative_to = Path(relative_to) if relative_to else None
imported_count = 0
@@ -990,11 +1258,14 @@ def import_cli(
description,
keyword,
album,
split_folder,
exiftool_path,
exiftool,
)
# initialize report data
# report data is set even if no report is generated
report_data: Dict[Path, ReportRecord] = {}
filecount = len(files)
with rich_progress(console=get_verbose_console(), mock=no_progress) as progress:
task = progress.add_task(
@@ -1006,9 +1277,13 @@ def import_cli(
relative_filepath = get_relative_filepath(filepath, relative_to)
verbose(f"Importing [filepath]{filepath}[/]")
report_data[filepath] = ReportRecord(
filepath=filepath, filename=filepath.name
)
photo, error = import_photo(filepath, dup_check, verbose)
if error:
error_count += 1
report_data[filepath].error = True
continue
imported_count += 1
@@ -1063,8 +1338,13 @@ def import_cli(
verbose,
)
update_report_record(report_data[filepath], photo, filepath)
progress.advance(task)
if report:
write_report(report_file, report_data, append)
verbose(f"Wrote import report to [filepath]{report_file}[/]")
echo(
f"Done: imported [num]{imported_count}[/] {pluralize(imported_count, 'file', 'files')}, "
f"[num]{error_count}[/] {pluralize(error_count, 'error', 'errors')}",

View File

@@ -1,10 +1,13 @@
""" Tests which require user interaction to run for osxphotos import command; run with pytest --test-import """
import csv
import json
import os
import os.path
import pathlib
import re
import shutil
import sqlite3
import time
from tempfile import TemporaryDirectory
from typing import Dict
@@ -680,3 +683,228 @@ def test_import_function_template():
assert photo_1.filename == file_1
albums = [a.title for a in photo_1.albums]
assert albums == ["MyAlbum"]
@pytest.mark.test_import
def test_import_report():
"""test import with --report option"""
runner = CliRunner()
cwd = os.getcwd()
test_image_1 = os.path.join(cwd, TEST_IMAGE_1)
with runner.isolated_filesystem():
result = runner.invoke(
import_cli,
[
test_image_1,
"--report",
"report.csv",
"--verbose",
],
)
assert result.exit_code == 0
assert "Wrote import report" in result.output
assert os.path.exists("report.csv")
with open("report.csv", "r") as f:
reader = csv.DictReader(f)
rows = list(reader)
filenames = [str(pathlib.Path(row["filename"]).name) for row in rows]
assert filenames == [pathlib.Path(TEST_IMAGE_1).name]
# test report gets overwritten
result = runner.invoke(
import_cli,
[
test_image_1,
"--report",
"report.csv",
"--verbose",
],
)
assert result.exit_code == 0
with open("report.csv", "r") as f:
reader = csv.DictReader(f)
rows = list(reader)
filenames = [str(pathlib.Path(row["filename"]).name) for row in rows]
assert filenames == [pathlib.Path(TEST_IMAGE_1).name]
# test report with --append
result = runner.invoke(
import_cli,
[
test_image_1,
"--report",
"report.csv",
"--append",
"--verbose",
],
)
assert result.exit_code == 0
with open("report.csv", "r") as f:
reader = csv.DictReader(f)
rows = list(reader)
filenames = [str(pathlib.Path(row["filename"]).name) for row in rows]
assert filenames == [
pathlib.Path(TEST_IMAGE_1).name,
pathlib.Path(TEST_IMAGE_1).name,
]
@pytest.mark.test_import
def test_import_report_json():
"""test import with --report option with json output"""
runner = CliRunner()
cwd = os.getcwd()
test_image_1 = os.path.join(cwd, TEST_IMAGE_1)
with runner.isolated_filesystem():
result = runner.invoke(
import_cli,
[
test_image_1,
"--report",
"report.json",
"--verbose",
],
)
assert result.exit_code == 0
assert "Wrote import report" in result.output
assert os.path.exists("report.json")
with open("report.json", "r") as f:
rows = json.load(f)
filenames = [str(pathlib.Path(row["filename"]).name) for row in rows]
assert filenames == [pathlib.Path(TEST_IMAGE_1).name]
# test report gets overwritten
result = runner.invoke(
import_cli,
[
test_image_1,
"--report",
"report.json",
"--verbose",
],
)
assert result.exit_code == 0
assert "Wrote import report" in result.output
assert os.path.exists("report.json")
with open("report.json", "r") as f:
rows = json.load(f)
filenames = [str(pathlib.Path(row["filename"]).name) for row in rows]
assert filenames == [pathlib.Path(TEST_IMAGE_1).name]
# test report with --append
result = runner.invoke(
import_cli,
[
test_image_1,
"--report",
"report.json",
"--append",
"--verbose",
],
)
assert result.exit_code == 0
assert "Wrote import report" in result.output
assert os.path.exists("report.json")
with open("report.json", "r") as f:
rows = json.load(f)
filenames = [str(pathlib.Path(row["filename"]).name) for row in rows]
assert filenames == [
pathlib.Path(TEST_IMAGE_1).name,
pathlib.Path(TEST_IMAGE_1).name,
]
@pytest.mark.test_import
@pytest.mark.parametrize("report_file", ["report.db", "report.sqlite"])
def test_import_report_sqlite(report_file):
"""test import with --report option with sqlite output"""
runner = CliRunner()
cwd = os.getcwd()
test_image_1 = os.path.join(cwd, TEST_IMAGE_1)
with runner.isolated_filesystem():
result = runner.invoke(
import_cli,
[
test_image_1,
"--report",
report_file,
"--verbose",
],
)
assert result.exit_code == 0
assert "Wrote import report" in result.output
assert os.path.exists(report_file)
conn = sqlite3.connect(report_file)
c = conn.cursor()
c.execute("SELECT filename FROM report")
filenames = [str(pathlib.Path(row[0]).name) for row in c.fetchall()]
assert filenames == [pathlib.Path(TEST_IMAGE_1).name]
# test report gets overwritten
result = runner.invoke(
import_cli,
[
test_image_1,
"--report",
report_file,
"--verbose",
],
)
assert result.exit_code == 0
assert "Wrote import report" in result.output
assert os.path.exists(report_file)
conn = sqlite3.connect(report_file)
c = conn.cursor()
c.execute("SELECT filename FROM report")
filenames = [str(pathlib.Path(row[0]).name) for row in c.fetchall()]
assert filenames == [pathlib.Path(TEST_IMAGE_1).name]
# test report with --append
result = runner.invoke(
import_cli,
[
test_image_1,
"--report",
report_file,
"--append",
"--verbose",
],
)
assert result.exit_code == 0
assert "Wrote import report" in result.output
assert os.path.exists(report_file)
conn = sqlite3.connect(report_file)
c = conn.cursor()
c.execute("SELECT filename FROM report")
filenames = [str(pathlib.Path(row[0]).name) for row in c.fetchall()]
assert filenames == [
pathlib.Path(TEST_IMAGE_1).name,
pathlib.Path(TEST_IMAGE_1).name,
]
@pytest.mark.test_import
def test_import_report_invalid_name():
"""test import with --report option with invalid report"""
runner = CliRunner()
cwd = os.getcwd()
test_image_1 = os.path.join(cwd, TEST_IMAGE_1)
with runner.isolated_filesystem():
result = runner.invoke(
import_cli,
[
test_image_1,
"--report",
"report", # invalid filename, no extension
"--verbose",
],
)
assert result.exit_code != 0