mirror of
https://github.com/FAUSheppy/ths-speech
synced 2025-12-06 14:11:35 +01:00
92 lines
2.7 KiB
Python
92 lines
2.7 KiB
Python
import base64
|
|
import os.path
|
|
import audiosegment_wrapper as AudioSegment
|
|
from configparse_wrapper.cpwrap import CFG
|
|
|
|
audiofiles = []
|
|
FTP_DIR = CFG("shareDir")
|
|
|
|
def save_audio(filename, base64_string):
|
|
global audiofiles
|
|
audiofiles += [filename[:-4]]
|
|
decoded = None
|
|
orig_filename = filename[:-4]+"_orig"
|
|
try:
|
|
decoded = base64.b64decode(base64_string)
|
|
except TypeError:
|
|
return b"ERROR_INVALID_ENCODING_64"
|
|
if os.path.isfile(filename):
|
|
return b"ERROR_FILE_EXISTS"
|
|
write_file(orig_filename, decoded)
|
|
|
|
seg = AudioSegment.from_file(orig_filename)
|
|
seg = seg.resample(sample_rate_Hz=32000, sample_width=2, channels=1)
|
|
|
|
seg.export(filename,format="wav")
|
|
seg.export(FTP_DIR + filename,format="wav")
|
|
|
|
return b"SUCCESS"
|
|
|
|
def save_audio_chain(file_str_tupels):
|
|
completeAudio = None
|
|
for fname, base64_string in file_str_tupels:
|
|
print("Filename: {}".format(fname))
|
|
decoded = None
|
|
orig_filename = fname[:-4]+"_orig"
|
|
try:
|
|
decoded = base64.b64decode(base64_string)
|
|
except TypeError:
|
|
return b"ERROR_INVALID_ENCODING_64"
|
|
with open(orig_filename,"wb") as f:
|
|
f.write(decoded)
|
|
if completeAudio == None:
|
|
completeAudio = AudioSegment.from_file(orig_filename)
|
|
else:
|
|
completeAudio += AudioSegment.from_file(orig_filename)
|
|
if not completeAudio:
|
|
return b"ERROR_AUDIO_CONCAT_FAILED"
|
|
else:
|
|
completeAudio = completeAudio.resample(sample_rate_Hz=32000, sample_width=2, channels=1)
|
|
completeAudio.export(file_str_tupels[0][0],format="wav")
|
|
completeAudio.export(FTP_DIR + file_str_tupels[0][0],format="wav")
|
|
return b"SUCCESS"
|
|
|
|
def save_transcript(filename, transcript):
|
|
global audiofiles
|
|
try:
|
|
audiofiles.remove(filename[:4])
|
|
except:
|
|
print("Audiofile not in working list")
|
|
if os.path.isfile(filename):
|
|
pass
|
|
with open(filename + "_transcript","w") as f:
|
|
f.write(transcript)
|
|
with open(FTP_DIR + filename + "_transcript","w") as f:
|
|
f.write(transcript)
|
|
|
|
def get_transcript(filename):
|
|
global audiofiles
|
|
try:
|
|
with open("data/" + filename + ".wav_transcript","r") as f:
|
|
return f.read()
|
|
except FileNotFoundError:
|
|
if filename in audiofiles:
|
|
return "ERROR_FILE_STILL_IN_WORK"
|
|
return "ERROR_FILE_NOT_AVAILIABLE"
|
|
|
|
def filelist():
|
|
return ""
|
|
|
|
def fileinfo(filename):
|
|
return ""
|
|
|
|
def copy_to_output(filename):
|
|
return ""
|
|
|
|
def write_file(filename, data):
|
|
global FTP_DIR
|
|
with open(FTP_DIR + filename,"wb") as f:
|
|
f.write(data)
|
|
with open(filename,"wb") as f:
|
|
f.write(data)
|