* Added ability to read iPhone database * Added comments * Fixed tests * Fixed albums for iPhone database
This commit is contained in:
parent
d8d2b1c4a1
commit
3cb4be4254
@ -54,6 +54,10 @@ def _process_searchinfo(self):
|
||||
self._db_searchinfo_labels = _db_searchinfo_labels = {}
|
||||
self._db_searchinfo_labels_normalized = _db_searchinfo_labels_normalized = {}
|
||||
|
||||
if self._skip_searchinfo:
|
||||
logging.debug("Skipping search info processing")
|
||||
return
|
||||
|
||||
if self._db_version <= _PHOTOS_4_VERSION:
|
||||
raise NotImplementedError(
|
||||
f"search info not implemented for this database version"
|
||||
@ -102,7 +106,7 @@ def _process_searchinfo(self):
|
||||
# 7: groups.normalized_string,
|
||||
# 8: groups.lookup_identifier
|
||||
|
||||
for row in c:
|
||||
for row in result:
|
||||
uuid = ints_to_uuid(row[1], row[2])
|
||||
# strings have null character appended, so strip it
|
||||
record = {
|
||||
|
||||
@ -90,7 +90,14 @@ class PhotosDB:
|
||||
labels_normalized_as_dict,
|
||||
)
|
||||
|
||||
def __init__(self, dbfile=None, verbose=None, exiftool=None, rich=None):
|
||||
def __init__(
|
||||
self,
|
||||
dbfile=None,
|
||||
verbose=None,
|
||||
exiftool=None,
|
||||
rich=None,
|
||||
_skip_searchinfo=False,
|
||||
):
|
||||
"""Create a new PhotosDB object.
|
||||
|
||||
Args:
|
||||
@ -98,6 +105,7 @@ class PhotosDB:
|
||||
verbose: optional callable function to use for printing verbose text during processing; if None (default), does not print output.
|
||||
exiftool: optional path to exiftool for methods that require this (e.g. PhotoInfo.exiftool); if not provided, will search PATH
|
||||
rich: use rich with verbose output
|
||||
_skip_searchinfo: if True, will not process search data from psi.sqlite; useful for processing standalone Photos.sqlite file
|
||||
|
||||
Raises:
|
||||
FileNotFoundError if dbfile is not a valid Photos library.
|
||||
@ -119,6 +127,7 @@ class PhotosDB:
|
||||
elif not callable(verbose):
|
||||
raise TypeError("verbose must be callable")
|
||||
self._verbose = verbose
|
||||
self._skip_searchinfo = _skip_searchinfo
|
||||
|
||||
# define functions for adding markup
|
||||
self._filepath = add_rich_markup_tag("filepath", rich=rich)
|
||||
@ -1780,9 +1789,9 @@ class PhotosDB:
|
||||
"parentfolder": album[7],
|
||||
"pk": album[8],
|
||||
"intrash": False if album[9] == 0 else True,
|
||||
"creation_date": album[10],
|
||||
"start_date": album[11],
|
||||
"end_date": album[12],
|
||||
"creation_date": album[10] or 0, # iPhone Photos.sqlite can have null value
|
||||
"start_date": album[11] or 0,
|
||||
"end_date": album[12] or 0,
|
||||
"customsortascending": album[13],
|
||||
"customsortkey": album[14],
|
||||
}
|
||||
|
||||
@ -30,7 +30,7 @@ def get_db_version(db_file):
|
||||
"""Gets the Photos DB version from LiGlobals table
|
||||
|
||||
Args:
|
||||
db_file: path to photos.db database file containing LiGlobals table
|
||||
db_file: path to photos.db database file containing LiGlobals table or Photos.sqlite database file
|
||||
|
||||
Returns: version as str
|
||||
"""
|
||||
@ -40,8 +40,24 @@ def get_db_version(db_file):
|
||||
(conn, c) = sqlite_open_ro(db_file)
|
||||
|
||||
# get database version
|
||||
c.execute("SELECT value from LiGlobals where LiGlobals.keyPath is 'libraryVersion'")
|
||||
version = c.fetchone()[0]
|
||||
result = [
|
||||
row[0]
|
||||
for row in c.execute(
|
||||
"SELECT name FROM sqlite_master WHERE type ='table' AND name NOT LIKE 'sqlite_%';"
|
||||
).fetchall()
|
||||
]
|
||||
if "LiGlobals" in result:
|
||||
# it's a photos.db file
|
||||
c.execute(
|
||||
"SELECT value FROM LiGlobals WHERE LiGlobals.keyPath IS 'libraryVersion'"
|
||||
)
|
||||
version = c.fetchone()[0]
|
||||
elif "Z_METADATA" in result:
|
||||
# assume it's a Photos 5+ Photos.sqlite file
|
||||
# get_model_version will find the exact version
|
||||
version = "5001"
|
||||
else:
|
||||
raise ValueError(f"Unknown database format: {db_file}")
|
||||
conn.close()
|
||||
|
||||
if version not in _TESTED_DB_VERSIONS:
|
||||
@ -62,8 +78,6 @@ def get_model_version(db_file):
|
||||
Returns: model version as str
|
||||
"""
|
||||
|
||||
version = None
|
||||
|
||||
(conn, c) = sqlite_open_ro(db_file)
|
||||
|
||||
# get database version
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
"""sqlite utils for use by osxphotos"""
|
||||
|
||||
import os.path
|
||||
import logging
|
||||
import pathlib
|
||||
import sqlite3
|
||||
from typing import List, Tuple
|
||||
@ -25,19 +25,14 @@ def sqlite_db_is_locked(dbname):
|
||||
returns True if database is locked, otherwise False
|
||||
dbname: name of database to test"""
|
||||
|
||||
# first, check to see if lock file exists, if so, assume the file is locked
|
||||
lock_name = f"{dbname}.lock"
|
||||
if os.path.exists(lock_name):
|
||||
return True
|
||||
|
||||
# no lock file so try to read from the database to see if it's locked
|
||||
locked = None
|
||||
try:
|
||||
(conn, c) = sqlite_open_ro(dbname)
|
||||
c.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name;")
|
||||
conn.close()
|
||||
locked = False
|
||||
except Exception:
|
||||
except Exception as e:
|
||||
logging.debug(f"sqlite_db_is_locked: {e}")
|
||||
locked = True
|
||||
|
||||
return locked
|
||||
|
||||
@ -6,17 +6,9 @@ import sqlite3
|
||||
from osxphotos.sqlite_utils import sqlite_open_ro, sqlite_db_is_locked
|
||||
|
||||
|
||||
DB_LOCKED_10_12 = "./tests/Test-Lock-10_12.photoslibrary/database/photos.db"
|
||||
DB_LOCKED_10_15 = "./tests/Test-Lock-10_15_1.photoslibrary/database/Photos.sqlite"
|
||||
DB_UNLOCKED_10_15 = "./tests/Test-10.15.1.photoslibrary/database/photos.db"
|
||||
|
||||
|
||||
def test_db_is_locked_locked():
|
||||
|
||||
assert sqlite_db_is_locked(DB_LOCKED_10_12)
|
||||
assert sqlite_db_is_locked(DB_LOCKED_10_15)
|
||||
|
||||
|
||||
def test_db_is_locked_unlocked():
|
||||
|
||||
assert not sqlite_db_is_locked(DB_UNLOCKED_10_15)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user