845 lines
26 KiB
Python
845 lines
26 KiB
Python
# -*- coding: utf-8 -*-
|
|
# ------------------------------------------------------------
|
|
# filetools
|
|
# File management with xbmcvfs / samba / local discrimination
|
|
# ------------------------------------------------------------
|
|
|
|
from __future__ import division
|
|
# from builtins import str
|
|
import io
|
|
|
|
from past.utils import old_div
|
|
import sys
|
|
PY3 = False
|
|
if sys.version_info[0] >= 3: PY3 = True; unicode = str; unichr = chr; long = int
|
|
|
|
import os
|
|
import traceback
|
|
|
|
from core import scrapertools
|
|
from platformcode import platformtools, logger
|
|
|
|
xbmc_vfs = True # False to disable XbmcVFS, True to enable
|
|
if xbmc_vfs:
|
|
try:
|
|
import xbmcvfs
|
|
if not PY3:
|
|
reload(sys) # Workoround. Review on migration to Python 3
|
|
sys.setdefaultencoding('utf-8') # xbmcvfs demeans the value of defaultencoding. It is reestablished
|
|
xbmc_vfs = True
|
|
except:
|
|
xbmc_vfs = False
|
|
|
|
samba = None
|
|
if not xbmc_vfs:
|
|
try:
|
|
from lib.sambatools import libsmb as samba
|
|
except:
|
|
samba = None
|
|
# Python 2.4 Not compatible with samba module, you have to check
|
|
# Windows is "mbcs" linux, osx, android is "utf8"
|
|
if os.name == "nt":
|
|
fs_encoding = ""
|
|
else:
|
|
fs_encoding = "utf8"
|
|
|
|
# per android è necessario, su kodi 18, usare FileIO
|
|
# https://forum.kodi.tv/showthread.php?tid=330124
|
|
# per xbox invece, è necessario usare open perchè _io è rotto :(
|
|
# https://github.com/jellyfin/jellyfin-kodi/issues/115#issuecomment-538811017
|
|
fileIo = platformtools.xbmc.getCondVisibility('system.platform.linux') and platformtools.xbmc.getCondVisibility('system.platform.android')
|
|
|
|
|
|
def validate_path(path):
|
|
"""
|
|
Eliminate illegal characters
|
|
@param path: string to validate
|
|
@type path: str
|
|
@rtype: str
|
|
@return: returns the string without the characters not allowed
|
|
"""
|
|
chars = ":*?<>|"
|
|
if scrapertools.find_single_match(path, r'(^\w+:\/\/)'):
|
|
protocolo = scrapertools.find_single_match(path, r'(^\w+:\/\/)')
|
|
import re
|
|
parts = re.split(r'^\w+:\/\/(.+?)/(.+)', path)[1:3]
|
|
return protocolo + parts[0] + "/" + ''.join([c for c in parts[1] if c not in chars])
|
|
|
|
else:
|
|
if path.find(":\\") == 1:
|
|
unidad = path[0:3]
|
|
path = path[2:]
|
|
else:
|
|
unidad = ""
|
|
|
|
return unidad + ''.join([c for c in path if c not in chars])
|
|
|
|
|
|
def encode(path, _samba=False):
|
|
"""
|
|
It encodes a path according to the operating system we are using.
|
|
The path argument has to be encoded in utf-8
|
|
@type unicode or str path with utf-8 encoding
|
|
@param path parameter to encode
|
|
@type _samba bool
|
|
@para _samba if the path is samba or not
|
|
@rtype: str
|
|
@return path encoded in system character set or utf-8 if samba
|
|
"""
|
|
if not isinstance(path, unicode):
|
|
path = unicode(path, "utf-8", "ignore")
|
|
|
|
if scrapertools.find_single_match(path, r'(^\w+:\/\/)') or _samba:
|
|
path = path.encode("utf-8", "ignore")
|
|
else:
|
|
if fs_encoding and not PY3:
|
|
path = path.encode(fs_encoding, "ignore")
|
|
|
|
return path
|
|
|
|
|
|
def decode(path):
|
|
"""
|
|
Converts a text string to the utf-8 character set
|
|
removing characters that are not allowed in utf-8
|
|
@type: str, unicode, list of str o unicode
|
|
@param path:can be a path or a list () with multiple paths
|
|
@rtype: str
|
|
@return: ruta encoded in UTF-8
|
|
"""
|
|
if isinstance(path, list):
|
|
for x in range(len(path)):
|
|
if not isinstance(path[x], unicode):
|
|
path[x] = path[x].decode(fs_encoding, "ignore")
|
|
path[x] = path[x].encode("utf-8", "ignore")
|
|
else:
|
|
if not isinstance(path, unicode):
|
|
path = path.decode(fs_encoding, "ignore")
|
|
path = path.encode("utf-8", "ignore")
|
|
return path
|
|
|
|
|
|
def read(path, linea_inicio=0, total_lineas=None, whence=0, silent=False, vfs=True):
|
|
"""
|
|
Read the contents of a file and return the data
|
|
@param path: file path
|
|
@type path: str
|
|
@param linea_inicio: first line to read from the file
|
|
@type linea_inicio: positive int
|
|
@param total_lineas: maximum number of lines to read. If it is None or greater than the total lines, the file will be read until the end.
|
|
@type total_lineas: positive int
|
|
@rtype: str
|
|
@return: data contained in the file
|
|
"""
|
|
path = encode(path)
|
|
try:
|
|
if not isinstance(linea_inicio, int):
|
|
try:
|
|
linea_inicio = int(linea_inicio)
|
|
except:
|
|
logger.error('Read: Start_line ERROR: %s' % str(linea_inicio))
|
|
linea_inicio = 0
|
|
if total_lineas != None and not isinstance(total_lineas, int):
|
|
try:
|
|
total_lineas = int(total_lineas)
|
|
except:
|
|
logger.error('Read: ERROR of total_lineas: %s' % str(total_lineas))
|
|
total_lineas = None
|
|
if xbmc_vfs and vfs:
|
|
if not exists(path): return False
|
|
f = xbmcvfs.File(path, "r")
|
|
data = f.read()
|
|
|
|
if total_lineas == None:
|
|
total_lineas = 9999999999
|
|
if linea_inicio > 0:
|
|
if not isinstance(whence, int):
|
|
try:
|
|
whence = int(whence)
|
|
except:
|
|
return False
|
|
data = '\n'.join(data.split('\n')[linea_inicio:total_lineas])
|
|
|
|
return data
|
|
elif path.lower().startswith("smb://"):
|
|
f = samba.smb_open(path, "rb")
|
|
else:
|
|
f = open(path, "rb")
|
|
|
|
data = []
|
|
for x, line in enumerate(f):
|
|
if x < linea_inicio: continue
|
|
if len(data) == total_lineas: break
|
|
data.append(line)
|
|
f.close()
|
|
except:
|
|
if not silent:
|
|
logger.error("ERROR reading file: %s" % path)
|
|
logger.error(traceback.format_exc())
|
|
return False
|
|
|
|
else:
|
|
if not PY3:
|
|
return unicode("".join(data))
|
|
else:
|
|
return unicode(b"".join(data))
|
|
|
|
|
|
def write(path, data, mode="w", silent=False, vfs=True):
|
|
"""
|
|
Save the data to a file
|
|
@param path: file path to save
|
|
@type path: str
|
|
@param data: data to save
|
|
@type data: str
|
|
@rtype: bool
|
|
@return: returns True if it was written correctly or False if it gave an error
|
|
"""
|
|
path = encode(path)
|
|
try:
|
|
if xbmc_vfs and vfs:
|
|
f = xbmcvfs.File(path, mode)
|
|
result = f.write(data)
|
|
f.close()
|
|
return bool(result)
|
|
elif path.lower().startswith("smb://"):
|
|
f = samba.smb_open(path, mode)
|
|
else:
|
|
f = open(path, mode)
|
|
|
|
f.write(data)
|
|
f.close()
|
|
except:
|
|
logger.error("ERROR saving file: %s" % path)
|
|
if not silent:
|
|
logger.error(traceback.format_exc())
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
|
|
def file_open(path, mode="r", silent=False, vfs=True):
|
|
"""
|
|
Open a file
|
|
@param path: path
|
|
@type path: str
|
|
@rtype: str
|
|
@return: file object
|
|
"""
|
|
path = encode(path)
|
|
try:
|
|
if xbmc_vfs and vfs:
|
|
if 'r' in mode and '+' in mode:
|
|
mode = mode.replace('r', 'w').replace('+', '')
|
|
logger.debug('Open MODE changed to: %s' % mode)
|
|
if 'a' in mode:
|
|
mode = mode.replace('a', 'w').replace('+', '')
|
|
logger.debug('Open MODE changed to: %s' % mode)
|
|
return xbmcvfs.File(path, mode)
|
|
elif path.lower().startswith("smb://"):
|
|
return samba.smb_open(path, mode)
|
|
else:
|
|
if fileIo:
|
|
return io.FileIO(path, mode)
|
|
else:
|
|
# return io.open(path, mode, decode='utf-8')
|
|
return open(path, mode)
|
|
except:
|
|
logger.error("ERROR when opening file: %s, %s" % (path, mode))
|
|
if not silent:
|
|
logger.error(traceback.format_exc())
|
|
platformtools.dialog_notification("Error Opening", path)
|
|
return False
|
|
|
|
|
|
def file_stat(path, silent=False, vfs=True):
|
|
"""
|
|
Stat of a file
|
|
@param path: path
|
|
@type path: str
|
|
@rtype: str
|
|
@return: file object
|
|
"""
|
|
path = encode(path)
|
|
try:
|
|
if xbmc_vfs and vfs:
|
|
if not exists(path): return False
|
|
return xbmcvfs.Stat(path)
|
|
raise
|
|
except:
|
|
logger.error("File_Stat not supported: %s" % path)
|
|
if not silent:
|
|
logger.error(traceback.format_exc())
|
|
return False
|
|
|
|
|
|
def rename(path, new_name, silent=False, strict=False, vfs=True):
|
|
"""
|
|
Rename a file or folder
|
|
@param path: path of the file or folder to rename
|
|
@type path: str
|
|
@param new_name: new name
|
|
@type new_name: str
|
|
@rtype: bool
|
|
@return: returns False on error
|
|
"""
|
|
path = encode(path)
|
|
try:
|
|
if xbmc_vfs and vfs:
|
|
path_end = path
|
|
if path_end.endswith('/') or path_end.endswith('\\'):
|
|
path_end = path_end[:-1]
|
|
dest = encode(join(dirname(path_end), new_name))
|
|
result = xbmcvfs.rename(path, dest)
|
|
if not result and not strict:
|
|
logger.error("ERROR RENAME file: %s. Copying and deleting" % path)
|
|
if not silent:
|
|
dialogo = platformtools.dialog_progress("Copying file", "")
|
|
result = xbmcvfs.copy(path, dest)
|
|
if not result:
|
|
return False
|
|
xbmcvfs.delete(path)
|
|
return bool(result)
|
|
elif path.lower().startswith("smb://"):
|
|
new_name = encode(new_name, True)
|
|
samba.rename(path, join(dirname(path), new_name))
|
|
else:
|
|
new_name = encode(new_name, False)
|
|
os.rename(path, os.path.join(os.path.dirname(path), new_name))
|
|
except:
|
|
logger.error("ERROR when renaming the file: %s" % path)
|
|
if not silent:
|
|
logger.error(traceback.format_exc())
|
|
platformtools.dialog_notification("Error renaming", path)
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
|
|
def move(path, dest, silent=False, strict=False, vfs=True):
|
|
"""
|
|
Move a file
|
|
@param path: path of the file to move
|
|
@type path: str
|
|
@param dest: path where to move
|
|
@type dest: str
|
|
@rtype: bool
|
|
@return: returns False on error
|
|
"""
|
|
try:
|
|
if xbmc_vfs and vfs:
|
|
if not exists(path): return False
|
|
path = encode(path)
|
|
dest = encode(dest)
|
|
result = xbmcvfs.rename(path, dest)
|
|
if not result and not strict:
|
|
logger.error("ERROR when MOVING the file: %s. Copying and deleting" % path)
|
|
if not silent:
|
|
dialogo = platformtools.dialog_progress("Copying file", "")
|
|
result = xbmcvfs.copy(path, dest)
|
|
if not result:
|
|
return False
|
|
xbmcvfs.delete(path)
|
|
return bool(result)
|
|
# samba/samba
|
|
elif path.lower().startswith("smb://") and dest.lower().startswith("smb://"):
|
|
dest = encode(dest, True)
|
|
path = encode(path, True)
|
|
samba.rename(path, dest)
|
|
|
|
# local/local
|
|
elif not path.lower().startswith("smb://") and not dest.lower().startswith("smb://"):
|
|
dest = encode(dest)
|
|
path = encode(path)
|
|
os.rename(path, dest)
|
|
# mixed In this case the file is copied and then the source file is deleted
|
|
else:
|
|
if not silent:
|
|
dialogo = platformtools.dialog_progress("Copying file", "")
|
|
return copy(path, dest) == True and remove(path) == True
|
|
except:
|
|
logger.error("ERROR when moving file: %s to %s" % (path, dest))
|
|
if not silent:
|
|
logger.error(traceback.format_exc())
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
|
|
def copy(path, dest, silent=False, vfs=True):
|
|
"""
|
|
Copy a file
|
|
@param path: path of the file to copy
|
|
@type path: str
|
|
@param dest: path to copy
|
|
@type dest: str
|
|
@param silent: the dialog box is displayed or not
|
|
@type silent: bool
|
|
@rtype: bool
|
|
@return: returns False on error
|
|
"""
|
|
try:
|
|
if xbmc_vfs and vfs:
|
|
path = encode(path)
|
|
dest = encode(dest)
|
|
if not silent:
|
|
dialogo = platformtools.dialog_progress("Copying file", "")
|
|
return bool(xbmcvfs.copy(path, dest))
|
|
|
|
fo = file_open(path, "rb")
|
|
fd = file_open(dest, "wb")
|
|
if fo and fd:
|
|
if not silent:
|
|
dialogo = platformtools.dialog_progress("Copying file", "")
|
|
size = getsize(path)
|
|
copiado = 0
|
|
while True:
|
|
if not silent:
|
|
dialogo.update(old_div(copiado * 100, size), basename(path))
|
|
buf = fo.read(1024 * 1024)
|
|
if not buf:
|
|
break
|
|
if not silent and dialogo.iscanceled():
|
|
dialogo.close()
|
|
return False
|
|
fd.write(buf)
|
|
copiado += len(buf)
|
|
if not silent:
|
|
dialogo.close()
|
|
except:
|
|
logger.error("ERROR when copying the file: %s" % path)
|
|
if not silent:
|
|
logger.error(traceback.format_exc())
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
|
|
def exists(path, silent=False, vfs=True):
|
|
"""
|
|
Check if there is a folder or file
|
|
@param path: path
|
|
@type path: str
|
|
@rtype: bool
|
|
@return: Returns True if the path exists, whether it is a folder or a file
|
|
"""
|
|
path = encode(path)
|
|
try:
|
|
if xbmc_vfs and vfs:
|
|
result = bool(xbmcvfs.exists(path))
|
|
if not result and not path.endswith('/') and not path.endswith('\\'):
|
|
result = bool(xbmcvfs.exists(join(path, ' ').rstrip()))
|
|
return result
|
|
elif path.lower().startswith("smb://"):
|
|
return samba.exists(path)
|
|
else:
|
|
return os.path.exists(path)
|
|
except:
|
|
logger.error("ERROR when checking the path: %s" % path)
|
|
if not silent:
|
|
logger.error(traceback.format_exc())
|
|
return False
|
|
|
|
|
|
def isfile(path, silent=False, vfs=True):
|
|
"""
|
|
Check if the path is a file
|
|
@param path: path
|
|
@type path: str
|
|
@rtype: bool
|
|
@return: Returns True if the path exists and is a file
|
|
"""
|
|
path = encode(path)
|
|
try:
|
|
if xbmc_vfs and vfs:
|
|
if not scrapertools.find_single_match(path, r'(^\w+:\/\/)'):
|
|
return os.path.isfile(path)
|
|
if path.endswith('/') or path.endswith('\\'):
|
|
path = path[:-1]
|
|
dirs, files = xbmcvfs.listdir(dirname(path))
|
|
base_name = basename(path)
|
|
for file in files:
|
|
if base_name == file:
|
|
return True
|
|
return False
|
|
elif path.lower().startswith("smb://"):
|
|
return samba.isfile(path)
|
|
else:
|
|
return os.path.isfile(path)
|
|
except:
|
|
logger.error("ERROR when checking file: %s" % path)
|
|
if not silent:
|
|
logger.error(traceback.format_exc())
|
|
return False
|
|
|
|
|
|
def isdir(path, silent=False, vfs=True):
|
|
"""
|
|
Check if the path is a directory
|
|
@param path: path
|
|
@type path: str
|
|
@rtype: bool
|
|
@return: Returns True if the path exists and is a directory
|
|
"""
|
|
path = encode(path)
|
|
try:
|
|
if xbmc_vfs and vfs:
|
|
if not scrapertools.find_single_match(path, r'(^\w+:\/\/)'):
|
|
return os.path.isdir(path)
|
|
if path.endswith('/') or path.endswith('\\'):
|
|
path = path[:-1]
|
|
dirs, files = xbmcvfs.listdir(dirname(path))
|
|
base_name = basename(path)
|
|
for dir in dirs:
|
|
if base_name == dir:
|
|
return True
|
|
return False
|
|
elif path.lower().startswith("smb://"):
|
|
return samba.isdir(path)
|
|
else:
|
|
return os.path.isdir(path)
|
|
except:
|
|
logger.error("ERROR when checking the directory: %s" % path)
|
|
if not silent:
|
|
logger.error(traceback.format_exc())
|
|
return False
|
|
|
|
|
|
def getsize(path, silent=False, vfs=True):
|
|
"""
|
|
Gets the size of a file
|
|
@param path: file path
|
|
@type path: str
|
|
@rtype: str
|
|
@return: file size
|
|
"""
|
|
path = encode(path)
|
|
try:
|
|
if xbmc_vfs and vfs:
|
|
if not exists(path): return long(0)
|
|
f = xbmcvfs.File(path)
|
|
s = f.size()
|
|
f.close()
|
|
return s
|
|
elif path.lower().startswith("smb://"):
|
|
return long(samba.get_attributes(path).file_size)
|
|
else:
|
|
return os.path.getsize(path)
|
|
except:
|
|
logger.error("ERROR when getting the size: %s" % path)
|
|
if not silent:
|
|
logger.error(traceback.format_exc())
|
|
return long(0)
|
|
|
|
|
|
def remove(path, silent=False, vfs=True):
|
|
"""
|
|
Delete a file
|
|
@param path: path of the file to delete
|
|
@type path: str
|
|
@rtype: bool
|
|
@return: returns False on error
|
|
"""
|
|
path = encode(path)
|
|
try:
|
|
if xbmc_vfs and vfs:
|
|
return bool(xbmcvfs.delete(path))
|
|
elif path.lower().startswith("smb://"):
|
|
samba.remove(path)
|
|
else:
|
|
os.remove(path)
|
|
except:
|
|
logger.error("ERROR deleting file: %s" % path)
|
|
if not silent:
|
|
logger.error(traceback.format_exc())
|
|
platformtools.dialog_notification("ERROR deleting file", path)
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
|
|
def rmdirtree(path, silent=False, vfs=True):
|
|
"""
|
|
Delete a directory and its contents
|
|
@param path: path to remove
|
|
@type path: str
|
|
@rtype: bool
|
|
@return: returns False on error
|
|
"""
|
|
path = encode(path)
|
|
try:
|
|
if xbmc_vfs and vfs:
|
|
if not exists(path): return True
|
|
if not path.endswith('/') and not path.endswith('\\'):
|
|
path = join(path, ' ').rstrip()
|
|
for raiz, subcarpetas, ficheros in walk(path, topdown=False):
|
|
for f in ficheros:
|
|
xbmcvfs.delete(join(raiz, f))
|
|
for s in subcarpetas:
|
|
xbmcvfs.rmdir(join(raiz, s))
|
|
xbmcvfs.rmdir(path)
|
|
elif path.lower().startswith("smb://"):
|
|
for raiz, subcarpetas, ficheros in samba.walk(path, topdown=False):
|
|
for f in ficheros:
|
|
samba.remove(join(decode(raiz), decode(f)))
|
|
for s in subcarpetas:
|
|
samba.rmdir(join(decode(raiz), decode(s)))
|
|
samba.rmdir(path)
|
|
else:
|
|
import shutil
|
|
shutil.rmtree(path, ignore_errors=True)
|
|
except:
|
|
logger.error("ERROR deleting directory: %s" % path)
|
|
if not silent:
|
|
logger.error(traceback.format_exc())
|
|
platformtools.dialog_notification("ERROR deleting directory", path)
|
|
return False
|
|
else:
|
|
return not exists(path)
|
|
|
|
|
|
def rmdir(path, silent=False, vfs=True):
|
|
"""
|
|
Delete a directory
|
|
@param path: path to remove
|
|
@type path: str
|
|
@rtype: bool
|
|
@return: returns False on error
|
|
"""
|
|
path = encode(path)
|
|
try:
|
|
if xbmc_vfs and vfs:
|
|
if not path.endswith('/') and not path.endswith('\\'):
|
|
path = join(path, ' ').rstrip()
|
|
return bool(xbmcvfs.rmdir(path))
|
|
elif path.lower().startswith("smb://"):
|
|
samba.rmdir(path)
|
|
else:
|
|
os.rmdir(path)
|
|
except:
|
|
logger.error("ERROR deleting directory: %s" % path)
|
|
if not silent:
|
|
logger.error(traceback.format_exc())
|
|
platformtools.dialog_notification("ERROR deleting directory", path)
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
|
|
def mkdir(path, silent=False, vfs=True):
|
|
"""
|
|
Create a directory
|
|
@param path: path to create
|
|
@type path: str
|
|
@rtype: bool
|
|
@return: returns False on error
|
|
"""
|
|
path = encode(path)
|
|
try:
|
|
if xbmc_vfs and vfs:
|
|
if not path.endswith('/') and not path.endswith('\\'):
|
|
path = join(path, ' ').rstrip()
|
|
result = bool(xbmcvfs.mkdirs(path))
|
|
if not result:
|
|
import time
|
|
time.sleep(0.1)
|
|
result = exists(path)
|
|
return result
|
|
elif path.lower().startswith("smb://"):
|
|
samba.mkdir(path)
|
|
else:
|
|
os.mkdir(path)
|
|
except:
|
|
logger.error("ERROR when creating directory: %s" % path)
|
|
if not silent:
|
|
logger.error(traceback.format_exc())
|
|
platformtools.dialog_notification("ERROR when creating directory", path)
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
|
|
def walk(top, topdown=True, onerror=None, vfs=True):
|
|
"""
|
|
List a directory recursively
|
|
@param top: Directory to list, must be a str "UTF-8"
|
|
@type top: str
|
|
@param topdown: scanned from top to bottom
|
|
@type topdown: bool
|
|
@param onerror: show error to continue listing if you have something set but raise an exception
|
|
@type onerror: bool
|
|
***The followlinks parameter, which by default is True, is not used here, since in samba it does not discriminate links
|
|
"""
|
|
top = encode(top)
|
|
if xbmc_vfs and vfs:
|
|
for a, b, c in walk_vfs(top, topdown, onerror):
|
|
# list (b) is for you to make a copy of the directory listing
|
|
# if it doesn't give error when you have to recursively enter directories with special characters
|
|
yield a, list(b), c
|
|
elif top.lower().startswith("smb://"):
|
|
for a, b, c in samba.walk(top, topdown, onerror):
|
|
# list (b) is for you to make a copy of the directory listing
|
|
# if it doesn't give error when you have to recursively enter directories with special characters
|
|
yield decode(a), decode(list(b)), decode(c)
|
|
else:
|
|
for a, b, c in os.walk(top, topdown, onerror):
|
|
# list (b) is for you to make a copy of the directory listing
|
|
# if it doesn't give error when you have to recursively enter directories with special characters
|
|
yield decode(a), decode(list(b)), decode(c)
|
|
|
|
|
|
def walk_vfs(top, topdown=True, onerror=None):
|
|
"""
|
|
List a directory recursively
|
|
Since xmbcvfs does not have this function, the logic of libsmb (samba) is copied to carry out the pre-Walk
|
|
"""
|
|
top = encode(top)
|
|
dirs, nondirs = xbmcvfs.listdir(top)
|
|
|
|
if topdown:
|
|
yield top, dirs, nondirs
|
|
|
|
for name in dirs:
|
|
if isinstance(name, unicode):
|
|
name = name.encode("utf8")
|
|
if PY3: name = name.decode("utf8")
|
|
elif PY3 and isinstance(name, bytes):
|
|
name = name.decode("utf8")
|
|
elif not PY3:
|
|
name = unicode(name, "utf8")
|
|
new_path = "/".join(top.split("/") + [name])
|
|
for x in walk_vfs(new_path, topdown, onerror):
|
|
yield x
|
|
if not topdown:
|
|
yield top, dirs, nondirs
|
|
|
|
|
|
def listdir(path, silent=False, vfs=True):
|
|
"""
|
|
List a directory
|
|
@param path: Directory to list, must be a str "UTF-8"
|
|
@type path: str
|
|
@rtype: str
|
|
@return: content of a directory
|
|
"""
|
|
|
|
path = encode(path)
|
|
try:
|
|
if xbmc_vfs and vfs:
|
|
dirs, files = xbmcvfs.listdir(path)
|
|
return dirs + files
|
|
elif path.lower().startswith("smb://"):
|
|
return decode(samba.listdir(path))
|
|
else:
|
|
return decode(os.listdir(path))
|
|
except:
|
|
logger.error("ERROR when reading the directory: %s" % path)
|
|
if not silent:
|
|
logger.error(traceback.format_exc())
|
|
return False
|
|
|
|
|
|
def join(*paths):
|
|
"""
|
|
Join several directories
|
|
Correct the bars "/" or "\" according to the operating system and whether or not it is smaba
|
|
@rytpe: str
|
|
@return: the concatenated path
|
|
"""
|
|
list_path = []
|
|
if paths[0].startswith("/"):
|
|
list_path.append("")
|
|
for path in paths:
|
|
if path:
|
|
if xbmc_vfs and type(path) != str:
|
|
path = encode(path)
|
|
list_path += path.replace("\\", "/").strip("/").split("/")
|
|
|
|
if scrapertools.find_single_match(paths[0], r'(^\w+:\/\/)'):
|
|
return str("/".join(list_path))
|
|
else:
|
|
return str(os.sep.join(list_path))
|
|
|
|
|
|
def split(path, vfs=True):
|
|
"""
|
|
Returns a tuple consisting of the directory and filename of a path
|
|
@param path: ruta
|
|
@type path: str
|
|
@return: (dirname, basename)
|
|
@rtype: tuple
|
|
"""
|
|
if scrapertools.find_single_match(path, r'(^\w+:\/\/)'):
|
|
protocol = scrapertools.find_single_match(path, r'(^\w+:\/\/)')
|
|
if '/' not in path[6:]:
|
|
path = path.replace(protocol, protocol + "/", 1)
|
|
return path.rsplit('/', 1)
|
|
else:
|
|
return os.path.split(path)
|
|
|
|
|
|
def basename(path, vfs=True):
|
|
"""
|
|
Returns the file name of a path
|
|
@param path: path
|
|
@type path: str
|
|
@return: path file
|
|
@rtype: str
|
|
"""
|
|
return split(path)[1]
|
|
|
|
|
|
def dirname(path, vfs=True):
|
|
"""
|
|
Returns the directory of a path
|
|
@param path: path
|
|
@type path: str
|
|
@return: path directory
|
|
@rtype: str
|
|
"""
|
|
return split(path)[0]
|
|
|
|
|
|
def is_relative(path):
|
|
return "://" not in path and not path.startswith("/") and ":\\" not in path
|
|
|
|
|
|
def remove_tags(title):
|
|
"""
|
|
returns the title without tags as color
|
|
@type title: str
|
|
@param title: title
|
|
@rtype: str
|
|
@return: string without tags
|
|
"""
|
|
logger.debug()
|
|
|
|
title_without_tags = scrapertools.find_single_match(title, r'\[color .+?\](.+)\[\/color\]')
|
|
|
|
if title_without_tags:
|
|
return title_without_tags
|
|
else:
|
|
return title
|
|
|
|
|
|
def remove_smb_credential(path):
|
|
"""
|
|
returns the path without password / user for SMB paths
|
|
@param path: path
|
|
@type path: str
|
|
@return: chain without credentials
|
|
@rtype: str
|
|
"""
|
|
logger.debug()
|
|
|
|
if not scrapertools.find_single_match(path, r'(^\w+:\/\/)'):
|
|
return path
|
|
|
|
protocol = scrapertools.find_single_match(path, r'(^\w+:\/\/)')
|
|
path_without_credentials = scrapertools.find_single_match(path, r'^\w+:\/\/(?:[^;\n]+;)?(?:[^:@\n]+[:|@])?(?:[^@\n]+@)?(.*?$)')
|
|
|
|
if path_without_credentials:
|
|
return (protocol + path_without_credentials)
|
|
else:
|
|
return path
|