From c07558e545f2a5251107ed671c0b796730ad61b2 Mon Sep 17 00:00:00 2001 From: mac12m99 <10120390+mac12m99@users.noreply.github.com> Date: Tue, 2 Feb 2021 22:41:10 +0100 Subject: [PATCH] test db con sqlitedict --- core/__init__.py | 54 +-- core/resolverdns.py | 21 +- core/tmdb.py | 32 +- lib/sqlitedict.py | 601 ++++++++++++++++++++++++++++++++++ platformcode/platformtools.py | 54 +-- 5 files changed, 640 insertions(+), 122 deletions(-) create mode 100644 lib/sqlitedict.py diff --git a/core/__init__.py b/core/__init__.py index 33193ae1..7f4a185d 100644 --- a/core/__init__.py +++ b/core/__init__.py @@ -13,51 +13,17 @@ except: # Connect to database from . import filetools from platformcode import config -import sqlite3, threading - -db_name = filetools.join(config.get_data_path(), "kod_db.sqlite") -db_semaphore = threading.Semaphore() +from collections import defaultdict +from lib.sqlitedict import SqliteDict -class safeConn(sqlite3.Connection): - """thread-safe sqlite3.Connection""" - def commit(self): - db_semaphore.acquire() - try: - super(safeConn, self).commit() - finally: - db_semaphore.release() +class nested_dict_sqlite(defaultdict): + 'like defaultdict but default_factory receives the key' + + def __missing__(self, key): + self[key] = value = self.default_factory(key) + return value -class safeCur(sqlite3.Cursor): - """thread-safe sqlite3.Cursor""" - def execute(self, *args, **kwargs): - db_semaphore.acquire() - try: - super(safeCur, self).execute(*args, **kwargs) - finally: - db_semaphore.release() - - def executescript(self, *args, **kwargs): - db_semaphore.acquire() - try: - super(safeCur, self).executescript(*args, **kwargs) - finally: - db_semaphore.release() - - def executemany(self, *args, **kwargs): - db_semaphore.acquire() - try: - super(safeCur, self).executemany(*args, **kwargs) - finally: - db_semaphore.release() - - -db_conn = sqlite3.connect(db_name, factory=safeConn, timeout=15, check_same_thread=False) -db = db_conn.cursor(safeCur) - -# Create tables if not already exists -db.execute('CREATE TABLE IF NOT EXISTS tmdb_cache (url TEXT PRIMARY KEY, response TEXT, added TEXT);') -db.execute('CREATE TABLE IF NOT EXISTS viewed (tmdb_id TEXT PRIMARY KEY, season INT, episode INT, played_time REAL);') -db.execute('CREATE TABLE IF NOT EXISTS dnscache(domain TEXT NOT NULL UNIQUE, ip TEXT NOT NULL, PRIMARY KEY(domain));') -db_conn.commit() +db_name = filetools.join(config.get_data_path(), "db.sqlite") +db = nested_dict_sqlite(lambda table: SqliteDict(db_name, table, 'c', True)) diff --git a/core/resolverdns.py b/core/resolverdns.py index d2aa01ca..8e87ecd2 100644 --- a/core/resolverdns.py +++ b/core/resolverdns.py @@ -15,7 +15,7 @@ from lib import doh from platformcode import logger import requests from core import scrapertools -from core import db, db_conn +from core import db if 'PROTOCOL_TLS' in ssl.__dict__: protocol = ssl.PROTOCOL_TLS @@ -50,18 +50,13 @@ class CipherSuiteAdapter(host_header_ssl.HostHeaderSSLAdapter): super(CipherSuiteAdapter, self).__init__(**kwargs) def flushDns(self, request, domain, **kwargs): - db.execute('delete from dnscache where domain=?', (domain,)) - db_conn.commit() + del db['dnscache'][domain] return self.send(request, flushedDns=True, **kwargs) def getIp(self, domain): - ip = None - try: - db.execute('select ip from dnscache where domain=?', (domain,)) - ip = db.fetchall()[0][0] - logger.info('Cache DNS: ' + domain + ' = ' + str(ip)) - except: - pass + ip = db['dnscache'].get(domain, None) + logger.info('Cache DNS: ' + domain + ' = ' + str(ip)) + if not ip: # not cached try: ip = doh.query(domain)[0] @@ -74,11 +69,7 @@ class CipherSuiteAdapter(host_header_ssl.HostHeaderSSLAdapter): return ip def writeToCache(self, domain, ip): - try: - db.execute('insert into dnscache values(?,?)', (domain, ip)) - db_conn.commit() - except: - pass + db['dnscache'][domain] = ip def init_poolmanager(self, *args, **kwargs): kwargs['ssl_context'] = self.ssl_context diff --git a/core/tmdb.py b/core/tmdb.py index 00b7e67d..95388910 100644 --- a/core/tmdb.py +++ b/core/tmdb.py @@ -3,6 +3,7 @@ # from future import standard_library # standard_library.install_aliases() # from builtins import str +import datetime import sys, requests PY3 = False if sys.version_info[0] >= 3: PY3 = True; unicode = str; unichr = chr; long = int @@ -15,7 +16,7 @@ else: from future.builtins import range from future.builtins import object -import ast, copy, re, sqlite3, time, xbmcaddon +import ast, copy, re, time from core import filetools, httptools, jsontools, scrapertools from core.item import InfoLabels @@ -62,7 +63,7 @@ def_lang = info_language[config.get_setting("info_language", "videolibrary")] # ------------------------------------------------- -------------------------------------------------- ----------- otmdb_global = None -from core import db, db_conn +from core import db # The function name is the name of the decorator and receives the function that decorates. @@ -73,17 +74,11 @@ def cache_response(fn): # start_time = time.time() def wrapper(*args, **kwargs): - import base64 - - def check_expired(ts): - import datetime - + def check_expired(saved_date): valided = False cache_expire = config.get_setting("tmdb_cache_expire", default=0) - - saved_date = datetime.datetime.fromtimestamp(ts) - current_date = datetime.datetime.fromtimestamp(time.time()) + current_date = datetime.datetime.now() elapsed = current_date - saved_date # 1 day @@ -130,23 +125,16 @@ def cache_response(fn): url = re.sub('&year=-', '', args[0]) if PY3: url = str.encode(url) - url_base64 = base64.b64encode(url) - db.execute("SELECT response, added FROM tmdb_cache WHERE url=?", (url_base64,)) - row = db.fetchone() - if row and check_expired(float(row[1])): - result = eval(base64.b64decode(row[0])) + row = db['tmdb_cache'].get(url) + + if row and check_expired(row[1]): + result = row[0] # si no se ha obtenido información, llamamos a la funcion if not result: result = fn(*args) - result = str(result) - if PY3: result = str.encode(result) - result_base64 = base64.b64encode(result) - db.execute("INSERT OR REPLACE INTO tmdb_cache (url, response, added) VALUES (?, ?, ?)", - (url_base64, result_base64, time.time())) - - db_conn.commit() + db['tmdb_cache'][url] = [result, datetime.datetime.now()] # elapsed_time = time.time() - start_time # logger.debug("TARDADO %s" % elapsed_time) diff --git a/lib/sqlitedict.py b/lib/sqlitedict.py new file mode 100644 index 00000000..93884a4e --- /dev/null +++ b/lib/sqlitedict.py @@ -0,0 +1,601 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# This code is distributed under the terms and conditions +# from the Apache License, Version 2.0 +# +# http://opensource.org/licenses/apache2.0.php +# +# This code was inspired by: +# * http://code.activestate.com/recipes/576638-draft-for-an-sqlite3-based-dbm/ +# * http://code.activestate.com/recipes/526618/ + +""" +A lightweight wrapper around Python's sqlite3 database, with a dict-like interface +and multi-thread access support:: + +>>> mydict = SqliteDict('some.db', autocommit=True) # the mapping will be persisted to file `some.db` +>>> mydict['some_key'] = any_picklable_object +>>> print mydict['some_key'] +>>> print len(mydict) # etc... all dict functions work + +Pickle is used internally to serialize the values. Keys are strings. + +If you don't use autocommit (default is no autocommit for performance), then +don't forget to call `mydict.commit()` when done with a transaction. + +""" + +import sqlite3 +import os +import sys +import tempfile +import logging +import time +import traceback + +from threading import Thread + +__version__ = '1.7.0.dev0' + +major_version = sys.version_info[0] +if major_version < 3: # py <= 2.x + if sys.version_info[1] < 5: # py <= 2.4 + raise ImportError("sqlitedict requires python 2.5 or higher (python 3.3 or higher supported)") + + # necessary to use exec()_ as this would be a SyntaxError in python3. + # this is an exact port of six.reraise(): + def exec_(_code_, _globs_=None, _locs_=None): + """Execute code in a namespace.""" + if _globs_ is None: + frame = sys._getframe(1) + _globs_ = frame.f_globals + if _locs_ is None: + _locs_ = frame.f_locals + del frame + elif _locs_ is None: + _locs_ = _globs_ + exec("""exec _code_ in _globs_, _locs_""") + + class TimeoutError(OSError): + pass + + exec_("def reraise(tp, value, tb=None):\n" + " raise tp, value, tb\n") +else: + def reraise(tp, value, tb=None): + if value is None: + value = tp() + if value.__traceback__ is not tb: + raise value.with_traceback(tb) + raise value + +try: + from cPickle import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL +except ImportError: + from pickle import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL + +# some Python 3 vs 2 imports +try: + from collections import UserDict as DictClass +except ImportError: + from UserDict import DictMixin as DictClass + +try: + from queue import Queue +except ImportError: + from Queue import Queue + + +logger = logging.getLogger(__name__) + + +def open(*args, **kwargs): + """See documentation of the SqliteDict class.""" + return SqliteDict(*args, **kwargs) + + +def encode(obj): + """Serialize an object using pickle to a binary format accepted by SQLite.""" + return sqlite3.Binary(dumps(obj, protocol=PICKLE_PROTOCOL)) + + +def decode(obj): + """Deserialize objects retrieved from SQLite.""" + return loads(bytes(obj)) + + +class SqliteDict(DictClass): + VALID_FLAGS = ['c', 'r', 'w', 'n'] + + def __init__(self, filename=None, tablename='unnamed', flag='c', + autocommit=False, journal_mode="DELETE", encode=encode, decode=decode, timeout=5): + """ + Initialize a thread-safe sqlite-backed dictionary. The dictionary will + be a table `tablename` in database file `filename`. A single file (=database) + may contain multiple tables. + + If no `filename` is given, a random file in temp will be used (and deleted + from temp once the dict is closed/deleted). + + If you enable `autocommit`, changes will be committed after each operation + (more inefficient but safer). Otherwise, changes are committed on `self.commit()`, + `self.clear()` and `self.close()`. + + Set `journal_mode` to 'OFF' if you're experiencing sqlite I/O problems + or if you need performance and don't care about crash-consistency. + + The `flag` parameter. Exactly one of: + 'c': default mode, open for read/write, creating the db/table if necessary. + 'w': open for r/w, but drop `tablename` contents first (start with empty table) + 'r': open as read-only + 'n': create a new database (erasing any existing tables, not just `tablename`!). + + The `encode` and `decode` parameters are used to customize how the values + are serialized and deserialized. + The `encode` parameter must be a function that takes a single Python + object and returns a serialized representation. + The `decode` function must be a function that takes the serialized + representation produced by `encode` and returns a deserialized Python + object. + The default is to use pickle. + + The `timeout` defines the maximum time (in seconds) to wait for initial Thread startup. + + """ + self.in_temp = filename is None + if self.in_temp: + fd, filename = tempfile.mkstemp(prefix='sqldict') + os.close(fd) + + if flag not in SqliteDict.VALID_FLAGS: + raise RuntimeError("Unrecognized flag: %s" % flag) + self.flag = flag + + if flag == 'n': + if os.path.exists(filename): + os.remove(filename) + + dirname = os.path.dirname(filename) + if dirname: + if not os.path.exists(dirname): + raise RuntimeError('Error! The directory does not exist, %s' % dirname) + + self.filename = filename + + # Use standard SQL escaping of double quote characters in identifiers, by doubling them. + # See https://github.com/RaRe-Technologies/sqlitedict/pull/113 + self.tablename = tablename.replace('"', '""') + + self.autocommit = autocommit + self.journal_mode = journal_mode + self.encode = encode + self.decode = decode + self.timeout = timeout + + logger.info("opening Sqlite table %r in %r" % (tablename, filename)) + self.conn = self._new_conn() + if self.flag == 'r': + if self.tablename not in SqliteDict.get_tablenames(self.filename): + msg = 'Refusing to create a new table "%s" in read-only DB mode' % tablename + raise RuntimeError(msg) + else: + MAKE_TABLE = 'CREATE TABLE IF NOT EXISTS "%s" (key TEXT PRIMARY KEY, value BLOB)' % self.tablename + self.conn.execute(MAKE_TABLE) + self.conn.commit() + if flag == 'w': + self.clear() + + def _new_conn(self): + return SqliteMultithread(self.filename, autocommit=self.autocommit, journal_mode=self.journal_mode, + timeout=self.timeout) + + def __enter__(self): + if not hasattr(self, 'conn') or self.conn is None: + self.conn = self._new_conn() + return self + + def __exit__(self, *exc_info): + self.close() + + def __str__(self): + return "SqliteDict(%s)" % (self.filename) + + def __repr__(self): + return str(self) # no need of something complex + + def __len__(self): + # `select count (*)` is super slow in sqlite (does a linear scan!!) + # As a result, len() is very slow too once the table size grows beyond trivial. + # We could keep the total count of rows ourselves, by means of triggers, + # but that seems too complicated and would slow down normal operation + # (insert/delete etc). + GET_LEN = 'SELECT COUNT(*) FROM "%s"' % self.tablename + rows = self.conn.select_one(GET_LEN)[0] + return rows if rows is not None else 0 + + def __bool__(self): + # No elements is False, otherwise True + GET_MAX = 'SELECT MAX(ROWID) FROM "%s"' % self.tablename + m = self.conn.select_one(GET_MAX)[0] + # Explicit better than implicit and bla bla + return True if m is not None else False + + def iterkeys(self): + GET_KEYS = 'SELECT key FROM "%s" ORDER BY rowid' % self.tablename + for key in self.conn.select(GET_KEYS): + yield key[0] + + def itervalues(self): + GET_VALUES = 'SELECT value FROM "%s" ORDER BY rowid' % self.tablename + for value in self.conn.select(GET_VALUES): + yield self.decode(value[0]) + + def iteritems(self): + GET_ITEMS = 'SELECT key, value FROM "%s" ORDER BY rowid' % self.tablename + for key, value in self.conn.select(GET_ITEMS): + yield key, self.decode(value) + + def keys(self): + return self.iterkeys() if major_version > 2 else list(self.iterkeys()) + + def values(self): + return self.itervalues() if major_version > 2 else list(self.itervalues()) + + def items(self): + return self.iteritems() if major_version > 2 else list(self.iteritems()) + + def __contains__(self, key): + HAS_ITEM = 'SELECT 1 FROM "%s" WHERE key = ?' % self.tablename + return self.conn.select_one(HAS_ITEM, (key,)) is not None + + def __getitem__(self, key): + GET_ITEM = 'SELECT value FROM "%s" WHERE key = ?' % self.tablename + item = self.conn.select_one(GET_ITEM, (key,)) + if item is None: + raise KeyError(key) + return self.decode(item[0]) + + def __setitem__(self, key, value): + if self.flag == 'r': + raise RuntimeError('Refusing to write to read-only SqliteDict') + + ADD_ITEM = 'REPLACE INTO "%s" (key, value) VALUES (?,?)' % self.tablename + self.conn.execute(ADD_ITEM, (key, self.encode(value))) + if self.autocommit: + self.commit() + + def __delitem__(self, key): + if self.flag == 'r': + raise RuntimeError('Refusing to delete from read-only SqliteDict') + + if key not in self: + raise KeyError(key) + DEL_ITEM = 'DELETE FROM "%s" WHERE key = ?' % self.tablename + self.conn.execute(DEL_ITEM, (key,)) + if self.autocommit: + self.commit() + + def update(self, items=(), **kwds): + if self.flag == 'r': + raise RuntimeError('Refusing to update read-only SqliteDict') + + try: + items = items.items() + except AttributeError: + pass + items = [(k, self.encode(v)) for k, v in items] + + UPDATE_ITEMS = 'REPLACE INTO "%s" (key, value) VALUES (?, ?)' % self.tablename + self.conn.executemany(UPDATE_ITEMS, items) + if kwds: + self.update(kwds) + if self.autocommit: + self.commit() + + def __iter__(self): + return self.iterkeys() + + def clear(self): + if self.flag == 'r': + raise RuntimeError('Refusing to clear read-only SqliteDict') + + # avoid VACUUM, as it gives "OperationalError: database schema has changed" + CLEAR_ALL = 'DELETE FROM "%s";' % self.tablename + self.conn.commit() + self.conn.execute(CLEAR_ALL) + self.conn.commit() + + @staticmethod + def get_tablenames(filename): + """get the names of the tables in an sqlite db as a list""" + if not os.path.isfile(filename): + raise IOError('file %s does not exist' % (filename)) + GET_TABLENAMES = 'SELECT name FROM sqlite_master WHERE type="table"' + with sqlite3.connect(filename) as conn: + cursor = conn.execute(GET_TABLENAMES) + res = cursor.fetchall() + + return [name[0] for name in res] + + def commit(self, blocking=True): + """ + Persist all data to disk. + + When `blocking` is False, the commit command is queued, but the data is + not guaranteed persisted (default implication when autocommit=True). + """ + if self.conn is not None: + self.conn.commit(blocking) + sync = commit + + def close(self, do_log=True, force=False): + if do_log: + logger.debug("closing %s" % self) + if hasattr(self, 'conn') and self.conn is not None: + if self.conn.autocommit and not force: + # typically calls to commit are non-blocking when autocommit is + # used. However, we need to block on close() to ensure any + # awaiting exceptions are handled and that all data is + # persisted to disk before returning. + self.conn.commit(blocking=True) + self.conn.close(force=force) + self.conn = None + if self.in_temp: + try: + os.remove(self.filename) + except Exception: + pass + + def terminate(self): + """Delete the underlying database file. Use with care.""" + if self.flag == 'r': + raise RuntimeError('Refusing to terminate read-only SqliteDict') + + self.close() + + if self.filename == ':memory:': + return + + logger.info("deleting %s" % self.filename) + try: + if os.path.isfile(self.filename): + os.remove(self.filename) + except (OSError, IOError): + logger.exception("failed to delete %s" % (self.filename)) + + def __del__(self): + # like close(), but assume globals are gone by now (do not log!) + try: + self.close(do_log=False, force=True) + except Exception: + # prevent error log flood in case of multiple SqliteDicts + # closed after connection lost (exceptions are always ignored + # in __del__ method. + pass + + +# Adding extra methods for python 2 compatibility (at import time) +if major_version == 2: + SqliteDict.__nonzero__ = SqliteDict.__bool__ + del SqliteDict.__bool__ # not needed and confusing + + +class SqliteMultithread(Thread): + """ + Wrap sqlite connection in a way that allows concurrent requests from multiple threads. + + This is done by internally queueing the requests and processing them sequentially + in a separate thread (in the same order they arrived). + + """ + def __init__(self, filename, autocommit, journal_mode, timeout): + super(SqliteMultithread, self).__init__() + self.filename = filename + self.autocommit = autocommit + self.journal_mode = journal_mode + # use request queue of unlimited size + self.reqs = Queue() + self.setDaemon(True) # python2.5-compatible + self.exception = None + self._sqlitedict_thread_initialized = None + self.timeout = timeout + self.log = logging.getLogger('sqlitedict.SqliteMultithread') + self.start() + + def run(self): + try: + if self.autocommit: + conn = sqlite3.connect(self.filename, isolation_level=None, check_same_thread=False) + else: + conn = sqlite3.connect(self.filename, check_same_thread=False) + except Exception: + self.log.exception("Failed to initialize connection for filename: %s" % self.filename) + self.exception = sys.exc_info() + raise + + try: + conn.execute('PRAGMA journal_mode = %s' % self.journal_mode) + conn.text_factory = str + cursor = conn.cursor() + conn.commit() + cursor.execute('PRAGMA synchronous=OFF') + except Exception: + self.log.exception("Failed to execute PRAGMA statements.") + self.exception = sys.exc_info() + raise + + self._sqlitedict_thread_initialized = True + + res = None + while True: + req, arg, res, outer_stack = self.reqs.get() + if req == '--close--': + assert res, ('--close-- without return queue', res) + break + elif req == '--commit--': + conn.commit() + if res: + res.put('--no more--') + else: + try: + cursor.execute(req, arg) + except Exception: + self.exception = (e_type, e_value, e_tb) = sys.exc_info() + inner_stack = traceback.extract_stack() + + # An exception occurred in our thread, but we may not + # immediately able to throw it in our calling thread, if it has + # no return `res` queue: log as level ERROR both the inner and + # outer exception immediately. + # + # Any iteration of res.get() or any next call will detect the + # inner exception and re-raise it in the calling Thread; though + # it may be confusing to see an exception for an unrelated + # statement, an ERROR log statement from the 'sqlitedict.*' + # namespace contains the original outer stack location. + self.log.error('Inner exception:') + for item in traceback.format_list(inner_stack): + self.log.error(item) + self.log.error('') # deliniate traceback & exception w/blank line + for item in traceback.format_exception_only(e_type, e_value): + self.log.error(item) + + self.log.error('') # exception & outer stack w/blank line + self.log.error('Outer stack:') + for item in traceback.format_list(outer_stack): + self.log.error(item) + self.log.error('Exception will be re-raised at next call.') + + if res: + for rec in cursor: + res.put(rec) + res.put('--no more--') + + if self.autocommit: + conn.commit() + + self.log.debug('received: %s, send: --no more--', req) + conn.close() + res.put('--no more--') + + def check_raise_error(self): + """ + Check for and raise exception for any previous sqlite query. + + For the `execute*` family of method calls, such calls are non-blocking and any + exception raised in the thread cannot be handled by the calling Thread (usually + MainThread). This method is called on `close`, and prior to any subsequent + calls to the `execute*` methods to check for and raise an exception in a + previous call to the MainThread. + """ + if self.exception: + e_type, e_value, e_tb = self.exception + + # clear self.exception, if the caller decides to handle such + # exception, we should not repeatedly re-raise it. + self.exception = None + + self.log.error('An exception occurred from a previous statement, view ' + 'the logging namespace "sqlitedict" for outer stack.') + + # The third argument to raise is the traceback object, and it is + # substituted instead of the current location as the place where + # the exception occurred, this is so that when using debuggers such + # as `pdb', or simply evaluating the naturally raised traceback, we + # retain the original (inner) location of where the exception + # occurred. + reraise(e_type, e_value, e_tb) + + def execute(self, req, arg=None, res=None): + """ + `execute` calls are non-blocking: just queue up the request and return immediately. + """ + self._wait_for_initialization() + self.check_raise_error() + + # NOTE: This might be a lot of information to pump into an input + # queue, affecting performance. I've also seen earlier versions of + # jython take a severe performance impact for throwing exceptions + # so often. + stack = traceback.extract_stack()[:-1] + self.reqs.put((req, arg or tuple(), res, stack)) + + def executemany(self, req, items): + for item in items: + self.execute(req, item) + self.check_raise_error() + + def select(self, req, arg=None): + """ + Unlike sqlite's native select, this select doesn't handle iteration efficiently. + + The result of `select` starts filling up with values as soon as the + request is dequeued, and although you can iterate over the result normally + (`for res in self.select(): ...`), the entire result will be in memory. + """ + res = Queue() # results of the select will appear as items in this queue + self.execute(req, arg, res) + while True: + rec = res.get() + self.check_raise_error() + if rec == '--no more--': + break + yield rec + + def select_one(self, req, arg=None): + """Return only the first row of the SELECT, or None if there are no matching rows.""" + try: + return next(iter(self.select(req, arg))) + except StopIteration: + return None + + def commit(self, blocking=True): + if blocking: + # by default, we await completion of commit() unless + # blocking=False. This ensures any available exceptions for any + # previous statement are thrown before returning, and that the + # data has actually persisted to disk! + self.select_one('--commit--') + else: + # otherwise, we fire and forget as usual. + self.execute('--commit--') + + def close(self, force=False): + if force: + # If a SqliteDict is being killed or garbage-collected, then select_one() + # could hang forever because run() might already have exited and therefore + # can't process the request. Instead, push the close command to the requests + # queue directly. If run() is still alive, it will exit gracefully. If not, + # then there's nothing we can do anyway. + self.reqs.put(('--close--', None, Queue(), None)) + else: + # we abuse 'select' to "iter" over a "--close--" statement so that we + # can confirm the completion of close before joining the thread and + # returning (by semaphore '--no more--' + self.select_one('--close--') + self.join() + + def _wait_for_initialization(self): + """ + Polls the 'initialized' flag to be set by the started Thread in run(). + """ + # A race condition may occur without waiting for initialization: + # __init__() finishes with the start() call, but the Thread needs some time to actually start working. + # If opening the database file fails in run(), an exception will occur and self.exception will be set. + # But if we run check_raise_error() before run() had a chance to set self.exception, it will report + # a false negative: An exception occured and the thread terminates but self.exception is unset. + # This leads to a deadlock while waiting for the results of execute(). + # By waiting for the Thread to set the initialized flag, we can ensure the thread has successfully + # opened the file - and possibly set self.exception to be detected by check_raise_error(). + + start_time = time.time() + while time.time() - start_time < self.timeout: + if self._sqlitedict_thread_initialized or self.exception: + return + time.sleep(0.1) + raise TimeoutError("SqliteMultithread failed to flag initialization withing %0.0f seconds." % self.timeout) + + +if __name__ == '__main__': + print(__version__) \ No newline at end of file diff --git a/platformcode/platformtools.py b/platformcode/platformtools.py index 18559e2c..f03fbd4a 100644 --- a/platformcode/platformtools.py +++ b/platformcode/platformtools.py @@ -1414,75 +1414,47 @@ def get_played_time(item): if not item.infoLabels: return 0 - ID = item.infoLabels.get('tmdb_id','') + ID = item.infoLabels.get('tmdb_id', '') if not ID: return 0 - S = item.infoLabels.get('season') + S = item.infoLabels.get('season', 0) E = item.infoLabels.get('episode') + result = None if item.contentType == 'movie': - db.execute("SELECT played_time FROM viewed WHERE tmdb_id=?", (ID,)) - + result = db['viewed'].get(ID) elif S and E: - S = item.infoLabels['season'] - E = item.infoLabels['episode'] - db.execute("SELECT played_time FROM viewed WHERE tmdb_id=? AND season=? AND episode=?", (ID, S, E)) - - elif E: - E = item.infoLabels['episode'] - db.execute("SELECT played_time FROM viewed WHERE tmdb_id=? AND episode=?", (ID, E)) - - result = db.fetchone() + result = db['viewed'].get(ID, {}).get(str(S)+'x'+str(E)) if not result: played_time = 0 - else: played_time = result[0] + else: played_time = result return played_time def set_played_time(item): logger.debug() - from core import db, db_conn + from core import db played_time = item.played_time if not item.infoLabels: return - ID = item.infoLabels.get('tmdb_id','') + ID = item.infoLabels.get('tmdb_id', '') if not ID: return - S = item.infoLabels.get('season') + S = item.infoLabels.get('season', 0) E = item.infoLabels.get('episode') if item.contentType == 'movie': - db.execute("SELECT played_time FROM viewed WHERE tmdb_id=?", (ID,)) - result = db.fetchone() - if result: - if played_time > 0: db.execute("UPDATE viewed SET played_time=? WHERE tmdb_id=?", (played_time, ID)) - else: db.execute("DELETE from viewed WHERE tmdb_id=?", (ID,)) - else: db.execute("INSERT INTO viewed (tmdb_id, played_time) VALUES (?, ?)", (ID, played_time)) - - elif S and E: - db.execute("SELECT played_time FROM viewed WHERE tmdb_id=? AND season = ? AND episode=?", (ID, S, E)) - result = db.fetchone() - if result: - if played_time > 0: db.execute("UPDATE viewed SET played_time=? WHERE tmdb_id=? AND season=? AND episode=?", (played_time, ID, S, E)) - else: db.execute("DELETE from viewed WHERE tmdb_id=? AND season=? AND episode=?", (ID, S, E)) - else: db.execute("INSERT INTO viewed (tmdb_id, season, episode, played_time) VALUES (?, ?, ?, ?)", (ID, S, E, played_time)) - + db['viewed'][ID] = played_time elif E: - E = item.infoLabels['episode'] - db.execute("SELECT played_time FROM viewed WHERE tmdb_id=? AND episode=?", (ID, E)) - result = db.fetchone() - if result: - if played_time > 0: db.execute("UPDATE viewed SET played_time=? WHERE tmdb_id=? AND episode=?", (played_time, ID, E)) - else: db.execute("DELETE from viewed WHERE tmdb_id=? AND episode=?", (ID, E)) - else: db.execute("INSERT INTO viewed (tmdb_id, episode, played_time) VALUES (?, ?, ?)", (ID, E, played_time)) - - db_conn.commit() + newDict = db['viewed'].get(ID, {}) + newDict[str(S) + 'x' + str(E)] = played_time + db['viewed'][ID] = newDict def prevent_busy(item):