test db con sqlitedict
This commit is contained in:
@@ -13,51 +13,17 @@ except:
|
||||
# Connect to database
|
||||
from . import filetools
|
||||
from platformcode import config
|
||||
import sqlite3, threading
|
||||
|
||||
db_name = filetools.join(config.get_data_path(), "kod_db.sqlite")
|
||||
db_semaphore = threading.Semaphore()
|
||||
from collections import defaultdict
|
||||
from lib.sqlitedict import SqliteDict
|
||||
|
||||
|
||||
class safeConn(sqlite3.Connection):
|
||||
"""thread-safe sqlite3.Connection"""
|
||||
def commit(self):
|
||||
db_semaphore.acquire()
|
||||
try:
|
||||
super(safeConn, self).commit()
|
||||
finally:
|
||||
db_semaphore.release()
|
||||
class nested_dict_sqlite(defaultdict):
|
||||
'like defaultdict but default_factory receives the key'
|
||||
|
||||
def __missing__(self, key):
|
||||
self[key] = value = self.default_factory(key)
|
||||
return value
|
||||
|
||||
|
||||
class safeCur(sqlite3.Cursor):
|
||||
"""thread-safe sqlite3.Cursor"""
|
||||
def execute(self, *args, **kwargs):
|
||||
db_semaphore.acquire()
|
||||
try:
|
||||
super(safeCur, self).execute(*args, **kwargs)
|
||||
finally:
|
||||
db_semaphore.release()
|
||||
|
||||
def executescript(self, *args, **kwargs):
|
||||
db_semaphore.acquire()
|
||||
try:
|
||||
super(safeCur, self).executescript(*args, **kwargs)
|
||||
finally:
|
||||
db_semaphore.release()
|
||||
|
||||
def executemany(self, *args, **kwargs):
|
||||
db_semaphore.acquire()
|
||||
try:
|
||||
super(safeCur, self).executemany(*args, **kwargs)
|
||||
finally:
|
||||
db_semaphore.release()
|
||||
|
||||
|
||||
db_conn = sqlite3.connect(db_name, factory=safeConn, timeout=15, check_same_thread=False)
|
||||
db = db_conn.cursor(safeCur)
|
||||
|
||||
# Create tables if not already exists
|
||||
db.execute('CREATE TABLE IF NOT EXISTS tmdb_cache (url TEXT PRIMARY KEY, response TEXT, added TEXT);')
|
||||
db.execute('CREATE TABLE IF NOT EXISTS viewed (tmdb_id TEXT PRIMARY KEY, season INT, episode INT, played_time REAL);')
|
||||
db.execute('CREATE TABLE IF NOT EXISTS dnscache(domain TEXT NOT NULL UNIQUE, ip TEXT NOT NULL, PRIMARY KEY(domain));')
|
||||
db_conn.commit()
|
||||
db_name = filetools.join(config.get_data_path(), "db.sqlite")
|
||||
db = nested_dict_sqlite(lambda table: SqliteDict(db_name, table, 'c', True))
|
||||
|
||||
@@ -15,7 +15,7 @@ from lib import doh
|
||||
from platformcode import logger
|
||||
import requests
|
||||
from core import scrapertools
|
||||
from core import db, db_conn
|
||||
from core import db
|
||||
|
||||
if 'PROTOCOL_TLS' in ssl.__dict__:
|
||||
protocol = ssl.PROTOCOL_TLS
|
||||
@@ -50,18 +50,13 @@ class CipherSuiteAdapter(host_header_ssl.HostHeaderSSLAdapter):
|
||||
super(CipherSuiteAdapter, self).__init__(**kwargs)
|
||||
|
||||
def flushDns(self, request, domain, **kwargs):
|
||||
db.execute('delete from dnscache where domain=?', (domain,))
|
||||
db_conn.commit()
|
||||
del db['dnscache'][domain]
|
||||
return self.send(request, flushedDns=True, **kwargs)
|
||||
|
||||
def getIp(self, domain):
|
||||
ip = None
|
||||
try:
|
||||
db.execute('select ip from dnscache where domain=?', (domain,))
|
||||
ip = db.fetchall()[0][0]
|
||||
logger.info('Cache DNS: ' + domain + ' = ' + str(ip))
|
||||
except:
|
||||
pass
|
||||
ip = db['dnscache'].get(domain, None)
|
||||
logger.info('Cache DNS: ' + domain + ' = ' + str(ip))
|
||||
|
||||
if not ip: # not cached
|
||||
try:
|
||||
ip = doh.query(domain)[0]
|
||||
@@ -74,11 +69,7 @@ class CipherSuiteAdapter(host_header_ssl.HostHeaderSSLAdapter):
|
||||
return ip
|
||||
|
||||
def writeToCache(self, domain, ip):
|
||||
try:
|
||||
db.execute('insert into dnscache values(?,?)', (domain, ip))
|
||||
db_conn.commit()
|
||||
except:
|
||||
pass
|
||||
db['dnscache'][domain] = ip
|
||||
|
||||
def init_poolmanager(self, *args, **kwargs):
|
||||
kwargs['ssl_context'] = self.ssl_context
|
||||
|
||||
32
core/tmdb.py
32
core/tmdb.py
@@ -3,6 +3,7 @@
|
||||
# from future import standard_library
|
||||
# standard_library.install_aliases()
|
||||
# from builtins import str
|
||||
import datetime
|
||||
import sys, requests
|
||||
PY3 = False
|
||||
if sys.version_info[0] >= 3: PY3 = True; unicode = str; unichr = chr; long = int
|
||||
@@ -15,7 +16,7 @@ else:
|
||||
from future.builtins import range
|
||||
from future.builtins import object
|
||||
|
||||
import ast, copy, re, sqlite3, time, xbmcaddon
|
||||
import ast, copy, re, time
|
||||
|
||||
from core import filetools, httptools, jsontools, scrapertools
|
||||
from core.item import InfoLabels
|
||||
@@ -62,7 +63,7 @@ def_lang = info_language[config.get_setting("info_language", "videolibrary")]
|
||||
# ------------------------------------------------- -------------------------------------------------- -----------
|
||||
|
||||
otmdb_global = None
|
||||
from core import db, db_conn
|
||||
from core import db
|
||||
|
||||
|
||||
# The function name is the name of the decorator and receives the function that decorates.
|
||||
@@ -73,17 +74,11 @@ def cache_response(fn):
|
||||
# start_time = time.time()
|
||||
|
||||
def wrapper(*args, **kwargs):
|
||||
import base64
|
||||
|
||||
def check_expired(ts):
|
||||
import datetime
|
||||
|
||||
def check_expired(saved_date):
|
||||
valided = False
|
||||
|
||||
cache_expire = config.get_setting("tmdb_cache_expire", default=0)
|
||||
|
||||
saved_date = datetime.datetime.fromtimestamp(ts)
|
||||
current_date = datetime.datetime.fromtimestamp(time.time())
|
||||
current_date = datetime.datetime.now()
|
||||
elapsed = current_date - saved_date
|
||||
|
||||
# 1 day
|
||||
@@ -130,23 +125,16 @@ def cache_response(fn):
|
||||
|
||||
url = re.sub('&year=-', '', args[0])
|
||||
if PY3: url = str.encode(url)
|
||||
url_base64 = base64.b64encode(url)
|
||||
db.execute("SELECT response, added FROM tmdb_cache WHERE url=?", (url_base64,))
|
||||
row = db.fetchone()
|
||||
|
||||
if row and check_expired(float(row[1])):
|
||||
result = eval(base64.b64decode(row[0]))
|
||||
row = db['tmdb_cache'].get(url)
|
||||
|
||||
if row and check_expired(row[1]):
|
||||
result = row[0]
|
||||
|
||||
# si no se ha obtenido información, llamamos a la funcion
|
||||
if not result:
|
||||
result = fn(*args)
|
||||
result = str(result)
|
||||
if PY3: result = str.encode(result)
|
||||
result_base64 = base64.b64encode(result)
|
||||
db.execute("INSERT OR REPLACE INTO tmdb_cache (url, response, added) VALUES (?, ?, ?)",
|
||||
(url_base64, result_base64, time.time()))
|
||||
|
||||
db_conn.commit()
|
||||
db['tmdb_cache'][url] = [result, datetime.datetime.now()]
|
||||
|
||||
# elapsed_time = time.time() - start_time
|
||||
# logger.debug("TARDADO %s" % elapsed_time)
|
||||
|
||||
601
lib/sqlitedict.py
Normal file
601
lib/sqlitedict.py
Normal file
@@ -0,0 +1,601 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# This code is distributed under the terms and conditions
|
||||
# from the Apache License, Version 2.0
|
||||
#
|
||||
# http://opensource.org/licenses/apache2.0.php
|
||||
#
|
||||
# This code was inspired by:
|
||||
# * http://code.activestate.com/recipes/576638-draft-for-an-sqlite3-based-dbm/
|
||||
# * http://code.activestate.com/recipes/526618/
|
||||
|
||||
"""
|
||||
A lightweight wrapper around Python's sqlite3 database, with a dict-like interface
|
||||
and multi-thread access support::
|
||||
|
||||
>>> mydict = SqliteDict('some.db', autocommit=True) # the mapping will be persisted to file `some.db`
|
||||
>>> mydict['some_key'] = any_picklable_object
|
||||
>>> print mydict['some_key']
|
||||
>>> print len(mydict) # etc... all dict functions work
|
||||
|
||||
Pickle is used internally to serialize the values. Keys are strings.
|
||||
|
||||
If you don't use autocommit (default is no autocommit for performance), then
|
||||
don't forget to call `mydict.commit()` when done with a transaction.
|
||||
|
||||
"""
|
||||
|
||||
import sqlite3
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import logging
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from threading import Thread
|
||||
|
||||
__version__ = '1.7.0.dev0'
|
||||
|
||||
major_version = sys.version_info[0]
|
||||
if major_version < 3: # py <= 2.x
|
||||
if sys.version_info[1] < 5: # py <= 2.4
|
||||
raise ImportError("sqlitedict requires python 2.5 or higher (python 3.3 or higher supported)")
|
||||
|
||||
# necessary to use exec()_ as this would be a SyntaxError in python3.
|
||||
# this is an exact port of six.reraise():
|
||||
def exec_(_code_, _globs_=None, _locs_=None):
|
||||
"""Execute code in a namespace."""
|
||||
if _globs_ is None:
|
||||
frame = sys._getframe(1)
|
||||
_globs_ = frame.f_globals
|
||||
if _locs_ is None:
|
||||
_locs_ = frame.f_locals
|
||||
del frame
|
||||
elif _locs_ is None:
|
||||
_locs_ = _globs_
|
||||
exec("""exec _code_ in _globs_, _locs_""")
|
||||
|
||||
class TimeoutError(OSError):
|
||||
pass
|
||||
|
||||
exec_("def reraise(tp, value, tb=None):\n"
|
||||
" raise tp, value, tb\n")
|
||||
else:
|
||||
def reraise(tp, value, tb=None):
|
||||
if value is None:
|
||||
value = tp()
|
||||
if value.__traceback__ is not tb:
|
||||
raise value.with_traceback(tb)
|
||||
raise value
|
||||
|
||||
try:
|
||||
from cPickle import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL
|
||||
except ImportError:
|
||||
from pickle import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL
|
||||
|
||||
# some Python 3 vs 2 imports
|
||||
try:
|
||||
from collections import UserDict as DictClass
|
||||
except ImportError:
|
||||
from UserDict import DictMixin as DictClass
|
||||
|
||||
try:
|
||||
from queue import Queue
|
||||
except ImportError:
|
||||
from Queue import Queue
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def open(*args, **kwargs):
|
||||
"""See documentation of the SqliteDict class."""
|
||||
return SqliteDict(*args, **kwargs)
|
||||
|
||||
|
||||
def encode(obj):
|
||||
"""Serialize an object using pickle to a binary format accepted by SQLite."""
|
||||
return sqlite3.Binary(dumps(obj, protocol=PICKLE_PROTOCOL))
|
||||
|
||||
|
||||
def decode(obj):
|
||||
"""Deserialize objects retrieved from SQLite."""
|
||||
return loads(bytes(obj))
|
||||
|
||||
|
||||
class SqliteDict(DictClass):
|
||||
VALID_FLAGS = ['c', 'r', 'w', 'n']
|
||||
|
||||
def __init__(self, filename=None, tablename='unnamed', flag='c',
|
||||
autocommit=False, journal_mode="DELETE", encode=encode, decode=decode, timeout=5):
|
||||
"""
|
||||
Initialize a thread-safe sqlite-backed dictionary. The dictionary will
|
||||
be a table `tablename` in database file `filename`. A single file (=database)
|
||||
may contain multiple tables.
|
||||
|
||||
If no `filename` is given, a random file in temp will be used (and deleted
|
||||
from temp once the dict is closed/deleted).
|
||||
|
||||
If you enable `autocommit`, changes will be committed after each operation
|
||||
(more inefficient but safer). Otherwise, changes are committed on `self.commit()`,
|
||||
`self.clear()` and `self.close()`.
|
||||
|
||||
Set `journal_mode` to 'OFF' if you're experiencing sqlite I/O problems
|
||||
or if you need performance and don't care about crash-consistency.
|
||||
|
||||
The `flag` parameter. Exactly one of:
|
||||
'c': default mode, open for read/write, creating the db/table if necessary.
|
||||
'w': open for r/w, but drop `tablename` contents first (start with empty table)
|
||||
'r': open as read-only
|
||||
'n': create a new database (erasing any existing tables, not just `tablename`!).
|
||||
|
||||
The `encode` and `decode` parameters are used to customize how the values
|
||||
are serialized and deserialized.
|
||||
The `encode` parameter must be a function that takes a single Python
|
||||
object and returns a serialized representation.
|
||||
The `decode` function must be a function that takes the serialized
|
||||
representation produced by `encode` and returns a deserialized Python
|
||||
object.
|
||||
The default is to use pickle.
|
||||
|
||||
The `timeout` defines the maximum time (in seconds) to wait for initial Thread startup.
|
||||
|
||||
"""
|
||||
self.in_temp = filename is None
|
||||
if self.in_temp:
|
||||
fd, filename = tempfile.mkstemp(prefix='sqldict')
|
||||
os.close(fd)
|
||||
|
||||
if flag not in SqliteDict.VALID_FLAGS:
|
||||
raise RuntimeError("Unrecognized flag: %s" % flag)
|
||||
self.flag = flag
|
||||
|
||||
if flag == 'n':
|
||||
if os.path.exists(filename):
|
||||
os.remove(filename)
|
||||
|
||||
dirname = os.path.dirname(filename)
|
||||
if dirname:
|
||||
if not os.path.exists(dirname):
|
||||
raise RuntimeError('Error! The directory does not exist, %s' % dirname)
|
||||
|
||||
self.filename = filename
|
||||
|
||||
# Use standard SQL escaping of double quote characters in identifiers, by doubling them.
|
||||
# See https://github.com/RaRe-Technologies/sqlitedict/pull/113
|
||||
self.tablename = tablename.replace('"', '""')
|
||||
|
||||
self.autocommit = autocommit
|
||||
self.journal_mode = journal_mode
|
||||
self.encode = encode
|
||||
self.decode = decode
|
||||
self.timeout = timeout
|
||||
|
||||
logger.info("opening Sqlite table %r in %r" % (tablename, filename))
|
||||
self.conn = self._new_conn()
|
||||
if self.flag == 'r':
|
||||
if self.tablename not in SqliteDict.get_tablenames(self.filename):
|
||||
msg = 'Refusing to create a new table "%s" in read-only DB mode' % tablename
|
||||
raise RuntimeError(msg)
|
||||
else:
|
||||
MAKE_TABLE = 'CREATE TABLE IF NOT EXISTS "%s" (key TEXT PRIMARY KEY, value BLOB)' % self.tablename
|
||||
self.conn.execute(MAKE_TABLE)
|
||||
self.conn.commit()
|
||||
if flag == 'w':
|
||||
self.clear()
|
||||
|
||||
def _new_conn(self):
|
||||
return SqliteMultithread(self.filename, autocommit=self.autocommit, journal_mode=self.journal_mode,
|
||||
timeout=self.timeout)
|
||||
|
||||
def __enter__(self):
|
||||
if not hasattr(self, 'conn') or self.conn is None:
|
||||
self.conn = self._new_conn()
|
||||
return self
|
||||
|
||||
def __exit__(self, *exc_info):
|
||||
self.close()
|
||||
|
||||
def __str__(self):
|
||||
return "SqliteDict(%s)" % (self.filename)
|
||||
|
||||
def __repr__(self):
|
||||
return str(self) # no need of something complex
|
||||
|
||||
def __len__(self):
|
||||
# `select count (*)` is super slow in sqlite (does a linear scan!!)
|
||||
# As a result, len() is very slow too once the table size grows beyond trivial.
|
||||
# We could keep the total count of rows ourselves, by means of triggers,
|
||||
# but that seems too complicated and would slow down normal operation
|
||||
# (insert/delete etc).
|
||||
GET_LEN = 'SELECT COUNT(*) FROM "%s"' % self.tablename
|
||||
rows = self.conn.select_one(GET_LEN)[0]
|
||||
return rows if rows is not None else 0
|
||||
|
||||
def __bool__(self):
|
||||
# No elements is False, otherwise True
|
||||
GET_MAX = 'SELECT MAX(ROWID) FROM "%s"' % self.tablename
|
||||
m = self.conn.select_one(GET_MAX)[0]
|
||||
# Explicit better than implicit and bla bla
|
||||
return True if m is not None else False
|
||||
|
||||
def iterkeys(self):
|
||||
GET_KEYS = 'SELECT key FROM "%s" ORDER BY rowid' % self.tablename
|
||||
for key in self.conn.select(GET_KEYS):
|
||||
yield key[0]
|
||||
|
||||
def itervalues(self):
|
||||
GET_VALUES = 'SELECT value FROM "%s" ORDER BY rowid' % self.tablename
|
||||
for value in self.conn.select(GET_VALUES):
|
||||
yield self.decode(value[0])
|
||||
|
||||
def iteritems(self):
|
||||
GET_ITEMS = 'SELECT key, value FROM "%s" ORDER BY rowid' % self.tablename
|
||||
for key, value in self.conn.select(GET_ITEMS):
|
||||
yield key, self.decode(value)
|
||||
|
||||
def keys(self):
|
||||
return self.iterkeys() if major_version > 2 else list(self.iterkeys())
|
||||
|
||||
def values(self):
|
||||
return self.itervalues() if major_version > 2 else list(self.itervalues())
|
||||
|
||||
def items(self):
|
||||
return self.iteritems() if major_version > 2 else list(self.iteritems())
|
||||
|
||||
def __contains__(self, key):
|
||||
HAS_ITEM = 'SELECT 1 FROM "%s" WHERE key = ?' % self.tablename
|
||||
return self.conn.select_one(HAS_ITEM, (key,)) is not None
|
||||
|
||||
def __getitem__(self, key):
|
||||
GET_ITEM = 'SELECT value FROM "%s" WHERE key = ?' % self.tablename
|
||||
item = self.conn.select_one(GET_ITEM, (key,))
|
||||
if item is None:
|
||||
raise KeyError(key)
|
||||
return self.decode(item[0])
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
if self.flag == 'r':
|
||||
raise RuntimeError('Refusing to write to read-only SqliteDict')
|
||||
|
||||
ADD_ITEM = 'REPLACE INTO "%s" (key, value) VALUES (?,?)' % self.tablename
|
||||
self.conn.execute(ADD_ITEM, (key, self.encode(value)))
|
||||
if self.autocommit:
|
||||
self.commit()
|
||||
|
||||
def __delitem__(self, key):
|
||||
if self.flag == 'r':
|
||||
raise RuntimeError('Refusing to delete from read-only SqliteDict')
|
||||
|
||||
if key not in self:
|
||||
raise KeyError(key)
|
||||
DEL_ITEM = 'DELETE FROM "%s" WHERE key = ?' % self.tablename
|
||||
self.conn.execute(DEL_ITEM, (key,))
|
||||
if self.autocommit:
|
||||
self.commit()
|
||||
|
||||
def update(self, items=(), **kwds):
|
||||
if self.flag == 'r':
|
||||
raise RuntimeError('Refusing to update read-only SqliteDict')
|
||||
|
||||
try:
|
||||
items = items.items()
|
||||
except AttributeError:
|
||||
pass
|
||||
items = [(k, self.encode(v)) for k, v in items]
|
||||
|
||||
UPDATE_ITEMS = 'REPLACE INTO "%s" (key, value) VALUES (?, ?)' % self.tablename
|
||||
self.conn.executemany(UPDATE_ITEMS, items)
|
||||
if kwds:
|
||||
self.update(kwds)
|
||||
if self.autocommit:
|
||||
self.commit()
|
||||
|
||||
def __iter__(self):
|
||||
return self.iterkeys()
|
||||
|
||||
def clear(self):
|
||||
if self.flag == 'r':
|
||||
raise RuntimeError('Refusing to clear read-only SqliteDict')
|
||||
|
||||
# avoid VACUUM, as it gives "OperationalError: database schema has changed"
|
||||
CLEAR_ALL = 'DELETE FROM "%s";' % self.tablename
|
||||
self.conn.commit()
|
||||
self.conn.execute(CLEAR_ALL)
|
||||
self.conn.commit()
|
||||
|
||||
@staticmethod
|
||||
def get_tablenames(filename):
|
||||
"""get the names of the tables in an sqlite db as a list"""
|
||||
if not os.path.isfile(filename):
|
||||
raise IOError('file %s does not exist' % (filename))
|
||||
GET_TABLENAMES = 'SELECT name FROM sqlite_master WHERE type="table"'
|
||||
with sqlite3.connect(filename) as conn:
|
||||
cursor = conn.execute(GET_TABLENAMES)
|
||||
res = cursor.fetchall()
|
||||
|
||||
return [name[0] for name in res]
|
||||
|
||||
def commit(self, blocking=True):
|
||||
"""
|
||||
Persist all data to disk.
|
||||
|
||||
When `blocking` is False, the commit command is queued, but the data is
|
||||
not guaranteed persisted (default implication when autocommit=True).
|
||||
"""
|
||||
if self.conn is not None:
|
||||
self.conn.commit(blocking)
|
||||
sync = commit
|
||||
|
||||
def close(self, do_log=True, force=False):
|
||||
if do_log:
|
||||
logger.debug("closing %s" % self)
|
||||
if hasattr(self, 'conn') and self.conn is not None:
|
||||
if self.conn.autocommit and not force:
|
||||
# typically calls to commit are non-blocking when autocommit is
|
||||
# used. However, we need to block on close() to ensure any
|
||||
# awaiting exceptions are handled and that all data is
|
||||
# persisted to disk before returning.
|
||||
self.conn.commit(blocking=True)
|
||||
self.conn.close(force=force)
|
||||
self.conn = None
|
||||
if self.in_temp:
|
||||
try:
|
||||
os.remove(self.filename)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def terminate(self):
|
||||
"""Delete the underlying database file. Use with care."""
|
||||
if self.flag == 'r':
|
||||
raise RuntimeError('Refusing to terminate read-only SqliteDict')
|
||||
|
||||
self.close()
|
||||
|
||||
if self.filename == ':memory:':
|
||||
return
|
||||
|
||||
logger.info("deleting %s" % self.filename)
|
||||
try:
|
||||
if os.path.isfile(self.filename):
|
||||
os.remove(self.filename)
|
||||
except (OSError, IOError):
|
||||
logger.exception("failed to delete %s" % (self.filename))
|
||||
|
||||
def __del__(self):
|
||||
# like close(), but assume globals are gone by now (do not log!)
|
||||
try:
|
||||
self.close(do_log=False, force=True)
|
||||
except Exception:
|
||||
# prevent error log flood in case of multiple SqliteDicts
|
||||
# closed after connection lost (exceptions are always ignored
|
||||
# in __del__ method.
|
||||
pass
|
||||
|
||||
|
||||
# Adding extra methods for python 2 compatibility (at import time)
|
||||
if major_version == 2:
|
||||
SqliteDict.__nonzero__ = SqliteDict.__bool__
|
||||
del SqliteDict.__bool__ # not needed and confusing
|
||||
|
||||
|
||||
class SqliteMultithread(Thread):
|
||||
"""
|
||||
Wrap sqlite connection in a way that allows concurrent requests from multiple threads.
|
||||
|
||||
This is done by internally queueing the requests and processing them sequentially
|
||||
in a separate thread (in the same order they arrived).
|
||||
|
||||
"""
|
||||
def __init__(self, filename, autocommit, journal_mode, timeout):
|
||||
super(SqliteMultithread, self).__init__()
|
||||
self.filename = filename
|
||||
self.autocommit = autocommit
|
||||
self.journal_mode = journal_mode
|
||||
# use request queue of unlimited size
|
||||
self.reqs = Queue()
|
||||
self.setDaemon(True) # python2.5-compatible
|
||||
self.exception = None
|
||||
self._sqlitedict_thread_initialized = None
|
||||
self.timeout = timeout
|
||||
self.log = logging.getLogger('sqlitedict.SqliteMultithread')
|
||||
self.start()
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
if self.autocommit:
|
||||
conn = sqlite3.connect(self.filename, isolation_level=None, check_same_thread=False)
|
||||
else:
|
||||
conn = sqlite3.connect(self.filename, check_same_thread=False)
|
||||
except Exception:
|
||||
self.log.exception("Failed to initialize connection for filename: %s" % self.filename)
|
||||
self.exception = sys.exc_info()
|
||||
raise
|
||||
|
||||
try:
|
||||
conn.execute('PRAGMA journal_mode = %s' % self.journal_mode)
|
||||
conn.text_factory = str
|
||||
cursor = conn.cursor()
|
||||
conn.commit()
|
||||
cursor.execute('PRAGMA synchronous=OFF')
|
||||
except Exception:
|
||||
self.log.exception("Failed to execute PRAGMA statements.")
|
||||
self.exception = sys.exc_info()
|
||||
raise
|
||||
|
||||
self._sqlitedict_thread_initialized = True
|
||||
|
||||
res = None
|
||||
while True:
|
||||
req, arg, res, outer_stack = self.reqs.get()
|
||||
if req == '--close--':
|
||||
assert res, ('--close-- without return queue', res)
|
||||
break
|
||||
elif req == '--commit--':
|
||||
conn.commit()
|
||||
if res:
|
||||
res.put('--no more--')
|
||||
else:
|
||||
try:
|
||||
cursor.execute(req, arg)
|
||||
except Exception:
|
||||
self.exception = (e_type, e_value, e_tb) = sys.exc_info()
|
||||
inner_stack = traceback.extract_stack()
|
||||
|
||||
# An exception occurred in our thread, but we may not
|
||||
# immediately able to throw it in our calling thread, if it has
|
||||
# no return `res` queue: log as level ERROR both the inner and
|
||||
# outer exception immediately.
|
||||
#
|
||||
# Any iteration of res.get() or any next call will detect the
|
||||
# inner exception and re-raise it in the calling Thread; though
|
||||
# it may be confusing to see an exception for an unrelated
|
||||
# statement, an ERROR log statement from the 'sqlitedict.*'
|
||||
# namespace contains the original outer stack location.
|
||||
self.log.error('Inner exception:')
|
||||
for item in traceback.format_list(inner_stack):
|
||||
self.log.error(item)
|
||||
self.log.error('') # deliniate traceback & exception w/blank line
|
||||
for item in traceback.format_exception_only(e_type, e_value):
|
||||
self.log.error(item)
|
||||
|
||||
self.log.error('') # exception & outer stack w/blank line
|
||||
self.log.error('Outer stack:')
|
||||
for item in traceback.format_list(outer_stack):
|
||||
self.log.error(item)
|
||||
self.log.error('Exception will be re-raised at next call.')
|
||||
|
||||
if res:
|
||||
for rec in cursor:
|
||||
res.put(rec)
|
||||
res.put('--no more--')
|
||||
|
||||
if self.autocommit:
|
||||
conn.commit()
|
||||
|
||||
self.log.debug('received: %s, send: --no more--', req)
|
||||
conn.close()
|
||||
res.put('--no more--')
|
||||
|
||||
def check_raise_error(self):
|
||||
"""
|
||||
Check for and raise exception for any previous sqlite query.
|
||||
|
||||
For the `execute*` family of method calls, such calls are non-blocking and any
|
||||
exception raised in the thread cannot be handled by the calling Thread (usually
|
||||
MainThread). This method is called on `close`, and prior to any subsequent
|
||||
calls to the `execute*` methods to check for and raise an exception in a
|
||||
previous call to the MainThread.
|
||||
"""
|
||||
if self.exception:
|
||||
e_type, e_value, e_tb = self.exception
|
||||
|
||||
# clear self.exception, if the caller decides to handle such
|
||||
# exception, we should not repeatedly re-raise it.
|
||||
self.exception = None
|
||||
|
||||
self.log.error('An exception occurred from a previous statement, view '
|
||||
'the logging namespace "sqlitedict" for outer stack.')
|
||||
|
||||
# The third argument to raise is the traceback object, and it is
|
||||
# substituted instead of the current location as the place where
|
||||
# the exception occurred, this is so that when using debuggers such
|
||||
# as `pdb', or simply evaluating the naturally raised traceback, we
|
||||
# retain the original (inner) location of where the exception
|
||||
# occurred.
|
||||
reraise(e_type, e_value, e_tb)
|
||||
|
||||
def execute(self, req, arg=None, res=None):
|
||||
"""
|
||||
`execute` calls are non-blocking: just queue up the request and return immediately.
|
||||
"""
|
||||
self._wait_for_initialization()
|
||||
self.check_raise_error()
|
||||
|
||||
# NOTE: This might be a lot of information to pump into an input
|
||||
# queue, affecting performance. I've also seen earlier versions of
|
||||
# jython take a severe performance impact for throwing exceptions
|
||||
# so often.
|
||||
stack = traceback.extract_stack()[:-1]
|
||||
self.reqs.put((req, arg or tuple(), res, stack))
|
||||
|
||||
def executemany(self, req, items):
|
||||
for item in items:
|
||||
self.execute(req, item)
|
||||
self.check_raise_error()
|
||||
|
||||
def select(self, req, arg=None):
|
||||
"""
|
||||
Unlike sqlite's native select, this select doesn't handle iteration efficiently.
|
||||
|
||||
The result of `select` starts filling up with values as soon as the
|
||||
request is dequeued, and although you can iterate over the result normally
|
||||
(`for res in self.select(): ...`), the entire result will be in memory.
|
||||
"""
|
||||
res = Queue() # results of the select will appear as items in this queue
|
||||
self.execute(req, arg, res)
|
||||
while True:
|
||||
rec = res.get()
|
||||
self.check_raise_error()
|
||||
if rec == '--no more--':
|
||||
break
|
||||
yield rec
|
||||
|
||||
def select_one(self, req, arg=None):
|
||||
"""Return only the first row of the SELECT, or None if there are no matching rows."""
|
||||
try:
|
||||
return next(iter(self.select(req, arg)))
|
||||
except StopIteration:
|
||||
return None
|
||||
|
||||
def commit(self, blocking=True):
|
||||
if blocking:
|
||||
# by default, we await completion of commit() unless
|
||||
# blocking=False. This ensures any available exceptions for any
|
||||
# previous statement are thrown before returning, and that the
|
||||
# data has actually persisted to disk!
|
||||
self.select_one('--commit--')
|
||||
else:
|
||||
# otherwise, we fire and forget as usual.
|
||||
self.execute('--commit--')
|
||||
|
||||
def close(self, force=False):
|
||||
if force:
|
||||
# If a SqliteDict is being killed or garbage-collected, then select_one()
|
||||
# could hang forever because run() might already have exited and therefore
|
||||
# can't process the request. Instead, push the close command to the requests
|
||||
# queue directly. If run() is still alive, it will exit gracefully. If not,
|
||||
# then there's nothing we can do anyway.
|
||||
self.reqs.put(('--close--', None, Queue(), None))
|
||||
else:
|
||||
# we abuse 'select' to "iter" over a "--close--" statement so that we
|
||||
# can confirm the completion of close before joining the thread and
|
||||
# returning (by semaphore '--no more--'
|
||||
self.select_one('--close--')
|
||||
self.join()
|
||||
|
||||
def _wait_for_initialization(self):
|
||||
"""
|
||||
Polls the 'initialized' flag to be set by the started Thread in run().
|
||||
"""
|
||||
# A race condition may occur without waiting for initialization:
|
||||
# __init__() finishes with the start() call, but the Thread needs some time to actually start working.
|
||||
# If opening the database file fails in run(), an exception will occur and self.exception will be set.
|
||||
# But if we run check_raise_error() before run() had a chance to set self.exception, it will report
|
||||
# a false negative: An exception occured and the thread terminates but self.exception is unset.
|
||||
# This leads to a deadlock while waiting for the results of execute().
|
||||
# By waiting for the Thread to set the initialized flag, we can ensure the thread has successfully
|
||||
# opened the file - and possibly set self.exception to be detected by check_raise_error().
|
||||
|
||||
start_time = time.time()
|
||||
while time.time() - start_time < self.timeout:
|
||||
if self._sqlitedict_thread_initialized or self.exception:
|
||||
return
|
||||
time.sleep(0.1)
|
||||
raise TimeoutError("SqliteMultithread failed to flag initialization withing %0.0f seconds." % self.timeout)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
print(__version__)
|
||||
@@ -1414,75 +1414,47 @@ def get_played_time(item):
|
||||
|
||||
if not item.infoLabels:
|
||||
return 0
|
||||
ID = item.infoLabels.get('tmdb_id','')
|
||||
ID = item.infoLabels.get('tmdb_id', '')
|
||||
if not ID:
|
||||
return 0
|
||||
|
||||
S = item.infoLabels.get('season')
|
||||
S = item.infoLabels.get('season', 0)
|
||||
E = item.infoLabels.get('episode')
|
||||
result = None
|
||||
|
||||
if item.contentType == 'movie':
|
||||
db.execute("SELECT played_time FROM viewed WHERE tmdb_id=?", (ID,))
|
||||
|
||||
result = db['viewed'].get(ID)
|
||||
elif S and E:
|
||||
S = item.infoLabels['season']
|
||||
E = item.infoLabels['episode']
|
||||
db.execute("SELECT played_time FROM viewed WHERE tmdb_id=? AND season=? AND episode=?", (ID, S, E))
|
||||
|
||||
elif E:
|
||||
E = item.infoLabels['episode']
|
||||
db.execute("SELECT played_time FROM viewed WHERE tmdb_id=? AND episode=?", (ID, E))
|
||||
|
||||
result = db.fetchone()
|
||||
result = db['viewed'].get(ID, {}).get(str(S)+'x'+str(E))
|
||||
|
||||
if not result: played_time = 0
|
||||
else: played_time = result[0]
|
||||
else: played_time = result
|
||||
|
||||
return played_time
|
||||
|
||||
|
||||
def set_played_time(item):
|
||||
logger.debug()
|
||||
from core import db, db_conn
|
||||
from core import db
|
||||
|
||||
played_time = item.played_time
|
||||
if not item.infoLabels:
|
||||
return
|
||||
|
||||
ID = item.infoLabels.get('tmdb_id','')
|
||||
ID = item.infoLabels.get('tmdb_id', '')
|
||||
if not ID:
|
||||
return
|
||||
|
||||
S = item.infoLabels.get('season')
|
||||
S = item.infoLabels.get('season', 0)
|
||||
E = item.infoLabels.get('episode')
|
||||
|
||||
|
||||
if item.contentType == 'movie':
|
||||
db.execute("SELECT played_time FROM viewed WHERE tmdb_id=?", (ID,))
|
||||
result = db.fetchone()
|
||||
if result:
|
||||
if played_time > 0: db.execute("UPDATE viewed SET played_time=? WHERE tmdb_id=?", (played_time, ID))
|
||||
else: db.execute("DELETE from viewed WHERE tmdb_id=?", (ID,))
|
||||
else: db.execute("INSERT INTO viewed (tmdb_id, played_time) VALUES (?, ?)", (ID, played_time))
|
||||
|
||||
elif S and E:
|
||||
db.execute("SELECT played_time FROM viewed WHERE tmdb_id=? AND season = ? AND episode=?", (ID, S, E))
|
||||
result = db.fetchone()
|
||||
if result:
|
||||
if played_time > 0: db.execute("UPDATE viewed SET played_time=? WHERE tmdb_id=? AND season=? AND episode=?", (played_time, ID, S, E))
|
||||
else: db.execute("DELETE from viewed WHERE tmdb_id=? AND season=? AND episode=?", (ID, S, E))
|
||||
else: db.execute("INSERT INTO viewed (tmdb_id, season, episode, played_time) VALUES (?, ?, ?, ?)", (ID, S, E, played_time))
|
||||
|
||||
db['viewed'][ID] = played_time
|
||||
elif E:
|
||||
E = item.infoLabels['episode']
|
||||
db.execute("SELECT played_time FROM viewed WHERE tmdb_id=? AND episode=?", (ID, E))
|
||||
result = db.fetchone()
|
||||
if result:
|
||||
if played_time > 0: db.execute("UPDATE viewed SET played_time=? WHERE tmdb_id=? AND episode=?", (played_time, ID, E))
|
||||
else: db.execute("DELETE from viewed WHERE tmdb_id=? AND episode=?", (ID, E))
|
||||
else: db.execute("INSERT INTO viewed (tmdb_id, episode, played_time) VALUES (?, ?, ?)", (ID, E, played_time))
|
||||
|
||||
db_conn.commit()
|
||||
newDict = db['viewed'].get(ID, {})
|
||||
newDict[str(S) + 'x' + str(E)] = played_time
|
||||
db['viewed'][ID] = newDict
|
||||
|
||||
|
||||
def prevent_busy(item):
|
||||
|
||||
Reference in New Issue
Block a user