migliorie test, possibile fix adesso in onda
This commit is contained in:
@@ -6,6 +6,8 @@
|
||||
|
||||
from __future__ import division
|
||||
# from builtins import str
|
||||
import io
|
||||
|
||||
from future.builtins import range
|
||||
from past.utils import old_div
|
||||
import sys
|
||||
@@ -43,6 +45,11 @@ if os.name == "nt":
|
||||
else:
|
||||
fs_encoding = "utf8"
|
||||
|
||||
# per android è necessario, su kodi 18, usare FileIO
|
||||
# https://forum.kodi.tv/showthread.php?tid=330124
|
||||
# per xbox invece, è necessario usare open perchè _io è rotto :(
|
||||
# https://github.com/jellyfin/jellyfin-kodi/issues/115#issuecomment-538811017
|
||||
fileIo = platformtools.xbmc.getCondVisibility('system.platform.linux') and platformtools.xbmc.getCondVisibility('system.platform.android')
|
||||
|
||||
|
||||
def validate_path(path):
|
||||
@@ -142,19 +149,20 @@ def read(path, linea_inicio=0, total_lineas=None, whence=0, silent=False, vfs=Tr
|
||||
total_lineas = None
|
||||
if xbmc_vfs and vfs:
|
||||
if not exists(path): return False
|
||||
f = xbmcvfs.File(path, "rb")
|
||||
f = xbmcvfs.File(path, "r")
|
||||
data = f.read()
|
||||
|
||||
if total_lineas == None:
|
||||
total_lineas = 9999999999
|
||||
if linea_inicio > 0:
|
||||
if not isinstance(whence, int):
|
||||
try:
|
||||
whence = int(whence)
|
||||
except:
|
||||
return False
|
||||
f.seek(linea_inicio, whence)
|
||||
logger.debug('POSITION of beginning of reading,, tell(): %s' % f.seek(0, 1))
|
||||
if total_lineas == None:
|
||||
total_lineas = 0
|
||||
data = f.read(total_lineas)
|
||||
return "".join(data)
|
||||
data = '\n'.join(data.split('\n')[linea_inicio:total_lineas])
|
||||
|
||||
return data
|
||||
elif path.lower().startswith("smb://"):
|
||||
f = samba.smb_open(path, "rb")
|
||||
else:
|
||||
@@ -179,7 +187,7 @@ def read(path, linea_inicio=0, total_lineas=None, whence=0, silent=False, vfs=Tr
|
||||
return unicode(b"".join(data))
|
||||
|
||||
|
||||
def write(path, data, mode="wb", silent=False, vfs=True):
|
||||
def write(path, data, mode="w", silent=False, vfs=True):
|
||||
"""
|
||||
Save the data to a file
|
||||
@param path: file path to save
|
||||
@@ -233,7 +241,10 @@ def file_open(path, mode="r", silent=False, vfs=True):
|
||||
elif path.lower().startswith("smb://"):
|
||||
return samba.smb_open(path, mode)
|
||||
else:
|
||||
return open(path, mode)
|
||||
if fileIo:
|
||||
return io.FileIO(path, mode)
|
||||
else:
|
||||
return io.open(path, mode)
|
||||
except:
|
||||
logger.error("ERROR when opening file: %s, %s" % (path, mode))
|
||||
if not silent:
|
||||
|
||||
@@ -326,7 +326,7 @@ def updateFromZip(message=config.get_localized_string(80050)):
|
||||
hash = fixZipGetHash(localfilename)
|
||||
logger.info(hash)
|
||||
|
||||
with zipfile.ZipFile(fOpen(localfilename, 'rb')) as zip:
|
||||
with zipfile.ZipFile(filetools.file_open(localfilename, 'rb', vfs=False)) as zip:
|
||||
size = sum([zinfo.file_size for zinfo in zip.filelist])
|
||||
cur_size = 0
|
||||
for member in zip.infolist():
|
||||
@@ -425,7 +425,7 @@ def rename(dir1, dir2):
|
||||
# https://stackoverflow.com/questions/3083235/unzipping-file-results-in-badzipfile-file-is-not-a-zip-file
|
||||
def fixZipGetHash(zipFile):
|
||||
hash = ''
|
||||
with fOpen(zipFile, 'r+b') as f:
|
||||
with filetools.file_open(zipFile, 'r+b', vfs=False) as f:
|
||||
data = f.read()
|
||||
pos = data.find(b'\x50\x4b\x05\x06') # End of central directory signature
|
||||
if pos > 0:
|
||||
@@ -438,17 +438,6 @@ def fixZipGetHash(zipFile):
|
||||
|
||||
return str(hash)
|
||||
|
||||
def fOpen(file, mode = 'r'):
|
||||
# per android è necessario, su kodi 18, usare FileIO
|
||||
# https://forum.kodi.tv/showthread.php?tid=330124
|
||||
# per xbox invece, è necessario usare open perchè _io è rotto :(
|
||||
# https://github.com/jellyfin/jellyfin-kodi/issues/115#issuecomment-538811017
|
||||
if xbmc.getCondVisibility('system.platform.linux') and xbmc.getCondVisibility('system.platform.android'):
|
||||
logger.info('android, uso FileIO per leggere')
|
||||
return io.FileIO(file, mode)
|
||||
else:
|
||||
return io.open(file, mode)
|
||||
|
||||
|
||||
def _pbhook(numblocks, blocksize, filesize, url, dp):
|
||||
try:
|
||||
|
||||
@@ -6,10 +6,10 @@ from platformcode import logger, config
|
||||
|
||||
def test_video_exists(page_url):
|
||||
logger.info("(page_url='%s')" % page_url)
|
||||
data = httptools.downloadpage(page_url)
|
||||
page = httptools.downloadpage(page_url)
|
||||
global data
|
||||
data = data.data
|
||||
if data.code == 404:
|
||||
data = page.data
|
||||
if page.code == 404:
|
||||
return False, config.get_localized_string(70449)
|
||||
return True, ""
|
||||
|
||||
|
||||
@@ -37,13 +37,13 @@ def download(item=None):
|
||||
|
||||
def extract():
|
||||
import zipfile
|
||||
from platformcode.updater import fixZipGetHash, fOpen
|
||||
from platformcode.updater import fixZipGetHash
|
||||
support.log('Estraggo Elementum in:', elementum_path)
|
||||
try:
|
||||
hash = fixZipGetHash(filename)
|
||||
support.log(hash)
|
||||
|
||||
with zipfile.ZipFile(fOpen(filename, 'rb')) as zip_ref:
|
||||
with zipfile.ZipFile(filetools.file_open(filename, 'rb', vfs=False)) as zip_ref:
|
||||
zip_ref.extractall(xbmc.translatePath(addon_path))
|
||||
|
||||
except Exception as e:
|
||||
|
||||
@@ -6,7 +6,6 @@ from datetime import datetime
|
||||
import glob, time, gzip, xbmc
|
||||
from core import filetools, downloadtools, support, scrapertools
|
||||
from core.item import Item
|
||||
from platformcode import logger
|
||||
|
||||
host = "http://epg-guide.com/kltv.gz"
|
||||
blacklisted_genres = ['attualita', 'scienza', 'religione', 'cucina', 'notiziario', 'altro', 'soap opera', 'viaggi', 'economia', 'tecnologia', 'magazine', 'show', 'reality show', 'lifestyle', 'societa', 'wrestling', 'azione', 'Musica', 'real life', 'real adventure', 'dplay original', 'natura', 'news', 'food', 'sport', 'moda', 'arte e cultura', 'crime', 'box set e serie tv', 'casa', 'storia', 'talk show', 'motori', 'attualit\xc3\xa0 e inchiesta', 'documentari', 'musica', 'spettacolo', 'medical', 'talent show', 'sex and love', 'beauty and style', 'news/current affairs', "children's/youth programmes", 'leisure hobbies', 'social/political issues/economics', 'education/science/factual topics', 'undefined content', 'show/game show', 'music/ballet/dance', 'sports', 'arts/culture', 'biografico', 'informazione', 'documentario']
|
||||
@@ -28,7 +27,7 @@ def getEpg():
|
||||
fileName = support.config.get_temp_file('guidatv-') + now.strftime('%Y %m %d')
|
||||
archiveName = fileName + '.gz'
|
||||
xmlName = fileName + '.xml'
|
||||
if not filetools.exists(archiveName):
|
||||
if not filetools.exists(xmlName):
|
||||
support.log('downloading epg')
|
||||
# cancello quelli vecchi
|
||||
for f in glob.glob(support.config.get_temp_file('guidatv-') + '*'):
|
||||
@@ -36,8 +35,8 @@ def getEpg():
|
||||
# inmemory = io.BytesIO(httptools.downloadpage(host).data)
|
||||
downloadtools.downloadfile(host, archiveName)
|
||||
support.log('opening gzip and writing xml')
|
||||
fStream = gzip.GzipFile(archiveName, mode='rb')
|
||||
guide = fStream.read().replace('\n', ' ').replace('><', '>\n<')
|
||||
with gzip.GzipFile(fileobj=filetools.file_open(archiveName, mode='rb', vfs=False)) as f:
|
||||
guide = f.read().replace('\n', ' ').replace('><', '>\n<')
|
||||
with open(xmlName, 'w') as f:
|
||||
f.write(guide)
|
||||
# else:
|
||||
|
||||
85
tests.py
85
tests.py
@@ -4,14 +4,16 @@ import sys
|
||||
import unittest
|
||||
import parameterized
|
||||
|
||||
from lib import requests
|
||||
from platformcode import config
|
||||
|
||||
config.set_setting('tmdb_active', False)
|
||||
|
||||
librerias = os.path.join(config.get_runtime_path(), 'lib')
|
||||
sys.path.insert(0, librerias)
|
||||
from core.support import typo
|
||||
from core.item import Item
|
||||
import channelselector
|
||||
from core import servertools
|
||||
import re
|
||||
|
||||
validUrlRegex = re.compile(
|
||||
@@ -104,7 +106,7 @@ chNumRis = {
|
||||
|
||||
|
||||
def getChannels():
|
||||
channel_list = channelselector.filterchannels("all")
|
||||
channel_list = channelselector.filterchannels("all")[0:2]
|
||||
ret = []
|
||||
for chItem in channel_list:
|
||||
ch = chItem.channel
|
||||
@@ -117,18 +119,13 @@ from specials import news
|
||||
|
||||
dictNewsChannels, any_active = news.get_channels_list()
|
||||
|
||||
srvLinkDict = {
|
||||
"wstream": ["https://wstream.video/video6zvimpy52/dvvwxyfs32ab"],
|
||||
"akvideo": ["https://akvideo.stream/video.php?file_code=23god95lrtqv"]
|
||||
}
|
||||
servers_found = []
|
||||
|
||||
|
||||
def getServers():
|
||||
server_list = servertools.get_servers_list()
|
||||
ret = []
|
||||
for srv in server_list:
|
||||
if srv in srvLinkDict:
|
||||
ret.append({'srv': srv})
|
||||
for srv in servers_found:
|
||||
ret.append({'item': srv})
|
||||
return ret
|
||||
|
||||
|
||||
@@ -144,24 +141,24 @@ class GenericChannelTest(unittest.TestCase):
|
||||
self.assertTrue(mainlist, 'channel ' + self.ch + ' has no menu')
|
||||
|
||||
for it in mainlist:
|
||||
it.title = it.title.decode('ascii', 'ignore')
|
||||
# it.title = it.title.decode('ascii', 'ignore')
|
||||
if it.action == 'channel_config':
|
||||
hasChannelConfig = True
|
||||
continue
|
||||
if it.action == 'search': # channel specific
|
||||
if it.action == 'search': # channel-specific
|
||||
continue
|
||||
itemlist = getattr(self.module, it.action)(it)
|
||||
self.assertTrue(itemlist, 'channel ' + self.ch + ' -> ' + it.title + ' is empty')
|
||||
if self.ch in chNumRis: # so a priori quanti risultati dovrebbe dare
|
||||
if self.ch in chNumRis: # i know how much results should be
|
||||
for content in chNumRis[self.ch]:
|
||||
if content in it.title:
|
||||
risNum = len([it for it in itemlist if not it.nextPage]) # not count nextpage
|
||||
risNum = len([i for i in itemlist if not i.nextPage]) # not count nextpage
|
||||
self.assertEqual(chNumRis[self.ch][content], risNum,
|
||||
'channel ' + self.ch + ' -> ' + it.title + ' returned wrong number of results')
|
||||
break
|
||||
|
||||
for resIt in itemlist:
|
||||
self.assertLess(len(resIt.fulltitle), 100,
|
||||
self.assertLess(len(resIt.fulltitle), 110,
|
||||
'channel ' + self.ch + ' -> ' + it.title + ' might contain wrong titles\n' + resIt.fulltitle)
|
||||
if resIt.url:
|
||||
self.assertIsNotNone(re.match(validUrlRegex, resIt.url),
|
||||
@@ -176,6 +173,38 @@ class GenericChannelTest(unittest.TestCase):
|
||||
nextPageItemlist = getattr(self.module, resIt.action)(resIt)
|
||||
self.assertTrue(nextPageItemlist,
|
||||
'channel ' + self.ch + ' -> ' + it.title + ' has nextpage not working')
|
||||
|
||||
# some sites might have no link inside, but if all results are without servers, there's something wrong
|
||||
servers = []
|
||||
for resIt in itemlist:
|
||||
servers = getattr(self.module, resIt.action)(resIt)
|
||||
if servers:
|
||||
break
|
||||
self.assertTrue(servers, 'channel ' + self.ch + ' -> ' + it.title + ' has no servers on all results')
|
||||
for server in servers:
|
||||
srv = server.server
|
||||
module = __import__('servers.%s' % srv, fromlist=["servers.%s" % srv])
|
||||
page_url = server.url
|
||||
print 'testing ' + page_url
|
||||
if module.test_video_exists(page_url)[0]:
|
||||
urls = module.get_video_url(page_url)
|
||||
print urls
|
||||
for u in urls:
|
||||
spl = u[1].split('|')
|
||||
if len(spl) == 2:
|
||||
directUrl, headersUrl = spl
|
||||
else:
|
||||
directUrl, headersUrl = spl[0], ''
|
||||
headers = {}
|
||||
if headersUrl:
|
||||
for name in headersUrl.split('&'):
|
||||
h, v = name.split('=')
|
||||
headers[h] = v
|
||||
print headers
|
||||
contentType = requests.head(directUrl, headers=headers, timeout=15).headers['Content-Type']
|
||||
self.assert_(contentType.startswith('video') or 'mpegurl' in contentType,
|
||||
srv + ' scraper did not return valid url for link ' + page_url)
|
||||
|
||||
self.assertTrue(hasChannelConfig, 'channel ' + self.ch + ' has no channel config')
|
||||
|
||||
def test_newest(self):
|
||||
@@ -186,31 +215,5 @@ class GenericChannelTest(unittest.TestCase):
|
||||
self.assertTrue(itemlist, 'channel ' + self.ch + ' returned no news for category ' + cat)
|
||||
break
|
||||
|
||||
|
||||
#
|
||||
# @parameterized.parameterized_class(getServers())
|
||||
# class GenericServerTest(unittest.TestCase):
|
||||
# def __init__(self, *args):
|
||||
# self.module = __import__('servers.%s' % self.srv, fromlist=["servers.%s" % self.srv])
|
||||
# super(GenericServerTest, self).__init__(*args)
|
||||
#
|
||||
# def test_resolve(self):
|
||||
# for link in srvLinkDict[self.srv]:
|
||||
# find = servertools.findvideosbyserver(link, self.srv)
|
||||
# self.assertTrue(find, 'link ' + link + ' not recognised')
|
||||
# page_url = find[0][1]
|
||||
# if self.module.test_video_exists(page_url)[0]:
|
||||
# urls = self.module.get_video_url(page_url)
|
||||
# print urls
|
||||
# for u in urls:
|
||||
# directUrl, headersUrl = u[1].split('|')
|
||||
# headers = {}
|
||||
# for name in headersUrl.split('&'):
|
||||
# h, v = name.split('=')
|
||||
# headers[h] = v
|
||||
# print headers
|
||||
# self.assertEqual(requests.head(directUrl, headers=headers, timeout=15).status_code, 200, self.srv + ' scraper did not return valid url for link ' + link)
|
||||
|
||||
if __name__ == '__main__':
|
||||
config.set_setting('tmdb_active', False)
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user