diff --git a/core/tmdb.py b/core/tmdb.py index 60210b3e..9582b4e3 100644 --- a/core/tmdb.py +++ b/core/tmdb.py @@ -4,6 +4,7 @@ import copy import re import sqlite3 import time +import urllib import xbmcaddon @@ -156,9 +157,11 @@ def cache_response(fn): result = fn(*args) else: - conn = sqlite3.connect(fname) + conn = sqlite3.connect(fname, timeout=15) c = conn.cursor() - url_base64 = base64.b64encode(args[0]) + url = re.sub('&year=-', '', args[0]) + logger.error('la url %s' % url) + url_base64 = base64.b64encode(url) c.execute("SELECT response, added FROM tmdb_cache WHERE url=?", (url_base64,)) row = c.fetchone() @@ -189,7 +192,7 @@ def cache_response(fn): return wrapper -def set_infoLabels(source, seekTmdb=True, idioma_busqueda=def_lang): +def set_infoLabels(source, seekTmdb=True, idioma_busqueda=def_lang, forced=False): """ Dependiendo del tipo de dato de source obtiene y fija (item.infoLabels) los datos extras de una o varias series, capitulos o peliculas. @@ -205,6 +208,9 @@ def set_infoLabels(source, seekTmdb=True, idioma_busqueda=def_lang): @rtype: int, list """ + if not config.get_setting('tmdb_active') and not forced: + return + start_time = time.time() if type(source) == list: ret = set_infoLabels_itemlist(source, seekTmdb, idioma_busqueda) @@ -215,7 +221,7 @@ def set_infoLabels(source, seekTmdb=True, idioma_busqueda=def_lang): return ret -def set_infoLabels_itemlist(item_list, seekTmdb=False, idioma_busqueda=def_lang): +def set_infoLabels_itemlist(item_list, seekTmdb=False, idioma_busqueda=def_lang, forced=False): """ De manera concurrente, obtiene los datos de los items incluidos en la lista item_list. @@ -236,6 +242,8 @@ def set_infoLabels_itemlist(item_list, seekTmdb=False, idioma_busqueda=def_lang) negativo en caso contrario. @rtype: list """ + if not config.get_setting('tmdb_active') and not forced: + return import threading threads_num = config.get_setting("tmdb_threads", default=20) @@ -555,20 +563,24 @@ def completar_codigos(item): item.infoLabels['url_scraper'].append(url_scraper) -def discovery(item): +def discovery(item, dict_=False, cast=False): if item.search_type == 'discover': listado = Tmdb(discover={'url':'discover/%s' % item.type, 'with_genres':item.list_type, 'language':def_lang, 'page':item.page}) + if item.search_type == 'discover': + if dict_: + listado = Tmdb(discover=dict_, cast=cast) + elif item.search_type == 'discover': + listado = Tmdb(discover={'url': 'discover/%s' % item.type, 'with_genres': item.list_type, 'language': 'es', + 'page': item.page}) elif item.search_type == 'list': if item.page == '': item.page = '1' listado = Tmdb(list={'url': item.list_type, 'language':def_lang, 'page':item.page}) - logger.debug(listado.get_list_resultados()) - result = listado.get_list_resultados() - return result + return listado def get_genres(type): lang = def_lang @@ -788,6 +800,7 @@ class Tmdb(object): def __init__(self, **kwargs): self.page = kwargs.get('page', 1) self.index_results = 0 + self.cast = kwargs.get('cast', False) self.results = [] self.result = ResultDictDefault() self.total_pages = 0 @@ -804,7 +817,6 @@ class Tmdb(object): self.busqueda_year = kwargs.get('year', '') self.busqueda_filtro = kwargs.get('filtro', {}) self.discover = kwargs.get('discover', {}) - self.list = kwargs.get('list', {}) # Reellenar diccionario de generos si es necesario if (self.busqueda_tipo == 'movie' or self.busqueda_tipo == "tv") and \ @@ -836,9 +848,6 @@ class Tmdb(object): elif self.discover: self.__discover() - elif self.list: - self.__list() - else: logger.debug("Creado objeto vacio") @@ -847,20 +856,19 @@ class Tmdb(object): def get_json(url): try: - result = httptools.downloadpage(url, cookies=False) + result = httptools.downloadpage(url, cookies=False, ignore_response_code=True) res_headers = result.headers - # logger.debug("res_headers es %s" % res_headers) dict_data = jsontools.load(result.data) - # logger.debug("result_data es %s" % dict_data) + #logger.debug("result_data es %s" % dict_data) if "status_code" in dict_data: - logger.debug("\nError de tmdb: %s %s" % (dict_data["status_code"], dict_data["status_message"])) + #logger.debug("\nError de tmdb: %s %s" % (dict_data["status_code"], dict_data["status_message"])) if dict_data["status_code"] == 25: while "status_code" in dict_data and dict_data["status_code"] == 25: wait = int(res_headers['retry-after']) - logger.debug("Limite alcanzado, esperamos para volver a llamar en ...%s" % wait) + #logger.error("Limite alcanzado, esperamos para volver a llamar en ...%s" % wait) time.sleep(wait) # logger.debug("RE Llamada #%s" % d) result = httptools.downloadpage(url, cookies=False) @@ -941,6 +949,8 @@ class Tmdb(object): self.result = ResultDictDefault() results = [] total_results = 0 + text_simple = self.busqueda_texto.lower() + text_quote = urllib.quote(text_simple) total_pages = 0 buscando = "" @@ -948,7 +958,7 @@ class Tmdb(object): # http://api.themoviedb.org/3/search/movie?api_key=a1ab8b8669da03637a4b98fa39c39228&query=superman&language=es # &include_adult=false&page=1 url = ('http://api.themoviedb.org/3/search/%s?api_key=a1ab8b8669da03637a4b98fa39c39228&query=%s&language=%s' - '&include_adult=%s&page=%s' % (self.busqueda_tipo, self.busqueda_texto.replace(' ', '%20'), + '&include_adult=%s&page=%s' % (self.busqueda_tipo, text_quote.replace(' ', '%20'), self.busqueda_idioma, self.busqueda_include_adult, page)) if self.busqueda_year: @@ -993,64 +1003,6 @@ class Tmdb(object): logger.error(msg) return 0 - def __list(self, index_results=0): - self.result = ResultDictDefault() - results = [] - total_results = 0 - total_pages = 0 - - # Ejemplo self.discover: {'url': 'movie/', 'with_cast': '1'} - # url: Método de la api a ejecutar - # resto de claves: Parámetros de la búsqueda concatenados a la url - type_search = self.list.get('url', '') - if type_search: - params = [] - for key, value in self.list.items(): - if key != "url": - params.append("&"+key + "=" + str(value)) - # http://api.themoviedb.org/3/movie/popolar?api_key=a1ab8b8669da03637a4b98fa39c39228&&language=es - url = ('http://api.themoviedb.org/3/%s?api_key=a1ab8b8669da03637a4b98fa39c39228%s' - % (type_search, ''.join(params))) - - logger.info("[Tmdb.py] Buscando %s:\n%s" % (type_search, url)) - resultado = self.get_json(url) - - total_results = resultado.get("total_results", -1) - total_pages = resultado.get("total_pages", 1) - - if total_results > 0: - results = resultado["results"] - if self.busqueda_filtro and results: - # TODO documentar esta parte - for key, value in dict(self.busqueda_filtro).items(): - for r in results[:]: - if key not in r or r[key] != value: - results.remove(r) - total_results -= 1 - elif total_results == -1: - results = resultado - - if index_results >= len(results): - logger.error( - "La busqueda de '%s' no dio %s resultados" % (type_search, index_results)) - return 0 - - # Retornamos el numero de resultados de esta pagina - if results: - self.results = results - self.total_results = total_results - self.total_pages = total_pages - if total_results > 0: - self.result = ResultDictDefault(self.results[index_results]) - else: - self.result = results - return len(self.results) - else: - # No hay resultados de la busqueda - logger.error("La busqueda de '%s' no dio resultados" % type_search) - return 0 - - def __discover(self, index_results=0): self.result = ResultDictDefault() @@ -1077,8 +1029,12 @@ class Tmdb(object): total_results = resultado.get("total_results", -1) total_pages = resultado.get("total_pages", 1) - if total_results > 0: - results = resultado["results"] + if total_results > 0 or self.cast: + if self.cast: + results = resultado[self.cast] + total_results = len(results) + else: + results = resultado["results"] if self.busqueda_filtro and results: # TODO documentar esta parte for key, value in dict(self.busqueda_filtro).items(): @@ -1101,6 +1057,7 @@ class Tmdb(object): self.total_pages = total_pages if total_results > 0: self.result = ResultDictDefault(self.results[index_results]) + else: self.result = results return len(self.results) @@ -1147,8 +1104,10 @@ class Tmdb(object): result['thumbnail'] = self.get_poster(size="w300") result['fanart'] = self.get_backdrop() + res.append(result) cr += 1 + if cr >= num_result: return res except: @@ -1370,7 +1329,7 @@ class Tmdb(object): msg += "\nError de tmdb: %s %s" % ( self.temporada[numtemporada]["status_code"], self.temporada[numtemporada]["status_message"]) logger.debug(msg) - self.temporada[numtemporada] = {"episodes": {}} + self.temporada[numtemporada] = {} return self.temporada[numtemporada] diff --git a/lib/concurrent/__init__.py b/lib/concurrent/__init__.py new file mode 100644 index 00000000..b36383a6 --- /dev/null +++ b/lib/concurrent/__init__.py @@ -0,0 +1,3 @@ +from pkgutil import extend_path + +__path__ = extend_path(__path__, __name__) diff --git a/lib/concurrent/futures/__init__.py b/lib/concurrent/futures/__init__.py new file mode 100644 index 00000000..428b14bd --- /dev/null +++ b/lib/concurrent/futures/__init__.py @@ -0,0 +1,23 @@ +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. + +"""Execute computations asynchronously using threads or processes.""" + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' + +from concurrent.futures._base import (FIRST_COMPLETED, + FIRST_EXCEPTION, + ALL_COMPLETED, + CancelledError, + TimeoutError, + Future, + Executor, + wait, + as_completed) +from concurrent.futures.thread import ThreadPoolExecutor + +try: + from concurrent.futures.process import ProcessPoolExecutor +except ImportError: + # some platforms don't have multiprocessing + pass diff --git a/lib/concurrent/futures/_base.py b/lib/concurrent/futures/_base.py new file mode 100644 index 00000000..510ffa53 --- /dev/null +++ b/lib/concurrent/futures/_base.py @@ -0,0 +1,667 @@ +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. + +import collections +import logging +import threading +import itertools +import time +import types + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' + +FIRST_COMPLETED = 'FIRST_COMPLETED' +FIRST_EXCEPTION = 'FIRST_EXCEPTION' +ALL_COMPLETED = 'ALL_COMPLETED' +_AS_COMPLETED = '_AS_COMPLETED' + +# Possible future states (for internal use by the futures package). +PENDING = 'PENDING' +RUNNING = 'RUNNING' +# The future was cancelled by the user... +CANCELLED = 'CANCELLED' +# ...and _Waiter.add_cancelled() was called by a worker. +CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' +FINISHED = 'FINISHED' + +_FUTURE_STATES = [ + PENDING, + RUNNING, + CANCELLED, + CANCELLED_AND_NOTIFIED, + FINISHED +] + +_STATE_TO_DESCRIPTION_MAP = { + PENDING: "pending", + RUNNING: "running", + CANCELLED: "cancelled", + CANCELLED_AND_NOTIFIED: "cancelled", + FINISHED: "finished" +} + +# Logger for internal use by the futures package. +LOGGER = logging.getLogger("concurrent.futures") + +class Error(Exception): + """Base class for all future-related exceptions.""" + pass + +class CancelledError(Error): + """The Future was cancelled.""" + pass + +class TimeoutError(Error): + """The operation exceeded the given deadline.""" + pass + +class _Waiter(object): + """Provides the event that wait() and as_completed() block on.""" + def __init__(self): + self.event = threading.Event() + self.finished_futures = [] + + def add_result(self, future): + self.finished_futures.append(future) + + def add_exception(self, future): + self.finished_futures.append(future) + + def add_cancelled(self, future): + self.finished_futures.append(future) + +class _AsCompletedWaiter(_Waiter): + """Used by as_completed().""" + + def __init__(self): + super(_AsCompletedWaiter, self).__init__() + self.lock = threading.Lock() + + def add_result(self, future): + with self.lock: + super(_AsCompletedWaiter, self).add_result(future) + self.event.set() + + def add_exception(self, future): + with self.lock: + super(_AsCompletedWaiter, self).add_exception(future) + self.event.set() + + def add_cancelled(self, future): + with self.lock: + super(_AsCompletedWaiter, self).add_cancelled(future) + self.event.set() + +class _FirstCompletedWaiter(_Waiter): + """Used by wait(return_when=FIRST_COMPLETED).""" + + def add_result(self, future): + super(_FirstCompletedWaiter, self).add_result(future) + self.event.set() + + def add_exception(self, future): + super(_FirstCompletedWaiter, self).add_exception(future) + self.event.set() + + def add_cancelled(self, future): + super(_FirstCompletedWaiter, self).add_cancelled(future) + self.event.set() + +class _AllCompletedWaiter(_Waiter): + """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" + + def __init__(self, num_pending_calls, stop_on_exception): + self.num_pending_calls = num_pending_calls + self.stop_on_exception = stop_on_exception + self.lock = threading.Lock() + super(_AllCompletedWaiter, self).__init__() + + def _decrement_pending_calls(self): + with self.lock: + self.num_pending_calls -= 1 + if not self.num_pending_calls: + self.event.set() + + def add_result(self, future): + super(_AllCompletedWaiter, self).add_result(future) + self._decrement_pending_calls() + + def add_exception(self, future): + super(_AllCompletedWaiter, self).add_exception(future) + if self.stop_on_exception: + self.event.set() + else: + self._decrement_pending_calls() + + def add_cancelled(self, future): + super(_AllCompletedWaiter, self).add_cancelled(future) + self._decrement_pending_calls() + +class _AcquireFutures(object): + """A context manager that does an ordered acquire of Future conditions.""" + + def __init__(self, futures): + self.futures = sorted(futures, key=id) + + def __enter__(self): + for future in self.futures: + future._condition.acquire() + + def __exit__(self, *args): + for future in self.futures: + future._condition.release() + +def _create_and_install_waiters(fs, return_when): + if return_when == _AS_COMPLETED: + waiter = _AsCompletedWaiter() + elif return_when == FIRST_COMPLETED: + waiter = _FirstCompletedWaiter() + else: + pending_count = sum( + f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs) + + if return_when == FIRST_EXCEPTION: + waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True) + elif return_when == ALL_COMPLETED: + waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False) + else: + raise ValueError("Invalid return condition: %r" % return_when) + + for f in fs: + f._waiters.append(waiter) + + return waiter + + +def _yield_finished_futures(fs, waiter, ref_collect): + """ + Iterate on the list *fs*, yielding finished futures one by one in + reverse order. + Before yielding a future, *waiter* is removed from its waiters + and the future is removed from each set in the collection of sets + *ref_collect*. + + The aim of this function is to avoid keeping stale references after + the future is yielded and before the iterator resumes. + """ + while fs: + f = fs[-1] + for futures_set in ref_collect: + futures_set.remove(f) + with f._condition: + f._waiters.remove(waiter) + del f + # Careful not to keep a reference to the popped value + yield fs.pop() + + +def as_completed(fs, timeout=None): + """An iterator over the given futures that yields each as it completes. + + Args: + fs: The sequence of Futures (possibly created by different Executors) to + iterate over. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + + Returns: + An iterator that yields the given Futures as they complete (finished or + cancelled). If any given Futures are duplicated, they will be returned + once. + + Raises: + TimeoutError: If the entire result iterator could not be generated + before the given timeout. + """ + if timeout is not None: + end_time = timeout + time.time() + + fs = set(fs) + total_futures = len(fs) + with _AcquireFutures(fs): + finished = set( + f for f in fs + if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) + pending = fs - finished + waiter = _create_and_install_waiters(fs, _AS_COMPLETED) + finished = list(finished) + try: + for f in _yield_finished_futures(finished, waiter, + ref_collect=(fs,)): + f = [f] + yield f.pop() + + while pending: + if timeout is None: + wait_timeout = None + else: + wait_timeout = end_time - time.time() + if wait_timeout < 0: + raise TimeoutError( + '%d (of %d) futures unfinished' % ( + len(pending), total_futures)) + + waiter.event.wait(wait_timeout) + + with waiter.lock: + finished = waiter.finished_futures + waiter.finished_futures = [] + waiter.event.clear() + + # reverse to keep finishing order + finished.reverse() + for f in _yield_finished_futures(finished, waiter, + ref_collect=(fs, pending)): + f = [f] + yield f.pop() + + finally: + # Remove waiter from unfinished futures + for f in fs: + with f._condition: + f._waiters.remove(waiter) + +DoneAndNotDoneFutures = collections.namedtuple( + 'DoneAndNotDoneFutures', 'done not_done') +def wait(fs, timeout=None, return_when=ALL_COMPLETED): + """Wait for the futures in the given sequence to complete. + + Args: + fs: The sequence of Futures (possibly created by different Executors) to + wait upon. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + return_when: Indicates when this function should return. The options + are: + + FIRST_COMPLETED - Return when any future finishes or is + cancelled. + FIRST_EXCEPTION - Return when any future finishes by raising an + exception. If no future raises an exception + then it is equivalent to ALL_COMPLETED. + ALL_COMPLETED - Return when all futures finish or are cancelled. + + Returns: + A named 2-tuple of sets. The first set, named 'done', contains the + futures that completed (is finished or cancelled) before the wait + completed. The second set, named 'not_done', contains uncompleted + futures. + """ + with _AcquireFutures(fs): + done = set(f for f in fs + if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) + not_done = set(fs) - done + + if (return_when == FIRST_COMPLETED) and done: + return DoneAndNotDoneFutures(done, not_done) + elif (return_when == FIRST_EXCEPTION) and done: + if any(f for f in done + if not f.cancelled() and f.exception() is not None): + return DoneAndNotDoneFutures(done, not_done) + + if len(done) == len(fs): + return DoneAndNotDoneFutures(done, not_done) + + waiter = _create_and_install_waiters(fs, return_when) + + waiter.event.wait(timeout) + for f in fs: + with f._condition: + f._waiters.remove(waiter) + + done.update(waiter.finished_futures) + return DoneAndNotDoneFutures(done, set(fs) - done) + +class Future(object): + """Represents the result of an asynchronous computation.""" + + def __init__(self): + """Initializes the future. Should not be called by clients.""" + self._condition = threading.Condition() + self._state = PENDING + self._result = None + self._exception = None + self._traceback = None + self._waiters = [] + self._done_callbacks = [] + + def _invoke_callbacks(self): + for callback in self._done_callbacks: + try: + callback(self) + except Exception: + LOGGER.exception('exception calling callback for %r', self) + except BaseException: + # Explicitly let all other new-style exceptions through so + # that we can catch all old-style exceptions with a simple + # "except:" clause below. + # + # All old-style exception objects are instances of + # types.InstanceType, but "except types.InstanceType:" does + # not catch old-style exceptions for some reason. Thus, the + # only way to catch all old-style exceptions without catching + # any new-style exceptions is to filter out the new-style + # exceptions, which all derive from BaseException. + raise + except: + # Because of the BaseException clause above, this handler only + # executes for old-style exception objects. + LOGGER.exception('exception calling callback for %r', self) + + def __repr__(self): + with self._condition: + if self._state == FINISHED: + if self._exception: + return '<%s at %#x state=%s raised %s>' % ( + self.__class__.__name__, + id(self), + _STATE_TO_DESCRIPTION_MAP[self._state], + self._exception.__class__.__name__) + else: + return '<%s at %#x state=%s returned %s>' % ( + self.__class__.__name__, + id(self), + _STATE_TO_DESCRIPTION_MAP[self._state], + self._result.__class__.__name__) + return '<%s at %#x state=%s>' % ( + self.__class__.__name__, + id(self), + _STATE_TO_DESCRIPTION_MAP[self._state]) + + def cancel(self): + """Cancel the future if possible. + + Returns True if the future was cancelled, False otherwise. A future + cannot be cancelled if it is running or has already completed. + """ + with self._condition: + if self._state in [RUNNING, FINISHED]: + return False + + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + return True + + self._state = CANCELLED + self._condition.notify_all() + + self._invoke_callbacks() + return True + + def cancelled(self): + """Return True if the future was cancelled.""" + with self._condition: + return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] + + def running(self): + """Return True if the future is currently executing.""" + with self._condition: + return self._state == RUNNING + + def done(self): + """Return True of the future was cancelled or finished executing.""" + with self._condition: + return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] + + def __get_result(self): + if self._exception: + if isinstance(self._exception, types.InstanceType): + # The exception is an instance of an old-style class, which + # means type(self._exception) returns types.ClassType instead + # of the exception's actual class type. + exception_type = self._exception.__class__ + else: + exception_type = type(self._exception) + raise exception_type, self._exception, self._traceback + else: + return self._result + + def add_done_callback(self, fn): + """Attaches a callable that will be called when the future finishes. + + Args: + fn: A callable that will be called with this future as its only + argument when the future completes or is cancelled. The callable + will always be called by a thread in the same process in which + it was added. If the future has already completed or been + cancelled then the callable will be called immediately. These + callables are called in the order that they were added. + """ + with self._condition: + if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: + self._done_callbacks.append(fn) + return + fn(self) + + def result(self, timeout=None): + """Return the result of the call that the future represents. + + Args: + timeout: The number of seconds to wait for the result if the future + isn't done. If None, then there is no limit on the wait time. + + Returns: + The result of the call that the future represents. + + Raises: + CancelledError: If the future was cancelled. + TimeoutError: If the future didn't finish executing before the given + timeout. + Exception: If the call raised then that exception will be raised. + """ + with self._condition: + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self.__get_result() + + self._condition.wait(timeout) + + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self.__get_result() + else: + raise TimeoutError() + + def exception_info(self, timeout=None): + """Return a tuple of (exception, traceback) raised by the call that the + future represents. + + Args: + timeout: The number of seconds to wait for the exception if the + future isn't done. If None, then there is no limit on the wait + time. + + Returns: + The exception raised by the call that the future represents or None + if the call completed without raising. + + Raises: + CancelledError: If the future was cancelled. + TimeoutError: If the future didn't finish executing before the given + timeout. + """ + with self._condition: + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self._exception, self._traceback + + self._condition.wait(timeout) + + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self._exception, self._traceback + else: + raise TimeoutError() + + def exception(self, timeout=None): + """Return the exception raised by the call that the future represents. + + Args: + timeout: The number of seconds to wait for the exception if the + future isn't done. If None, then there is no limit on the wait + time. + + Returns: + The exception raised by the call that the future represents or None + if the call completed without raising. + + Raises: + CancelledError: If the future was cancelled. + TimeoutError: If the future didn't finish executing before the given + timeout. + """ + return self.exception_info(timeout)[0] + + # The following methods should only be used by Executors and in tests. + def set_running_or_notify_cancel(self): + """Mark the future as running or process any cancel notifications. + + Should only be used by Executor implementations and unit tests. + + If the future has been cancelled (cancel() was called and returned + True) then any threads waiting on the future completing (though calls + to as_completed() or wait()) are notified and False is returned. + + If the future was not cancelled then it is put in the running state + (future calls to running() will return True) and True is returned. + + This method should be called by Executor implementations before + executing the work associated with this future. If this method returns + False then the work should not be executed. + + Returns: + False if the Future was cancelled, True otherwise. + + Raises: + RuntimeError: if this method was already called or if set_result() + or set_exception() was called. + """ + with self._condition: + if self._state == CANCELLED: + self._state = CANCELLED_AND_NOTIFIED + for waiter in self._waiters: + waiter.add_cancelled(self) + # self._condition.notify_all() is not necessary because + # self.cancel() triggers a notification. + return False + elif self._state == PENDING: + self._state = RUNNING + return True + else: + LOGGER.critical('Future %s in unexpected state: %s', + id(self), + self._state) + raise RuntimeError('Future in unexpected state') + + def set_result(self, result): + """Sets the return value of work associated with the future. + + Should only be used by Executor implementations and unit tests. + """ + with self._condition: + self._result = result + self._state = FINISHED + for waiter in self._waiters: + waiter.add_result(self) + self._condition.notify_all() + self._invoke_callbacks() + + def set_exception_info(self, exception, traceback): + """Sets the result of the future as being the given exception + and traceback. + + Should only be used by Executor implementations and unit tests. + """ + with self._condition: + self._exception = exception + self._traceback = traceback + self._state = FINISHED + for waiter in self._waiters: + waiter.add_exception(self) + self._condition.notify_all() + self._invoke_callbacks() + + def set_exception(self, exception): + """Sets the result of the future as being the given exception. + + Should only be used by Executor implementations and unit tests. + """ + self.set_exception_info(exception, None) + +class Executor(object): + """This is an abstract base class for concrete asynchronous executors.""" + + def submit(self, fn, *args, **kwargs): + """Submits a callable to be executed with the given arguments. + + Schedules the callable to be executed as fn(*args, **kwargs) and returns + a Future instance representing the execution of the callable. + + Returns: + A Future representing the given call. + """ + raise NotImplementedError() + + def map(self, fn, *iterables, **kwargs): + """Returns an iterator equivalent to map(fn, iter). + + Args: + fn: A callable that will take as many arguments as there are + passed iterables. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + + Returns: + An iterator equivalent to: map(func, *iterables) but the calls may + be evaluated out-of-order. + + Raises: + TimeoutError: If the entire result iterator could not be generated + before the given timeout. + Exception: If fn(*args) raises for any values. + """ + timeout = kwargs.get('timeout') + if timeout is not None: + end_time = timeout + time.time() + + fs = [self.submit(fn, *args) for args in itertools.izip(*iterables)] + + # Yield must be hidden in closure so that the futures are submitted + # before the first iterator value is required. + def result_iterator(): + try: + # reverse to keep finishing order + fs.reverse() + while fs: + # Careful not to keep a reference to the popped future + if timeout is None: + yield fs.pop().result() + else: + yield fs.pop().result(end_time - time.time()) + finally: + for future in fs: + future.cancel() + return result_iterator() + + def shutdown(self, wait=True): + """Clean-up the resources associated with the Executor. + + It is safe to call this method several times. Otherwise, no other + methods can be called after this one. + + Args: + wait: If True then shutdown will not return until all running + futures have finished executing and the resources used by the + executor have been reclaimed. + """ + pass + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.shutdown(wait=True) + return False diff --git a/lib/concurrent/futures/_compat.py b/lib/concurrent/futures/_compat.py new file mode 100644 index 00000000..e77cf0e5 --- /dev/null +++ b/lib/concurrent/futures/_compat.py @@ -0,0 +1,111 @@ +from keyword import iskeyword as _iskeyword +from operator import itemgetter as _itemgetter +import sys as _sys + + +def namedtuple(typename, field_names): + """Returns a new subclass of tuple with named fields. + + >>> Point = namedtuple('Point', 'x y') + >>> Point.__doc__ # docstring for the new class + 'Point(x, y)' + >>> p = Point(11, y=22) # instantiate with positional args or keywords + >>> p[0] + p[1] # indexable like a plain tuple + 33 + >>> x, y = p # unpack like a regular tuple + >>> x, y + (11, 22) + >>> p.x + p.y # fields also accessable by name + 33 + >>> d = p._asdict() # convert to a dictionary + >>> d['x'] + 11 + >>> Point(**d) # convert from a dictionary + Point(x=11, y=22) + >>> p._replace(x=100) # _replace() is like str.replace() but targets named fields + Point(x=100, y=22) + + """ + + # Parse and validate the field names. Validation serves two purposes, + # generating informative error messages and preventing template injection attacks. + if isinstance(field_names, basestring): + field_names = field_names.replace(',', ' ').split() # names separated by whitespace and/or commas + field_names = tuple(map(str, field_names)) + for name in (typename,) + field_names: + if not all(c.isalnum() or c=='_' for c in name): + raise ValueError('Type names and field names can only contain alphanumeric characters and underscores: %r' % name) + if _iskeyword(name): + raise ValueError('Type names and field names cannot be a keyword: %r' % name) + if name[0].isdigit(): + raise ValueError('Type names and field names cannot start with a number: %r' % name) + seen_names = set() + for name in field_names: + if name.startswith('_'): + raise ValueError('Field names cannot start with an underscore: %r' % name) + if name in seen_names: + raise ValueError('Encountered duplicate field name: %r' % name) + seen_names.add(name) + + # Create and fill-in the class template + numfields = len(field_names) + argtxt = repr(field_names).replace("'", "")[1:-1] # tuple repr without parens or quotes + reprtxt = ', '.join('%s=%%r' % name for name in field_names) + dicttxt = ', '.join('%r: t[%d]' % (name, pos) for pos, name in enumerate(field_names)) + template = '''class %(typename)s(tuple): + '%(typename)s(%(argtxt)s)' \n + __slots__ = () \n + _fields = %(field_names)r \n + def __new__(_cls, %(argtxt)s): + return _tuple.__new__(_cls, (%(argtxt)s)) \n + @classmethod + def _make(cls, iterable, new=tuple.__new__, len=len): + 'Make a new %(typename)s object from a sequence or iterable' + result = new(cls, iterable) + if len(result) != %(numfields)d: + raise TypeError('Expected %(numfields)d arguments, got %%d' %% len(result)) + return result \n + def __repr__(self): + return '%(typename)s(%(reprtxt)s)' %% self \n + def _asdict(t): + 'Return a new dict which maps field names to their values' + return {%(dicttxt)s} \n + def _replace(_self, **kwds): + 'Return a new %(typename)s object replacing specified fields with new values' + result = _self._make(map(kwds.pop, %(field_names)r, _self)) + if kwds: + raise ValueError('Got unexpected field names: %%r' %% kwds.keys()) + return result \n + def __getnewargs__(self): + return tuple(self) \n\n''' % locals() + for i, name in enumerate(field_names): + template += ' %s = _property(_itemgetter(%d))\n' % (name, i) + + # Execute the template string in a temporary namespace and + # support tracing utilities by setting a value for frame.f_globals['__name__'] + namespace = dict(_itemgetter=_itemgetter, __name__='namedtuple_%s' % typename, + _property=property, _tuple=tuple) + try: + exec(template, namespace) + except SyntaxError: + e = _sys.exc_info()[1] + raise SyntaxError(e.message + ':\n' + template) + result = namespace[typename] + + # For pickling to work, the __module__ variable needs to be set to the frame + # where the named tuple is created. Bypass this step in enviroments where + # sys._getframe is not defined (Jython for example). + if hasattr(_sys, '_getframe'): + result.__module__ = _sys._getframe(1).f_globals.get('__name__', '__main__') + + return result + + +if _sys.version_info[0] < 3: + def reraise(exc, traceback): + locals_ = {'exc_type': type(exc), 'exc_value': exc, 'traceback': traceback} + exec('raise exc_type, exc_value, traceback', {}, locals_) +else: + def reraise(exc, traceback): + # Tracebacks are embedded in exceptions in Python 3 + raise exc diff --git a/lib/concurrent/futures/process.py b/lib/concurrent/futures/process.py new file mode 100644 index 00000000..fa5b96fd --- /dev/null +++ b/lib/concurrent/futures/process.py @@ -0,0 +1,363 @@ +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. + +"""Implements ProcessPoolExecutor. + +The follow diagram and text describe the data-flow through the system: + +|======================= In-process =====================|== Out-of-process ==| + ++----------+ +----------+ +--------+ +-----------+ +---------+ +| | => | Work Ids | => | | => | Call Q | => | | +| | +----------+ | | +-----------+ | | +| | | ... | | | | ... | | | +| | | 6 | | | | 5, call() | | | +| | | 7 | | | | ... | | | +| Process | | ... | | Local | +-----------+ | Process | +| Pool | +----------+ | Worker | | #1..n | +| Executor | | Thread | | | +| | +----------- + | | +-----------+ | | +| | <=> | Work Items | <=> | | <= | Result Q | <= | | +| | +------------+ | | +-----------+ | | +| | | 6: call() | | | | ... | | | +| | | future | | | | 4, result | | | +| | | ... | | | | 3, except | | | ++----------+ +------------+ +--------+ +-----------+ +---------+ + +Executor.submit() called: +- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict +- adds the id of the _WorkItem to the "Work Ids" queue + +Local worker thread: +- reads work ids from the "Work Ids" queue and looks up the corresponding + WorkItem from the "Work Items" dict: if the work item has been cancelled then + it is simply removed from the dict, otherwise it is repackaged as a + _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" + until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because + calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). +- reads _ResultItems from "Result Q", updates the future stored in the + "Work Items" dict and deletes the dict entry + +Process #1..n: +- reads _CallItems from "Call Q", executes the calls, and puts the resulting + _ResultItems in "Request Q" +""" + +import atexit +from concurrent.futures import _base +import Queue as queue +import multiprocessing +import threading +import weakref +import sys + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' + +# Workers are created as daemon threads and processes. This is done to allow the +# interpreter to exit when there are still idle processes in a +# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However, +# allowing workers to die with the interpreter has two undesirable properties: +# - The workers would still be running during interpretor shutdown, +# meaning that they would fail in unpredictable ways. +# - The workers could be killed while evaluating a work item, which could +# be bad if the callable being evaluated has external side-effects e.g. +# writing to a file. +# +# To work around this problem, an exit handler is installed which tells the +# workers to exit when their work queues are empty and then waits until the +# threads/processes finish. + +_threads_queues = weakref.WeakKeyDictionary() +_shutdown = False + +def _python_exit(): + global _shutdown + _shutdown = True + items = list(_threads_queues.items()) if _threads_queues else () + for t, q in items: + q.put(None) + for t, q in items: + t.join(sys.maxint) + +# Controls how many more calls than processes will be queued in the call queue. +# A smaller number will mean that processes spend more time idle waiting for +# work while a larger number will make Future.cancel() succeed less frequently +# (Futures in the call queue cannot be cancelled). +EXTRA_QUEUED_CALLS = 1 + +class _WorkItem(object): + def __init__(self, future, fn, args, kwargs): + self.future = future + self.fn = fn + self.args = args + self.kwargs = kwargs + +class _ResultItem(object): + def __init__(self, work_id, exception=None, result=None): + self.work_id = work_id + self.exception = exception + self.result = result + +class _CallItem(object): + def __init__(self, work_id, fn, args, kwargs): + self.work_id = work_id + self.fn = fn + self.args = args + self.kwargs = kwargs + +def _process_worker(call_queue, result_queue): + """Evaluates calls from call_queue and places the results in result_queue. + + This worker is run in a separate process. + + Args: + call_queue: A multiprocessing.Queue of _CallItems that will be read and + evaluated by the worker. + result_queue: A multiprocessing.Queue of _ResultItems that will written + to by the worker. + shutdown: A multiprocessing.Event that will be set as a signal to the + worker that it should exit when call_queue is empty. + """ + while True: + call_item = call_queue.get(block=True) + if call_item is None: + # Wake up queue management thread + result_queue.put(None) + return + try: + r = call_item.fn(*call_item.args, **call_item.kwargs) + except: + e = sys.exc_info()[1] + result_queue.put(_ResultItem(call_item.work_id, + exception=e)) + else: + result_queue.put(_ResultItem(call_item.work_id, + result=r)) + +def _add_call_item_to_queue(pending_work_items, + work_ids, + call_queue): + """Fills call_queue with _WorkItems from pending_work_items. + + This function never blocks. + + Args: + pending_work_items: A dict mapping work ids to _WorkItems e.g. + {5: <_WorkItem...>, 6: <_WorkItem...>, ...} + work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids + are consumed and the corresponding _WorkItems from + pending_work_items are transformed into _CallItems and put in + call_queue. + call_queue: A multiprocessing.Queue that will be filled with _CallItems + derived from _WorkItems. + """ + while True: + if call_queue.full(): + return + try: + work_id = work_ids.get(block=False) + except queue.Empty: + return + else: + work_item = pending_work_items[work_id] + + if work_item.future.set_running_or_notify_cancel(): + call_queue.put(_CallItem(work_id, + work_item.fn, + work_item.args, + work_item.kwargs), + block=True) + else: + del pending_work_items[work_id] + continue + +def _queue_management_worker(executor_reference, + processes, + pending_work_items, + work_ids_queue, + call_queue, + result_queue): + """Manages the communication between this process and the worker processes. + + This function is run in a local thread. + + Args: + executor_reference: A weakref.ref to the ProcessPoolExecutor that owns + this thread. Used to determine if the ProcessPoolExecutor has been + garbage collected and that this function can exit. + process: A list of the multiprocessing.Process instances used as + workers. + pending_work_items: A dict mapping work ids to _WorkItems e.g. + {5: <_WorkItem...>, 6: <_WorkItem...>, ...} + work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). + call_queue: A multiprocessing.Queue that will be filled with _CallItems + derived from _WorkItems for processing by the process workers. + result_queue: A multiprocessing.Queue of _ResultItems generated by the + process workers. + """ + nb_shutdown_processes = [0] + def shutdown_one_process(): + """Tell a worker to terminate, which will in turn wake us again""" + call_queue.put(None) + nb_shutdown_processes[0] += 1 + while True: + _add_call_item_to_queue(pending_work_items, + work_ids_queue, + call_queue) + + result_item = result_queue.get(block=True) + if result_item is not None: + work_item = pending_work_items[result_item.work_id] + del pending_work_items[result_item.work_id] + + if result_item.exception: + work_item.future.set_exception(result_item.exception) + else: + work_item.future.set_result(result_item.result) + # Delete references to object. See issue16284 + del work_item + # Check whether we should start shutting down. + executor = executor_reference() + # No more work items can be added if: + # - The interpreter is shutting down OR + # - The executor that owns this worker has been collected OR + # - The executor that owns this worker has been shutdown. + if _shutdown or executor is None or executor._shutdown_thread: + # Since no new work items can be added, it is safe to shutdown + # this thread if there are no pending work items. + if not pending_work_items: + while nb_shutdown_processes[0] < len(processes): + shutdown_one_process() + # If .join() is not called on the created processes then + # some multiprocessing.Queue methods may deadlock on Mac OS + # X. + for p in processes: + p.join() + call_queue.close() + return + del executor + +_system_limits_checked = False +_system_limited = None +def _check_system_limits(): + global _system_limits_checked, _system_limited + if _system_limits_checked: + if _system_limited: + raise NotImplementedError(_system_limited) + _system_limits_checked = True + try: + import os + nsems_max = os.sysconf("SC_SEM_NSEMS_MAX") + except (AttributeError, ValueError): + # sysconf not available or setting not available + return + if nsems_max == -1: + # indetermine limit, assume that limit is determined + # by available memory only + return + if nsems_max >= 256: + # minimum number of semaphores available + # according to POSIX + return + _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max + raise NotImplementedError(_system_limited) + + +class ProcessPoolExecutor(_base.Executor): + def __init__(self, max_workers=None): + """Initializes a new ProcessPoolExecutor instance. + + Args: + max_workers: The maximum number of processes that can be used to + execute the given calls. If None or not given then as many + worker processes will be created as the machine has processors. + """ + _check_system_limits() + + if max_workers is None: + self._max_workers = multiprocessing.cpu_count() + else: + if max_workers <= 0: + raise ValueError("max_workers must be greater than 0") + + self._max_workers = max_workers + + # Make the call queue slightly larger than the number of processes to + # prevent the worker processes from idling. But don't make it too big + # because futures in the call queue cannot be cancelled. + self._call_queue = multiprocessing.Queue(self._max_workers + + EXTRA_QUEUED_CALLS) + self._result_queue = multiprocessing.Queue() + self._work_ids = queue.Queue() + self._queue_management_thread = None + self._processes = set() + + # Shutdown is a two-step process. + self._shutdown_thread = False + self._shutdown_lock = threading.Lock() + self._queue_count = 0 + self._pending_work_items = {} + + def _start_queue_management_thread(self): + # When the executor gets lost, the weakref callback will wake up + # the queue management thread. + def weakref_cb(_, q=self._result_queue): + q.put(None) + if self._queue_management_thread is None: + self._queue_management_thread = threading.Thread( + target=_queue_management_worker, + args=(weakref.ref(self, weakref_cb), + self._processes, + self._pending_work_items, + self._work_ids, + self._call_queue, + self._result_queue)) + self._queue_management_thread.daemon = True + self._queue_management_thread.start() + _threads_queues[self._queue_management_thread] = self._result_queue + + def _adjust_process_count(self): + for _ in range(len(self._processes), self._max_workers): + p = multiprocessing.Process( + target=_process_worker, + args=(self._call_queue, + self._result_queue)) + p.start() + self._processes.add(p) + + def submit(self, fn, *args, **kwargs): + with self._shutdown_lock: + if self._shutdown_thread: + raise RuntimeError('cannot schedule new futures after shutdown') + + f = _base.Future() + w = _WorkItem(f, fn, args, kwargs) + + self._pending_work_items[self._queue_count] = w + self._work_ids.put(self._queue_count) + self._queue_count += 1 + # Wake up queue management thread + self._result_queue.put(None) + + self._start_queue_management_thread() + self._adjust_process_count() + return f + submit.__doc__ = _base.Executor.submit.__doc__ + + def shutdown(self, wait=True): + with self._shutdown_lock: + self._shutdown_thread = True + if self._queue_management_thread: + # Wake up queue management thread + self._result_queue.put(None) + if wait: + self._queue_management_thread.join(sys.maxint) + # To reduce the risk of openning too many files, remove references to + # objects that use file descriptors. + self._queue_management_thread = None + self._call_queue = None + self._result_queue = None + self._processes = None + shutdown.__doc__ = _base.Executor.shutdown.__doc__ + +atexit.register(_python_exit) diff --git a/lib/concurrent/futures/thread.py b/lib/concurrent/futures/thread.py new file mode 100644 index 00000000..b5f832ff --- /dev/null +++ b/lib/concurrent/futures/thread.py @@ -0,0 +1,170 @@ +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. + +"""Implements ThreadPoolExecutor.""" + +import atexit +from concurrent.futures import _base +import itertools +import Queue as queue +import threading +import weakref +import sys + +try: + from multiprocessing import cpu_count +except ImportError: + # some platforms don't have multiprocessing + def cpu_count(): + return None + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' + +# Workers are created as daemon threads. This is done to allow the interpreter +# to exit when there are still idle threads in a ThreadPoolExecutor's thread +# pool (i.e. shutdown() was not called). However, allowing workers to die with +# the interpreter has two undesirable properties: +# - The workers would still be running during interpretor shutdown, +# meaning that they would fail in unpredictable ways. +# - The workers could be killed while evaluating a work item, which could +# be bad if the callable being evaluated has external side-effects e.g. +# writing to a file. +# +# To work around this problem, an exit handler is installed which tells the +# workers to exit when their work queues are empty and then waits until the +# threads finish. + +_threads_queues = weakref.WeakKeyDictionary() +_shutdown = False + +def _python_exit(): + global _shutdown + _shutdown = True + items = list(_threads_queues.items()) if _threads_queues else () + for t, q in items: + q.put(None) + for t, q in items: + t.join(sys.maxint) + +atexit.register(_python_exit) + +class _WorkItem(object): + def __init__(self, future, fn, args, kwargs): + self.future = future + self.fn = fn + self.args = args + self.kwargs = kwargs + + def run(self): + if not self.future.set_running_or_notify_cancel(): + return + + try: + result = self.fn(*self.args, **self.kwargs) + except: + e, tb = sys.exc_info()[1:] + self.future.set_exception_info(e, tb) + else: + self.future.set_result(result) + +def _worker(executor_reference, work_queue): + try: + while True: + work_item = work_queue.get(block=True) + if work_item is not None: + work_item.run() + # Delete references to object. See issue16284 + del work_item + + # attempt to increment idle count + executor = executor_reference() + if executor is not None: + executor._idle_semaphore.release() + del executor + continue + executor = executor_reference() + # Exit if: + # - The interpreter is shutting down OR + # - The executor that owns the worker has been collected OR + # - The executor that owns the worker has been shutdown. + if _shutdown or executor is None or executor._shutdown: + # Notice other workers + work_queue.put(None) + return + del executor + except: + _base.LOGGER.critical('Exception in worker', exc_info=True) + + +class ThreadPoolExecutor(_base.Executor): + + # Used to assign unique thread names when thread_name_prefix is not supplied. + _counter = itertools.count().next + + def __init__(self, max_workers=None, thread_name_prefix=''): + """Initializes a new ThreadPoolExecutor instance. + + Args: + max_workers: The maximum number of threads that can be used to + execute the given calls. + thread_name_prefix: An optional name prefix to give our threads. + """ + if max_workers is None: + # Use this number because ThreadPoolExecutor is often + # used to overlap I/O instead of CPU work. + max_workers = (cpu_count() or 1) * 5 + if max_workers <= 0: + raise ValueError("max_workers must be greater than 0") + + self._max_workers = max_workers + self._work_queue = queue.Queue() + self._idle_semaphore = threading.Semaphore(0) + self._threads = set() + self._shutdown = False + self._shutdown_lock = threading.Lock() + self._thread_name_prefix = (thread_name_prefix or + ("ThreadPoolExecutor-%d" % self._counter())) + + def submit(self, fn, *args, **kwargs): + with self._shutdown_lock: + if self._shutdown: + raise RuntimeError('cannot schedule new futures after shutdown') + + f = _base.Future() + w = _WorkItem(f, fn, args, kwargs) + + self._work_queue.put(w) + self._adjust_thread_count() + return f + submit.__doc__ = _base.Executor.submit.__doc__ + + def _adjust_thread_count(self): + # if idle threads are available, don't spin new threads + if self._idle_semaphore.acquire(False): + return + + # When the executor gets lost, the weakref callback will wake up + # the worker threads. + def weakref_cb(_, q=self._work_queue): + q.put(None) + + num_threads = len(self._threads) + if num_threads < self._max_workers: + thread_name = '%s_%d' % (self._thread_name_prefix or self, + num_threads) + t = threading.Thread(name=thread_name, target=_worker, + args=(weakref.ref(self, weakref_cb), + self._work_queue)) + t.daemon = True + t.start() + self._threads.add(t) + _threads_queues[t] = self._work_queue + + def shutdown(self, wait=True): + with self._shutdown_lock: + self._shutdown = True + self._work_queue.put(None) + if wait: + for t in self._threads: + t.join(sys.maxint) + shutdown.__doc__ = _base.Executor.shutdown.__doc__ diff --git a/platformcode/platformtools.py b/platformcode/platformtools.py index fdf97ce8..567c9dbc 100644 --- a/platformcode/platformtools.py +++ b/platformcode/platformtools.py @@ -558,23 +558,27 @@ def set_context_commands(item, parent_item): # Buscar en otros canales if item.contentType in ['movie', 'tvshow'] and item.channel != 'search': + + # Buscar en otros canales if item.contentSerieName != '': item.wanted = item.contentSerieName else: item.wanted = item.contentTitle - context_commands.append((config.get_localized_string(60350), - "XBMC.Container.Update (%s?%s)" % (sys.argv[0], - item.clone(channel='search', - action="do_search", - from_channel=item.channel, - contextual=True).tourl()))) + if item.contentType == 'tvshow': mediatype = 'tv' else: mediatype = item.contentType + context_commands.append((config.get_localized_string(60350), + "XBMC.Container.Update (%s?%s)" % (sys.argv[0], + item.clone(channel='search', + action="from_context", + from_channel=item.channel, + contextual=True, + text=item.wanted).tourl()))) context_commands.append(("[B]%s[/B]" % config.get_localized_string(70561), "XBMC.Container.Update (%s?%s)" % ( - sys.argv[0], item.clone(channel='search', action='discover_list', search_type='list', page='1', + sys.argv[0], item.clone(channel='search', action='from_context', search_type='list', page='1', list_type='%s/%s/similar' % (mediatype,item.infoLabels['tmdb_id'])).tourl()))) # Definir como Pagina de inicio diff --git a/resources/media/themes/default/thumb_search_generic.png b/resources/media/themes/default/thumb_search_generic.png new file mode 100644 index 00000000..6cc4ee98 Binary files /dev/null and b/resources/media/themes/default/thumb_search_generic.png differ diff --git a/resources/media/themes/default/thumb_search_more.png b/resources/media/themes/default/thumb_search_more.png new file mode 100644 index 00000000..6ab8b575 Binary files /dev/null and b/resources/media/themes/default/thumb_search_more.png differ diff --git a/resources/media/themes/default/thumb_search_tvshow.png b/resources/media/themes/default/thumb_search_tvshow.png new file mode 100644 index 00000000..062cb435 Binary files /dev/null and b/resources/media/themes/default/thumb_search_tvshow.png differ diff --git a/resources/media/themes/default/thumb_years.png b/resources/media/themes/default/thumb_years.png new file mode 100644 index 00000000..750a8055 Binary files /dev/null and b/resources/media/themes/default/thumb_years.png differ diff --git a/specials/search.json b/specials/search.json index fc48e3c8..e752457d 100644 --- a/specials/search.json +++ b/specials/search.json @@ -1,44 +1,9 @@ { - "id": "search", - "name": "@60672", - "active": false, - "adult": false, - "language": ["ita"], - "categories": ["movie"], - "settings": [ - { - "id": "multithread", - "type": "bool", - "label": "@60673", - "default": true, - "enabled": true, - "visible": true - }, - { - "id": "result_mode", - "type": "list", - "label": "@60674", - "default": 0, - "enabled": true, - "visible": true, - "lvalues": ["@60675","@60676"] - }, - { - "id": "saved_searches_limit", - "type": "list", - "label": "@60677", - "default": 0, - "enabled": true, - "visible": true, - "lvalues": ["10","20","30","40"] - }, - { - "id": "last_search", - "type": "bool", - "label": "@60678", - "default": true, - "enabled": true, - "visible": true - } - ] + "id": "search", + "name": "search", + "active": false, + "adult": false, + "thumbnail": "", + "banner": "", + "categories": [] } \ No newline at end of file