feat: make software-init threadsafe & concurrent

This commit is contained in:
Yannik Schmidt
2024-05-31 15:49:11 +02:00
parent 9008df2f3f
commit bcc3d39e31
2 changed files with 40 additions and 31 deletions

View File

@@ -5,6 +5,7 @@ import software
import ftplib
import tqdm
import ssl
import concurrent.futures
class SESSION_REUSE_FTP_TLS(ftplib.FTP_TLS):
"""Explicit FTPS, with shared TLS session"""
@@ -164,7 +165,7 @@ class FTP(DataBackend):
return ftp
def get(self, path, cache_dir=None, return_content=False):
def get(self, path, cache_dir=None, return_content=False, new_connection=False):
# check the load cache dir #
if cache_dir:
@@ -183,34 +184,43 @@ class FTP(DataBackend):
# print("Cachedir:", cache_dir, os.path.basename(path), local_file)
if not os.path.isfile(local_file):
ftp = self._connect()
ftp = self._connect(individual_connection=True)
ftp.sendcmd('TYPE I')
# load the file on remote #
total_size = ftp.size(fullpath)
print(total_size)
self.progress_bar_wrapper.get_pb()["maximum"] = total_size
if not new_connection:
total_size = ftp.size(fullpath)
print(total_size)
self.progress_bar_wrapper.get_pb()["maximum"] = total_size
print(local_file, "not in cache, retriving..")
with open(local_file, "w") as f:
f.write(local_file)
with open(local_file, 'wb') as local_file_open, tqdm.tqdm(
desc="Downloading",
total=total_size,
unit='B',
unit_scale=True
) as cmd_progress_bar:
print(local_file, "not in cache, retriving..")
with open(local_file, "w") as f:
f.write(local_file)
with open(local_file, 'wb') as local_file_open, tqdm.tqdm(
desc="Downloading",
total=total_size,
unit='B',
unit_scale=True
) as cmd_progress_bar:
# Define a callback function to update the progress bar #
def callback(data):
local_file_open.write(data)
self.root.update_idletasks() # Update the GUI
self.progress_bar_wrapper.get_pb().set(
self.progress_bar_wrapper.get_pb().get() + len(data)/total_size)
cmd_progress_bar.update(len(data))
# Define a callback function to update the progress bar #
def callback(data):
local_file_open.write(data)
if new_connection: # return if parralell
return
self.root.update_idletasks() # Update the GUI
self.progress_bar_wrapper.get_pb().set(
self.progress_bar_wrapper.get_pb().get() + len(data)/total_size)
cmd_progress_bar.update(len(data))
# run with callback #
ftp.retrbinary('RETR ' + fullpath, callback)
# run with callback #
ftp.retrbinary('RETR ' + fullpath, callback)
else:
with open(local_file, 'wb') as fp:
ftp.retrbinary('RETR ' + fullpath, fp.write)
if new_connection:
ftp.close()
if return_content:
with open(local_file, encoding="utf-8") as fr:
@@ -265,7 +275,6 @@ class FTP(DataBackend):
local_meta_file_list = []
root_elements = self.list(self.remote_root_dir)
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()*5) as executor:
software_dir_contents = list(executor.map(
@@ -287,6 +296,6 @@ class FTP(DataBackend):
#print(meta_file_content)
local_meta_file_list.append(f)
return list(filter(lambda x: not x.invalid,
[ software.Software(meta_file, self, self.progress_bar_wrapper)
for meta_file in local_meta_file_list ]))
with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()*5) as executor:
software_list = executor.map(lambda meta_file: software.Software(meta_file, self, self.progress_bar_wrapper), local_meta_file_list)
return list(filter(lambda x: not x.invalid, software_list))