mirror of
https://github.com/FAUSheppy/homelab_gamevault
synced 2025-12-06 06:51:36 +01:00
239 lines
8.5 KiB
Python
239 lines
8.5 KiB
Python
import yaml
|
|
import tkinter
|
|
import os
|
|
import localaction
|
|
import zipfile
|
|
import shutil
|
|
import pathlib
|
|
import tqdm
|
|
import webbrowser
|
|
import jinja_helper
|
|
import threading
|
|
import sys
|
|
import tkinter
|
|
import statekeeper
|
|
from tkinter import messagebox
|
|
|
|
class Software:
|
|
|
|
def __init__(self, meta_file, backend, progress_bar_wrapper):
|
|
|
|
self.meta_file = meta_file
|
|
self.directory = os.path.dirname(meta_file)
|
|
self.backend = backend
|
|
self.run_button = None
|
|
print("Software Directory:", self.directory)
|
|
|
|
self.cache_dir = backend.cache_dir or os.path.join("cache", self.directory.lstrip("/").lstrip("\\"))
|
|
|
|
# return None instead of the object if yaml failed #
|
|
try:
|
|
self.invalid = False
|
|
self._load_from_yaml()
|
|
except ValueError as e:
|
|
print(e)
|
|
raise e
|
|
self.invalid = True
|
|
|
|
# if not progress_bar_wrapper:
|
|
# raise AssertionError()
|
|
|
|
# self.progress_bar_wrapper = progress_bar_wrapper
|
|
|
|
def _load_from_yaml(self):
|
|
|
|
content = self.backend.get(self.meta_file, self.cache_dir, return_content=True)
|
|
# print("Meta-Content:", content)
|
|
|
|
meta = yaml.safe_load(content)
|
|
if not meta:
|
|
raise ValueError("Empty Meta File")
|
|
|
|
self.title = meta.get("title")
|
|
self.genre = meta.get("genre")
|
|
self.description = meta.get("description")
|
|
self.dependencies = meta.get("dependencies")
|
|
self.link_only = meta.get("link_only")
|
|
self.link = meta.get("link")
|
|
self.extra_files = meta.get("extra_files")
|
|
self.run_exe = meta.get("run_exe")
|
|
self.installer = meta.get("installer")
|
|
self.installer_no_admin = meta.get("installer_no_admin")
|
|
self.age_limit = meta.get("age_limit") or 20
|
|
|
|
self.pictures = [ self.backend.get(pp, self.cache_dir) for pp in
|
|
self.backend.list(os.path.join(self.directory, "pictures"), fullpaths=True) ]
|
|
|
|
if any([x is None for x in self.pictures]):
|
|
raise AssertionError("None Entries in self.pictures: " + str(self.pictures))
|
|
|
|
self.reg_files = self.backend.list(os.path.join(self.directory, "registry_files"), fullpaths=True)
|
|
|
|
print("Finished Init for", self.title)
|
|
|
|
|
|
def get_thumbnail(self):
|
|
'''Return the thumbnail for this software'''
|
|
|
|
if not self.pictures:
|
|
return None
|
|
|
|
return self.pictures[0]
|
|
|
|
def _extract_to_target(self, cache_src, target):
|
|
'''Extract a cached, downloaded zip to the target location'''
|
|
|
|
software_path = os.path.join(target, self.title)
|
|
|
|
if os.path.isdir(software_path):
|
|
overwrite = messagebox.askyesno(
|
|
"Overwrite Existing Directory",
|
|
f"The directory '{software_path}' already exists.\nDo you want to overwrite it?"
|
|
)
|
|
if not overwrite:
|
|
print("Skipping install as instructed by user...")
|
|
return
|
|
|
|
os.makedirs(software_path, exist_ok=True)
|
|
|
|
# beginn progress tracking #
|
|
with zipfile.ZipFile(cache_src, 'r') as zip_ref:
|
|
|
|
statekeeper.log_begin_download(local_path=cache_src, path=cache_src, url=None, type="extraction", start_size=len(zip_ref.infolist()))
|
|
total_count = zip_ref.infolist()
|
|
count = 0
|
|
for member in tqdm.tqdm(total_count, desc='Extracting '):
|
|
try:
|
|
zip_ref.extract(member, software_path)
|
|
count += 1
|
|
|
|
# update progress #
|
|
statekeeper.set_extraction_status(cache_src, count)
|
|
|
|
|
|
except zipfile.error as e:
|
|
print(e)
|
|
pass # TODO ???
|
|
|
|
# finish extraction tracking #
|
|
statekeeper.log_end_download(cache_src, type="extraction")
|
|
|
|
def install_async(self):
|
|
|
|
thread = threading.Thread(target=self.install)
|
|
thread.start()
|
|
|
|
def install(self):
|
|
'''Install this software from the backend'''
|
|
|
|
# things to execute #
|
|
admin_run_list = []
|
|
|
|
print("Installing:", self.title, self.directory)
|
|
|
|
# handle link-only software #
|
|
if self.link_only:
|
|
webbrowser.open(self.link_only)
|
|
return
|
|
|
|
#self.progress_bar_wrapper.set_text(text="Please wait..")
|
|
#self.progress_bar_wrapper.tk_parent.update_idletasks()
|
|
path = os.path.join(self.directory, "main_dir")
|
|
|
|
try:
|
|
remote_file = self.backend.list(path, fullpaths=True)[0]
|
|
except IndexError:
|
|
print("No main_dir:", path)
|
|
raise AssertionError("No main_dir for this software")
|
|
|
|
statekeeper.log_begin_download(remote_file, self.backend.get_local_target(remote_file), self.backend._get_url())
|
|
local_file = self.backend.get(remote_file, self.cache_dir, wait=True)
|
|
statekeeper.log_end_download(remote_file)
|
|
|
|
print("Deciding on installer...")
|
|
|
|
# execute or unpack #
|
|
if local_file.endswith(".exe"):
|
|
print("Target is an executable.. running as installer.")
|
|
if os.name != "nt" and not os.path.isabs(local_file):
|
|
# need abs path for wine #
|
|
local_file = os.path.join(os.getcwd(), local_file)
|
|
localaction.run_exe(local_file)
|
|
elif local_file.endswith(".zip"):
|
|
print("Target is a zip.. unpacking first.")
|
|
self._extract_to_target(local_file, self.backend.install_dir)
|
|
|
|
# download & install registry files #
|
|
for rf in self.reg_files:
|
|
path = self.backend.get(rf, cache_dir=self.cache_dir, wait=True)
|
|
if path.endswith(".j2"):
|
|
target_install_dir = os.path.join(self.backend.install_dir, self.title)
|
|
print("Install dir Registry:", target_install_dir)
|
|
path = jinja_helper.render_path(path, target_install_dir, self.directory)
|
|
|
|
if sys.platform == "win32":
|
|
admin_run_list.append(path)
|
|
else:
|
|
localaction.install_registry_file(path)
|
|
|
|
# install dependencies #
|
|
if self.dependencies:
|
|
avail_software = self.backend.find_all_metadata()
|
|
for s in avail_software:
|
|
if s.title in self.dependencies:
|
|
s.install()
|
|
|
|
# run installer if set #
|
|
print("self.installer", self.installer)
|
|
if self.installer:
|
|
installer_path = os.path.join(self.backend.install_dir, self.title, self.installer)
|
|
print("installer path in if", installer_path)
|
|
if os.name != "nt" and not os.path.isabs(installer_path):
|
|
# need abs path for wine #
|
|
installer_path = os.path.join(os.getcwd(), installer_path)
|
|
|
|
|
|
print("Running installer:", installer_path)
|
|
if not self.installer_no_admin:
|
|
admin_run_list.append(installer_path)
|
|
else:
|
|
localaction.run_exe(installer_path)
|
|
|
|
if admin_run_list:
|
|
print("admin list", admin_run_list)
|
|
localaction.run_exe(admin_run_list)
|
|
|
|
# install gamefiles #
|
|
if self.extra_files:
|
|
for src, dest in self.extra_files.items():
|
|
tmp = self.backend.get(os.path.join(self.directory, "extra_files", src), self.cache_dir, wait=True)
|
|
dest_dir = os.path.expandvars(dest)
|
|
os.makedirs(dest_dir, exist_ok=True)
|
|
shutil.copy(tmp, dest_dir)
|
|
|
|
#self.progress_bar_wrapper.set_text(text="")
|
|
try:
|
|
if self.run_button:
|
|
self.run_button.configure(state=tkinter.NORMAL)
|
|
self.run_button.configure(fg_color="green")
|
|
except tkinter.TclError:
|
|
print("INFO: No longer in installation view - no button to update")
|
|
|
|
def run(self):
|
|
'''Run the configured exe for this software'''
|
|
|
|
print(self.run_exe, self.link_only)
|
|
if self.run_exe == "steam" and "steam://" in self.link_only:
|
|
print("steam://runid/{}".format(self.link_only.split("/")[-1]))
|
|
webbrowser.open("steam://rungameid/{}".format(self.link_only.split("/")[-1]))
|
|
return
|
|
|
|
if self.run_exe:
|
|
if os.name == "nt" or not ".lnk" in self.run_exe:
|
|
if self.run_exe.startswith("%") or self.run_exe[1] == ":":
|
|
localaction.run_exe(self.run_exe)
|
|
else:
|
|
localaction.run_exe(os.path.join(self.backend.install_dir, self.title, self.run_exe))
|
|
else:
|
|
localaction.run_exe(self.run_exe)
|