first commit

This commit is contained in:
Rhet Turnbull 2019-06-12 11:48:30 -07:00
commit 8b61d573ed
4 changed files with 2645 additions and 0 deletions

441
osxphotos/__init__.py Normal file
View File

@ -0,0 +1,441 @@
import platform
import os.path
from pathlib import Path
from plistlib import load as plistload
from datetime import datetime
import tempfile
import objc
import CoreFoundation
from Foundation import *
import urllib.parse
import sys
from shutil import copyfile
import pprint
import sqlite3
from loguru import logger
from . import _applescript
# TODO: replace verbose with loguru
# replace string formatting with fstrings
_debug = False
def _get_os_version():
# returns tuple containing OS version
# e.g. 10.13.6 = (10, 13, 6)
(ver, major, minor) = platform.mac_ver()[0].split(".")
return (ver, major, minor)
def _check_file_exists(filename):
# returns true if file exists and is not a directory
# otherwise returns false
filename = os.path.abspath(filename)
return os.path.exists(filename) and not os.path.isdir(filename)
class Photos:
def __init__(self, dbfile=None):
# Dict with information about all photos by uuid
self._dbphotos = {}
# Dict with information about all persons/photos by uuid
self._dbfaces_uuid = {}
# Dict with information about all persons/photos by person
self._dbfaces_person = {}
# Dict with information about all keywords/photos by uuid
self._dbkeywords_uuid = {}
# Dict with information about all keywords/photos by keyword
self._dbkeywords_keyword = {}
# Dict with information about all albums/photos by uuid
self._dbalbums_uuid = {}
# Dict with information about all albums/photos by album
self._dbalbums_album = {}
# Dict with information about all the volumes/photos by uuid
self._dbvolumes = {}
print(dbfile)
if dbfile is None:
library_path = self.get_photos_library_path()
print("library_path: " + library_path)
# TODO: verify library path not None
dbfile = os.path.join(library_path, "database/photos.db")
print(dbfile)
logger.debug("filename = %s" % dbfile)
# TODO: replace os.path with pathlib
# TODO: clean this up -- we'll already know library_path
library_path = os.path.dirname(dbfile)
(library_path, tmp) = os.path.split(library_path)
masters_path = os.path.join(library_path, "Masters")
logger.debug("library = %s, masters = %s" % (library_path, masters_path))
if not _check_file_exists(dbfile):
sys.exit("_dbfile %s does not exist" % (dbfile))
logger.info("database filename = %s" % dbfile)
self._dbfile = dbfile
self._setup_applescript()
self._process_database()
def keywords(self):
# return keywords as dict of keyword, count in reverse sorted order (descending)
keywords = {}
for k in self._dbkeywords_keyword.keys():
keywords[k] = len(self._dbkeywords_keyword[k])
keywords = dict(sorted(keywords.items(), key=lambda kv: kv[1], reverse=True))
return keywords
# Various AppleScripts we need
def _setup_applescript(self):
self._scpt_export = ""
self._scpt_launch = ""
self._scpt_quit = ""
# Compile apple script that exports one image
# self._scpt_export = _applescript.AppleScript('''
# on run {arg}
# set thepath to "%s"
# tell application "Photos"
# set theitem to media item id arg
# set thelist to {theitem}
# export thelist to POSIX file thepath
# end tell
# end run
# ''' % (tmppath))
#
# Compile apple script that launches Photos.App
self._scpt_launch = _applescript.AppleScript(
"""
on run
tell application "Photos"
activate
end tell
end run
"""
)
# Compile apple script that quits Photos.App
self._scpt_quit = _applescript.AppleScript(
"""
on run
tell application "Photos"
quit
end tell
end run
"""
)
def get_photos_library_path(self):
# return the path to the Photos library
plist_file = Path(
str(Path.home())
+ "/Library/Containers/com.apple.Photos/Data/Library/Preferences/com.apple.Photos.plist"
)
if plist_file.is_file():
with open(plist_file, "rb") as fp:
pl = plistload(fp)
else:
print("could not find plist file: " + str(plist_file), file=sys.stderr)
return None
# get the IPXDefaultLibraryURLBookmark from com.apple.Photos.plist
# this is a serialized CFData object
photosurlref = pl["IPXDefaultLibraryURLBookmark"]
if photosurlref != None:
# use CFURLCreateByResolvingBookmarkData to de-serialize bookmark data into a CFURLRef
photosurl = CoreFoundation.CFURLCreateByResolvingBookmarkData(
kCFAllocatorDefault, photosurlref, 0, None, None, None, None
)
# the CFURLRef we got is a sruct that python treats as an array
# I'd like to pass this to CFURLGetFileSystemRepresentation to get the path but
# CFURLGetFileSystemRepresentation barfs when it gets an array from python instead of expected struct
# first element is the path string in form:
# file:///Users/username/Pictures/Photos%20Library.photoslibrary/
photosurlstr = photosurl[0].absoluteString() if photosurl[0] else None
# now coerce the file URI back into an OS path
# surely there must be a better way
if photosurlstr is not None:
photospath = os.path.normpath(
urllib.parse.unquote(urllib.parse.urlparse(photosurlstr).path)
)
else:
print(
"Could not extract photos URL String from IPXDefaultLibraryURLBookmark",
file=sys.stderr,
)
return None
return photospath
else:
print("Could not get path to Photos database", file=sys.stderr)
return None
def _copy_db_file(self, fname):
# copies the sqlite database file to a temp file
# returns the name of the temp file
# required because python's sqlite3 implementation can't read a locked file
fd, tmp = tempfile.mkstemp(suffix=".db", prefix="photos")
logger.debug("copying " + fname + " to " + tmp)
try:
copyfile(fname, tmp)
except:
print("copying " + fname + " to " + tmp, file=sys.stderr)
sys.exit()
return tmp
def _open_sql_file(self, file):
fname = file
logger.debug("Trying to open database %s" % (fname))
try:
conn = sqlite3.connect("%s" % (fname))
c = conn.cursor()
except sqlite3.Error as e:
print("An error occurred: %s %s" % (e.args[0], fname))
sys.exit(3)
logger.debug("SQLite database is open")
return (conn, c)
def _process_database(self):
global _debug
fname = self._dbfile
# Epoch is Jan 1, 2001
td = (datetime(2001, 1, 1, 0, 0) - datetime(1970, 1, 1, 0, 0)).total_seconds()
# Ensure Photos.App is not running
self._scpt_quit.run()
tmp_db = self._copy_db_file(fname)
(conn, c) = self._open_sql_file(tmp_db)
logger.debug("Have connection with database")
# Look for all combinations of persons and pictures
logger.debug("Getting information about persons")
i = 0
c.execute(
"select count(*) from RKFace, RKPerson where RKFace.personID = RKperson.modelID"
)
# init_pbar_status("Faces", c.fetchone()[0])
# c.execute("select RKPerson.name, RKFace.imageID from RKFace, RKPerson where RKFace.personID = RKperson.modelID")
c.execute(
"select RKPerson.name, RKVersion.uuid from RKFace, RKPerson, RKVersion, RKMaster "
+ "where RKFace.personID = RKperson.modelID and RKVersion.modelId = RKFace.ImageModelId "
+ "and RKVersion.type = 2 and RKVersion.masterUuid = RKMaster.uuid and "
+ "RKVersion.filename not like '%.pdf'"
)
for person in c:
if person[0] == None:
logger.debug("skipping person = None %s" % person[1])
continue
if not person[1] in self._dbfaces_uuid:
self._dbfaces_uuid[person[1]] = []
if not person[0] in self._dbfaces_person:
self._dbfaces_person[person[0]] = []
self._dbfaces_uuid[person[1]].append(person[0])
self._dbfaces_person[person[0]].append(person[1])
# set_pbar_status(i)
i = i + 1
logger.debug("Finished walking through persons")
# close_pbar_status()
logger.debug("Getting information about albums")
i = 0
c.execute(
"select count(*) from RKAlbum, RKVersion, RKAlbumVersion where "
+ "RKAlbum.modelID = RKAlbumVersion.albumId and "
+ "RKAlbumVersion.versionID = RKVersion.modelId and "
+ "RKVersion.filename not like '%.pdf' and RKVersion.isInTrash = 0"
)
# init_pbar_status("Albums", c.fetchone()[0])
# c.execute("select RKPerson.name, RKFace.imageID from RKFace, RKPerson where RKFace.personID = RKperson.modelID")
c.execute(
"select RKAlbum.name, RKVersion.uuid from RKAlbum, RKVersion, RKAlbumVersion "
+ "where RKAlbum.modelID = RKAlbumVersion.albumId and "
+ "RKAlbumVersion.versionID = RKVersion.modelId and RKVersion.type = 2 and "
+ "RKVersion.filename not like '%.pdf' and RKVersion.isInTrash = 0"
)
for album in c:
# store by uuid in _dbalbums_uuid and by album in _dbalbums_album
if not album[1] in self._dbalbums_uuid:
self._dbalbums_uuid[album[1]] = []
if not album[0] in self._dbalbums_album:
self._dbalbums_album[album[0]] = []
self._dbalbums_uuid[album[1]].append(album[0])
self._dbalbums_album[album[0]].append(album[1])
logger.debug("%s %s" % (album[1], album[0]))
# set_pbar_status(i)
i = i + 1
logger.debug("Finished walking through albums")
# close_pbar_status()
logger.debug("Getting information about keywords")
c.execute(
"select count(*) from RKKeyword, RKKeywordForVersion,RKVersion, RKMaster "
+ "where RKKeyword.modelId = RKKeyWordForVersion.keywordID and "
+ "RKVersion.modelID = RKKeywordForVersion.versionID and RKMaster.uuid = "
+ "RKVersion.masterUuid and RKVersion.filename not like '%.pdf' and RKVersion.isInTrash = 0"
)
# init_pbar_status("Keywords", c.fetchone()[0])
c.execute(
"select RKKeyword.name, RKVersion.uuid, RKMaster.uuid from "
+ "RKKeyword, RKKeywordForVersion, RKVersion, RKMaster "
+ "where RKKeyword.modelId = RKKeyWordForVersion.keywordID and "
+ "RKVersion.modelID = RKKeywordForVersion.versionID "
+ "and RKMaster.uuid = RKVersion.masterUuid and RKVersion.type = 2 "
+ "and RKVersion.filename not like '%.pdf' and RKVersion.isInTrash = 0"
)
i = 0
for keyword in c:
if not keyword[1] in self._dbkeywords_uuid:
self._dbkeywords_uuid[keyword[1]] = []
if not keyword[0] in self._dbkeywords_keyword:
self._dbkeywords_keyword[keyword[0]] = []
self._dbkeywords_uuid[keyword[1]].append(keyword[0])
self._dbkeywords_keyword[keyword[0]].append(keyword[1])
logger.debug("%s %s" % (keyword[1], keyword[0]))
# set_pbar_status(i)
i = i + 1
logger.debug("Finished walking through keywords")
# close_pbar_status()
logger.debug("Getting information about volumes")
c.execute("select count(*) from RKVolume")
# init_pbar_status("Volumes", c.fetchone()[0])
c.execute("select RKVolume.modelId, RKVolume.name from RKVolume")
i = 0
for vol in c:
self._dbvolumes[vol[0]] = vol[1]
logger.debug("%s %s" % (vol[0], vol[1]))
# set_pbar_status(i)
i = i + 1
logger.debug("Finished walking through volumes")
# close_pbar_status()
logger.debug("Getting information about photos")
c.execute(
"select count(*) from RKVersion, RKMaster where RKVersion.isInTrash = 0 and "
+ "RKVersion.type = 2 and RKVersion.masterUuid = RKMaster.uuid and "
+ "RKVersion.filename not like '%.pdf'"
)
# init_pbar_status("Photos", c.fetchone()[0])
c.execute(
"select RKVersion.uuid, RKVersion.modelId, RKVersion.masterUuid, RKVersion.filename, "
+ "RKVersion.lastmodifieddate, RKVersion.imageDate, RKVersion.mainRating, "
+ "RKVersion.hasAdjustments, RKVersion.hasKeywords, RKVersion.imageTimeZoneOffsetSeconds, "
+ "RKMaster.volumeId, RKMaster.imagePath, RKVersion.extendedDescription, RKVersion.name, "
+ "RKMaster.isMissing "
+ "from RKVersion, RKMaster where RKVersion.isInTrash = 0 and RKVersion.type = 2 and "
+ "RKVersion.masterUuid = RKMaster.uuid and RKVersion.filename not like '%.pdf'"
)
i = 0
for row in c:
# set_pbar_status(i)
i = i + 1
uuid = row[0]
if _debug:
print("i = %d, uuid = '%s, master = '%s" % (i, uuid, row[2]))
self._dbphotos[uuid] = {}
self._dbphotos[uuid]["modelID"] = row[1]
self._dbphotos[uuid]["masterUuid"] = row[2]
self._dbphotos[uuid]["filename"] = row[3]
try:
self._dbphotos[uuid]["lastmodifieddate"] = datetime.fromtimestamp(
row[4] + td
)
except:
self._dbphotos[uuid]["lastmodifieddate"] = datetime.fromtimestamp(
row[5] + td
)
self._dbphotos[uuid]["imageDate"] = datetime.fromtimestamp(row[5] + td)
self._dbphotos[uuid]["mainRating"] = row[6]
self._dbphotos[uuid]["hasAdjustments"] = row[7]
self._dbphotos[uuid]["hasKeywords"] = row[8]
self._dbphotos[uuid]["imageTimeZoneOffsetSeconds"] = row[9]
self._dbphotos[uuid]["volumeId"] = row[10]
self._dbphotos[uuid]["imagePath"] = row[11]
self._dbphotos[uuid]["extendedDescription"] = row[12]
self._dbphotos[uuid]["name"] = row[13]
self._dbphotos[uuid]["isMissing"] = row[14]
logger.debug(
"Fetching data for photo %d %s %s %s %s %s: %s"
% (
i,
uuid,
self._dbphotos[uuid]["masterUuid"],
self._dbphotos[uuid]["volumeId"],
self._dbphotos[uuid]["filename"],
self._dbphotos[uuid]["extendedDescription"],
self._dbphotos[uuid]["imageDate"],
)
)
# close_pbar_status()
conn.close()
# add faces and keywords to photo data
for uuid in self._dbphotos:
# keywords
if self._dbphotos[uuid]["hasKeywords"] == 1:
self._dbphotos[uuid]["keywords"] = self._dbkeywords_uuid[uuid]
else:
self._dbphotos[uuid]["keywords"] = []
if uuid in self._dbfaces_uuid:
self._dbphotos[uuid]["hasPersons"] = 1
self._dbphotos[uuid]["persons"] = self._dbfaces_uuid[uuid]
else:
self._dbphotos[uuid]["hasPersons"] = 0
self._dbphotos[uuid]["persons"] = []
if uuid in self._dbalbums_uuid:
self._dbphotos[uuid]["albums"] = self._dbalbums_uuid[uuid]
self._dbphotos[uuid]["hasAlbums"] = 1
else:
self._dbphotos[uuid]["albums"] = []
self._dbphotos[uuid]["hasAlbums"] = 0
if self._dbphotos[uuid]["volumeId"] is not None:
self._dbphotos[uuid]["volume"] = self._dbvolumes[
self._dbphotos[uuid]["volumeId"]
]
else:
self._dbphotos[uuid]["volume"] = None
# remove temporary copy of the database
try:
logger.info("Removing temporary database file" + tmp_db)
os.remove(tmp_db)
except:
print("Could not remove temporary database: " + tmp_db, file=sys.stderr)
if _debug:
pp = pprint.PrettyPrinter(indent=4)
print("Faces:")
pp.pprint(self._dbfaces_uuid)
print("Keywords by uuid:")
pp.pprint(self._dbkeywords_uuid)
print("Keywords by keyword:")
pp.pprint(self._dbkeywords_keyword)
print("Albums by uuid:")
pp.pprint(self._dbalbums_uuid)
print("Albums by album:")
pp.pprint(self._dbalbums_album)
print("Volumes:")
pp.pprint(self._dbvolumes)
print("Photos:")
pp.pprint(self._dbphotos)
logger.debug(f"processed {len(self._dbphotos)} photos")

