From 6b26a1c2eca1aa33ae72c07310175362c16d9e70 Mon Sep 17 00:00:00 2001 From: Yannik Schmidt Date: Sat, 25 Nov 2023 14:10:04 +0100 Subject: [PATCH] feat: accept tm2020 replays --- server.py | 71 ++++++++++++++------------- tm2020parser.py | 124 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 163 insertions(+), 32 deletions(-) create mode 100644 tm2020parser.py diff --git a/server.py b/server.py index ff5ff73..8da2862 100755 --- a/server.py +++ b/server.py @@ -9,6 +9,7 @@ import json import datetime from pygbx import Gbx, GbxType +import tm2020parser import sqlalchemy from sqlalchemy import Column, Integer, String, Boolean, or_, and_, asc, desc @@ -92,16 +93,15 @@ class ParsedReplay(db.Model): login = Column(String) cp_times = Column(String) + login_uid_tm2020 = Column(String) + game = Column(String) + def clean_login(self): if "/" in self.login: return self.login.split("/")[0] else: return self.login - def guess_map(self): - base = os.path.basename(self.filepath) - return base.split("_")[1].split(".Replay")[0] - def get_human_readable_time(self): t = datetime.timedelta(microseconds=self.race_time*1000) t_string = str(t) @@ -114,7 +114,7 @@ class ParsedReplay(db.Model): def __repr__(self): return "{time} on {map_n} by {login}".format( time=self.get_human_readable_time(), - map_n=self.guess_map(), login=self.login) + map_n=self.map_uid, login=self.login) def to_dict(self): d = dict() @@ -232,40 +232,47 @@ class DataTable(): return self.__build(results, total, filtered) -def replay_from_path(fullpath, uploader=None): - - if not fullpath.lower().endswith(".gbx"): - raise ValueError("Path must be a .gbx file") - - - g = Gbx(fullpath) - ghost = g.get_class_by_id(GbxType.CTN_GHOST) - if not ghost: - raise ValueError("No ghost found in GBX file") - - f_hash = None - mapname_from_filename = os.path.basename(fullpath).split("_")[1].split(".Replay")[0] +def _extracted_login_from_file(fullpath): + '''Extract a login from a tmnf 2020 replay manually''' + + # TODO fix underscores in filenames # + if "its_a_sheppy" in fullpath: + login_from_filename = "its_a_sheppy" + else: + login_from_filename = os.path.basename(fullpath).split("_")[0] with open(fullpath, "rb") as f: content = f.read() decoded_string = content.decode("ascii", errors="ignore") - if mapname_from_filename not in decoded_string: - raise ValueError("Mapname indicated by filename does not match map in file") - f_hash = hashlib.sha512(content).hexdigest() + if login_from_filename not in decoded_string: + raise ValueError("Login indicated by filename does not match login in file") + return login_from_filename - replay = ParsedReplay(filehash=f_hash, - race_time=ghost.race_time, - uploader=uploader, - filepath=fullpath, - map_uid=mapname_from_filename, - ghost_id=ghost.id, - login=ghost.login, - upload_dt=datetime.datetime.now().isoformat(), - cp_times=",".join(map(str, ghost.cp_times))) - m = Map(map_uid=replay.map_uid, mapname=replay.guess_map()) +def replay_from_path(fullpath, uploader=None): + '''Load a replay from uploaded path''' + + # use ghost wrapper to parse both tmnf and tm2020 # + ghost = tm2020parser.GhostWrapper(fullpath, uploader) + + # build a database replay from ghost wrapper # + replay = ParsedReplay(filehash=ghost.filehash, + race_time=ghost.race_time, + uploader=ghost.uploader, + filepath=ghost.fullpath, + map_uid=ghost.map_uid, + ghost_id=ghost.ghost_id, + login=ghost.login, + login_uid_tm2020=ghost.login_uid_tm2020, + upload_dt=ghost.upload_dt, + cp_times=ghost.cp_times, + game=ghost.game) + + # build database map object from replay # + m = Map(map_uid=replay.map_uid, mapname=replay.map_uid) + + # merge the map & commit and return the replay # db.session.merge(m) db.session.commit() - return replay def get_number_of_rank_x(rank): diff --git a/tm2020parser.py b/tm2020parser.py new file mode 100644 index 0000000..0715da1 --- /dev/null +++ b/tm2020parser.py @@ -0,0 +1,124 @@ +import re +import os +import datetime +import hashlib +import pygbx +import xmltodict + +class GhostWrapper(): + + def __init__(self, fullpath, uploader): + + # set parameters as attributes # + self.fullpath = fullpath + self.uploader = uploader + + # sanity check filename # + if not fullpath.lower().endswith(".gbx"): + raise ValueError("Path must be a .gbx file") + + # parse with normal GBX-parser + g = pygbx.Gbx(fullpath) + ghost = g.get_class_by_id(pygbx.GbxType.CTN_GHOST) + if not ghost: + raise ValueError("No ghost found in GBX file") + + # compute mapname # + if ghost.game_version.startswith("TmForever"): + mapname_from_filename = self._compute_map_from_filename() + else: + mapname_from_filename = None + + # compute filehash # + f_hash = None + content = None + with open(fullpath, "rb") as f: + + # read file & compute # + content = f.read() + decoded_string = content.decode("ascii", errors="ignore") + f_hash = hashlib.sha512(content).hexdigest() + + # general variables # + self.filehash = f_hash + self.ghost_id = ghost.id + self.login = ghost.login + self.race_time = ghost.race_time + self.cp_times = ",".join(map(str, ghost.cp_times)) + self.upload_dt = datetime.datetime.now().isoformat() + + # game version # + if ghost.game_version.startswith("TmForever"): + + # set version and map # + self.game = "tmnf" + self.map_uid = mapname_from_filename + self.login_uid_tm2020 = None + + # sanity check mapname for tmnf # + if mapname_from_filename not in decoded_string: + raise ValueError("Mapname indicated by filename does not match map in file") + + else: + + # set gameversion and compute from xml # + self.game = "tm2020" + self._set_from_2020() + + + def _compute_map_from_filename(self): + '''Compute the mapname from the filename if possible''' + + try: + underscore_count = len(self.fullpath.split("_")) + if underscore_count > 3 or underscore_count < 1: + error_msg = "Filename unexpected number of '_' ({})".format(underscore_count) + error_msg += ", does your (map-)name contain underscores? If yes remove them." + raise ValueError(error_msg) + mapname_from_filename = os.path.basename(self.fullpath).split("_")[1] + if ".Replay" in mapname_from_filename: + mapname_from_filename = mapname_from_filename.split(".Replay")[0] + except IndexError: + raise ValueError("Unexpected filename format. (IndexError)") + return mapname_from_filename + + def _set_from_2020(self): + '''Extract the XML-Header from TM2020-replays to set missing variables''' + + # Specify the pattern to match the XML-like string + pattern = re.compile(rb'', re.DOTALL) + + # Extract XML-like strings from the binary file + xml_strings = [] + with open(self.fullpath, 'rb') as binary_file: + binary_data = binary_file.read() + matches = re.findall(pattern, binary_data) + xml_strings = [match.decode('utf-8') for match in matches] + + # set vars # + xml_string = xml_strings[0] + xml_dict = xmltodict.parse(xml_string) + + self.map_uid = xml_dict["header"]["map"]["@name"] + print(self.map_uid) + + # load the name # + with open(self.fullpath, "rb") as f: + content = f.read() + result = content.split(b"\0")[:100] + + # filter out empty bytes # + result = list(filter(lambda x: x, result)) + + # find the uid # + uid_index = -1 + for i, el in enumerate(result): + if self.login in el.decode("ascii", errors="ignore"): + uid_index = i + break + + if uid_index < 1: + raise ValueError("Can't find user UID in replay file.") + else: + self.login_uid_tm2020 = self.login + self.login = result[uid_index-1].strip(b"\x16").decode("utf-8")