* Initial implementation of ReportWriter for #309 * Initial implementation of ReportWriterJSON * Added sqlite report format * Added auto-flush to report writer, fixed exportdb --info to output json * Added exportdb --report
This commit is contained in:
@@ -2103,6 +2103,7 @@ Attributes:
|
||||
|
||||
`ExportResults` has the following properties:
|
||||
|
||||
* datetime: date/time of export in ISO 8601 format
|
||||
* exported: list of all exported files (A single call to export could export more than one file, e.g. original file, preview, live video, raw, etc.)
|
||||
* new: list of new files exported when used with update=True
|
||||
* updated: list of updated files when used with update=True
|
||||
|
||||
@@ -4161,6 +4161,7 @@ Attributes:
|
||||
|
||||
`ExportResults` has the following properties:
|
||||
|
||||
* datetime: date/time of export in ISO 8601 format
|
||||
* exported: list of all exported files (A single call to export could export more than one file, e.g. original file, preview, live video, raw, etc.)
|
||||
* new: list of new files exported when used with update=True
|
||||
* updated: list of updated files when used with update=True
|
||||
|
||||
@@ -86,6 +86,7 @@ from .common import (
|
||||
from .help import ExportCommand, get_help_msg
|
||||
from .list import _list_libraries
|
||||
from .param_types import ExportDBType, FunctionCall, TemplateString
|
||||
from .report_writer import report_writer_factory, ReportWriterNoOp
|
||||
from .rich_progress import rich_progress
|
||||
from .verbose import get_verbose_console, time_stamp, verbose_print
|
||||
|
||||
@@ -514,11 +515,21 @@ from .verbose import get_verbose_console, time_stamp, verbose_print
|
||||
@click.option(
|
||||
"--report",
|
||||
metavar="REPORT_FILE",
|
||||
help="Write a CSV formatted report of all files that were exported. "
|
||||
help="Write a report of all files that were exported. "
|
||||
"The extension of the report filename will be used to determine the format. "
|
||||
"Valid extensions are: "
|
||||
".csv (CSV file), .json (JSON), .db and .sqlite (SQLite database). "
|
||||
"REPORT_FILE may be a template string (see Templating System), for example, "
|
||||
"--report 'export_{today.date}.csv' will write a report file named with today's date.",
|
||||
"--report 'export_{today.date}.csv' will write a CSV report file named with today's date. "
|
||||
"See also --append.",
|
||||
type=TemplateString(),
|
||||
)
|
||||
@click.option(
|
||||
"--append",
|
||||
is_flag=True,
|
||||
help="If used with --report, add data to existing report file instead of overwriting it. "
|
||||
"See also --report.",
|
||||
)
|
||||
@click.option(
|
||||
"--cleanup",
|
||||
is_flag=True,
|
||||
@@ -690,6 +701,7 @@ def export(
|
||||
added_in_last,
|
||||
album_keyword,
|
||||
album,
|
||||
append,
|
||||
beta,
|
||||
burst,
|
||||
cleanup,
|
||||
@@ -909,6 +921,7 @@ def export(
|
||||
add_skipped_to_album = cfg.add_skipped_to_album
|
||||
album = cfg.album
|
||||
album_keyword = cfg.album_keyword
|
||||
append = cfg.append
|
||||
beta = cfg.beta
|
||||
burst = cfg.burst
|
||||
cleanup = cfg.cleanup
|
||||
@@ -1094,6 +1107,7 @@ def export(
|
||||
("jpeg_quality", ("convert_to_jpeg")),
|
||||
("missing", ("download_missing", "use_photos_export")),
|
||||
("only_new", ("update", "force_update")),
|
||||
("append", ("report")),
|
||||
]
|
||||
try:
|
||||
cfg.validate(exclusive=exclusive_options, dependent=dependent_options, cli=True)
|
||||
@@ -1154,6 +1168,9 @@ def export(
|
||||
|
||||
if report:
|
||||
report = render_and_validate_report(report, exiftool_path, dest)
|
||||
report_writer = report_writer_factory(report, append)
|
||||
else:
|
||||
report_writer = ReportWriterNoOp()
|
||||
|
||||
# if use_photokit and not check_photokit_authorization():
|
||||
# click.echo(
|
||||
@@ -1575,6 +1592,8 @@ def export(
|
||||
export_dir=dest,
|
||||
verbose_=verbose_,
|
||||
)
|
||||
export_results.xattr_written.extend(tags_written)
|
||||
export_results.xattr_skipped.extend(tags_skipped)
|
||||
results.xattr_written.extend(tags_written)
|
||||
results.xattr_skipped.extend(tags_skipped)
|
||||
|
||||
@@ -1587,9 +1606,13 @@ def export(
|
||||
export_dir=dest,
|
||||
verbose_=verbose_,
|
||||
)
|
||||
export_results.xattr_written.extend(xattr_written)
|
||||
export_results.xattr_skipped.extend(xattr_skipped)
|
||||
results.xattr_written.extend(xattr_written)
|
||||
results.xattr_skipped.extend(xattr_skipped)
|
||||
|
||||
report_writer.write(export_results)
|
||||
|
||||
progress.advance(task)
|
||||
|
||||
# handle limit
|
||||
@@ -1602,6 +1625,9 @@ def export(
|
||||
progress.advance(task, num_photos - photo_num)
|
||||
break
|
||||
|
||||
# store results so they can be used by `osxphotos exportdb --report`
|
||||
export_db.set_export_results(results)
|
||||
|
||||
photo_str_total = pluralize(len(photos), "photo", "photos")
|
||||
if update or force_update:
|
||||
summary = (
|
||||
@@ -1661,12 +1687,15 @@ def export(
|
||||
rich_echo(
|
||||
f"Deleted: [num]{len(cleaned_files)}[/num] {file_str}, [num]{len(cleaned_dirs)}[/num] {dir_str}"
|
||||
)
|
||||
report_writer.write(
|
||||
ExportResults(deleted_files=cleaned_files, deleted_directories=cleaned_dirs)
|
||||
)
|
||||
results.deleted_files = cleaned_files
|
||||
results.deleted_directories = cleaned_dirs
|
||||
|
||||
if report:
|
||||
verbose_(f"Writing export report to [filepath]{report}")
|
||||
write_export_report(report, results)
|
||||
verbose_(f"Wrote export report to [filepath]{report}")
|
||||
report_writer.close()
|
||||
|
||||
# close export_db and write changes if needed
|
||||
if ramdb and not dry_run:
|
||||
@@ -1675,18 +1704,6 @@ def export(
|
||||
export_db.close()
|
||||
|
||||
|
||||
def _export_with_profiler(args: Dict):
|
||||
""" "Run export with cProfile"""
|
||||
try:
|
||||
args.pop("profile")
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
cProfile.runctx(
|
||||
"_export(**args)", globals=globals(), locals=locals(), sort="tottime"
|
||||
)
|
||||
|
||||
|
||||
def export_photo(
|
||||
photo=None,
|
||||
dest=None,
|
||||
@@ -2436,150 +2453,6 @@ def find_files_in_branch(pathname, filename):
|
||||
return files
|
||||
|
||||
|
||||
def write_export_report(report_file, results):
|
||||
|
||||
"""write CSV report with results from export
|
||||
|
||||
Args:
|
||||
report_file: path to report file
|
||||
results: ExportResults object
|
||||
"""
|
||||
|
||||
# Collect results for reporting
|
||||
all_results = {
|
||||
result: {
|
||||
"filename": result,
|
||||
"exported": 0,
|
||||
"new": 0,
|
||||
"updated": 0,
|
||||
"skipped": 0,
|
||||
"exif_updated": 0,
|
||||
"touched": 0,
|
||||
"converted_to_jpeg": 0,
|
||||
"sidecar_xmp": 0,
|
||||
"sidecar_json": 0,
|
||||
"sidecar_exiftool": 0,
|
||||
"missing": 0,
|
||||
"error": "",
|
||||
"exiftool_warning": "",
|
||||
"exiftool_error": "",
|
||||
"extended_attributes_written": 0,
|
||||
"extended_attributes_skipped": 0,
|
||||
"cleanup_deleted_file": 0,
|
||||
"cleanup_deleted_directory": 0,
|
||||
"exported_album": "",
|
||||
}
|
||||
for result in results.all_files()
|
||||
+ results.deleted_files
|
||||
+ results.deleted_directories
|
||||
}
|
||||
|
||||
for result in results.exported:
|
||||
all_results[result]["exported"] = 1
|
||||
|
||||
for result in results.new:
|
||||
all_results[result]["new"] = 1
|
||||
|
||||
for result in results.updated:
|
||||
all_results[result]["updated"] = 1
|
||||
|
||||
for result in results.skipped:
|
||||
all_results[result]["skipped"] = 1
|
||||
|
||||
for result in results.exif_updated:
|
||||
all_results[result]["exif_updated"] = 1
|
||||
|
||||
for result in results.touched:
|
||||
all_results[result]["touched"] = 1
|
||||
|
||||
for result in results.converted_to_jpeg:
|
||||
all_results[result]["converted_to_jpeg"] = 1
|
||||
|
||||
for result in results.sidecar_xmp_written:
|
||||
all_results[result]["sidecar_xmp"] = 1
|
||||
all_results[result]["exported"] = 1
|
||||
|
||||
for result in results.sidecar_xmp_skipped:
|
||||
all_results[result]["sidecar_xmp"] = 1
|
||||
all_results[result]["skipped"] = 1
|
||||
|
||||
for result in results.sidecar_json_written:
|
||||
all_results[result]["sidecar_json"] = 1
|
||||
all_results[result]["exported"] = 1
|
||||
|
||||
for result in results.sidecar_json_skipped:
|
||||
all_results[result]["sidecar_json"] = 1
|
||||
all_results[result]["skipped"] = 1
|
||||
|
||||
for result in results.sidecar_exiftool_written:
|
||||
all_results[result]["sidecar_exiftool"] = 1
|
||||
all_results[result]["exported"] = 1
|
||||
|
||||
for result in results.sidecar_exiftool_skipped:
|
||||
all_results[result]["sidecar_exiftool"] = 1
|
||||
all_results[result]["skipped"] = 1
|
||||
|
||||
for result in results.missing:
|
||||
all_results[result]["missing"] = 1
|
||||
|
||||
for result in results.error:
|
||||
all_results[result[0]]["error"] = result[1]
|
||||
|
||||
for result in results.exiftool_warning:
|
||||
all_results[result[0]]["exiftool_warning"] = result[1]
|
||||
|
||||
for result in results.exiftool_error:
|
||||
all_results[result[0]]["exiftool_error"] = result[1]
|
||||
|
||||
for result in results.xattr_written:
|
||||
all_results[result]["extended_attributes_written"] = 1
|
||||
|
||||
for result in results.xattr_skipped:
|
||||
all_results[result]["extended_attributes_skipped"] = 1
|
||||
|
||||
for result in results.deleted_files:
|
||||
all_results[result]["cleanup_deleted_file"] = 1
|
||||
|
||||
for result in results.deleted_directories:
|
||||
all_results[result]["cleanup_deleted_directory"] = 1
|
||||
|
||||
for result, album in results.exported_album:
|
||||
all_results[result]["exported_album"] = album
|
||||
|
||||
report_columns = [
|
||||
"filename",
|
||||
"exported",
|
||||
"new",
|
||||
"updated",
|
||||
"skipped",
|
||||
"exif_updated",
|
||||
"touched",
|
||||
"converted_to_jpeg",
|
||||
"sidecar_xmp",
|
||||
"sidecar_json",
|
||||
"sidecar_exiftool",
|
||||
"missing",
|
||||
"error",
|
||||
"exiftool_warning",
|
||||
"exiftool_error",
|
||||
"extended_attributes_written",
|
||||
"extended_attributes_skipped",
|
||||
"cleanup_deleted_file",
|
||||
"cleanup_deleted_directory",
|
||||
"exported_album",
|
||||
]
|
||||
|
||||
try:
|
||||
with open(report_file, "w") as csvfile:
|
||||
writer = csv.DictWriter(csvfile, fieldnames=report_columns)
|
||||
writer.writeheader()
|
||||
for data in [result for result in all_results.values()]:
|
||||
writer.writerow(data)
|
||||
except IOError:
|
||||
rich_echo_error("[error]Could not open output file for writing"),
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def cleanup_files(dest_path, files_to_keep, fileutil, verbose_):
|
||||
"""cleanup dest_path by deleting and files and empty directories
|
||||
not in files_to_keep
|
||||
@@ -2855,3 +2728,15 @@ def render_and_validate_report(report: str, exiftool_path: str, export_dir: str)
|
||||
sys.exit(1)
|
||||
|
||||
return report
|
||||
|
||||
|
||||
# def _export_with_profiler(args: Dict):
|
||||
# """ "Run export with cProfile"""
|
||||
# try:
|
||||
# args.pop("profile")
|
||||
# except KeyError:
|
||||
# pass
|
||||
|
||||
# cProfile.runctx(
|
||||
# "_export(**args)", globals=globals(), locals=locals(), sort="tottime"
|
||||
# )
|
||||
|
||||
@@ -8,7 +8,11 @@ from rich import print
|
||||
|
||||
from osxphotos._constants import OSXPHOTOS_EXPORT_DB
|
||||
from osxphotos._version import __version__
|
||||
from osxphotos.export_db import OSXPHOTOS_EXPORTDB_VERSION, ExportDB
|
||||
from osxphotos.export_db import (
|
||||
MAX_EXPORT_RESULTS_DATA_ROWS,
|
||||
OSXPHOTOS_EXPORTDB_VERSION,
|
||||
ExportDB,
|
||||
)
|
||||
from osxphotos.export_db_utils import (
|
||||
export_db_check_signatures,
|
||||
export_db_get_last_run,
|
||||
@@ -20,6 +24,9 @@ from osxphotos.export_db_utils import (
|
||||
)
|
||||
|
||||
from .common import OSXPHOTOS_HIDDEN
|
||||
from .export import render_and_validate_report
|
||||
from .param_types import TemplateString
|
||||
from .report_writer import report_writer_factory
|
||||
from .verbose import verbose_print
|
||||
|
||||
|
||||
@@ -57,6 +64,23 @@ from .verbose import verbose_print
|
||||
nargs=1,
|
||||
help="Print information about FILE_PATH contained in the database.",
|
||||
)
|
||||
@click.option(
|
||||
"--report",
|
||||
metavar="REPORT_FILE RUN_ID",
|
||||
help="Generate an export report as `osxphotos export ... --report REPORT_FILE` would have done. "
|
||||
"This allows you to re-create an export report if you didn't use the --report option "
|
||||
"when running `osxphotos export`. "
|
||||
"The extension of the report file is used to determine the format. "
|
||||
"Valid extensions are: "
|
||||
".csv (CSV file), .json (JSON), .db and .sqlite (SQLite database). "
|
||||
f"RUN_ID may be any integer from {-MAX_EXPORT_RESULTS_DATA_ROWS} to 0 specifying which run to use. "
|
||||
"For example, `--report report.csv 0` will generate a CSV report for the last run and "
|
||||
"`--report report.json -1` will generate a JSON report for the second-to-last run "
|
||||
"(one run prior to last run). "
|
||||
"REPORT_FILE may be a template string (see Templating System), for example, "
|
||||
"--report 'export_{today.date}.csv' will write a CSV report file named with today's date. ",
|
||||
type=(TemplateString(), click.IntRange(-MAX_EXPORT_RESULTS_DATA_ROWS, 0)),
|
||||
)
|
||||
@click.option(
|
||||
"--migrate",
|
||||
is_flag=True,
|
||||
@@ -88,6 +112,7 @@ def exportdb(
|
||||
last_run,
|
||||
save_config,
|
||||
info,
|
||||
report,
|
||||
migrate,
|
||||
sql,
|
||||
export_dir,
|
||||
@@ -112,15 +137,20 @@ def exportdb(
|
||||
export_dir = export_dir or export_db.parent
|
||||
|
||||
sub_commands = [
|
||||
version,
|
||||
check_signatures,
|
||||
update_signatures,
|
||||
touch_file,
|
||||
last_run,
|
||||
bool(save_config),
|
||||
bool(info),
|
||||
migrate,
|
||||
bool(sql),
|
||||
bool(cmd)
|
||||
for cmd in [
|
||||
check_signatures,
|
||||
info,
|
||||
last_run,
|
||||
migrate,
|
||||
report,
|
||||
save_config,
|
||||
sql,
|
||||
touch_file,
|
||||
update_signatures,
|
||||
vacuum,
|
||||
version,
|
||||
]
|
||||
]
|
||||
if sum(sub_commands) > 1:
|
||||
print("[red]Only a single sub-command may be specified at a time[/red]")
|
||||
@@ -221,11 +251,29 @@ def exportdb(
|
||||
sys.exit(1)
|
||||
else:
|
||||
if info_rec:
|
||||
print(info_rec.asdict())
|
||||
print(info_rec.json(indent=2))
|
||||
else:
|
||||
print(f"[red]File '{info}' not found in export database[/red]")
|
||||
sys.exit(0)
|
||||
|
||||
if report:
|
||||
exportdb = ExportDB(export_db, export_dir)
|
||||
report_template, run_id = report
|
||||
report_filename = render_and_validate_report(report_template, "", export_dir)
|
||||
export_results = exportdb.get_export_results(run_id)
|
||||
if not export_results:
|
||||
print(f"[red]No report results found for run ID {run_id}[/red]")
|
||||
sys.exit(1)
|
||||
try:
|
||||
report_writer = report_writer_factory(report_filename)
|
||||
except ValueError as e:
|
||||
print(f"[red]Error: {e}[/red]")
|
||||
sys.exit(1)
|
||||
report_writer.write(export_results)
|
||||
report_writer.close()
|
||||
print(f"Wrote report to {report_filename}")
|
||||
sys.exit(0)
|
||||
|
||||
if migrate:
|
||||
exportdb = ExportDB(export_db, export_dir)
|
||||
if upgraded := exportdb.was_upgraded:
|
||||
|
||||
372
osxphotos/cli/report_writer.py
Normal file
372
osxphotos/cli/report_writer.py
Normal file
@@ -0,0 +1,372 @@
|
||||
"""Report writer for the --report option of `osxphotos export`"""
|
||||
|
||||
|
||||
import csv
|
||||
import json
|
||||
import os
|
||||
import os.path
|
||||
import sqlite3
|
||||
from abc import ABC, abstractmethod
|
||||
from contextlib import suppress
|
||||
from typing import Union, Dict
|
||||
|
||||
from osxphotos.photoexporter import ExportResults
|
||||
from osxphotos.export_db import OSXPHOTOS_ABOUT_STRING
|
||||
|
||||
__all__ = [
|
||||
"report_writer_factory",
|
||||
"ReportWriterABC",
|
||||
"ReportWriterCSV",
|
||||
"ReportWriterSqlite",
|
||||
"ReportWriterNoOp",
|
||||
]
|
||||
|
||||
|
||||
class ReportWriterABC(ABC):
|
||||
"""Abstract base class for report writers"""
|
||||
|
||||
@abstractmethod
|
||||
def write(self, export_results: ExportResults):
|
||||
"""Write results to the output file"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def close(self):
|
||||
"""Close the output file"""
|
||||
pass
|
||||
|
||||
|
||||
class ReportWriterNoOp(ABC):
|
||||
"""Report writer that does nothing"""
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def write(self, export_results: ExportResults):
|
||||
"""Write results to the output file"""
|
||||
pass
|
||||
|
||||
def close(self):
|
||||
"""Close the output file"""
|
||||
pass
|
||||
|
||||
|
||||
class ReportWriterCSV(ReportWriterABC):
|
||||
"""Write CSV report file"""
|
||||
|
||||
def __init__(
|
||||
self, output_file: Union[str, bytes, os.PathLike], append: bool = False
|
||||
):
|
||||
self.output_file = output_file
|
||||
self.append = append
|
||||
|
||||
report_columns = [
|
||||
"datetime",
|
||||
"filename",
|
||||
"exported",
|
||||
"new",
|
||||
"updated",
|
||||
"skipped",
|
||||
"exif_updated",
|
||||
"touched",
|
||||
"converted_to_jpeg",
|
||||
"sidecar_xmp",
|
||||
"sidecar_json",
|
||||
"sidecar_exiftool",
|
||||
"missing",
|
||||
"error",
|
||||
"exiftool_warning",
|
||||
"exiftool_error",
|
||||
"extended_attributes_written",
|
||||
"extended_attributes_skipped",
|
||||
"cleanup_deleted_file",
|
||||
"cleanup_deleted_directory",
|
||||
"exported_album",
|
||||
]
|
||||
|
||||
mode = "a" if append else "w"
|
||||
self._output_fh = open(self.output_file, mode)
|
||||
|
||||
self._csv_writer = csv.DictWriter(self._output_fh, fieldnames=report_columns)
|
||||
if not append:
|
||||
self._csv_writer.writeheader()
|
||||
|
||||
def write(self, export_results: ExportResults):
|
||||
"""Write results to the output file"""
|
||||
all_results = prepare_results_for_writing(export_results)
|
||||
for data in list(all_results.values()):
|
||||
self._csv_writer.writerow(data)
|
||||
self._output_fh.flush()
|
||||
|
||||
def close(self):
|
||||
"""Close the output file"""
|
||||
self._output_fh.close()
|
||||
|
||||
def __del__(self):
|
||||
with suppress(Exception):
|
||||
self._output_fh.close()
|
||||
|
||||
|
||||
class ReportWriterJSON(ReportWriterABC):
|
||||
"""Write JSON report file"""
|
||||
|
||||
def __init__(
|
||||
self, output_file: Union[str, bytes, os.PathLike], append: bool = False
|
||||
):
|
||||
self.output_file = output_file
|
||||
self.append = append
|
||||
self.indent = 4
|
||||
|
||||
self._first_record_written = False
|
||||
if append:
|
||||
with open(self.output_file, "r") as fh:
|
||||
existing_data = json.load(fh)
|
||||
self._output_fh = open(self.output_file, "w")
|
||||
self._output_fh.write("[")
|
||||
for data in existing_data:
|
||||
self._output_fh.write(json.dumps(data, indent=self.indent))
|
||||
self._output_fh.write(",\n")
|
||||
else:
|
||||
self._output_fh = open(self.output_file, "w")
|
||||
self._output_fh.write("[")
|
||||
|
||||
def write(self, export_results: ExportResults):
|
||||
"""Write results to the output file"""
|
||||
all_results = prepare_results_for_writing(export_results, bool_values=True)
|
||||
for data in list(all_results.values()):
|
||||
if self._first_record_written:
|
||||
self._output_fh.write(",\n")
|
||||
else:
|
||||
self._first_record_written = True
|
||||
self._output_fh.write(json.dumps(data, indent=self.indent))
|
||||
self._output_fh.flush()
|
||||
|
||||
def close(self):
|
||||
"""Close the output file"""
|
||||
self._output_fh.write("]")
|
||||
self._output_fh.close()
|
||||
|
||||
def __del__(self):
|
||||
with suppress(Exception):
|
||||
self.close()
|
||||
|
||||
|
||||
class ReportWriterSQLite(ReportWriterABC):
|
||||
"""Write sqlite report file"""
|
||||
|
||||
def __init__(
|
||||
self, output_file: Union[str, bytes, os.PathLike], append: bool = False
|
||||
):
|
||||
self.output_file = output_file
|
||||
self.append = append
|
||||
|
||||
if not append:
|
||||
with suppress(FileNotFoundError):
|
||||
os.unlink(self.output_file)
|
||||
|
||||
self._conn = sqlite3.connect(self.output_file)
|
||||
self._create_tables()
|
||||
|
||||
def write(self, export_results: ExportResults):
|
||||
"""Write results to the output file"""
|
||||
|
||||
all_results = prepare_results_for_writing(export_results)
|
||||
for data in list(all_results.values()):
|
||||
cursor = self._conn.cursor()
|
||||
cursor.execute(
|
||||
"INSERT INTO report "
|
||||
"(datetime, filename, exported, new, updated, skipped, exif_updated, touched, converted_to_jpeg, sidecar_xmp, sidecar_json, sidecar_exiftool, missing, error, exiftool_warning, exiftool_error, extended_attributes_written, extended_attributes_skipped, cleanup_deleted_file, cleanup_deleted_directory, exported_album) "
|
||||
"VALUES "
|
||||
"(:datetime, :filename, :exported, :new, :updated, :skipped, :exif_updated, :touched, :converted_to_jpeg, :sidecar_xmp, :sidecar_json, :sidecar_exiftool, :missing, :error, :exiftool_warning, :exiftool_error, :extended_attributes_written, :extended_attributes_skipped, :cleanup_deleted_file, :cleanup_deleted_directory, :exported_album);",
|
||||
data,
|
||||
)
|
||||
self._conn.commit()
|
||||
|
||||
def close(self):
|
||||
"""Close the output file"""
|
||||
self._conn.close()
|
||||
|
||||
def _create_tables(self):
|
||||
c = self._conn.cursor()
|
||||
c.execute(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS report (
|
||||
datetime text,
|
||||
filename text,
|
||||
exported integer,
|
||||
new integer,
|
||||
updated integer,
|
||||
skipped integer,
|
||||
exif_updated integer,
|
||||
touched integer,
|
||||
converted_to_jpeg integer,
|
||||
sidecar_xmp integer,
|
||||
sidecar_json integer,
|
||||
sidecar_exiftool integer,
|
||||
missing integer,
|
||||
error text,
|
||||
exiftool_warning text,
|
||||
exiftool_error text,
|
||||
extended_attributes_written integer,
|
||||
extended_attributes_skipped integer,
|
||||
cleanup_deleted_file integer,
|
||||
cleanup_deleted_directory integer,
|
||||
exported_album text
|
||||
)
|
||||
"""
|
||||
)
|
||||
c.execute(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS about (
|
||||
id INTEGER PRIMARY KEY,
|
||||
about TEXT
|
||||
);"""
|
||||
)
|
||||
c.execute(
|
||||
"INSERT INTO about(about) VALUES (?);",
|
||||
(f"OSXPhotos Export Report. {OSXPHOTOS_ABOUT_STRING}",),
|
||||
)
|
||||
|
||||
self._conn.commit()
|
||||
|
||||
def __del__(self):
|
||||
with suppress(Exception):
|
||||
self.close()
|
||||
|
||||
|
||||
def prepare_results_for_writing(
|
||||
export_results: ExportResults, bool_values: bool = False
|
||||
) -> Dict:
|
||||
"""Return all results for writing to report
|
||||
|
||||
Args:
|
||||
export_results: ExportResults object
|
||||
bool_values: Return a boolean value instead of a integer (e.g. for use with JSON)
|
||||
|
||||
Returns:
|
||||
Dict: All results
|
||||
"""
|
||||
false = False if bool_values else 0
|
||||
true = True if bool_values else 1
|
||||
|
||||
all_results = {}
|
||||
for result in (
|
||||
export_results.all_files()
|
||||
+ export_results.deleted_files
|
||||
+ export_results.deleted_directories
|
||||
):
|
||||
result = str(result)
|
||||
if result not in all_results:
|
||||
all_results[str(result)] = {
|
||||
"datetime": export_results.datetime,
|
||||
"filename": str(result),
|
||||
"exported": false,
|
||||
"new": false,
|
||||
"updated": false,
|
||||
"skipped": false,
|
||||
"exif_updated": false,
|
||||
"touched": false,
|
||||
"converted_to_jpeg": false,
|
||||
"sidecar_xmp": false,
|
||||
"sidecar_json": false,
|
||||
"sidecar_exiftool": false,
|
||||
"missing": false,
|
||||
"error": "",
|
||||
"exiftool_warning": "",
|
||||
"exiftool_error": "",
|
||||
"extended_attributes_written": false,
|
||||
"extended_attributes_skipped": false,
|
||||
"cleanup_deleted_file": false,
|
||||
"cleanup_deleted_directory": false,
|
||||
"exported_album": "",
|
||||
}
|
||||
|
||||
for result in export_results.exported:
|
||||
all_results[str(result)]["exported"] = true
|
||||
|
||||
for result in export_results.new:
|
||||
all_results[str(result)]["new"] = true
|
||||
|
||||
for result in export_results.updated:
|
||||
all_results[str(result)]["updated"] = true
|
||||
|
||||
for result in export_results.skipped:
|
||||
all_results[str(result)]["skipped"] = true
|
||||
|
||||
for result in export_results.exif_updated:
|
||||
all_results[str(result)]["exif_updated"] = true
|
||||
|
||||
for result in export_results.touched:
|
||||
all_results[str(result)]["touched"] = true
|
||||
|
||||
for result in export_results.converted_to_jpeg:
|
||||
all_results[str(result)]["converted_to_jpeg"] = true
|
||||
|
||||
for result in export_results.sidecar_xmp_written:
|
||||
all_results[str(result)]["sidecar_xmp"] = true
|
||||
all_results[str(result)]["exported"] = true
|
||||
|
||||
for result in export_results.sidecar_xmp_skipped:
|
||||
all_results[str(result)]["sidecar_xmp"] = true
|
||||
all_results[str(result)]["skipped"] = true
|
||||
|
||||
for result in export_results.sidecar_json_written:
|
||||
all_results[str(result)]["sidecar_json"] = true
|
||||
all_results[str(result)]["exported"] = true
|
||||
|
||||
for result in export_results.sidecar_json_skipped:
|
||||
all_results[str(result)]["sidecar_json"] = true
|
||||
all_results[str(result)]["skipped"] = true
|
||||
|
||||
for result in export_results.sidecar_exiftool_written:
|
||||
all_results[str(result)]["sidecar_exiftool"] = true
|
||||
all_results[str(result)]["exported"] = true
|
||||
|
||||
for result in export_results.sidecar_exiftool_skipped:
|
||||
all_results[str(result)]["sidecar_exiftool"] = true
|
||||
all_results[str(result)]["skipped"] = true
|
||||
|
||||
for result in export_results.missing:
|
||||
all_results[str(result)]["missing"] = true
|
||||
|
||||
for result in export_results.error:
|
||||
all_results[str(result[0])]["error"] = result[1]
|
||||
|
||||
for result in export_results.exiftool_warning:
|
||||
all_results[str(result[0])]["exiftool_warning"] = result[1]
|
||||
|
||||
for result in export_results.exiftool_error:
|
||||
all_results[str(result[0])]["exiftool_error"] = result[1]
|
||||
|
||||
for result in export_results.xattr_written:
|
||||
all_results[str(result)]["extended_attributes_written"] = true
|
||||
|
||||
for result in export_results.xattr_skipped:
|
||||
all_results[str(result)]["extended_attributes_skipped"] = true
|
||||
|
||||
for result in export_results.deleted_files:
|
||||
all_results[str(result)]["cleanup_deleted_file"] = true
|
||||
|
||||
for result in export_results.deleted_directories:
|
||||
all_results[str(result)]["cleanup_deleted_directory"] = true
|
||||
|
||||
for result, album in export_results.exported_album:
|
||||
all_results[str(result)]["exported_album"] = album
|
||||
|
||||
return all_results
|
||||
|
||||
|
||||
def report_writer_factory(
|
||||
output_file: Union[str, bytes, os.PathLike], append: bool = False
|
||||
) -> ReportWriterABC:
|
||||
"""Return a ReportWriter instance appropriate for the output file type"""
|
||||
output_type = os.path.splitext(output_file)[1]
|
||||
output_type = output_type.lower()[1:]
|
||||
if output_type == "csv":
|
||||
return ReportWriterCSV(output_file, append)
|
||||
elif output_type == "json":
|
||||
return ReportWriterJSON(output_file, append)
|
||||
elif output_type in ["sqlite", "db"]:
|
||||
return ReportWriterSQLite(output_file, append)
|
||||
else:
|
||||
raise ValueError(f"Unknown report file type: {output_file}")
|
||||
@@ -2,17 +2,20 @@
|
||||
|
||||
|
||||
import datetime
|
||||
import gzip
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import pathlib
|
||||
import pickle
|
||||
import sqlite3
|
||||
import sys
|
||||
import time
|
||||
from contextlib import suppress
|
||||
from io import StringIO
|
||||
from sqlite3 import Error
|
||||
from tempfile import TemporaryDirectory
|
||||
from typing import Optional, Tuple, Union
|
||||
from typing import Any, Optional, Tuple, Union
|
||||
|
||||
from tenacity import retry, stop_after_attempt
|
||||
|
||||
@@ -27,12 +30,42 @@ __all__ = [
|
||||
"ExportDBTemp",
|
||||
]
|
||||
|
||||
OSXPHOTOS_EXPORTDB_VERSION = "6.0"
|
||||
OSXPHOTOS_EXPORTDB_VERSION = "7.0"
|
||||
OSXPHOTOS_ABOUT_STRING = f"Created by osxphotos version {__version__} (https://github.com/RhetTbull/osxphotos) on {datetime.datetime.now()}"
|
||||
|
||||
# max retry attempts for methods which use tenacity.retry
|
||||
MAX_RETRY_ATTEMPTS = 5
|
||||
|
||||
# maximum number of export results rows to save
|
||||
MAX_EXPORT_RESULTS_DATA_ROWS = 10
|
||||
|
||||
|
||||
def pickle_and_zip(data: Any) -> bytes:
|
||||
"""
|
||||
Pickle and gzip data.
|
||||
|
||||
Args:
|
||||
data: data to pickle and gzip (must be pickle-able)
|
||||
|
||||
Returns:
|
||||
bytes of gzipped pickled data
|
||||
"""
|
||||
pickled = pickle.dumps(data)
|
||||
return gzip.compress(pickled)
|
||||
|
||||
|
||||
def unzip_and_unpickle(data: bytes) -> Any:
|
||||
"""
|
||||
Unzip and unpickle data.
|
||||
|
||||
Args:
|
||||
data: data to unzip and unpickle
|
||||
|
||||
Returns:
|
||||
unpickled data
|
||||
"""
|
||||
return pickle.loads(gzip.decompress(data))
|
||||
|
||||
|
||||
class ExportDB:
|
||||
"""Interface to sqlite3 database used to store state information for osxphotos export command"""
|
||||
@@ -192,6 +225,63 @@ class ExportDB:
|
||||
except Error as e:
|
||||
logging.warning(e)
|
||||
|
||||
def set_export_results(self, results):
|
||||
"""Store export results in database; data is pickled and gzipped for storage"""
|
||||
|
||||
results_data = pickle_and_zip(results)
|
||||
|
||||
conn = self._conn
|
||||
try:
|
||||
dt = datetime.datetime.now().isoformat()
|
||||
c = conn.cursor()
|
||||
c.execute(
|
||||
"""
|
||||
UPDATE export_results_data
|
||||
SET datetime = ?,
|
||||
export_results = ?
|
||||
WHERE datetime = (SELECT MIN(datetime) FROM export_results_data);
|
||||
""",
|
||||
(dt, results_data),
|
||||
)
|
||||
conn.commit()
|
||||
except Error as e:
|
||||
logging.warning(e)
|
||||
|
||||
def get_export_results(self, run: int = 0):
|
||||
"""Retrieve export results from database
|
||||
|
||||
Args:
|
||||
run: which run to retrieve results for;
|
||||
0 = most recent run, -1 = previous run, -2 = run prior to that, etc.
|
||||
|
||||
Returns:
|
||||
ExportResults object or None if no results found
|
||||
"""
|
||||
if run > 0:
|
||||
raise ValueError("run must be 0 or negative")
|
||||
run = -run
|
||||
|
||||
conn = self._conn
|
||||
try:
|
||||
c = conn.cursor()
|
||||
c.execute(
|
||||
"""
|
||||
SELECT export_results
|
||||
FROM export_results_data
|
||||
ORDER BY datetime DESC
|
||||
""",
|
||||
)
|
||||
rows = c.fetchall()
|
||||
try:
|
||||
data = rows[run][0]
|
||||
results = unzip_and_unpickle(data) if data else None
|
||||
except IndexError:
|
||||
results = None
|
||||
except Error as e:
|
||||
logging.warning(e)
|
||||
results = None
|
||||
return results
|
||||
|
||||
def close(self):
|
||||
"""close the database connection"""
|
||||
try:
|
||||
@@ -362,6 +452,10 @@ class ExportDB:
|
||||
# create export_data table
|
||||
self._migrate_5_0_to_6_0(conn)
|
||||
|
||||
if version[1] < "7.0":
|
||||
# create report_data table
|
||||
self._migrate_6_0_to_7_0(conn)
|
||||
|
||||
conn.execute("VACUUM;")
|
||||
conn.commit()
|
||||
|
||||
@@ -556,6 +650,29 @@ class ExportDB:
|
||||
except Error as e:
|
||||
logging.warning(e)
|
||||
|
||||
def _migrate_6_0_to_7_0(self, conn):
|
||||
try:
|
||||
c = conn.cursor()
|
||||
c.execute(
|
||||
"""CREATE TABLE IF NOT EXISTS export_results_data (
|
||||
id INTEGER PRIMARY KEY,
|
||||
datetime TEXT,
|
||||
export_results BLOB
|
||||
);"""
|
||||
)
|
||||
# pre-populate report_data table with blank fields
|
||||
# ExportDB will use these as circular buffer always writing to the oldest record
|
||||
for _ in range(MAX_EXPORT_RESULTS_DATA_ROWS):
|
||||
c.execute(
|
||||
"""INSERT INTO export_results_data (datetime, export_results) VALUES (?, ?);""",
|
||||
(datetime.datetime.now().isoformat(), b""),
|
||||
)
|
||||
# sleep a tiny bit just to ensure time stamps increment
|
||||
time.sleep(0.001)
|
||||
conn.commit()
|
||||
except Error as e:
|
||||
logging.warning(e)
|
||||
|
||||
def _perform_db_maintenace(self, conn):
|
||||
"""Perform database maintenance"""
|
||||
try:
|
||||
@@ -630,7 +747,7 @@ class ExportDBInMemory(ExportDB):
|
||||
except Error as e:
|
||||
logging.warning(e)
|
||||
|
||||
def _open_export_db(self, dbfile):
|
||||
def _open_export_db(self, dbfile): # sourcery skip: raise-specific-error
|
||||
"""open export database and return a db connection
|
||||
returns: connection to the database
|
||||
"""
|
||||
@@ -933,6 +1050,10 @@ class ExportRecord:
|
||||
"photoinfo": photoinfo,
|
||||
}
|
||||
|
||||
def json(self, indent=None):
|
||||
"""Return json of self"""
|
||||
return json.dumps(self.asdict(), indent=indent)
|
||||
|
||||
def __enter__(self):
|
||||
self._context_manager = True
|
||||
return self
|
||||
|
||||
@@ -10,6 +10,7 @@ import re
|
||||
import typing as t
|
||||
from collections import namedtuple # pylint: disable=syntax-error
|
||||
from dataclasses import asdict, dataclass
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
|
||||
import photoscript
|
||||
@@ -238,59 +239,62 @@ class ExportResults:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
exported=None,
|
||||
new=None,
|
||||
updated=None,
|
||||
skipped=None,
|
||||
exif_updated=None,
|
||||
touched=None,
|
||||
to_touch=None,
|
||||
converted_to_jpeg=None,
|
||||
sidecar_json_written=None,
|
||||
sidecar_json_skipped=None,
|
||||
sidecar_exiftool_written=None,
|
||||
sidecar_exiftool_skipped=None,
|
||||
sidecar_xmp_written=None,
|
||||
sidecar_xmp_skipped=None,
|
||||
missing=None,
|
||||
error=None,
|
||||
exiftool_warning=None,
|
||||
exiftool_error=None,
|
||||
xattr_written=None,
|
||||
xattr_skipped=None,
|
||||
deleted_files=None,
|
||||
deleted_directories=None,
|
||||
deleted_files=None,
|
||||
error=None,
|
||||
exif_updated=None,
|
||||
exiftool_error=None,
|
||||
exiftool_warning=None,
|
||||
exported_album=None,
|
||||
skipped_album=None,
|
||||
missing_album=None,
|
||||
exported=None,
|
||||
metadata_changed=None,
|
||||
missing_album=None,
|
||||
missing=None,
|
||||
new=None,
|
||||
sidecar_exiftool_skipped=None,
|
||||
sidecar_exiftool_written=None,
|
||||
sidecar_json_skipped=None,
|
||||
sidecar_json_written=None,
|
||||
sidecar_xmp_skipped=None,
|
||||
sidecar_xmp_written=None,
|
||||
skipped_album=None,
|
||||
skipped=None,
|
||||
to_touch=None,
|
||||
touched=None,
|
||||
updated=None,
|
||||
xattr_skipped=None,
|
||||
xattr_written=None,
|
||||
):
|
||||
self.exported = exported or []
|
||||
self.new = new or []
|
||||
self.updated = updated or []
|
||||
self.skipped = skipped or []
|
||||
self.exif_updated = exif_updated or []
|
||||
self.touched = touched or []
|
||||
self.to_touch = to_touch or []
|
||||
|
||||
self.datetime = datetime.now().isoformat()
|
||||
|
||||
self.converted_to_jpeg = converted_to_jpeg or []
|
||||
self.sidecar_json_written = sidecar_json_written or []
|
||||
self.sidecar_json_skipped = sidecar_json_skipped or []
|
||||
self.sidecar_exiftool_written = sidecar_exiftool_written or []
|
||||
self.sidecar_exiftool_skipped = sidecar_exiftool_skipped or []
|
||||
self.sidecar_xmp_written = sidecar_xmp_written or []
|
||||
self.sidecar_xmp_skipped = sidecar_xmp_skipped or []
|
||||
self.missing = missing or []
|
||||
self.error = error or []
|
||||
self.exiftool_warning = exiftool_warning or []
|
||||
self.exiftool_error = exiftool_error or []
|
||||
self.xattr_written = xattr_written or []
|
||||
self.xattr_skipped = xattr_skipped or []
|
||||
self.deleted_files = deleted_files or []
|
||||
self.deleted_directories = deleted_directories or []
|
||||
self.deleted_files = deleted_files or []
|
||||
self.error = error or []
|
||||
self.exif_updated = exif_updated or []
|
||||
self.exiftool_error = exiftool_error or []
|
||||
self.exiftool_warning = exiftool_warning or []
|
||||
self.exported = exported or []
|
||||
self.exported_album = exported_album or []
|
||||
self.skipped_album = skipped_album or []
|
||||
self.missing_album = missing_album or []
|
||||
self.metadata_changed = metadata_changed or []
|
||||
self.missing = missing or []
|
||||
self.missing_album = missing_album or []
|
||||
self.new = new or []
|
||||
self.sidecar_exiftool_skipped = sidecar_exiftool_skipped or []
|
||||
self.sidecar_exiftool_written = sidecar_exiftool_written or []
|
||||
self.sidecar_json_skipped = sidecar_json_skipped or []
|
||||
self.sidecar_json_written = sidecar_json_written or []
|
||||
self.sidecar_xmp_skipped = sidecar_xmp_skipped or []
|
||||
self.sidecar_xmp_written = sidecar_xmp_written or []
|
||||
self.skipped = skipped or []
|
||||
self.skipped_album = skipped_album or []
|
||||
self.to_touch = to_touch or []
|
||||
self.touched = touched or []
|
||||
self.updated = updated or []
|
||||
self.xattr_skipped = xattr_skipped or []
|
||||
self.xattr_written = xattr_written or []
|
||||
|
||||
def all_files(self):
|
||||
"""return all filenames contained in results"""
|
||||
@@ -348,7 +352,8 @@ class ExportResults:
|
||||
def __str__(self):
|
||||
return (
|
||||
"ExportResults("
|
||||
+ f"exported={self.exported}"
|
||||
+ f"datetime={self.datetime}"
|
||||
+ f",exported={self.exported}"
|
||||
+ f",new={self.new}"
|
||||
+ f",updated={self.updated}"
|
||||
+ f",skipped={self.skipped}"
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
""" Test the command line interface (CLI) """
|
||||
import csv
|
||||
import datetime
|
||||
import glob
|
||||
import json
|
||||
@@ -9,7 +10,6 @@ import pathlib
|
||||
import re
|
||||
import shutil
|
||||
import sqlite3
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
from tempfile import TemporaryDirectory
|
||||
@@ -19,8 +19,8 @@ from click.testing import CliRunner
|
||||
from osxmetadata import OSXMetaData, Tag
|
||||
|
||||
import osxphotos
|
||||
from osxphotos._version import __version__
|
||||
from osxphotos._constants import OSXPHOTOS_EXPORT_DB
|
||||
from osxphotos._version import __version__
|
||||
from osxphotos.cli import (
|
||||
about,
|
||||
albums,
|
||||
@@ -960,6 +960,18 @@ EXPORT_UNICODE_TITLE_FILENAMES = [
|
||||
"Frítest (3).jpg",
|
||||
]
|
||||
|
||||
# data for --report
|
||||
UUID_REPORT = [
|
||||
{
|
||||
"uuid": "4D521201-92AC-43E5-8F7C-59BC41C37A96",
|
||||
"filenames": ["IMG_1997.JPG", "IMG_1997.cr2"],
|
||||
},
|
||||
{
|
||||
"uuid": "7783E8E6-9CAC-40F3-BE22-81FB7051C266",
|
||||
"filenames": ["IMG_3092.heic", "IMG_3092_edited.jpeg"],
|
||||
},
|
||||
]
|
||||
|
||||
# data for --exif
|
||||
QUERY_EXIF_DATA = [("EXIF:Make", "FUJIFILM", ["6191423D-8DB8-4D4C-92BE-9BBBA308AAC4"])]
|
||||
QUERY_EXIF_DATA_CASE_INSENSITIVE = [
|
||||
@@ -5473,13 +5485,217 @@ def test_export_report():
|
||||
cwd = os.getcwd()
|
||||
# pylint: disable=not-context-manager
|
||||
with runner.isolated_filesystem():
|
||||
# test report creation
|
||||
result = runner.invoke(
|
||||
export,
|
||||
[os.path.join(cwd, CLI_PHOTOS_DB), ".", "-V", "--report", "report.csv"],
|
||||
[
|
||||
os.path.join(cwd, CLI_PHOTOS_DB),
|
||||
".",
|
||||
"-V",
|
||||
"--uuid",
|
||||
UUID_REPORT[0]["uuid"],
|
||||
"--report",
|
||||
"report.csv",
|
||||
],
|
||||
)
|
||||
assert result.exit_code == 0
|
||||
assert "Writing export report" in result.output
|
||||
assert "Wrote export report" in result.output
|
||||
assert os.path.exists("report.csv")
|
||||
with open("report.csv", "r") as f:
|
||||
reader = csv.DictReader(f)
|
||||
rows = list(reader)
|
||||
filenames = [str(pathlib.Path(row["filename"]).name) for row in rows]
|
||||
assert sorted(filenames) == sorted(UUID_REPORT[0]["filenames"])
|
||||
|
||||
# test report gets overwritten
|
||||
result = runner.invoke(
|
||||
export,
|
||||
[
|
||||
os.path.join(cwd, CLI_PHOTOS_DB),
|
||||
".",
|
||||
"-V",
|
||||
"--uuid",
|
||||
UUID_REPORT[1]["uuid"],
|
||||
"--report",
|
||||
"report.csv",
|
||||
],
|
||||
)
|
||||
assert result.exit_code == 0
|
||||
with open("report.csv", "r") as f:
|
||||
reader = csv.DictReader(f)
|
||||
rows = list(reader)
|
||||
filenames = [str(pathlib.Path(row["filename"]).name) for row in rows]
|
||||
assert sorted(filenames) == sorted(UUID_REPORT[1]["filenames"])
|
||||
|
||||
# test report with --append
|
||||
result = runner.invoke(
|
||||
export,
|
||||
[
|
||||
os.path.join(cwd, CLI_PHOTOS_DB),
|
||||
".",
|
||||
"-V",
|
||||
"--uuid",
|
||||
UUID_REPORT[0]["uuid"],
|
||||
"--report",
|
||||
"report.csv",
|
||||
"--overwrite",
|
||||
"--append",
|
||||
],
|
||||
)
|
||||
assert result.exit_code == 0
|
||||
with open("report.csv", "r") as f:
|
||||
reader = csv.DictReader(f)
|
||||
rows = list(reader)
|
||||
filenames = [str(pathlib.Path(row["filename"]).name) for row in rows]
|
||||
assert sorted(filenames) == sorted(
|
||||
UUID_REPORT[0]["filenames"] + UUID_REPORT[1]["filenames"]
|
||||
)
|
||||
|
||||
|
||||
def test_export_report_json():
|
||||
"""test export with --report option for JSON report"""
|
||||
|
||||
runner = CliRunner()
|
||||
cwd = os.getcwd()
|
||||
# pylint: disable=not-context-manager
|
||||
with runner.isolated_filesystem():
|
||||
# test report creation
|
||||
result = runner.invoke(
|
||||
export,
|
||||
[
|
||||
os.path.join(cwd, CLI_PHOTOS_DB),
|
||||
".",
|
||||
"-V",
|
||||
"--uuid",
|
||||
UUID_REPORT[0]["uuid"],
|
||||
"--report",
|
||||
"report.json",
|
||||
],
|
||||
)
|
||||
assert result.exit_code == 0
|
||||
assert "Wrote export report" in result.output
|
||||
assert os.path.exists("report.json")
|
||||
with open("report.json", "r") as f:
|
||||
rows = json.load(f)
|
||||
filenames = [str(pathlib.Path(row["filename"]).name) for row in rows]
|
||||
assert sorted(filenames) == sorted(UUID_REPORT[0]["filenames"])
|
||||
|
||||
# test report gets overwritten
|
||||
result = runner.invoke(
|
||||
export,
|
||||
[
|
||||
os.path.join(cwd, CLI_PHOTOS_DB),
|
||||
".",
|
||||
"-V",
|
||||
"--uuid",
|
||||
UUID_REPORT[1]["uuid"],
|
||||
"--report",
|
||||
"report.json",
|
||||
],
|
||||
)
|
||||
assert result.exit_code == 0
|
||||
with open("report.json", "r") as f:
|
||||
rows = json.load(f)
|
||||
filenames = [str(pathlib.Path(row["filename"]).name) for row in rows]
|
||||
assert sorted(filenames) == sorted(UUID_REPORT[1]["filenames"])
|
||||
|
||||
# test report with --append
|
||||
result = runner.invoke(
|
||||
export,
|
||||
[
|
||||
os.path.join(cwd, CLI_PHOTOS_DB),
|
||||
".",
|
||||
"-V",
|
||||
"--uuid",
|
||||
UUID_REPORT[0]["uuid"],
|
||||
"--report",
|
||||
"report.json",
|
||||
"--overwrite",
|
||||
"--append",
|
||||
],
|
||||
)
|
||||
assert result.exit_code == 0
|
||||
with open("report.json", "r") as f:
|
||||
rows = json.load(f)
|
||||
filenames = [str(pathlib.Path(row["filename"]).name) for row in rows]
|
||||
assert sorted(filenames) == sorted(
|
||||
UUID_REPORT[0]["filenames"] + UUID_REPORT[1]["filenames"]
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("report_file", ["report.db", "report.sqlite"])
|
||||
def test_export_report_sqlite(report_file):
|
||||
"""test export with --report option with sqlite report"""
|
||||
|
||||
runner = CliRunner()
|
||||
cwd = os.getcwd()
|
||||
# pylint: disable=not-context-manager
|
||||
with runner.isolated_filesystem():
|
||||
# test report creation
|
||||
result = runner.invoke(
|
||||
export,
|
||||
[
|
||||
os.path.join(cwd, CLI_PHOTOS_DB),
|
||||
".",
|
||||
"-V",
|
||||
"--uuid",
|
||||
UUID_REPORT[0]["uuid"],
|
||||
"--report",
|
||||
report_file,
|
||||
],
|
||||
)
|
||||
assert result.exit_code == 0
|
||||
assert "Wrote export report" in result.output
|
||||
assert os.path.exists(report_file)
|
||||
conn = sqlite3.connect(report_file)
|
||||
c = conn.cursor()
|
||||
c.execute("SELECT filename FROM report")
|
||||
filenames = [str(pathlib.Path(row[0]).name) for row in c.fetchall()]
|
||||
assert sorted(filenames) == sorted(UUID_REPORT[0]["filenames"])
|
||||
|
||||
# test report gets overwritten
|
||||
result = runner.invoke(
|
||||
export,
|
||||
[
|
||||
os.path.join(cwd, CLI_PHOTOS_DB),
|
||||
".",
|
||||
"-V",
|
||||
"--uuid",
|
||||
UUID_REPORT[1]["uuid"],
|
||||
"--report",
|
||||
report_file,
|
||||
],
|
||||
)
|
||||
assert result.exit_code == 0
|
||||
conn = sqlite3.connect(report_file)
|
||||
c = conn.cursor()
|
||||
c.execute("SELECT filename FROM report")
|
||||
filenames = [str(pathlib.Path(row[0]).name) for row in c.fetchall()]
|
||||
assert sorted(filenames) == sorted(UUID_REPORT[1]["filenames"])
|
||||
|
||||
# test report with --append
|
||||
result = runner.invoke(
|
||||
export,
|
||||
[
|
||||
os.path.join(cwd, CLI_PHOTOS_DB),
|
||||
".",
|
||||
"-V",
|
||||
"--uuid",
|
||||
UUID_REPORT[0]["uuid"],
|
||||
"--report",
|
||||
report_file,
|
||||
"--overwrite",
|
||||
"--append",
|
||||
],
|
||||
)
|
||||
assert result.exit_code == 0
|
||||
conn = sqlite3.connect(report_file)
|
||||
c = conn.cursor()
|
||||
c.execute("SELECT filename FROM report")
|
||||
filenames = [str(pathlib.Path(row[0]).name) for row in c.fetchall()]
|
||||
assert sorted(filenames) == sorted(
|
||||
UUID_REPORT[0]["filenames"] + UUID_REPORT[1]["filenames"]
|
||||
)
|
||||
|
||||
|
||||
def test_export_report_template():
|
||||
@@ -5500,7 +5716,7 @@ def test_export_report_template():
|
||||
],
|
||||
)
|
||||
assert result.exit_code == 0
|
||||
assert "Writing export report" in result.output
|
||||
assert "Wrote export report" in result.output
|
||||
assert os.path.exists(f"report_{__version__}.csv")
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user