mirror of
https://github.com/FAUSheppy/open-web-leaderboard.git
synced 2025-12-06 15:11:35 +01:00
114 lines
3.9 KiB
Python
114 lines
3.9 KiB
Python
#!/usr/bin/python3
|
|
import json
|
|
import sqlite3
|
|
import player
|
|
import os
|
|
|
|
DATABASE_PLAYERS = "players.sqlite"
|
|
DATABASE_ROUNDS = "rounds.sqlite"
|
|
DATABASE_HISTORICAL = "players.sqlite"
|
|
|
|
class DatabaseConnection:
|
|
def __init__(self, basepath):
|
|
self.dbFormatString = "file:{}?mode=ro"
|
|
self.databasePlayers = self.dbFormatString.format(os.path.join(basepath, DATABASE_PLAYERS))
|
|
self.databaseRounds = self.dbFormatString.format(os.path.join(basepath, DATABASE_ROUNDS))
|
|
self.databaseHistorical = self.dbFormatString.format(
|
|
os.path.join(basepath, DATABASE_HISTORICAL))
|
|
|
|
self.connPlayers = sqlite3.connect(self.databasePlayers, uri=True)
|
|
self.connRounds = sqlite3.connect(self.databaseRounds, uri=True)
|
|
self.connHistorical = sqlite3.connect(self.databaseHistorical, uri=True)
|
|
|
|
def __del__(self):
|
|
self.connPlayers.close();
|
|
self.connRounds.close();
|
|
self.connHistorical.close();
|
|
|
|
def getTotalPlayers(self):
|
|
'''Get the total number of players in the database'''
|
|
|
|
cursor = self.connPlayers.cursor()
|
|
cursor.execute("SELECT Count(*) FROM players")
|
|
count = cursor.fetchone()[0]
|
|
return count
|
|
|
|
def getHistoricalForPlayerId(self, playerId):
|
|
'''Get historical data for a player'''
|
|
|
|
cursor = self.connHistorical.cursor()
|
|
cursor.execute("SELECT * FROM playerHistoricalData where id = ?", (playerId,))
|
|
rows = cursor.fetchall()
|
|
|
|
PLAYER_ID = 0
|
|
TIMESTAMP = 1
|
|
MU = 2
|
|
SIGMA = 3
|
|
|
|
playerIdDict = dict()
|
|
for r in rows:
|
|
timestampDict = dict()
|
|
timestampDict.update({ "mu" : r[MU] })
|
|
timestampDict.update({ "sigma" : r[SIGMA]})
|
|
playerIdDict.update({ r[TIMESTAMP] : timestampDict })
|
|
|
|
retDict = { rows[0][PLAYER_ID] : playerIdDict }
|
|
return retDict
|
|
|
|
def getPlayerById(self, playerId):
|
|
'''Get a player by his id'''
|
|
|
|
cursor = self.connPlayers.cursor()
|
|
cursor.execute("SELECT * FROM players where playerId = ?", (playerId,))
|
|
row = cursor.fetchone()
|
|
|
|
if(row):
|
|
playerInLeaderboard = player.PlayerInLeaderboard(playerRow)
|
|
else:
|
|
playerInLeaderboard = None
|
|
|
|
return row
|
|
|
|
def getRankRange(self, start, end):
|
|
'''Get a range of players by rank'''
|
|
|
|
cursor = self.connPlayers.cursor()
|
|
limit = end - start
|
|
sqlQuery = "Select * FROM players ORDER BY (mu - 2*sigma) DESC LIMIT ? OFFSET ?"
|
|
cursor.execute(sqlQuery, (limit, start))
|
|
rows = cursor.fetchall()
|
|
playerList = []
|
|
for row in rows:
|
|
playerList += [player.PlayerInLeaderboard(row)]
|
|
return playerList
|
|
|
|
def findPlayerByName(self, playerName):
|
|
'''Find a player by his name (prefer fullmatch)'''
|
|
|
|
cursor = self.connPlayers.cursor()
|
|
playerNamePrepared = "%{}%".format(playerName.replace("%", "%%"))
|
|
cursor.execute("SELECT * FROM players WHERE name == ?", (playerName,))
|
|
row = cursor.fetchone()
|
|
|
|
playerRow = None
|
|
if not playerRow:
|
|
cursor.execute("SELECT * FROM players WHERE name LIKE ?", (playerNamePrepared,))
|
|
playerRow = cursor.fetchone()
|
|
if not playerRow:
|
|
conn.close()
|
|
return (None, None)
|
|
|
|
playerInLeaderboard = player.PlayerInLeaderboard(playerRow)
|
|
player.rank = getPlayerRank(playerInLeaderboard)
|
|
return playerInLeaderboard
|
|
|
|
def getPlayerRank(self, player):
|
|
'''Calculate player rank - a player rank may change rapidly and
|
|
can't and shouldn't be used to identify a player'''
|
|
|
|
cursor = connPlayers.cursor()
|
|
cursor.execute("SELECT COUNT(*) from players where (mu-2*sigma) > (?-2*?);",
|
|
(playerInLeaderboard.mu, playerInLeaderboard.sigma))
|
|
rank = cursor.fetchone()[0]
|
|
return rank
|