diff --git a/data_backend.py b/data_backend.py index 45ab1fa..c4e4081 100644 --- a/data_backend.py +++ b/data_backend.py @@ -44,7 +44,7 @@ class DataBackend: def list(self, path): '''List the contents of this path''' raise NotImplementedError() - + def find_all_metadata(self): '''Return key-value map of { software : metadata-dict }''' raise NotImplementedError @@ -63,7 +63,7 @@ class LocalFS(DataBackend): fullpath = path if self.remote_root_dir and not path.startswith(self.remote_root_dir): fullpath = os.path.join(self.remote_root_dir, path) - + # load the file on remote # with open(fullpath, "rb") as f: target = os.path.join(cache_dir, os.path.basename(path)) @@ -74,9 +74,9 @@ class LocalFS(DataBackend): ft.write(f.read()) return target - + def list(self, path, fullpaths=False): - + # prepend root dir if not given # fullpath = path if self.remote_root_dir and not path.startswith(self.remote_root_dir): @@ -89,9 +89,9 @@ class LocalFS(DataBackend): return [ os.path.join(path, filename) for filename in os.listdir(fullpath)] else: return os.listdir(fullpath) - + def find_all_metadata(self): - + meta_info_list = [] for software_dir in glob.iglob(self.remote_root_dir + "/*"): meta_file = os.path.join(software_dir, "meta.yaml") @@ -99,17 +99,21 @@ class LocalFS(DataBackend): continue else: meta_info_list.append(software.Software(meta_file, self, self.progress_bar_wrapper)) - + return list(filter(lambda x: not x.invalid, meta_info_list)) class FTP(DataBackend): paths_listed = {} - def _connect(self): + def _connect(self, individual_connection=False): - if self.ftp: - return self.ftp + if self.ftp and not individual_connection: + try: + self.ftp.voidcmd("NOOP") + return self.ftp + except ssl.SSLError: + pass # reconnect if self.server.startswith("ftp://"): tls = False @@ -155,7 +159,8 @@ class FTP(DataBackend): # cache dir is automatically set # self.cache_dir = None - self.ftp = ftp + if not individual_connection: + self.ftp = ftp return ftp @@ -174,7 +179,7 @@ class FTP(DataBackend): #print(self.remote_root_dir, path, fullpath) fullpath = fullpath.replace("\\", "/") local_file = os.path.join(cache_dir, os.path.basename(path)) - + # print("Cachedir:", cache_dir, os.path.basename(path), local_file) if not os.path.isfile(local_file): @@ -190,9 +195,9 @@ class FTP(DataBackend): with open(local_file, "w") as f: f.write(local_file) with open(local_file, 'wb') as local_file_open, tqdm.tqdm( - desc="Downloading", - total=total_size, - unit='B', + desc="Downloading", + total=total_size, + unit='B', unit_scale=True ) as cmd_progress_bar: @@ -206,15 +211,15 @@ class FTP(DataBackend): # run with callback # ftp.retrbinary('RETR ' + fullpath, callback) - + if return_content: with open(local_file, encoding="utf-8") as fr: return fr.read() return local_file - - def list(self, path, fullpaths=False): - + + def list(self, path, fullpaths=False, new_connection=False): + # prepend root dir if not given # fullpath = path if self.remote_root_dir and not path.startswith(self.remote_root_dir): @@ -231,14 +236,20 @@ class FTP(DataBackend): paths = self.paths_listed[fullpath] #print("Retrieved paths from cache:", fullpath, paths) else: - ftp = self._connect() + ftp = self._connect(individual_connection=new_connection) + print("Listing previously unlisted path: {}".format(fullpath)) self.paths_listed.update({fullpath: []}) # in case dir does not exit paths = ftp.nlst(fullpath) self.paths_listed.update({fullpath: paths}) + if new_connection: # close individual connections + ftp.close() + if not fullpaths: return paths + return [ os.path.join(path, filename).replace("\\", "/") for filename in paths ] + except ftplib.error_perm as e: if "550 No files found" in str(e): print("No files in this directory: {}".format(fullpath)) @@ -248,21 +259,34 @@ class FTP(DataBackend): return [] else: raise e - + def find_all_metadata(self): - + local_meta_file_list = [] root_elements = self.list(self.remote_root_dir) - for s in root_elements: + import concurrent.futures + with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()*5) as executor: + + software_dir_contents = list(executor.map( + lambda s: self.list(s, fullpaths=True, new_connection=True), root_elements)) + + # this caches the paths, done remove it # + cache_list = [os.path.join(s, "registry_files") for s in root_elements ] + cache_list += [os.path.join(s, "pictures") for s in root_elements ] + picture_contents_async_cache = list(executor.map( + lambda s: self.list(s, fullpaths=True, new_connection=True), cache_list)) + + for files in software_dir_contents: #print(s) - files = self.list(s, fullpaths=True) - #print(files) + #files = self.list(s, fullpaths=True) + print(files) for f in files: if f.endswith("meta.yaml"): meta_file_content = self.get(f, cache_dir="cache", return_content=True) #print(meta_file_content) local_meta_file_list.append(f) - - return list(filter(lambda x: not x.invalid, [ software.Software(meta_file, self, self.progress_bar_wrapper) - for meta_file in local_meta_file_list ])) + + return list(filter(lambda x: not x.invalid, + [ software.Software(meta_file, self, self.progress_bar_wrapper) + for meta_file in local_meta_file_list ]))