mirror of
https://github.com/FAUSheppy/tmnf-replay-server.git
synced 2025-12-07 23:51:36 +01:00
feat: accept tm2020 replays
This commit is contained in:
69
server.py
69
server.py
@@ -9,6 +9,7 @@ import json
|
|||||||
import datetime
|
import datetime
|
||||||
|
|
||||||
from pygbx import Gbx, GbxType
|
from pygbx import Gbx, GbxType
|
||||||
|
import tm2020parser
|
||||||
|
|
||||||
import sqlalchemy
|
import sqlalchemy
|
||||||
from sqlalchemy import Column, Integer, String, Boolean, or_, and_, asc, desc
|
from sqlalchemy import Column, Integer, String, Boolean, or_, and_, asc, desc
|
||||||
@@ -92,16 +93,15 @@ class ParsedReplay(db.Model):
|
|||||||
login = Column(String)
|
login = Column(String)
|
||||||
cp_times = Column(String)
|
cp_times = Column(String)
|
||||||
|
|
||||||
|
login_uid_tm2020 = Column(String)
|
||||||
|
game = Column(String)
|
||||||
|
|
||||||
def clean_login(self):
|
def clean_login(self):
|
||||||
if "/" in self.login:
|
if "/" in self.login:
|
||||||
return self.login.split("/")[0]
|
return self.login.split("/")[0]
|
||||||
else:
|
else:
|
||||||
return self.login
|
return self.login
|
||||||
|
|
||||||
def guess_map(self):
|
|
||||||
base = os.path.basename(self.filepath)
|
|
||||||
return base.split("_")[1].split(".Replay")[0]
|
|
||||||
|
|
||||||
def get_human_readable_time(self):
|
def get_human_readable_time(self):
|
||||||
t = datetime.timedelta(microseconds=self.race_time*1000)
|
t = datetime.timedelta(microseconds=self.race_time*1000)
|
||||||
t_string = str(t)
|
t_string = str(t)
|
||||||
@@ -114,7 +114,7 @@ class ParsedReplay(db.Model):
|
|||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return "{time} on {map_n} by {login}".format(
|
return "{time} on {map_n} by {login}".format(
|
||||||
time=self.get_human_readable_time(),
|
time=self.get_human_readable_time(),
|
||||||
map_n=self.guess_map(), login=self.login)
|
map_n=self.map_uid, login=self.login)
|
||||||
|
|
||||||
def to_dict(self):
|
def to_dict(self):
|
||||||
d = dict()
|
d = dict()
|
||||||
@@ -232,40 +232,47 @@ class DataTable():
|
|||||||
|
|
||||||
return self.__build(results, total, filtered)
|
return self.__build(results, total, filtered)
|
||||||
|
|
||||||
def replay_from_path(fullpath, uploader=None):
|
def _extracted_login_from_file(fullpath):
|
||||||
|
'''Extract a login from a tmnf 2020 replay manually'''
|
||||||
|
|
||||||
if not fullpath.lower().endswith(".gbx"):
|
# TODO fix underscores in filenames #
|
||||||
raise ValueError("Path must be a .gbx file")
|
if "its_a_sheppy" in fullpath:
|
||||||
|
login_from_filename = "its_a_sheppy"
|
||||||
|
else:
|
||||||
g = Gbx(fullpath)
|
login_from_filename = os.path.basename(fullpath).split("_")[0]
|
||||||
ghost = g.get_class_by_id(GbxType.CTN_GHOST)
|
|
||||||
if not ghost:
|
|
||||||
raise ValueError("No ghost found in GBX file")
|
|
||||||
|
|
||||||
f_hash = None
|
|
||||||
mapname_from_filename = os.path.basename(fullpath).split("_")[1].split(".Replay")[0]
|
|
||||||
with open(fullpath, "rb") as f:
|
with open(fullpath, "rb") as f:
|
||||||
content = f.read()
|
content = f.read()
|
||||||
decoded_string = content.decode("ascii", errors="ignore")
|
decoded_string = content.decode("ascii", errors="ignore")
|
||||||
if mapname_from_filename not in decoded_string:
|
if login_from_filename not in decoded_string:
|
||||||
raise ValueError("Mapname indicated by filename does not match map in file")
|
raise ValueError("Login indicated by filename does not match login in file")
|
||||||
f_hash = hashlib.sha512(content).hexdigest()
|
return login_from_filename
|
||||||
|
|
||||||
replay = ParsedReplay(filehash=f_hash,
|
|
||||||
race_time=ghost.race_time,
|
|
||||||
uploader=uploader,
|
|
||||||
filepath=fullpath,
|
|
||||||
map_uid=mapname_from_filename,
|
|
||||||
ghost_id=ghost.id,
|
|
||||||
login=ghost.login,
|
|
||||||
upload_dt=datetime.datetime.now().isoformat(),
|
|
||||||
cp_times=",".join(map(str, ghost.cp_times)))
|
|
||||||
|
|
||||||
m = Map(map_uid=replay.map_uid, mapname=replay.guess_map())
|
def replay_from_path(fullpath, uploader=None):
|
||||||
|
'''Load a replay from uploaded path'''
|
||||||
|
|
||||||
|
# use ghost wrapper to parse both tmnf and tm2020 #
|
||||||
|
ghost = tm2020parser.GhostWrapper(fullpath, uploader)
|
||||||
|
|
||||||
|
# build a database replay from ghost wrapper #
|
||||||
|
replay = ParsedReplay(filehash=ghost.filehash,
|
||||||
|
race_time=ghost.race_time,
|
||||||
|
uploader=ghost.uploader,
|
||||||
|
filepath=ghost.fullpath,
|
||||||
|
map_uid=ghost.map_uid,
|
||||||
|
ghost_id=ghost.ghost_id,
|
||||||
|
login=ghost.login,
|
||||||
|
login_uid_tm2020=ghost.login_uid_tm2020,
|
||||||
|
upload_dt=ghost.upload_dt,
|
||||||
|
cp_times=ghost.cp_times,
|
||||||
|
game=ghost.game)
|
||||||
|
|
||||||
|
# build database map object from replay #
|
||||||
|
m = Map(map_uid=replay.map_uid, mapname=replay.map_uid)
|
||||||
|
|
||||||
|
# merge the map & commit and return the replay #
|
||||||
db.session.merge(m)
|
db.session.merge(m)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
return replay
|
return replay
|
||||||
|
|
||||||
def get_number_of_rank_x(rank):
|
def get_number_of_rank_x(rank):
|
||||||
|
|||||||
124
tm2020parser.py
Normal file
124
tm2020parser.py
Normal file
@@ -0,0 +1,124 @@
|
|||||||
|
import re
|
||||||
|
import os
|
||||||
|
import datetime
|
||||||
|
import hashlib
|
||||||
|
import pygbx
|
||||||
|
import xmltodict
|
||||||
|
|
||||||
|
class GhostWrapper():
|
||||||
|
|
||||||
|
def __init__(self, fullpath, uploader):
|
||||||
|
|
||||||
|
# set parameters as attributes #
|
||||||
|
self.fullpath = fullpath
|
||||||
|
self.uploader = uploader
|
||||||
|
|
||||||
|
# sanity check filename #
|
||||||
|
if not fullpath.lower().endswith(".gbx"):
|
||||||
|
raise ValueError("Path must be a .gbx file")
|
||||||
|
|
||||||
|
# parse with normal GBX-parser
|
||||||
|
g = pygbx.Gbx(fullpath)
|
||||||
|
ghost = g.get_class_by_id(pygbx.GbxType.CTN_GHOST)
|
||||||
|
if not ghost:
|
||||||
|
raise ValueError("No ghost found in GBX file")
|
||||||
|
|
||||||
|
# compute mapname #
|
||||||
|
if ghost.game_version.startswith("TmForever"):
|
||||||
|
mapname_from_filename = self._compute_map_from_filename()
|
||||||
|
else:
|
||||||
|
mapname_from_filename = None
|
||||||
|
|
||||||
|
# compute filehash #
|
||||||
|
f_hash = None
|
||||||
|
content = None
|
||||||
|
with open(fullpath, "rb") as f:
|
||||||
|
|
||||||
|
# read file & compute #
|
||||||
|
content = f.read()
|
||||||
|
decoded_string = content.decode("ascii", errors="ignore")
|
||||||
|
f_hash = hashlib.sha512(content).hexdigest()
|
||||||
|
|
||||||
|
# general variables #
|
||||||
|
self.filehash = f_hash
|
||||||
|
self.ghost_id = ghost.id
|
||||||
|
self.login = ghost.login
|
||||||
|
self.race_time = ghost.race_time
|
||||||
|
self.cp_times = ",".join(map(str, ghost.cp_times))
|
||||||
|
self.upload_dt = datetime.datetime.now().isoformat()
|
||||||
|
|
||||||
|
# game version #
|
||||||
|
if ghost.game_version.startswith("TmForever"):
|
||||||
|
|
||||||
|
# set version and map #
|
||||||
|
self.game = "tmnf"
|
||||||
|
self.map_uid = mapname_from_filename
|
||||||
|
self.login_uid_tm2020 = None
|
||||||
|
|
||||||
|
# sanity check mapname for tmnf #
|
||||||
|
if mapname_from_filename not in decoded_string:
|
||||||
|
raise ValueError("Mapname indicated by filename does not match map in file")
|
||||||
|
|
||||||
|
else:
|
||||||
|
|
||||||
|
# set gameversion and compute from xml #
|
||||||
|
self.game = "tm2020"
|
||||||
|
self._set_from_2020()
|
||||||
|
|
||||||
|
|
||||||
|
def _compute_map_from_filename(self):
|
||||||
|
'''Compute the mapname from the filename if possible'''
|
||||||
|
|
||||||
|
try:
|
||||||
|
underscore_count = len(self.fullpath.split("_"))
|
||||||
|
if underscore_count > 3 or underscore_count < 1:
|
||||||
|
error_msg = "Filename unexpected number of '_' ({})".format(underscore_count)
|
||||||
|
error_msg += ", does your (map-)name contain underscores? If yes remove them."
|
||||||
|
raise ValueError(error_msg)
|
||||||
|
mapname_from_filename = os.path.basename(self.fullpath).split("_")[1]
|
||||||
|
if ".Replay" in mapname_from_filename:
|
||||||
|
mapname_from_filename = mapname_from_filename.split(".Replay")[0]
|
||||||
|
except IndexError:
|
||||||
|
raise ValueError("Unexpected filename format. (IndexError)")
|
||||||
|
return mapname_from_filename
|
||||||
|
|
||||||
|
def _set_from_2020(self):
|
||||||
|
'''Extract the XML-Header from TM2020-replays to set missing variables'''
|
||||||
|
|
||||||
|
# Specify the pattern to match the XML-like string
|
||||||
|
pattern = re.compile(rb'<header.*?</header>', re.DOTALL)
|
||||||
|
|
||||||
|
# Extract XML-like strings from the binary file
|
||||||
|
xml_strings = []
|
||||||
|
with open(self.fullpath, 'rb') as binary_file:
|
||||||
|
binary_data = binary_file.read()
|
||||||
|
matches = re.findall(pattern, binary_data)
|
||||||
|
xml_strings = [match.decode('utf-8') for match in matches]
|
||||||
|
|
||||||
|
# set vars #
|
||||||
|
xml_string = xml_strings[0]
|
||||||
|
xml_dict = xmltodict.parse(xml_string)
|
||||||
|
|
||||||
|
self.map_uid = xml_dict["header"]["map"]["@name"]
|
||||||
|
print(self.map_uid)
|
||||||
|
|
||||||
|
# load the name #
|
||||||
|
with open(self.fullpath, "rb") as f:
|
||||||
|
content = f.read()
|
||||||
|
result = content.split(b"\0")[:100]
|
||||||
|
|
||||||
|
# filter out empty bytes #
|
||||||
|
result = list(filter(lambda x: x, result))
|
||||||
|
|
||||||
|
# find the uid #
|
||||||
|
uid_index = -1
|
||||||
|
for i, el in enumerate(result):
|
||||||
|
if self.login in el.decode("ascii", errors="ignore"):
|
||||||
|
uid_index = i
|
||||||
|
break
|
||||||
|
|
||||||
|
if uid_index < 1:
|
||||||
|
raise ValueError("Can't find user UID in replay file.")
|
||||||
|
else:
|
||||||
|
self.login_uid_tm2020 = self.login
|
||||||
|
self.login = result[uid_index-1].strip(b"\x16").decode("utf-8")
|
||||||
Reference in New Issue
Block a user