mirror of
https://github.com/FAUSheppy/homelab_gamevault
synced 2025-12-06 06:51:36 +01:00
feat: conncurrent ftp listings
This commit is contained in:
@@ -44,7 +44,7 @@ class DataBackend:
|
|||||||
def list(self, path):
|
def list(self, path):
|
||||||
'''List the contents of this path'''
|
'''List the contents of this path'''
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
def find_all_metadata(self):
|
def find_all_metadata(self):
|
||||||
'''Return key-value map of { software : metadata-dict }'''
|
'''Return key-value map of { software : metadata-dict }'''
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
@@ -63,7 +63,7 @@ class LocalFS(DataBackend):
|
|||||||
fullpath = path
|
fullpath = path
|
||||||
if self.remote_root_dir and not path.startswith(self.remote_root_dir):
|
if self.remote_root_dir and not path.startswith(self.remote_root_dir):
|
||||||
fullpath = os.path.join(self.remote_root_dir, path)
|
fullpath = os.path.join(self.remote_root_dir, path)
|
||||||
|
|
||||||
# load the file on remote #
|
# load the file on remote #
|
||||||
with open(fullpath, "rb") as f:
|
with open(fullpath, "rb") as f:
|
||||||
target = os.path.join(cache_dir, os.path.basename(path))
|
target = os.path.join(cache_dir, os.path.basename(path))
|
||||||
@@ -74,9 +74,9 @@ class LocalFS(DataBackend):
|
|||||||
ft.write(f.read())
|
ft.write(f.read())
|
||||||
|
|
||||||
return target
|
return target
|
||||||
|
|
||||||
def list(self, path, fullpaths=False):
|
def list(self, path, fullpaths=False):
|
||||||
|
|
||||||
# prepend root dir if not given #
|
# prepend root dir if not given #
|
||||||
fullpath = path
|
fullpath = path
|
||||||
if self.remote_root_dir and not path.startswith(self.remote_root_dir):
|
if self.remote_root_dir and not path.startswith(self.remote_root_dir):
|
||||||
@@ -89,9 +89,9 @@ class LocalFS(DataBackend):
|
|||||||
return [ os.path.join(path, filename) for filename in os.listdir(fullpath)]
|
return [ os.path.join(path, filename) for filename in os.listdir(fullpath)]
|
||||||
else:
|
else:
|
||||||
return os.listdir(fullpath)
|
return os.listdir(fullpath)
|
||||||
|
|
||||||
def find_all_metadata(self):
|
def find_all_metadata(self):
|
||||||
|
|
||||||
meta_info_list = []
|
meta_info_list = []
|
||||||
for software_dir in glob.iglob(self.remote_root_dir + "/*"):
|
for software_dir in glob.iglob(self.remote_root_dir + "/*"):
|
||||||
meta_file = os.path.join(software_dir, "meta.yaml")
|
meta_file = os.path.join(software_dir, "meta.yaml")
|
||||||
@@ -99,17 +99,21 @@ class LocalFS(DataBackend):
|
|||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
meta_info_list.append(software.Software(meta_file, self, self.progress_bar_wrapper))
|
meta_info_list.append(software.Software(meta_file, self, self.progress_bar_wrapper))
|
||||||
|
|
||||||
return list(filter(lambda x: not x.invalid, meta_info_list))
|
return list(filter(lambda x: not x.invalid, meta_info_list))
|
||||||
|
|
||||||
class FTP(DataBackend):
|
class FTP(DataBackend):
|
||||||
|
|
||||||
paths_listed = {}
|
paths_listed = {}
|
||||||
|
|
||||||
def _connect(self):
|
def _connect(self, individual_connection=False):
|
||||||
|
|
||||||
if self.ftp:
|
if self.ftp and not individual_connection:
|
||||||
return self.ftp
|
try:
|
||||||
|
self.ftp.voidcmd("NOOP")
|
||||||
|
return self.ftp
|
||||||
|
except ssl.SSLError:
|
||||||
|
pass # reconnect
|
||||||
|
|
||||||
if self.server.startswith("ftp://"):
|
if self.server.startswith("ftp://"):
|
||||||
tls = False
|
tls = False
|
||||||
@@ -155,7 +159,8 @@ class FTP(DataBackend):
|
|||||||
# cache dir is automatically set #
|
# cache dir is automatically set #
|
||||||
self.cache_dir = None
|
self.cache_dir = None
|
||||||
|
|
||||||
self.ftp = ftp
|
if not individual_connection:
|
||||||
|
self.ftp = ftp
|
||||||
return ftp
|
return ftp
|
||||||
|
|
||||||
|
|
||||||
@@ -174,7 +179,7 @@ class FTP(DataBackend):
|
|||||||
#print(self.remote_root_dir, path, fullpath)
|
#print(self.remote_root_dir, path, fullpath)
|
||||||
fullpath = fullpath.replace("\\", "/")
|
fullpath = fullpath.replace("\\", "/")
|
||||||
local_file = os.path.join(cache_dir, os.path.basename(path))
|
local_file = os.path.join(cache_dir, os.path.basename(path))
|
||||||
|
|
||||||
# print("Cachedir:", cache_dir, os.path.basename(path), local_file)
|
# print("Cachedir:", cache_dir, os.path.basename(path), local_file)
|
||||||
|
|
||||||
if not os.path.isfile(local_file):
|
if not os.path.isfile(local_file):
|
||||||
@@ -190,9 +195,9 @@ class FTP(DataBackend):
|
|||||||
with open(local_file, "w") as f:
|
with open(local_file, "w") as f:
|
||||||
f.write(local_file)
|
f.write(local_file)
|
||||||
with open(local_file, 'wb') as local_file_open, tqdm.tqdm(
|
with open(local_file, 'wb') as local_file_open, tqdm.tqdm(
|
||||||
desc="Downloading",
|
desc="Downloading",
|
||||||
total=total_size,
|
total=total_size,
|
||||||
unit='B',
|
unit='B',
|
||||||
unit_scale=True
|
unit_scale=True
|
||||||
) as cmd_progress_bar:
|
) as cmd_progress_bar:
|
||||||
|
|
||||||
@@ -206,15 +211,15 @@ class FTP(DataBackend):
|
|||||||
|
|
||||||
# run with callback #
|
# run with callback #
|
||||||
ftp.retrbinary('RETR ' + fullpath, callback)
|
ftp.retrbinary('RETR ' + fullpath, callback)
|
||||||
|
|
||||||
if return_content:
|
if return_content:
|
||||||
with open(local_file, encoding="utf-8") as fr:
|
with open(local_file, encoding="utf-8") as fr:
|
||||||
return fr.read()
|
return fr.read()
|
||||||
|
|
||||||
return local_file
|
return local_file
|
||||||
|
|
||||||
def list(self, path, fullpaths=False):
|
def list(self, path, fullpaths=False, new_connection=False):
|
||||||
|
|
||||||
# prepend root dir if not given #
|
# prepend root dir if not given #
|
||||||
fullpath = path
|
fullpath = path
|
||||||
if self.remote_root_dir and not path.startswith(self.remote_root_dir):
|
if self.remote_root_dir and not path.startswith(self.remote_root_dir):
|
||||||
@@ -231,14 +236,20 @@ class FTP(DataBackend):
|
|||||||
paths = self.paths_listed[fullpath]
|
paths = self.paths_listed[fullpath]
|
||||||
#print("Retrieved paths from cache:", fullpath, paths)
|
#print("Retrieved paths from cache:", fullpath, paths)
|
||||||
else:
|
else:
|
||||||
ftp = self._connect()
|
ftp = self._connect(individual_connection=new_connection)
|
||||||
|
print("Listing previously unlisted path: {}".format(fullpath))
|
||||||
self.paths_listed.update({fullpath: []}) # in case dir does not exit
|
self.paths_listed.update({fullpath: []}) # in case dir does not exit
|
||||||
paths = ftp.nlst(fullpath)
|
paths = ftp.nlst(fullpath)
|
||||||
self.paths_listed.update({fullpath: paths})
|
self.paths_listed.update({fullpath: paths})
|
||||||
|
|
||||||
|
if new_connection: # close individual connections
|
||||||
|
ftp.close()
|
||||||
|
|
||||||
if not fullpaths:
|
if not fullpaths:
|
||||||
return paths
|
return paths
|
||||||
|
|
||||||
return [ os.path.join(path, filename).replace("\\", "/") for filename in paths ]
|
return [ os.path.join(path, filename).replace("\\", "/") for filename in paths ]
|
||||||
|
|
||||||
except ftplib.error_perm as e:
|
except ftplib.error_perm as e:
|
||||||
if "550 No files found" in str(e):
|
if "550 No files found" in str(e):
|
||||||
print("No files in this directory: {}".format(fullpath))
|
print("No files in this directory: {}".format(fullpath))
|
||||||
@@ -248,21 +259,34 @@ class FTP(DataBackend):
|
|||||||
return []
|
return []
|
||||||
else:
|
else:
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
def find_all_metadata(self):
|
def find_all_metadata(self):
|
||||||
|
|
||||||
local_meta_file_list = []
|
local_meta_file_list = []
|
||||||
|
|
||||||
root_elements = self.list(self.remote_root_dir)
|
root_elements = self.list(self.remote_root_dir)
|
||||||
for s in root_elements:
|
import concurrent.futures
|
||||||
|
with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()*5) as executor:
|
||||||
|
|
||||||
|
software_dir_contents = list(executor.map(
|
||||||
|
lambda s: self.list(s, fullpaths=True, new_connection=True), root_elements))
|
||||||
|
|
||||||
|
# this caches the paths, done remove it #
|
||||||
|
cache_list = [os.path.join(s, "registry_files") for s in root_elements ]
|
||||||
|
cache_list += [os.path.join(s, "pictures") for s in root_elements ]
|
||||||
|
picture_contents_async_cache = list(executor.map(
|
||||||
|
lambda s: self.list(s, fullpaths=True, new_connection=True), cache_list))
|
||||||
|
|
||||||
|
for files in software_dir_contents:
|
||||||
#print(s)
|
#print(s)
|
||||||
files = self.list(s, fullpaths=True)
|
#files = self.list(s, fullpaths=True)
|
||||||
#print(files)
|
print(files)
|
||||||
for f in files:
|
for f in files:
|
||||||
if f.endswith("meta.yaml"):
|
if f.endswith("meta.yaml"):
|
||||||
meta_file_content = self.get(f, cache_dir="cache", return_content=True)
|
meta_file_content = self.get(f, cache_dir="cache", return_content=True)
|
||||||
#print(meta_file_content)
|
#print(meta_file_content)
|
||||||
local_meta_file_list.append(f)
|
local_meta_file_list.append(f)
|
||||||
|
|
||||||
return list(filter(lambda x: not x.invalid, [ software.Software(meta_file, self, self.progress_bar_wrapper)
|
return list(filter(lambda x: not x.invalid,
|
||||||
for meta_file in local_meta_file_list ]))
|
[ software.Software(meta_file, self, self.progress_bar_wrapper)
|
||||||
|
for meta_file in local_meta_file_list ]))
|
||||||
|
|||||||
Reference in New Issue
Block a user