@@ -18,7 +18,7 @@ import uuid
|
||||
from collections import namedtuple
|
||||
from contextlib import suppress
|
||||
from dataclasses import asdict, dataclass, field
|
||||
from pathlib import Path, PosixPath
|
||||
from pathlib import Path
|
||||
from textwrap import dedent
|
||||
from typing import Callable, Dict, List, Optional, Tuple, Union
|
||||
|
||||
@@ -29,6 +29,7 @@ from rich.markdown import Markdown
|
||||
|
||||
from osxphotos._constants import _OSXPHOTOS_NONE_SENTINEL
|
||||
from osxphotos._version import __version__
|
||||
from osxphotos.cli.common import get_data_dir
|
||||
from osxphotos.cli.help import HELP_WIDTH
|
||||
from osxphotos.cli.param_types import TemplateString
|
||||
from osxphotos.datetime_utils import datetime_naive_to_local
|
||||
@@ -36,6 +37,7 @@ from osxphotos.exiftool import ExifToolCaching, get_exiftool_path
|
||||
from osxphotos.photoinfo import PhotoInfoNone
|
||||
from osxphotos.photosalbum import PhotosAlbumPhotoScript
|
||||
from osxphotos.phototemplate import PhotoTemplate, RenderOptions
|
||||
from osxphotos.sqlitekvstore import SQLiteKeyValueStore
|
||||
from osxphotos.utils import pluralize
|
||||
|
||||
from .click_rich_echo import (
|
||||
@@ -53,6 +55,9 @@ MetaData = namedtuple("MetaData", ["title", "description", "keywords", "location
|
||||
|
||||
OSXPHOTOS_ABOUT_STRING = f"Created by osxphotos version {__version__} (https://github.com/RhetTbull/osxphotos) on {datetime.datetime.now()}"
|
||||
|
||||
# stores import status so imports can be resumed
|
||||
IMPORT_DB = "osxphotos_import.db"
|
||||
|
||||
|
||||
def echo(message, emoji=True, **kwargs):
|
||||
"""Echo text with rich"""
|
||||
@@ -551,6 +556,21 @@ class ReportRecord:
|
||||
title: str = ""
|
||||
uuid: str = ""
|
||||
|
||||
@classmethod
|
||||
def serialize(cls, record: "ReportRecord") -> str:
|
||||
"""Serialize class instance to JSON"""
|
||||
return json.dumps(record.asjsondict())
|
||||
|
||||
@classmethod
|
||||
def deserialize(cls, json_string: str) -> "ReportRecord":
|
||||
"""Deserialize class from JSON"""
|
||||
dict_data = json.loads(json_string)
|
||||
dict_data["filepath"] = Path(dict_data["filepath"])
|
||||
dict_data["import_datetime"] = datetime.datetime.fromisoformat(
|
||||
dict_data["import_datetime"]
|
||||
)
|
||||
return cls(**dict_data)
|
||||
|
||||
def asdict(self):
|
||||
return asdict(self)
|
||||
|
||||
@@ -1170,6 +1190,14 @@ class ImportCommand(click.Command):
|
||||
"See also --append.",
|
||||
type=TemplateString(),
|
||||
)
|
||||
@click.option(
|
||||
"--resume",
|
||||
"-R",
|
||||
is_flag=True,
|
||||
help="Resume previous import. "
|
||||
f"Note: data on each imported file is kept in a database in '{get_data_dir() / IMPORT_DB}'. "
|
||||
"This data can be used to resume a previous import if there was an error or the import was cancelled.",
|
||||
)
|
||||
@click.option(
|
||||
"--append",
|
||||
is_flag=True,
|
||||
@@ -1213,6 +1241,7 @@ def import_cli(
|
||||
no_progress,
|
||||
relative_to,
|
||||
report,
|
||||
resume,
|
||||
split_folder,
|
||||
theme,
|
||||
timestamp,
|
||||
@@ -1247,8 +1276,6 @@ def import_cli(
|
||||
report_file = render_and_validate_report(report) if report else None
|
||||
relative_to = Path(relative_to) if relative_to else None
|
||||
|
||||
imported_count = 0
|
||||
error_count = 0
|
||||
files = collect_files_to_import(files, walk, glob)
|
||||
if check_templates:
|
||||
check_templates_and_exit(
|
||||
@@ -1266,6 +1293,17 @@ def import_cli(
|
||||
# report data is set even if no report is generated
|
||||
report_data: Dict[Path, ReportRecord] = {}
|
||||
|
||||
import_db = SQLiteKeyValueStore(
|
||||
get_data_dir() / IMPORT_DB,
|
||||
wal=True,
|
||||
serialize=ReportRecord.serialize,
|
||||
deserialize=ReportRecord.deserialize,
|
||||
)
|
||||
import_db.about = f"osxphotos import database\n{OSXPHOTOS_ABOUT_STRING}"
|
||||
|
||||
imported_count = 0
|
||||
error_count = 0
|
||||
skipped_count = 0
|
||||
filecount = len(files)
|
||||
with rich_progress(console=get_verbose_console(), mock=no_progress) as progress:
|
||||
task = progress.add_task(
|
||||
@@ -1276,6 +1314,20 @@ def import_cli(
|
||||
filepath = Path(filepath).resolve().absolute()
|
||||
relative_filepath = get_relative_filepath(filepath, relative_to)
|
||||
|
||||
# check if file already imported
|
||||
if resume:
|
||||
if record := import_db.get(str(filepath)):
|
||||
if record.imported and not record.error:
|
||||
# file already imported
|
||||
verbose(
|
||||
f"Skipping [filepath]{filepath}[/], "
|
||||
f"already imported on [time]{record.import_datetime.isoformat()}[/] "
|
||||
f"with UUID [uuid]{record.uuid}[/]"
|
||||
)
|
||||
skipped_count += 1
|
||||
progress.advance(task)
|
||||
continue
|
||||
|
||||
verbose(f"Importing [filepath]{filepath}[/]")
|
||||
report_data[filepath] = ReportRecord(
|
||||
filepath=filepath, filename=filepath.name
|
||||
@@ -1339,14 +1391,18 @@ def import_cli(
|
||||
)
|
||||
|
||||
update_report_record(report_data[filepath], photo, filepath)
|
||||
import_db.set(str(filepath), report_data[filepath])
|
||||
|
||||
progress.advance(task)
|
||||
|
||||
if report:
|
||||
write_report(report_file, report_data, append)
|
||||
verbose(f"Wrote import report to [filepath]{report_file}[/]")
|
||||
|
||||
skipped_str = f"[num]{skipped_count}[/] skipped" if resume else ""
|
||||
echo(
|
||||
f"Done: imported [num]{imported_count}[/] {pluralize(imported_count, 'file', 'files')}, "
|
||||
f"[num]{error_count}[/] {pluralize(error_count, 'error', 'errors')}",
|
||||
f"[num]{error_count}[/] {pluralize(error_count, 'error', 'errors')}"
|
||||
f", {skipped_str}",
|
||||
emoji=False,
|
||||
)
|
||||
|
||||
233
osxphotos/sqlitekvstore.py
Normal file
233
osxphotos/sqlitekvstore.py
Normal file
@@ -0,0 +1,233 @@
|
||||
"""Simple key-value store using sqlite3"""
|
||||
|
||||
|
||||
import contextlib
|
||||
import os.path
|
||||
import sqlite3
|
||||
from typing import Callable, Dict, Generator, Iterable, Optional, Tuple, TypeVar, Union
|
||||
|
||||
# keep mypy happy
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
class SQLiteKeyValueStore:
|
||||
"""Simple Key-Value Store that uses sqlite3 database as backend"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
dbpath: str,
|
||||
serialize: Optional[Callable[[T], T]] = None,
|
||||
deserialize: Optional[Callable[[T], T]] = None,
|
||||
wal: bool = False,
|
||||
):
|
||||
"""Opens the database if it exists, otherwise creates it
|
||||
|
||||
Args:
|
||||
dbpath: path to the database
|
||||
serialize: optional function to serialize values on set
|
||||
deserialize: optional function to deserialize values on get
|
||||
wal: enable write-ahead logging which may offer significant speed boost;
|
||||
once enabled, WAL mode will not be disabled, even if wal=False
|
||||
"""
|
||||
|
||||
if serialize and not callable(serialize):
|
||||
raise TypeError("serialize must be callable")
|
||||
if deserialize and not callable(deserialize):
|
||||
raise TypeError("deserialize must be callable")
|
||||
|
||||
self._dbpath = dbpath
|
||||
self._serialize_func = serialize
|
||||
self._deserialize_func = deserialize
|
||||
self._conn = (
|
||||
sqlite3.Connection(dbpath)
|
||||
if os.path.exists(dbpath)
|
||||
else self._create_database(dbpath)
|
||||
)
|
||||
|
||||
if wal:
|
||||
self._conn.execute("PRAGMA journal_mode=WAL;")
|
||||
self._conn.execute("PRAGMA synchronous=NORMAL;")
|
||||
self._conn.commit()
|
||||
|
||||
def _create_database(self, dbpath: str):
|
||||
"""Create the progress database"""
|
||||
conn = sqlite3.Connection(dbpath)
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(
|
||||
"""CREATE TABLE IF NOT EXISTS _about (
|
||||
id INTEGER PRIMARY KEY,
|
||||
description TEXT);
|
||||
"""
|
||||
)
|
||||
cursor.execute(
|
||||
"""CREATE TABLE IF NOT EXISTS
|
||||
data (key BLOB PRIMARY KEY NOT NULL, value BLOB);"""
|
||||
)
|
||||
cursor.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_key ON data (key);")
|
||||
conn.commit()
|
||||
return conn
|
||||
|
||||
def connection(self) -> sqlite3.Connection:
|
||||
"""Return connection to underlying sqlite3 database"""
|
||||
return self._conn
|
||||
|
||||
def set(self, key: T, value: T):
|
||||
"""Set key:value pair"""
|
||||
serialized_value = self._serialize(value)
|
||||
conn = self.connection()
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(
|
||||
"INSERT OR REPLACE INTO data VALUES (?, ?);", (key, serialized_value)
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
def set_many(self, items: Union[Iterable[Tuple[T, T]], Dict[T, T]]):
|
||||
"""Set multiple key:value pairs
|
||||
|
||||
Args:
|
||||
items: iterable of (key, value) tuples or dictionary of key:value pairs
|
||||
"""
|
||||
conn = self.connection()
|
||||
cursor = conn.cursor()
|
||||
_items = items.items() if isinstance(items, dict) else items
|
||||
cursor.executemany(
|
||||
"INSERT OR REPLACE INTO data VALUES (?, ?);",
|
||||
((key, self._serialize(value)) for key, value in _items),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
def get(self, key: T, default: Optional[T] = None) -> Optional[T]:
|
||||
"""Get value for key
|
||||
|
||||
Args:
|
||||
key: key to get from key-value store
|
||||
default: optional default value to return if key not found
|
||||
Returns: value for key or default (Note: does not insert key:default into database if key does not exist)
|
||||
"""
|
||||
try:
|
||||
return self._get(key)
|
||||
except KeyError:
|
||||
return default
|
||||
|
||||
def delete(self, key: T):
|
||||
"""Delete key from key-value store"""
|
||||
conn = self.connection()
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("DELETE FROM data WHERE key = ?;", (key,))
|
||||
conn.commit()
|
||||
|
||||
def pop(self, key) -> Optional[T]:
|
||||
"""Delete key and return value"""
|
||||
value = self[key]
|
||||
del self[key]
|
||||
return value
|
||||
|
||||
def keys(self) -> Generator[T, None, None]:
|
||||
"""Return keys as generator"""
|
||||
return iter(self)
|
||||
|
||||
def values(self) -> Generator[T, None, None]:
|
||||
"""Return values as generator"""
|
||||
conn = self.connection()
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("SELECT value FROM data;")
|
||||
for value in cursor:
|
||||
yield self._deserialize(value[0])
|
||||
|
||||
def items(self) -> Generator[Tuple[T, T], None, None]:
|
||||
"""Return items (key, value) as generator"""
|
||||
conn = self.connection()
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("SELECT key, value FROM data;")
|
||||
for key, value in cursor:
|
||||
yield key, self._deserialize(value)
|
||||
|
||||
def close(self):
|
||||
"""Close the database"""
|
||||
self.connection().close()
|
||||
|
||||
@property
|
||||
def about(self) -> str:
|
||||
"""Return description for the database"""
|
||||
results = (
|
||||
self.connection()
|
||||
.cursor()
|
||||
.execute("SELECT description FROM _about;")
|
||||
.fetchone()
|
||||
)
|
||||
return results[0] if results else ""
|
||||
|
||||
@about.setter
|
||||
def about(self, description: str):
|
||||
"""Set description of the database"""
|
||||
conn = self.connection()
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(
|
||||
"INSERT OR REPLACE INTO _about VALUES (?, ?);",
|
||||
(
|
||||
1,
|
||||
description,
|
||||
),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
def vacuum(self):
|
||||
"""Vacuum the database, ref: https://www.sqlite.org/matrix/lang_vacuum.html"""
|
||||
self.connection().execute("VACUUM;")
|
||||
|
||||
def _get(self, key: T) -> T:
|
||||
"""Get value for key or raise KeyError if key not found"""
|
||||
cursor = self.connection().cursor()
|
||||
cursor.execute("SELECT value FROM data WHERE key = ?;", (key,))
|
||||
if result := cursor.fetchone():
|
||||
return self._deserialize(result[0])
|
||||
raise KeyError(key)
|
||||
|
||||
def _serialize(self, value: T) -> T:
|
||||
"""Serialize value using serialize function if provided"""
|
||||
return self._serialize_func(value) if self._serialize_func else value
|
||||
|
||||
def _deserialize(self, value: T) -> T:
|
||||
"""Deserialize value using deserialize function if provided"""
|
||||
return self._deserialize_func(value) if self._deserialize_func else value
|
||||
|
||||
def __getitem__(self, key: T) -> T:
|
||||
return self._get(key)
|
||||
|
||||
def __setitem__(self, key: T, value: T):
|
||||
self.set(key, value)
|
||||
|
||||
def __delitem__(self, key: T):
|
||||
# try to get the key which will raise KeyError if key does not exist
|
||||
if key in self:
|
||||
self.delete(key)
|
||||
else:
|
||||
raise KeyError(key)
|
||||
|
||||
def __iter__(self):
|
||||
cursor = self.connection().cursor()
|
||||
cursor.execute("SELECT key FROM data;")
|
||||
for key in cursor:
|
||||
yield key[0]
|
||||
|
||||
def __contains__(self, key: T) -> bool:
|
||||
# Implement in operator, don't use _get to avoid deserializing value unnecessarily
|
||||
cursor = self.connection().cursor()
|
||||
cursor.execute("SELECT 1 FROM data WHERE key = ?;", (key,))
|
||||
return bool(cursor.fetchone())
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_value, exc_traceback):
|
||||
self.close()
|
||||
|
||||
def __len__(self):
|
||||
cursor = self.connection().cursor()
|
||||
cursor.execute("SELECT COUNT(*) FROM data;")
|
||||
return cursor.fetchone()[0]
|
||||
|
||||
def __del__(self):
|
||||
"""Try to close the database in case it wasn't already closed. Don't count on this!"""
|
||||
with contextlib.suppress(Exception):
|
||||
self.close()
|
||||
@@ -15,7 +15,7 @@ from typing import Dict
|
||||
import pytest
|
||||
from click.testing import CliRunner
|
||||
from photoscript import Photo
|
||||
from pytest import approx
|
||||
from pytest import MonkeyPatch, approx
|
||||
|
||||
from osxphotos.cli.import_cli import import_cli
|
||||
from osxphotos.exiftool import get_exiftool_path
|
||||
@@ -908,3 +908,41 @@ def test_import_report_invalid_name():
|
||||
],
|
||||
)
|
||||
assert result.exit_code != 0
|
||||
|
||||
|
||||
@pytest.mark.test_import
|
||||
def test_import_resume(monkeypatch: MonkeyPatch, tmpdir):
|
||||
"""Test import with --resume"""
|
||||
|
||||
monkeypatch.delenv("XDG_DATA_HOME", raising=False)
|
||||
monkeypatch.setenv("XDG_DATA_HOME", os.fspath(str(tmpdir)))
|
||||
|
||||
cwd = os.getcwd()
|
||||
test_image_1 = os.path.join(cwd, TEST_IMAGE_1)
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(
|
||||
import_cli,
|
||||
["--verbose", test_image_1],
|
||||
terminal_width=TERMINAL_WIDTH,
|
||||
)
|
||||
|
||||
assert result.exit_code == 0
|
||||
|
||||
import_data = parse_import_output(result.output)
|
||||
file_1 = pathlib.Path(test_image_1).name
|
||||
uuid_1 = import_data[file_1]
|
||||
photo_1 = Photo(uuid_1)
|
||||
|
||||
assert photo_1.filename == file_1
|
||||
|
||||
# test resume
|
||||
test_image_2 = os.path.join(cwd, TEST_IMAGE_2)
|
||||
result = runner.invoke(
|
||||
import_cli,
|
||||
["--verbose", "--resume", test_image_1, test_image_2],
|
||||
terminal_width=TERMINAL_WIDTH,
|
||||
)
|
||||
assert result.exit_code == 0
|
||||
assert "Skipping" in result.output
|
||||
assert "1 skipped" in result.output
|
||||
assert "imported 1" in result.output
|
||||
|
||||
249
tests/test_sqlitekvstore.py
Normal file
249
tests/test_sqlitekvstore.py
Normal file
@@ -0,0 +1,249 @@
|
||||
"""Test osxphotos.sqlitekvstore"""
|
||||
|
||||
import gzip
|
||||
import json
|
||||
import pickle
|
||||
import sqlite3
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
|
||||
import osxphotos.sqlitekvstore
|
||||
|
||||
|
||||
def pickle_and_zip(data: Any) -> bytes:
|
||||
"""
|
||||
Pickle and gzip data.
|
||||
|
||||
Args:
|
||||
data: data to pickle and gzip (must be pickle-able)
|
||||
|
||||
Returns:
|
||||
bytes of gzipped pickled data
|
||||
"""
|
||||
pickled = pickle.dumps(data)
|
||||
return gzip.compress(pickled)
|
||||
|
||||
|
||||
def unzip_and_unpickle(data: bytes) -> Any:
|
||||
"""
|
||||
Unzip and unpickle data.
|
||||
|
||||
Args:
|
||||
data: data to unzip and unpickle
|
||||
|
||||
Returns:
|
||||
unpickled data
|
||||
"""
|
||||
return pickle.loads(gzip.decompress(data))
|
||||
|
||||
|
||||
def test_basic_get_set(tmpdir):
|
||||
"""Test basic functionality"""
|
||||
dbpath = tmpdir / "kvtest.db"
|
||||
kvstore = osxphotos.sqlitekvstore.SQLiteKeyValueStore(dbpath)
|
||||
kvstore.set("foo", "bar")
|
||||
assert kvstore.get("foo") == "bar"
|
||||
assert kvstore.get("FOOBAR") is None
|
||||
kvstore.delete("foo")
|
||||
assert kvstore.get("foo") is None
|
||||
kvstore.set("baz", None)
|
||||
assert kvstore.get("baz") is None
|
||||
|
||||
kvstore.close()
|
||||
|
||||
# verify that the connection is closed
|
||||
conn = kvstore.connection()
|
||||
with pytest.raises(sqlite3.ProgrammingError):
|
||||
conn.execute("PRAGMA user_version;")
|
||||
|
||||
|
||||
def test_basic_get_set_wal(tmpdir):
|
||||
"""Test basic functionality with WAL mode"""
|
||||
dbpath = tmpdir / "kvtest.db"
|
||||
kvstore = osxphotos.sqlitekvstore.SQLiteKeyValueStore(dbpath, wal=True)
|
||||
kvstore.set("foo", "bar")
|
||||
assert kvstore.get("foo") == "bar"
|
||||
assert kvstore.get("FOOBAR") is None
|
||||
kvstore.delete("foo")
|
||||
assert kvstore.get("foo") is None
|
||||
kvstore.set("baz", None)
|
||||
assert kvstore.get("baz") is None
|
||||
|
||||
kvstore.vacuum()
|
||||
|
||||
kvstore.close()
|
||||
|
||||
# verify that the connection is closed
|
||||
conn = kvstore.connection()
|
||||
with pytest.raises(sqlite3.ProgrammingError):
|
||||
conn.execute("PRAGMA user_version;")
|
||||
|
||||
|
||||
def test_set_many(tmpdir):
|
||||
"""Test set_many()"""
|
||||
dbpath = tmpdir / "kvtest.db"
|
||||
|
||||
kvstore = osxphotos.sqlitekvstore.SQLiteKeyValueStore(dbpath)
|
||||
kvstore.set_many([("foo", "bar"), ("baz", "qux")])
|
||||
assert kvstore.get("foo") == "bar"
|
||||
assert kvstore.get("baz") == "qux"
|
||||
kvstore.close()
|
||||
|
||||
# make sure values got committed
|
||||
kvstore = osxphotos.sqlitekvstore.SQLiteKeyValueStore(dbpath)
|
||||
assert kvstore.get("foo") == "bar"
|
||||
assert kvstore.get("baz") == "qux"
|
||||
kvstore.close()
|
||||
|
||||
|
||||
def test_set_many_dict(tmpdir):
|
||||
"""Test set_many() with dict of values"""
|
||||
dbpath = tmpdir / "kvtest.db"
|
||||
|
||||
kvstore = osxphotos.sqlitekvstore.SQLiteKeyValueStore(dbpath)
|
||||
kvstore.set_many({"foo": "bar", "baz": "qux"})
|
||||
assert kvstore.get("foo") == "bar"
|
||||
assert kvstore.get("baz") == "qux"
|
||||
kvstore.close()
|
||||
|
||||
# make sure values got committed
|
||||
kvstore = osxphotos.sqlitekvstore.SQLiteKeyValueStore(dbpath)
|
||||
assert kvstore.get("foo") == "bar"
|
||||
assert kvstore.get("baz") == "qux"
|
||||
kvstore.close()
|
||||
|
||||
|
||||
def test_basic_context_handler(tmpdir):
|
||||
"""Test basic functionality with context handler"""
|
||||
|
||||
dbpath = tmpdir / "kvtest.db"
|
||||
with osxphotos.sqlitekvstore.SQLiteKeyValueStore(dbpath) as kvstore:
|
||||
kvstore.set("foo", "bar")
|
||||
assert kvstore.get("foo") == "bar"
|
||||
assert kvstore.get("FOOBAR") is None
|
||||
kvstore.delete("foo")
|
||||
assert kvstore.get("foo") is None
|
||||
|
||||
# verify that the connection is closed
|
||||
conn = kvstore.connection()
|
||||
with pytest.raises(sqlite3.ProgrammingError):
|
||||
conn.execute("PRAGMA user_version;")
|
||||
|
||||
|
||||
def test_about(tmpdir):
|
||||
"""Test about property"""
|
||||
dbpath = tmpdir / "kvtest.db"
|
||||
with osxphotos.sqlitekvstore.SQLiteKeyValueStore(dbpath) as kvstore:
|
||||
kvstore.about = "My description"
|
||||
assert kvstore.about == "My description"
|
||||
kvstore.about = "My new description"
|
||||
assert kvstore.about == "My new description"
|
||||
|
||||
|
||||
def test_existing_db(tmpdir):
|
||||
"""Test that opening an existing database works as expected"""
|
||||
dbpath = tmpdir / "kvtest.db"
|
||||
with osxphotos.sqlitekvstore.SQLiteKeyValueStore(dbpath) as kvstore:
|
||||
kvstore.set("foo", "bar")
|
||||
|
||||
with osxphotos.sqlitekvstore.SQLiteKeyValueStore(dbpath) as kvstore:
|
||||
assert kvstore.get("foo") == "bar"
|
||||
|
||||
|
||||
def test_dict_interface(tmpdir):
|
||||
""" "Test dict interface"""
|
||||
dbpath = tmpdir / "kvtest.db"
|
||||
with osxphotos.sqlitekvstore.SQLiteKeyValueStore(dbpath) as kvstore:
|
||||
kvstore["foo"] = "bar"
|
||||
assert kvstore["foo"] == "bar"
|
||||
assert len(kvstore) == 1
|
||||
assert kvstore.get("foo") == "bar"
|
||||
|
||||
assert "foo" in kvstore
|
||||
assert "FOOBAR" not in kvstore
|
||||
|
||||
assert kvstore.pop("foo") == "bar"
|
||||
assert kvstore.get("foo") is None
|
||||
|
||||
kvstore["❤️"] = "💖"
|
||||
assert kvstore["❤️"] == "💖"
|
||||
assert kvstore.get("❤️") == "💖"
|
||||
|
||||
del kvstore["❤️"]
|
||||
assert kvstore.get("❤️") is None
|
||||
|
||||
with pytest.raises(KeyError):
|
||||
kvstore["baz"]
|
||||
|
||||
with pytest.raises(KeyError):
|
||||
del kvstore["notakey"]
|
||||
|
||||
with pytest.raises(KeyError):
|
||||
kvstore.pop("foo")
|
||||
|
||||
|
||||
def test_serialize_deserialize(tmpdir):
|
||||
"""Test serialize/deserialize"""
|
||||
dbpath = tmpdir / "kvtest.db"
|
||||
kvstore = osxphotos.sqlitekvstore.SQLiteKeyValueStore(
|
||||
dbpath, serialize=json.dumps, deserialize=json.loads
|
||||
)
|
||||
kvstore.set("foo", {"bar": "baz"})
|
||||
assert kvstore.get("foo") == {"bar": "baz"}
|
||||
assert kvstore.get("FOOBAR") is None
|
||||
|
||||
|
||||
def test_serialize_deserialize_binary_data(tmpdir):
|
||||
"""Test serialize/deserialize with binary data"""
|
||||
dbpath = tmpdir / "kvtest.db"
|
||||
kvstore = osxphotos.sqlitekvstore.SQLiteKeyValueStore(
|
||||
dbpath, serialize=pickle_and_zip, deserialize=unzip_and_unpickle
|
||||
)
|
||||
kvstore.set("foo", {"bar": "baz"})
|
||||
assert kvstore.get("foo") == {"bar": "baz"}
|
||||
assert kvstore.get("FOOBAR") is None
|
||||
|
||||
|
||||
def test_serialize_deserialize_bad_callable(tmpdir):
|
||||
"""Test serialize/deserialize with bad values"""
|
||||
dbpath = tmpdir / "kvtest.db"
|
||||
with pytest.raises(TypeError):
|
||||
osxphotos.sqlitekvstore.SQLiteKeyValueStore(
|
||||
dbpath, serialize=1, deserialize=None
|
||||
)
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
osxphotos.sqlitekvstore.SQLiteKeyValueStore(
|
||||
dbpath, serialize=None, deserialize=1
|
||||
)
|
||||
|
||||
|
||||
def test_iter(tmpdir):
|
||||
"""Test generator behavior"""
|
||||
dbpath = tmpdir / "kvtest.db"
|
||||
kvstore = osxphotos.sqlitekvstore.SQLiteKeyValueStore(dbpath)
|
||||
kvstore.set("foo", "bar")
|
||||
kvstore.set("baz", "qux")
|
||||
kvstore.set("quux", "corge")
|
||||
kvstore.set("grault", "garply")
|
||||
assert len(kvstore) == 4
|
||||
assert sorted(iter(kvstore)) == ["baz", "foo", "grault", "quux"]
|
||||
|
||||
|
||||
def test_keys_values_items(tmpdir):
|
||||
"""Test keys, values, items"""
|
||||
dbpath = tmpdir / "kvtest.db"
|
||||
kvstore = osxphotos.sqlitekvstore.SQLiteKeyValueStore(dbpath)
|
||||
kvstore.set("foo", "bar")
|
||||
kvstore.set("baz", "qux")
|
||||
kvstore.set("quux", "corge")
|
||||
kvstore.set("grault", "garply")
|
||||
assert sorted(kvstore.keys()) == ["baz", "foo", "grault", "quux"]
|
||||
assert sorted(kvstore.values()) == ["bar", "corge", "garply", "qux"]
|
||||
assert sorted(kvstore.items()) == [
|
||||
("baz", "qux"),
|
||||
("foo", "bar"),
|
||||
("grault", "garply"),
|
||||
("quux", "corge"),
|
||||
]
|
||||
Reference in New Issue
Block a user