207
osxphotos/_applescript.py Normal file
View File

@ -0,0 +1,207 @@
""" applescript -- Easy-to-use Python wrapper for NSAppleScript """
"""
This code is from py-applescript, a public domain package available at:
https://github.com/rdhyee/py-applescript
I've included the whole thing here for simplicity as there is more than one
applescript packge on PyPi so there's ambiguity as to which one "import applescript"
would use if user had installed another library.
This package is used instead of the others because it uses a native PyObjC
bridge and is thus much faster than others which use osascript.
"""
import sys
from Foundation import (
NSAppleScript,
NSAppleEventDescriptor,
NSURL,
NSAppleScriptErrorMessage,
NSAppleScriptErrorBriefMessage,
NSAppleScriptErrorNumber,
NSAppleScriptErrorAppName,
NSAppleScriptErrorRange,
)
from .aecodecs import Codecs, fourcharcode, AEType, AEEnum
from . import kae
__all__ = ["AppleScript", "ScriptError", "AEType", "AEEnum", "kMissingValue", "kae"]
######################################################################
class AppleScript:
""" Represents a compiled AppleScript. The script object is persistent; its handlers may be called multiple times and its top-level properties will retain current state until the script object's disposal.
"""
_codecs = Codecs()
def __init__(self, source=None, path=None):
"""
source : str | None -- AppleScript source code
path : str | None -- full path to .scpt/.applescript file
Notes:
- Either the path or the source argument must be provided.
- If the script cannot be read/compiled, a ScriptError is raised.
"""
if path:
url = NSURL.fileURLWithPath_(path)
self._script, errorinfo = NSAppleScript.alloc().initWithContentsOfURL_error_(
url, None
)
if errorinfo:
raise ScriptError(errorinfo)
elif source:
self._script = NSAppleScript.alloc().initWithSource_(source)
else:
raise ValueError("Missing source or path argument.")
if not self._script.isCompiled():
errorinfo = self._script.compileAndReturnError_(None)[1]
if errorinfo:
raise ScriptError(errorinfo)
def __repr__(self):
s = self.source
return "AppleScript({})".format(
repr(s) if len(s) < 100 else "{}...{}".format(repr(s)[:80], repr(s)[-17:])
)
##
def _newevent(self, suite, code, args):
evt = NSAppleEventDescriptor.appleEventWithEventClass_eventID_targetDescriptor_returnID_transactionID_(
fourcharcode(suite),
fourcharcode(code),
NSAppleEventDescriptor.nullDescriptor(),
0,
0,
)
evt.setDescriptor_forKeyword_(
self._codecs.pack(args), fourcharcode(kae.keyDirectObject)
)
return evt
def _unpackresult(self, result, errorinfo):
if not result:
raise ScriptError(errorinfo)
return self._codecs.unpack(result)
##
source = property(
lambda self: str(self._script.source()), doc="str -- the script's source code"
)
def run(self, *args):
""" Run the script, optionally passing arguments to its run handler.
args : anything -- arguments to pass to script, if any; see also supported type mappings documentation
Result : anything | None -- the script's return value, if any
Notes:
- The run handler must be explicitly declared in order to pass arguments.
- AppleScript will ignore excess arguments. Passing insufficient arguments will result in an error.
- If execution fails, a ScriptError is raised.
"""
if args:
evt = self._newevent(kae.kCoreEventClass, kae.kAEOpenApplication, args)
return self._unpackresult(*self._script.executeAppleEvent_error_(evt, None))
else:
return self._unpackresult(*self._script.executeAndReturnError_(None))
def call(self, name, *args):
""" Call the specified user-defined handler.
name : str -- the handler's name (case-sensitive)
args : anything -- arguments to pass to script, if any; see documentation for supported types
Result : anything | None -- the script's return value, if any
Notes:
- The handler's name must be a user-defined identifier, not an AppleScript keyword; e.g. 'myCount' is acceptable; 'count' is not.
- AppleScript will ignore excess arguments. Passing insufficient arguments will result in an error.
- If execution fails, a ScriptError is raised.
"""
evt = self._newevent(
kae.kASAppleScriptSuite, kae.kASPrepositionalSubroutine, args
)
evt.setDescriptor_forKeyword_(
NSAppleEventDescriptor.descriptorWithString_(name),
fourcharcode(kae.keyASSubroutineName),
)
return self._unpackresult(*self._script.executeAppleEvent_error_(evt, None))
##
class ScriptError(Exception):
""" Indicates an AppleScript compilation/execution error. """
def __init__(self, errorinfo):
self._errorinfo = dict(errorinfo)
def __repr__(self):
return "ScriptError({})".format(self._errorinfo)
@property
def message(self):
""" str -- the error message """
msg = self._errorinfo.get(NSAppleScriptErrorMessage)
if not msg:
msg = self._errorinfo.get(NSAppleScriptErrorBriefMessage, "Script Error")
return msg
number = property(
lambda self: self._errorinfo.get(NSAppleScriptErrorNumber),
doc="int | None -- the error number, if given",
)
appname = property(
lambda self: self._errorinfo.get(NSAppleScriptErrorAppName),
doc="str | None -- the name of the application that reported the error, where relevant",
)
@property
def range(self):
""" (int, int) -- the start and end points (1-indexed) within the source code where the error occurred """
range = self._errorinfo.get(NSAppleScriptErrorRange)
if range:
start = range.rangeValue().location
end = start + range.rangeValue().length
return (start, end)
else:
return None
def __str__(self):
msg = self.message
for s, v in [
(" ({})", self.number),
(" app={!r}", self.appname),
(" range={0[0]}-{0[1]}", self.range),
]:
if v is not None:
msg += s.format(v)
return (
msg.encode("ascii", "replace") if sys.version_info.major < 3 else msg
) # 2.7 compatibility
##
kMissingValue = AEType(kae.cMissingValue) # convenience constant

