wip: asnyc http (1)

This commit is contained in:
Yannik Schmidt
2025-01-08 19:39:23 +01:00
parent 0d35e9c095
commit f46fce1824
7 changed files with 181 additions and 184 deletions

View File

@@ -6,19 +6,9 @@ import ftplib
import tqdm
import ssl
import concurrent.futures
import statekeeper
import requests
class SESSION_REUSE_FTP_TLS(ftplib.FTP_TLS):
"""Explicit FTPS, with shared TLS session"""
def ntransfercmd(self, cmd, rest=None):
conn, size = ftplib.FTP.ntransfercmd(self, cmd, rest)
if self._prot_p:
conn = self.context.wrap_socket(
conn,
server_hostname=self.host,
session=self.sock.session) # this is the fix
return conn, size
class DataBackend:
@@ -36,7 +26,6 @@ class DataBackend:
self.progress_bar_wrapper = progress_bar_wrapper
self.root = tkinter_root
self.cache_dir = "./cache/"
self.ftp = None # ftp connection object
def get(self, path, return_content=False):
'''Return the contents of this path'''
@@ -102,70 +91,16 @@ class LocalFS(DataBackend):
meta_info_list.append(software.Software(meta_file, self, self.progress_bar_wrapper))
return list(filter(lambda x: not x.invalid, meta_info_list))
class FTP(DataBackend):
class HTTP(DataBackend):
paths_listed = {}
REMOTE_PATH = "/get-path"
def _connect(self, individual_connection=False):
def _get_url(self):
print(self.server + HTTP.REMOTE_PATH)
return self.server + HTTP.REMOTE_PATH
if self.ftp and not individual_connection:
try:
self.ftp.voidcmd("NOOP")
return self.ftp
except ssl.SSLError:
pass # reconnect
if self.server.startswith("ftp://"):
tls = False
elif self.server.startswith("ftps://"):
tls = True
else:
raise ValueError("FTP Server must start with ftp:// or ftps://")
# build connection parameters #
server = self.server.split("://")[1]
port = None
try:
server = server.split(":")[0]
except (IndexError, ValueError):
port = 0
# try extract server #
try:
server = server.split(":")[0]
except (IndexError, ValueError):
server = self.server
print("Connecting to:", server, "on port:", port, "ssl =", tls)
# connect #
if not tls:
ftp = ftplib.FTP()
else:
ftp = SESSION_REUSE_FTP_TLS()
ftp.ssl_version = ssl.PROTOCOL_TLSv1_2
ftp.connect(server, port=port or 0)
if self.user:
ftp.login(self.user, self.password)
else:
ftp.login()
# open a secure session for tls #
if tls:
ftp.prot_p()
# cache dir is automatically set #
self.cache_dir = None
if not individual_connection:
self.ftp = ftp
return ftp
def get(self, path, cache_dir=None, return_content=False, new_connection=False):
def get(self, path, cache_dir=None, return_content=False):
# check the load cache dir #
if cache_dir:
@@ -177,102 +112,56 @@ class FTP(DataBackend):
fullpath = path
if self.remote_root_dir and not path.startswith(self.remote_root_dir):
fullpath = os.path.join(self.remote_root_dir, path)
#print(self.remote_root_dir, path, fullpath)
fullpath = fullpath.replace("\\", "/")
local_file = os.path.join(cache_dir, os.path.basename(path))
# print("Cachedir:", cache_dir, os.path.basename(path), local_file)
print("Requiring:", local_file)
if not os.path.isfile(local_file):
ftp = self._connect(individual_connection=True)
ftp.sendcmd('TYPE I')
if return_content:
# load the file on remote #
if not new_connection:
# the content is needed for the UI now and not cached, it's needs to be downloaded synchroniously #
# as there cannot be a meaningful UI-draw without it. #
r = requests.get(self._get_url(), params={ "path" : path, "as_string": True })
total_size = ftp.size(fullpath)
print("Total Size:", total_size)
self.progress_bar_wrapper.get_pb()["maximum"] = total_size
# cache the download imediatelly #
with open(local_file, encoding="utf-8", mode="w") as f:
f.write(r.json()["content"])
print(local_file, "not in cache, retriving..")
with open(local_file, "w") as f:
f.write(local_file)
with open(local_file, 'wb') as local_file_open, tqdm.tqdm(
desc="Downloading",
total=total_size,
unit='B',
unit_scale=True
) as cmd_progress_bar:
# return the content #
return r.json()["content"]
# Define a callback function to update the progress bar #
def callback(data):
local_file_open.write(data)
if new_connection: # return if parralell
return
self.root.update_idletasks() # Update the GUI
current_total = self.progress_bar_wrapper.get_pb().get() + len(data)/total_size
self.progress_bar_wrapper.get_pb().set(current_total)
self.progress_bar_wrapper.set_text(
text="Downloading: {:.2f}%".format(current_total*100))
cmd_progress_bar.update(len(data))
# run with callback #
ftp.retrbinary('RETR ' + fullpath, callback)
else:
with open(local_file, 'wb') as fp:
ftp.retrbinary('RETR ' + fullpath, fp.write)
statekeeper.add_to_download_queue(self._get_url(), path, first=return_content)
if new_connection:
ftp.close()
if return_content:
elif return_content:
with open(local_file, encoding="utf-8") as fr:
return fr.read()
else:
return local_file
return local_file
def list(self, path, fullpaths=False):
def list(self, path, fullpaths=False, new_connection=False):
# prepend root dir if not given #
fullpath = path
if self.remote_root_dir and not path.startswith(self.remote_root_dir):
fullpath = os.path.join(self.remote_root_dir, path)
fullpath = fullpath.replace("\\", "/")
#print(fullpath)
# if not os.path.isdir(fullpath):
# return []
# retrieve session cached paths #
if fullpath in self.paths_listed:
paths = self.paths_listed[fullpath]
else:
try:
# retrieve session cached paths #
if fullpath in self.paths_listed:
paths = self.paths_listed[fullpath]
#print("Retrieved paths from cache:", fullpath, paths)
else:
ftp = self._connect(individual_connection=new_connection)
print("Listing previously unlisted path: {}".format(fullpath))
self.paths_listed.update({fullpath: []}) # in case dir does not exit
paths = ftp.nlst(fullpath)
self.paths_listed.update({fullpath: paths})
if new_connection: # close individual connections
ftp.close()
r = requests.get(self._get_url(), params={ "path" : path })
print(r, r.status_code, r.content)
paths = r.json()["contents"]
if not fullpaths:
return paths
return [ os.path.join(path, filename).replace("\\", "/") for filename in paths ]
except ftplib.error_perm as e:
if "550 No files found" in str(e):
print("No files in this directory: {}".format(fullpath))
return []
elif "550 No such file or directory" in str(e):
print("File or dir does not exist: {}".format(fullpath))
return []
else:
raise e
return [ os.path.join(path, filename).replace("\\", "/") for filename in paths ]
def find_all_metadata(self):
@@ -282,24 +171,21 @@ class FTP(DataBackend):
with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()*5) as executor:
software_dir_contents = list(executor.map(
lambda s: self.list(s, fullpaths=True, new_connection=True), root_elements))
lambda s: self.list(s, fullpaths=True), root_elements))
# this caches the paths, done remove it #
cache_list = [os.path.join(s, "registry_files") for s in root_elements ]
cache_list += [os.path.join(s, "pictures") for s in root_elements ]
# THIS PRELOAD IMAGES-paths, DO NOT REMOVE IT #
picture_contents_async_cache = list(executor.map(
lambda s: self.list(s, fullpaths=True, new_connection=True), cache_list))
lambda s: self.list(s, fullpaths=True), cache_list))
for files in software_dir_contents:
#print(s)
#files = self.list(s, fullpaths=True)
print(files)
for f in files:
if f.endswith("meta.yaml"):
meta_file_content = self.get(f, cache_dir="cache", return_content=True)
#print(meta_file_content)
local_meta_file_list.append(f)
with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()*5) as executor:
software_list = executor.map(lambda meta_file: software.Software(meta_file, self, self.progress_bar_wrapper), local_meta_file_list)
return list(filter(lambda x: not x.invalid, software_list))
return list(filter(lambda x: not x.invalid, software_list))