Implemented retry for export db, #569

This commit is contained in:
Rhet Turnbull
2022-05-24 09:13:44 -07:00
parent 1daf18ad9f
commit dae710b836
2 changed files with 260 additions and 296 deletions

View File

@@ -1,3 +1,3 @@
""" version info """
__version__ = "0.49.8"
__version__ = "0.49.9"

View File

@@ -19,7 +19,7 @@ from sqlite3 import Error
from tempfile import TemporaryDirectory
from typing import Any, List, Optional, Tuple, Union
from tenacity import retry, stop_after_attempt
from tenacity import retry, retry_if_not_exception_type, stop_after_attempt
from ._constants import OSXPHOTOS_EXPORT_DB
from ._version import __version__
@@ -36,7 +36,7 @@ OSXPHOTOS_EXPORTDB_VERSION = "7.1"
OSXPHOTOS_ABOUT_STRING = f"Created by osxphotos version {__version__} (https://github.com/RhetTbull/osxphotos) on {datetime.datetime.now()}"
# max retry attempts for methods which use tenacity.retry
MAX_RETRY_ATTEMPTS = 5
MAX_RETRY_ATTEMPTS = 3
# maximum number of export results rows to save
MAX_EXPORT_RESULTS_DATA_ROWS = 10
@@ -101,6 +101,7 @@ class ExportDB:
"""returns path to export directory"""
return self._path
@retry(stop=stop_after_attempt(MAX_RETRY_ATTEMPTS))
def get_file_record(self, filename: Union[pathlib.Path, str]) -> "ExportRecord":
"""get info for filename and uuid
@@ -118,6 +119,10 @@ class ExportDB:
return ExportRecord(conn, filename_normalized)
return None
@retry(
stop=stop_after_attempt(MAX_RETRY_ATTEMPTS),
retry=retry_if_not_exception_type(sqlite3.IntegrityError),
)
def create_file_record(
self, filename: Union[pathlib.Path, str], uuid: str
) -> "ExportRecord":
@@ -136,6 +141,10 @@ class ExportDB:
conn.commit()
return ExportRecord(conn, filename_normalized)
@retry(
stop=stop_after_attempt(MAX_RETRY_ATTEMPTS),
retry=retry_if_not_exception_type(sqlite3.IntegrityError),
)
def create_or_get_file_record(
self, filename: Union[pathlib.Path, str], uuid: str
) -> "ExportRecord":
@@ -154,25 +163,22 @@ class ExportDB:
conn.commit()
return ExportRecord(conn, filename_normalized)
@retry(stop=stop_after_attempt(MAX_RETRY_ATTEMPTS))
def get_uuid_for_file(self, filename):
"""query database for filename and return UUID
returns None if filename not found in database
"""
filepath_normalized = self._normalize_filepath_relative(filename)
conn = self._conn
try:
c = conn.cursor()
c.execute(
"SELECT uuid FROM export_data WHERE filepath_normalized = ?",
(filepath_normalized,),
)
results = c.fetchone()
uuid = results[0] if results else None
except Error as e:
logging.warning(e)
uuid = None
return uuid
c = conn.cursor()
c.execute(
"SELECT uuid FROM export_data WHERE filepath_normalized = ?",
(filepath_normalized,),
)
results = c.fetchone()
return results[0] if results else None
@retry(stop=stop_after_attempt(MAX_RETRY_ATTEMPTS))
def get_files_for_uuid(self, uuid: str) -> List:
"""query database for UUID and return list of files associated with UUID or empty list"""
conn = self._conn
@@ -184,33 +190,30 @@ class ExportDB:
results = c.fetchall()
return [os.path.join(self.export_dir, r[0]) for r in results]
@retry(stop=stop_after_attempt(MAX_RETRY_ATTEMPTS))
def get_photoinfo_for_uuid(self, uuid):
"""returns the photoinfo JSON struct for a UUID"""
conn = self._conn
try:
c = conn.cursor()
c.execute("SELECT photoinfo FROM photoinfo WHERE uuid = ?", (uuid,))
results = c.fetchone()
info = results[0] if results else None
except Error as e:
logging.warning(e)
info = None
return info
c = conn.cursor()
c.execute("SELECT photoinfo FROM photoinfo WHERE uuid = ?", (uuid,))
results = c.fetchone()
return results[0] if results else None
@retry(
stop=stop_after_attempt(MAX_RETRY_ATTEMPTS),
retry=retry_if_not_exception_type(sqlite3.IntegrityError),
)
def set_photoinfo_for_uuid(self, uuid, info):
"""sets the photoinfo JSON struct for a UUID"""
conn = self._conn
try:
c = conn.cursor()
c.execute(
"INSERT OR REPLACE INTO photoinfo(uuid, photoinfo) VALUES (?, ?);",
(uuid, info),
)
conn.commit()
except Error as e:
logging.warning(e)
c = conn.cursor()
c.execute(
"INSERT OR REPLACE INTO photoinfo(uuid, photoinfo) VALUES (?, ?);",
(uuid, info),
)
conn.commit()
@retry(stop=stop_after_attempt(MAX_RETRY_ATTEMPTS))
def get_target_for_file(
self, uuid: str, filename: Union[str, pathlib.Path]
) -> Optional[str]:
@@ -235,60 +238,62 @@ class ExportDB:
for result in results:
filepath_normalized = os.path.splitext(result[2])[0]
if re.match(re.escape(filepath_stem) + r"(\s\(\d+\))?$", filepath_normalized):
if re.match(
re.escape(filepath_stem) + r"(\s\(\d+\))?$", filepath_normalized
):
return os.path.join(self.export_dir, result[1])
return None
@retry(stop=stop_after_attempt(MAX_RETRY_ATTEMPTS))
def get_previous_uuids(self):
"""returns list of UUIDs of previously exported photos found in export database"""
conn = self._conn
previous_uuids = []
try:
c = conn.cursor()
c.execute("SELECT DISTINCT uuid FROM export_data")
results = c.fetchall()
previous_uuids = [row[0] for row in results]
except Error as e:
logging.warning(e)
return previous_uuids
c = conn.cursor()
c.execute("SELECT DISTINCT uuid FROM export_data")
results = c.fetchall()
return [row[0] for row in results]
@retry(
stop=stop_after_attempt(MAX_RETRY_ATTEMPTS),
retry=retry_if_not_exception_type(sqlite3.IntegrityError),
)
def set_config(self, config_data):
"""set config in the database"""
conn = self._conn
try:
dt = datetime.datetime.now().isoformat()
c = conn.cursor()
c.execute(
"INSERT OR REPLACE INTO config(datetime, config) VALUES (?, ?);",
(dt, config_data),
)
conn.commit()
except Error as e:
logging.warning(e)
dt = datetime.datetime.now().isoformat()
c = conn.cursor()
c.execute(
"INSERT OR REPLACE INTO config(datetime, config) VALUES (?, ?);",
(dt, config_data),
)
conn.commit()
@retry(
stop=stop_after_attempt(MAX_RETRY_ATTEMPTS),
retry=retry_if_not_exception_type(sqlite3.IntegrityError),
)
def set_export_results(self, results):
"""Store export results in database; data is pickled and gzipped for storage"""
results_data = pickle_and_zip(results)
conn = self._conn
try:
dt = datetime.datetime.now().isoformat()
c = conn.cursor()
c.execute(
"""
UPDATE export_results_data
SET datetime = ?,
export_results = ?
WHERE datetime = (SELECT MIN(datetime) FROM export_results_data);
""",
(dt, results_data),
)
conn.commit()
except Error as e:
logging.warning(e)
dt = datetime.datetime.now().isoformat()
c = conn.cursor()
c.execute(
"""
UPDATE export_results_data
SET datetime = ?,
export_results = ?
WHERE datetime = (SELECT MIN(datetime) FROM export_results_data);
""",
(dt, results_data),
)
conn.commit()
@retry(stop=stop_after_attempt(MAX_RETRY_ATTEMPTS))
def get_export_results(self, run: int = 0):
"""Retrieve export results from database
@@ -304,47 +309,39 @@ class ExportDB:
run = -run
conn = self._conn
c = conn.cursor()
c.execute(
"""
SELECT export_results
FROM export_results_data
ORDER BY datetime DESC
""",
)
rows = c.fetchall()
try:
c = conn.cursor()
c.execute(
"""
SELECT export_results
FROM export_results_data
ORDER BY datetime DESC
""",
)
rows = c.fetchall()
try:
data = rows[run][0]
results = unzip_and_unpickle(data) if data else None
except IndexError:
results = None
except Error as e:
logging.warning(e)
data = rows[run][0]
results = unzip_and_unpickle(data) if data else None
except IndexError:
results = None
return results
@retry(stop=stop_after_attempt(MAX_RETRY_ATTEMPTS))
def get_exported_files(self):
"""Returns tuple of (uuid, filepath) for all paths of all exported files tracked in the database"""
conn = self._conn
try:
c = conn.cursor()
c.execute("SELECT uuid, filepath FROM export_data")
except Error as e:
logging.warning(e)
return
c = conn.cursor()
c.execute("SELECT uuid, filepath FROM export_data")
while row := c.fetchone():
yield row[0], os.path.join(self.export_dir, row[1])
return
@retry(stop=stop_after_attempt(MAX_RETRY_ATTEMPTS))
def close(self):
"""close the database connection"""
try:
self._conn.close()
except Error as e:
logging.warning(e)
self._conn.close()
@retry(stop=stop_after_attempt(MAX_RETRY_ATTEMPTS))
def _open_export_db(self, dbfile):
"""open export database and return a db connection
if dbfile does not exist, will create and initialize the database
@@ -354,8 +351,6 @@ class ExportDB:
if not os.path.isfile(dbfile):
conn = self._get_db_connection(dbfile)
if not conn:
raise Exception(f"Error getting connection to database {dbfile}")
self._create_or_migrate_db_tables(conn)
self.was_created = True
self.was_upgraded = ()
@@ -379,16 +374,12 @@ class ExportDB:
return conn
@retry(stop=stop_after_attempt(MAX_RETRY_ATTEMPTS))
def _get_db_connection(self, dbfile):
"""return db connection to dbname"""
try:
conn = sqlite3.connect(dbfile)
except Error as e:
logging.warning(e)
conn = None
return conn
return sqlite3.connect(dbfile)
@retry(stop=stop_after_attempt(MAX_RETRY_ATTEMPTS))
def _get_database_version(self, conn):
"""return tuple of (osxphotos, exportdb) versions for database connection conn"""
version_info = conn.execute(
@@ -484,18 +475,15 @@ class ExportDB:
""" CREATE UNIQUE INDEX IF NOT EXISTS idx_detected_text on detected_text (uuid);""",
]
# create the tables if needed
try:
c = conn.cursor()
for cmd in sql_commands:
c.execute(cmd)
c.execute(
"INSERT INTO version(osxphotos, exportdb) VALUES (?, ?);",
(__version__, OSXPHOTOS_EXPORTDB_VERSION),
)
c.execute("INSERT INTO about(about) VALUES (?);", (OSXPHOTOS_ABOUT_STRING,))
conn.commit()
except Error as e:
logging.warning(e)
c = conn.cursor()
for cmd in sql_commands:
c.execute(cmd)
c.execute(
"INSERT INTO version(osxphotos, exportdb) VALUES (?, ?);",
(__version__, OSXPHOTOS_EXPORTDB_VERSION),
)
c.execute("INSERT INTO about(about) VALUES (?);", (OSXPHOTOS_ABOUT_STRING,))
conn.commit()
# perform needed migrations
if version[1] < "4.3":
@@ -524,6 +512,7 @@ class ExportDB:
with suppress(Exception):
self._conn.close()
@retry(stop=stop_after_attempt(MAX_RETRY_ATTEMPTS))
def _insert_run_info(self):
dt = datetime.datetime.now(datetime.timezone.utc).isoformat()
python_path = sys.executable
@@ -531,16 +520,13 @@ class ExportDB:
args = " ".join(sys.argv[1:]) if len(sys.argv) > 1 else ""
cwd = os.getcwd()
conn = self._conn
try:
c = conn.cursor()
c.execute(
"INSERT INTO runs (datetime, python_path, script_name, args, cwd) VALUES (?, ?, ?, ?, ?)",
(dt, python_path, cmd, args, cwd),
)
c = conn.cursor()
c.execute(
"INSERT INTO runs (datetime, python_path, script_name, args, cwd) VALUES (?, ?, ?, ?, ?)",
(dt, python_path, cmd, args, cwd),
)
conn.commit()
except Error as e:
logging.warning(e)
conn.commit()
def _relative_filepath(self, filepath: Union[str, pathlib.Path]) -> str:
"""return filepath relative to self._path"""
@@ -596,184 +582,169 @@ class ExportDB:
def _migrate_4_3_to_5_0(self, conn):
"""Migrate database from version 4.3 to 5.0"""
try:
c = conn.cursor()
# add metadata column to files to support --force-update
c.execute("ALTER TABLE files ADD COLUMN metadata TEXT;")
conn.commit()
except Error as e:
logging.warning(e)
c = conn.cursor()
# add metadata column to files to support --force-update
c.execute("ALTER TABLE files ADD COLUMN metadata TEXT;")
conn.commit()
def _migrate_5_0_to_6_0(self, conn):
try:
c = conn.cursor()
c = conn.cursor()
# add export_data table
c.execute(
""" CREATE TABLE IF NOT EXISTS export_data(
id INTEGER PRIMARY KEY,
filepath_normalized TEXT NOT NULL,
filepath TEXT NOT NULL,
uuid TEXT NOT NULL,
src_mode INTEGER,
src_size INTEGER,
src_mtime REAL,
dest_mode INTEGER,
dest_size INTEGER,
dest_mtime REAL,
digest TEXT,
exifdata JSON,
export_options INTEGER,
UNIQUE(filepath_normalized)
); """,
)
c.execute(
""" CREATE UNIQUE INDEX IF NOT EXISTS idx_export_data_filepath_normalized on export_data (filepath_normalized); """,
)
# add export_data table
c.execute(
""" CREATE TABLE IF NOT EXISTS export_data(
id INTEGER PRIMARY KEY,
filepath_normalized TEXT NOT NULL,
filepath TEXT NOT NULL,
uuid TEXT NOT NULL,
src_mode INTEGER,
src_size INTEGER,
src_mtime REAL,
dest_mode INTEGER,
dest_size INTEGER,
dest_mtime REAL,
digest TEXT,
exifdata JSON,
export_options INTEGER,
UNIQUE(filepath_normalized)
); """,
)
c.execute(
""" CREATE UNIQUE INDEX IF NOT EXISTS idx_export_data_filepath_normalized on export_data (filepath_normalized); """,
)
# migrate data
c.execute(
""" INSERT INTO export_data (filepath_normalized, filepath, uuid) SELECT filepath_normalized, filepath, uuid FROM files;""",
)
c.execute(
""" UPDATE export_data
SET (src_mode, src_size, src_mtime) =
(SELECT mode, size, mtime
FROM edited
WHERE export_data.filepath_normalized = edited.filepath_normalized);
""",
)
c.execute(
""" UPDATE export_data
SET (dest_mode, dest_size, dest_mtime) =
(SELECT orig_mode, orig_size, orig_mtime
FROM files
WHERE export_data.filepath_normalized = files.filepath_normalized);
""",
)
c.execute(
""" UPDATE export_data SET digest =
(SELECT metadata FROM files
WHERE files.filepath_normalized = export_data.filepath_normalized
); """
)
c.execute(
""" UPDATE export_data SET exifdata =
(SELECT json_exifdata FROM exifdata
WHERE exifdata.filepath_normalized = export_data.filepath_normalized
); """
)
# migrate data
c.execute(
""" INSERT INTO export_data (filepath_normalized, filepath, uuid) SELECT filepath_normalized, filepath, uuid FROM files;""",
)
c.execute(
""" UPDATE export_data
SET (src_mode, src_size, src_mtime) =
(SELECT mode, size, mtime
FROM edited
WHERE export_data.filepath_normalized = edited.filepath_normalized);
""",
)
c.execute(
""" UPDATE export_data
SET (dest_mode, dest_size, dest_mtime) =
(SELECT orig_mode, orig_size, orig_mtime
FROM files
WHERE export_data.filepath_normalized = files.filepath_normalized);
""",
)
c.execute(
""" UPDATE export_data SET digest =
(SELECT metadata FROM files
WHERE files.filepath_normalized = export_data.filepath_normalized
); """
)
c.execute(
""" UPDATE export_data SET exifdata =
(SELECT json_exifdata FROM exifdata
WHERE exifdata.filepath_normalized = export_data.filepath_normalized
); """
)
# create config table
c.execute(
""" CREATE TABLE IF NOT EXISTS config (
id INTEGER PRIMARY KEY,
datetime TEXT,
config TEXT
); """
)
# create config table
c.execute(
""" CREATE TABLE IF NOT EXISTS config (
id INTEGER PRIMARY KEY,
datetime TEXT,
config TEXT
); """
)
# create photoinfo table
c.execute(
""" CREATE TABLE IF NOT EXISTS photoinfo (
id INTEGER PRIMARY KEY,
uuid TEXT NOT NULL,
photoinfo JSON,
UNIQUE(uuid)
); """
)
c.execute(
"""CREATE UNIQUE INDEX IF NOT EXISTS idx_photoinfo_uuid on photoinfo (uuid);"""
)
c.execute(
""" INSERT INTO photoinfo (uuid, photoinfo) SELECT uuid, json_info FROM info;"""
)
# create photoinfo table
c.execute(
""" CREATE TABLE IF NOT EXISTS photoinfo (
id INTEGER PRIMARY KEY,
uuid TEXT NOT NULL,
photoinfo JSON,
UNIQUE(uuid)
); """
)
c.execute(
"""CREATE UNIQUE INDEX IF NOT EXISTS idx_photoinfo_uuid on photoinfo (uuid);"""
)
c.execute(
""" INSERT INTO photoinfo (uuid, photoinfo) SELECT uuid, json_info FROM info;"""
)
# drop indexes no longer needed
c.execute("DROP INDEX IF EXISTS idx_files_filepath_normalized;")
c.execute("DROP INDEX IF EXISTS idx_exifdata_filename;")
c.execute("DROP INDEX IF EXISTS idx_edited_filename;")
c.execute("DROP INDEX IF EXISTS idx_converted_filename;")
c.execute("DROP INDEX IF EXISTS idx_sidecar_filename;")
c.execute("DROP INDEX IF EXISTS idx_detected_text;")
# drop indexes no longer needed
c.execute("DROP INDEX IF EXISTS idx_files_filepath_normalized;")
c.execute("DROP INDEX IF EXISTS idx_exifdata_filename;")
c.execute("DROP INDEX IF EXISTS idx_edited_filename;")
c.execute("DROP INDEX IF EXISTS idx_converted_filename;")
c.execute("DROP INDEX IF EXISTS idx_sidecar_filename;")
c.execute("DROP INDEX IF EXISTS idx_detected_text;")
# drop tables no longer needed
c.execute("DROP TABLE IF EXISTS files;")
c.execute("DROP TABLE IF EXISTS info;")
c.execute("DROP TABLE IF EXISTS exifdata;")
c.execute("DROP TABLE IF EXISTS edited;")
c.execute("DROP TABLE IF EXISTS converted;")
c.execute("DROP TABLE IF EXISTS sidecar;")
c.execute("DROP TABLE IF EXISTS detected_text;")
# drop tables no longer needed
c.execute("DROP TABLE IF EXISTS files;")
c.execute("DROP TABLE IF EXISTS info;")
c.execute("DROP TABLE IF EXISTS exifdata;")
c.execute("DROP TABLE IF EXISTS edited;")
c.execute("DROP TABLE IF EXISTS converted;")
c.execute("DROP TABLE IF EXISTS sidecar;")
c.execute("DROP TABLE IF EXISTS detected_text;")
conn.commit()
except Error as e:
logging.warning(e)
conn.commit()
def _migrate_6_0_to_7_0(self, conn):
try:
c = conn.cursor()
c = conn.cursor()
c.execute(
"""CREATE TABLE IF NOT EXISTS export_results_data (
id INTEGER PRIMARY KEY,
datetime TEXT,
export_results BLOB
);"""
)
# pre-populate report_data table with blank fields
# ExportDB will use these as circular buffer always writing to the oldest record
for _ in range(MAX_EXPORT_RESULTS_DATA_ROWS):
c.execute(
"""CREATE TABLE IF NOT EXISTS export_results_data (
id INTEGER PRIMARY KEY,
datetime TEXT,
export_results BLOB
);"""
"""INSERT INTO export_results_data (datetime, export_results) VALUES (?, ?);""",
(datetime.datetime.now().isoformat(), b""),
)
# pre-populate report_data table with blank fields
# ExportDB will use these as circular buffer always writing to the oldest record
for _ in range(MAX_EXPORT_RESULTS_DATA_ROWS):
c.execute(
"""INSERT INTO export_results_data (datetime, export_results) VALUES (?, ?);""",
(datetime.datetime.now().isoformat(), b""),
)
# sleep a tiny bit just to ensure time stamps increment
time.sleep(0.001)
conn.commit()
except Error as e:
logging.warning(e)
# sleep a tiny bit just to ensure time stamps increment
time.sleep(0.001)
conn.commit()
def _migrate_7_0_to_7_1(self, conn):
try:
c = conn.cursor()
c.execute("""ALTER TABLE export_data ADD COLUMN timestamp DATETIME;""")
c.execute(
"""
CREATE TRIGGER insert_timestamp_trigger
AFTER INSERT ON export_data
BEGIN
UPDATE export_data SET timestamp = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW') WHERE id = NEW.id;
END;
"""
)
c.execute(
"""
CREATE TRIGGER update_timestamp_trigger
AFTER UPDATE On export_data
BEGIN
UPDATE export_data SET timestamp = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW') WHERE id = NEW.id;
END;
"""
)
conn.commit()
except Error as e:
logging.warning(e)
c = conn.cursor()
c.execute("""ALTER TABLE export_data ADD COLUMN timestamp DATETIME;""")
c.execute(
"""
CREATE TRIGGER insert_timestamp_trigger
AFTER INSERT ON export_data
BEGIN
UPDATE export_data SET timestamp = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW') WHERE id = NEW.id;
END;
"""
)
c.execute(
"""
CREATE TRIGGER update_timestamp_trigger
AFTER UPDATE On export_data
BEGIN
UPDATE export_data SET timestamp = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW') WHERE id = NEW.id;
END;
"""
)
conn.commit()
def _perform_db_maintenace(self, conn):
"""Perform database maintenance"""
try:
c = conn.cursor()
c.execute(
"""DELETE FROM config
WHERE id < (
SELECT MIN(id)
FROM (SELECT id FROM config ORDER BY id DESC LIMIT 9)
);
"""
)
conn.commit()
except Error as e:
logging.warning(e)
c = conn.cursor()
c.execute(
"""DELETE FROM config
WHERE id < (
SELECT MIN(id)
FROM (SELECT id FROM config ORDER BY id DESC LIMIT 9)
);
"""
)
conn.commit()
class ExportDBInMemory(ExportDB):
@@ -825,14 +796,13 @@ class ExportDBInMemory(ExportDB):
conn_on_disk.commit()
conn_on_disk.close()
@retry(stop=stop_after_attempt(MAX_RETRY_ATTEMPTS))
def close(self):
"""close the database connection"""
try:
if self._conn:
self._conn.close()
except Error as e:
logging.warning(e)
if self._conn:
self._conn.close()
@retry(stop=stop_after_attempt(MAX_RETRY_ATTEMPTS))
def _open_export_db(self, dbfile): # sourcery skip: raise-specific-error
"""open export database and return a db connection
returns: connection to the database
@@ -866,13 +836,7 @@ class ExportDBInMemory(ExportDB):
def _get_db_connection(self):
"""return db connection to in memory database"""
try:
conn = sqlite3.connect(":memory:")
except Error as e:
logging.warning(e)
conn = None
return conn
return sqlite3.connect(":memory:")
def _dump_db(self, conn: sqlite3.Connection) -> StringIO:
"""dump sqlite db to a string buffer"""