294
osxphotos/aecodecs.py Normal file
View File

@ -0,0 +1,294 @@
""" aecodecs -- Convert from common Python types to Apple Event Manager types and vice-versa. """
import datetime, struct, sys
from Foundation import NSAppleEventDescriptor, NSURL
from . import kae
__all__ = ["Codecs", "AEType", "AEEnum"]
######################################################################
def fourcharcode(code):
""" Convert four-char code for use in NSAppleEventDescriptor methods.
code : bytes -- four-char code, e.g. b'utxt'
Result : int -- OSType, e.g. 1970567284
"""
return struct.unpack(">I", code)[0]
#######
class Codecs:
""" Implements mappings for common Python types with direct AppleScript equivalents. Used by AppleScript class. """
kMacEpoch = datetime.datetime(1904, 1, 1)
kUSRF = fourcharcode(kae.keyASUserRecordFields)
def __init__(self):
# Clients may add/remove/replace encoder and decoder items:
self.encoders = {
NSAppleEventDescriptor.class__(): self.packdesc,
type(None): self.packnone,
bool: self.packbool,
int: self.packint,
float: self.packfloat,
bytes: self.packbytes,
str: self.packstr,
list: self.packlist,
tuple: self.packlist,
dict: self.packdict,
datetime.datetime: self.packdatetime,
AEType: self.packtype,
AEEnum: self.packenum,
}
if sys.version_info.major < 3: # 2.7 compatibility
self.encoders[unicode] = self.packstr
self.decoders = {
fourcharcode(k): v
for k, v in {
kae.typeNull: self.unpacknull,
kae.typeBoolean: self.unpackboolean,
kae.typeFalse: self.unpackboolean,
kae.typeTrue: self.unpackboolean,
kae.typeSInt32: self.unpacksint32,
kae.typeIEEE64BitFloatingPoint: self.unpackfloat64,
kae.typeUTF8Text: self.unpackunicodetext,
kae.typeUTF16ExternalRepresentation: self.unpackunicodetext,
kae.typeUnicodeText: self.unpackunicodetext,
kae.typeLongDateTime: self.unpacklongdatetime,
kae.typeAEList: self.unpackaelist,
kae.typeAERecord: self.unpackaerecord,
kae.typeAlias: self.unpackfile,
kae.typeFSS: self.unpackfile,
kae.typeFSRef: self.unpackfile,
kae.typeFileURL: self.unpackfile,
kae.typeType: self.unpacktype,
kae.typeEnumeration: self.unpackenumeration,
}.items()
}
def pack(self, data):
"""Pack Python data.
data : anything -- a Python value
Result : NSAppleEventDescriptor -- an AE descriptor, or error if no encoder exists for this type of data
"""
try:
return self.encoders[data.__class__](data) # quick lookup by type/class
except (KeyError, AttributeError) as e:
for (
type,
encoder,
) in (
self.encoders.items()
): # slower but more thorough lookup that can handle subtypes/subclasses
if isinstance(data, type):
return encoder(data)
raise TypeError(
"Can't pack data into an AEDesc (unsupported type): {!r}".format(data)
)
def unpack(self, desc):
"""Unpack an Apple event descriptor.
desc : NSAppleEventDescriptor
Result : anything -- a Python value, or the original NSAppleEventDescriptor if no decoder is found
"""
decoder = self.decoders.get(desc.descriptorType())
# unpack known type
if decoder:
return decoder(desc)
# if it's a record-like desc, unpack as dict with an extra AEType(b'pcls') key containing the desc type
rec = desc.coerceToDescriptorType_(fourcharcode(kae.typeAERecord))
if rec:
rec = self.unpackaerecord(rec)
rec[AEType(kae.pClass)] = AEType(struct.pack(">I", desc.descriptorType()))
return rec
# return as-is
return desc
##
def _packbytes(self, desctype, data):
return NSAppleEventDescriptor.descriptorWithDescriptorType_bytes_length_(
fourcharcode(desctype), data, len(data)
)
def packdesc(self, val):
return val
def packnone(self, val):
return NSAppleEventDescriptor.nullDescriptor()
def packbool(self, val):
return NSAppleEventDescriptor.descriptorWithBoolean_(int(val))
def packint(self, val):
if (-2 ** 31) <= val < (2 ** 31):
return NSAppleEventDescriptor.descriptorWithInt32_(val)
else:
return self.pack(float(val))
def packfloat(self, val):
return self._packbytes(kae.typeFloat, struct.pack("d", val))
def packbytes(self, val):
return self._packbytes(kae.typeData, val)
def packstr(self, val):
return NSAppleEventDescriptor.descriptorWithString_(val)
def packdatetime(self, val):
delta = val - self.kMacEpoch
sec = delta.days * 3600 * 24 + delta.seconds
return self._packbytes(kae.typeLongDateTime, struct.pack("q", sec))
def packlist(self, val):
lst = NSAppleEventDescriptor.listDescriptor()
for item in val:
lst.insertDescriptor_atIndex_(self.pack(item), 0)
return lst
def packdict(self, val):
record = NSAppleEventDescriptor.recordDescriptor()
usrf = desctype = None
for key, value in val.items():
if isinstance(key, AEType):
if key.code == kae.pClass and isinstance(
value, AEType
): # AS packs records that contain a 'class' property by coercing the packed record to the descriptor type specified by the property's value (assuming it's an AEType)
desctype = value
else:
record.setDescriptor_forKeyword_(
self.pack(value), fourcharcode(key.code)
)
else:
if not usrf:
usrf = NSAppleEventDescriptor.listDescriptor()
usrf.insertDescriptor_atIndex_(self.pack(key), 0)
usrf.insertDescriptor_atIndex_(self.pack(value), 0)
if usrf:
record.setDescriptor_forKeyword_(usrf, self.kUSRF)
if desctype:
newrecord = record.coerceToDescriptorType_(fourcharcode(desctype.code))
if newrecord:
record = newrecord
else: # coercion failed for some reason, so pack as normal key-value pair
record.setDescriptor_forKeyword_(
self.pack(desctype), fourcharcode(key.code)
)
return record
def packtype(self, val):
return NSAppleEventDescriptor.descriptorWithTypeCode_(fourcharcode(val.code))
def packenum(self, val):
return NSAppleEventDescriptor.descriptorWithEnumCode_(fourcharcode(val.code))
#######
def unpacknull(self, desc):
return None
def unpackboolean(self, desc):
return desc.booleanValue()
def unpacksint32(self, desc):
return desc.int32Value()
def unpackfloat64(self, desc):
return struct.unpack("d", bytes(desc.data()))[0]
def unpackunicodetext(self, desc):
return desc.stringValue()
def unpacklongdatetime(self, desc):
return self.kMacEpoch + datetime.timedelta(
seconds=struct.unpack("q", bytes(desc.data()))[0]
)
def unpackaelist(self, desc):
return [
self.unpack(desc.descriptorAtIndex_(i + 1))
for i in range(desc.numberOfItems())
]
def unpackaerecord(self, desc):
dct = {}
for i in range(desc.numberOfItems()):
key = desc.keywordForDescriptorAtIndex_(i + 1)
value = desc.descriptorForKeyword_(key)
if key == self.kUSRF:
lst = self.unpackaelist(value)
for i in range(0, len(lst), 2):
dct[lst[i]] = lst[i + 1]
else:
dct[AEType(struct.pack(">I", key))] = self.unpack(value)
return dct
def unpacktype(self, desc):
return AEType(struct.pack(">I", desc.typeCodeValue()))
def unpackenumeration(self, desc):
return AEEnum(struct.pack(">I", desc.enumCodeValue()))
def unpackfile(self, desc):
url = bytes(
desc.coerceToDescriptorType_(fourcharcode(kae.typeFileURL)).data()
).decode("utf8")
return NSURL.URLWithString_(url).path()
#######
class AETypeBase:
""" Base class for AEType and AEEnum.
Notes:
- Hashable and comparable, so may be used as keys in dictionaries that map to AE records.
"""
def __init__(self, code):
"""
code : bytes -- four-char code, e.g. b'utxt'
"""
if not isinstance(code, bytes):
raise TypeError("invalid code (not a bytes object): {!r}".format(code))
elif len(code) != 4:
raise ValueError("invalid code (not four bytes long): {!r}".format(code))
self._code = code
code = property(
lambda self: self._code, doc="bytes -- four-char code, e.g. b'utxt'"
)
def __hash__(self):
return hash(self._code)
def __eq__(self, val):
return val.__class__ == self.__class__ and val.code == self._code
def __ne__(self, val):
return not self == val
def __repr__(self):
return "{}({!r})".format(self.__class__.__name__, self._code)
##
class AEType(AETypeBase):
"""An AE type. Maps to an AppleScript type class, e.g. AEType(b'utxt') <=> 'unicode text'."""
class AEEnum(AETypeBase):
"""An AE enumeration. Maps to an AppleScript constant, e.g. AEEnum(b'yes ') <=> 'yes'."""

1703
osxphotos/kae.py Normal file

File diff suppressed because it is too large Load Diff