mirror of
https://github.com/FAUSheppy/ths-speech
synced 2025-12-06 21:01:36 +01:00
1061 lines
50 KiB
Python
1061 lines
50 KiB
Python
"""
|
|
This module simply exposes a wrapper of a pydub.AudioSegment object.
|
|
"""
|
|
from __future__ import division
|
|
from __future__ import print_function
|
|
|
|
import collections
|
|
import functools
|
|
import itertools
|
|
import math
|
|
import numpy as np
|
|
import pickle
|
|
import platform
|
|
import pydub
|
|
import os
|
|
import random
|
|
import scipy.signal as signal
|
|
import string
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import warnings
|
|
import webrtcvad
|
|
|
|
MS_PER_S = 1000
|
|
S_PER_MIN = 60
|
|
MS_PER_MIN = MS_PER_S * S_PER_MIN
|
|
|
|
def deprecated(func):
|
|
"""
|
|
Deprecator decorator.
|
|
"""
|
|
|
|
@functools.wraps(func)
|
|
def new_func(*args, **kwargs):
|
|
warnings.warn("Call to deprecated function {}.".format(func.__name__), category=DeprecationWarning, stacklevel=2)
|
|
return func(*args, **kwargs)
|
|
|
|
return new_func
|
|
|
|
class AudioSegment:
|
|
"""
|
|
This class is a wrapper for a pydub.AudioSegment that provides additional methods.
|
|
"""
|
|
|
|
def __init__(self, pydubseg, name):
|
|
self.seg = pydubseg
|
|
self.name = name
|
|
|
|
def __getattr__(self, attr):
|
|
orig_attr = self.seg.__getattribute__(attr)
|
|
if callable(orig_attr):
|
|
def hooked(*args, **kwargs):
|
|
result = orig_attr(*args, **kwargs)
|
|
if result == self.seg:
|
|
return self
|
|
elif type(result) == pydub.AudioSegment:
|
|
return AudioSegment(result, self.name)
|
|
else:
|
|
return result
|
|
return hooked
|
|
else:
|
|
return orig_attr
|
|
|
|
def __len__(self):
|
|
return len(self.seg)
|
|
|
|
def __eq__(self, other):
|
|
return self.seg == other
|
|
|
|
def __ne__(self, other):
|
|
return self.seg != other
|
|
|
|
def __iter__(self):
|
|
return (x for x in self.seg)
|
|
|
|
def __getitem__(self, millisecond):
|
|
return AudioSegment(self.seg[millisecond], self.name)
|
|
|
|
def __add__(self, arg):
|
|
if type(arg) == AudioSegment:
|
|
self.seg._data = self.seg._data + arg.seg._data
|
|
else:
|
|
self.seg = self.seg + arg
|
|
return self
|
|
|
|
def __radd__(self, rarg):
|
|
return self.seg.__radd__(rarg)
|
|
|
|
def __repr__(self):
|
|
return str(self)
|
|
|
|
def __str__(self):
|
|
s = "%s: %s channels, %s bit, sampled @ %s kHz, %.3fs long" %\
|
|
(self.name, str(self.channels), str(self.sample_width * 8),\
|
|
str(self.frame_rate / 1000.0), self.duration_seconds)
|
|
return s
|
|
|
|
def __sub__(self, arg):
|
|
if type(arg) == AudioSegment:
|
|
self.seg = self.seg - arg.seg
|
|
else:
|
|
self.seg = self.seg - arg
|
|
return self
|
|
|
|
def __mul__(self, arg):
|
|
if type(arg) == AudioSegment:
|
|
self.seg = self.seg * arg.seg
|
|
else:
|
|
self.seg = self.seg * arg
|
|
return self
|
|
|
|
@property
|
|
def spl(self):
|
|
"""
|
|
Sound Pressure Level - defined as 20 * log10(abs(value)).
|
|
|
|
Returns a numpy array of SPL dB values.
|
|
"""
|
|
return 20.0 * np.log10(np.abs(self.to_numpy_array() + 1E-9))
|
|
|
|
@staticmethod
|
|
def _bandpass_filter(data, low, high, fs, order=5):
|
|
"""
|
|
:param data: The data (numpy array) to be filtered.
|
|
:param low: The low cutoff in Hz.
|
|
:param high: The high cutoff in Hz.
|
|
:param fs: The sample rate (in Hz) of the data.
|
|
:param order: The order of the filter. The higher the order, the tighter the roll-off.
|
|
:returns: Filtered data (numpy array).
|
|
"""
|
|
nyq = 0.5 * fs
|
|
low = low / nyq
|
|
high = high / nyq
|
|
b, a = signal.butter(order, [low, high], btype='band')
|
|
y = signal.lfilter(b, a, data)
|
|
return y
|
|
|
|
@staticmethod
|
|
def lowpass_filter(data, cutoff, fs, order=5):
|
|
"""
|
|
:param data: The data (numpy array) to be filtered.
|
|
:param cutoff: The high cutoff in Hz.
|
|
:param fs: The sample rate in Hz of the data.
|
|
:param order: The order of the filter. The higher the order, the tighter the roll-off.
|
|
:returns: Filtered data (numpy array).
|
|
"""
|
|
nyq = 0.5 * fs
|
|
normal_cutoff = cutoff / nyq
|
|
b, a = signal.butter(order, normal_cutoff, btype='low', analog=False)
|
|
y = signal.lfilter(b, a, data)
|
|
return y
|
|
|
|
def auditory_scene_analysis(self):
|
|
"""
|
|
Algorithm based on paper: Auditory Segmentation Based on Onset and Offset Analysis,
|
|
by Hu and Wang, 2007.
|
|
"""
|
|
|
|
import matplotlib.pyplot as plt
|
|
|
|
def visualize_time_domain(seg, title=""):
|
|
plt.plot(seg)
|
|
plt.title(title)
|
|
plt.show()
|
|
plt.clf()
|
|
|
|
def visualize(spect, frequencies, title=""):
|
|
i = 0
|
|
for freq, (index, row) in zip(frequencies[::-1], enumerate(spect[::-1, :])):
|
|
plt.subplot(spect.shape[0], 1, index + 1)
|
|
if i == 0:
|
|
plt.title(title)
|
|
i += 1
|
|
plt.ylabel("{0:.0f}".format(freq))
|
|
plt.plot(row)
|
|
plt.show()
|
|
plt.clf()
|
|
|
|
# Normalize self into 25dB average SPL
|
|
normalized = self.normalize_spl_by_average(db=25)
|
|
visualize_time_domain(normalized.to_numpy_array(), "Normalized")
|
|
# Do a band-pass filter in each frequency
|
|
data = normalized.to_numpy_array()
|
|
start_frequency = 50
|
|
stop_frequency = 8000
|
|
start = np.log10(start_frequency)
|
|
stop = np.log10(stop_frequency)
|
|
frequencies = np.logspace(start, stop, num=10, endpoint=True, base=10.0)
|
|
print("Dealing with the following frequencies:", frequencies)
|
|
rows = [AudioSegment._bandpass_filter(data, freq*0.8, freq*1.2, self.frame_rate) for freq in frequencies]
|
|
rows = np.array(rows)
|
|
spect = np.vstack(rows)
|
|
visualize(spect, frequencies, "After bandpass filtering (cochlear model)")
|
|
|
|
# Half-wave rectify each frequency channel
|
|
spect[spect < 0] = 0
|
|
visualize(spect, frequencies, "After half-wave rectification in each frequency")
|
|
|
|
# Low-pass filter each frequency channel
|
|
spect = np.apply_along_axis(AudioSegment.lowpass_filter, 1, spect, 30, self.frame_rate, 6)
|
|
visualize(spect, frequencies, "After low-pass filtering in each frequency")
|
|
|
|
# Downsample each frequency to 400 Hz
|
|
downsample_freq_hz = 400
|
|
if self.frame_rate > downsample_freq_hz:
|
|
step = int(round(self.frame_rate / downsample_freq_hz))
|
|
spect = spect[:, ::step]
|
|
visualize(spect, frequencies, "After downsampling in each frequency")
|
|
|
|
# Now you have the temporal envelope of each frequency channel
|
|
|
|
# Smoothing
|
|
scales = [(6, 1/4), (6, 1/14), (1/2, 1/14)]
|
|
thetas = [0.95, 0.95, 0.85]
|
|
## For each (sc, st) scale, smooth across time using st, then across frequency using sc
|
|
gaussian = lambda x, mu, sig: np.exp(-np.power(x - mu, 2.0) / (2 * np.power(sig, 2.0)))
|
|
gaussian_kernel = lambda sig: gaussian(np.linspace(-10, 10, len(frequencies) / 2), 0, sig)
|
|
spectrograms = []
|
|
for sc, st in scales:
|
|
time_smoothed = np.apply_along_axis(AudioSegment.lowpass_filter, 1, spect, 1/st, downsample_freq_hz, 6)
|
|
visualize(time_smoothed, frequencies, "After time smoothing with scale: " + str(st))
|
|
freq_smoothed = np.apply_along_axis(np.convolve, 0, spect, gaussian_kernel(sc))
|
|
spectrograms.append(freq_smoothed)
|
|
visualize(freq_smoothed, frequencies, "After time and frequency smoothing with scales (freq) " + str(sc) + " and (time) " + str(st))
|
|
## Now we have a set of scale-space spectrograms of different scales (sc, st)
|
|
|
|
# Onset/Offset Detection and Matching
|
|
def theta_on(spect):
|
|
return np.nanmean(spect) + np.nanstd(spect)
|
|
|
|
def compute_peaks_or_valleys_of_first_derivative(s, do_peaks=True):
|
|
"""
|
|
Takes a spectrogram and returns a 2D array of the form:
|
|
|
|
0 0 0 1 0 0 1 0 0 0 1 <-- Frequency 0
|
|
0 0 1 0 0 0 0 0 0 1 0 <-- Frequency 1
|
|
0 0 0 0 0 0 1 0 1 0 0 <-- Frequency 2
|
|
*** Time axis *******
|
|
|
|
Where a 1 means that the value in that time bin in the spectrogram corresponds to
|
|
a peak/valley in the first derivative.
|
|
"""
|
|
gradient = np.nan_to_num(np.apply_along_axis(np.gradient, 1, s), copy=False)
|
|
half_window = 4
|
|
if do_peaks:
|
|
indexes = [signal.argrelextrema(gradient[i, :], np.greater, order=half_window) for i in range(gradient.shape[0])]
|
|
else:
|
|
indexes = [signal.argrelextrema(gradient[i, :], np.less, order=half_window) for i in range(gradient.shape[0])]
|
|
extrema = np.zeros(s.shape)
|
|
for row_index, index_array in enumerate(indexes):
|
|
# Each index_array is a list of indexes corresponding to all the extrema in a given row
|
|
for col_index in index_array:
|
|
extrema[row_index, col_index] = 1
|
|
return extrema
|
|
|
|
for spect, (sc, st) in zip(spectrograms, scales):
|
|
# Compute sudden upward changes in spect, these are onsets of events
|
|
onsets = compute_peaks_or_valleys_of_first_derivative(spect)
|
|
# Compute sudden downward changes in spect, these are offsets of events
|
|
offsets = compute_peaks_or_valleys_of_first_derivative(spect, do_peaks=False)
|
|
print("TOTAL ONSETS:", np.sum(onsets, axis=1))
|
|
print("TOTAL OFFSETS:", np.sum(offsets, axis=1))
|
|
exit()
|
|
|
|
# onsets and offsets are 2D arrays
|
|
|
|
## Determine the offset time for each onset:
|
|
### If t_on[c, i] represents the time of the ith onset in frequency channel c, the corresponding offset
|
|
### must occur between t_on[c, i] and t_on[c, i+1]
|
|
### If there are more than one offsets candidates in this range, choose the one with largest intensity decrease.
|
|
## Create onset/offset fronts by connecting onsets across frequency channels (connect two onsets
|
|
## if they occur within 20ms of each other). Start over whenever a frequency band does not contain an offset
|
|
## in this range. Do the same procedure for offsets. Now you have onset and offset fronts.
|
|
## Now hook up the onsets with the offsets to form segments:
|
|
## For each onset front, (t_on[c, i1, t_on[c + 1, i2], ..., t_on[c + m - 1, im]):
|
|
## matching_offsets = (t_off[c, i1], t_off[c + 1, i2], ..., t_off[c + m - 1, im])
|
|
## Get all offset fronts which contain at least one of offset time found in matching_offsets
|
|
## Among these offset fronts, the one that crosses the most of matching_offsets is chosen,
|
|
## - call this offset front: matching_offset_front
|
|
## Update all t_offs in matching_offsets whose 'c's are in matching_offset_front to be 'matched', and
|
|
## - update their times to the corresponding channel offset in matching_offset_front.
|
|
## If all t_offs in matching_offsets are 'matched', continue to next onset front
|
|
## Now go through all the segments you have created and break them up along frequencies if the temporal
|
|
## envelopes don't match well enough. That is, if we have two adjacent channels c and c+1, and they
|
|
## are part of the same segment as determined above, break this segment into two along these lines
|
|
## if the correlation between them is below theta_c. Theta_c is thetas[i] where i depends on the scale.
|
|
|
|
# Multiscale Integration
|
|
##
|
|
## TODO
|
|
|
|
def detect_voice(self, prob_detect_voice=0.5):
|
|
"""
|
|
Returns self as a list of tuples:
|
|
[('v', voiced segment), ('u', unvoiced segment), (etc.)]
|
|
|
|
The overall order of the AudioSegment is preserved.
|
|
|
|
:param prob_detect_voice: The raw probability that any random 20ms window of the audio file
|
|
contains voice.
|
|
:returns: The described list.
|
|
"""
|
|
assert self.frame_rate in (48000, 32000, 16000, 8000), "Try resampling to one of the allowed frame rates."
|
|
assert self.sample_width == 2, "Try resampling to 16 bit."
|
|
assert self.channels == 1, "Try resampling to one channel."
|
|
|
|
class model_class:
|
|
def __init__(self, aggressiveness):
|
|
self.v = webrtcvad.Vad(int(aggressiveness))
|
|
|
|
def predict(self, vector):
|
|
if self.v.is_speech(vector.raw_data, vector.frame_rate):
|
|
return 1
|
|
else:
|
|
return 0
|
|
|
|
model = model_class(aggressiveness=2)
|
|
pyesno = 0.3 # Probability of the next 20 ms being unvoiced given that this 20 ms was voiced
|
|
pnoyes = 0.2 # Probability of the next 20 ms being voiced given that this 20 ms was unvoiced
|
|
p_realyes_outputyes = 0.4 # WebRTCVAD has a very high FP rate - just because it says yes, doesn't mean much
|
|
p_realyes_outputno = 0.05 # If it says no, we can be very certain that it really is a no
|
|
p_yes_raw = prob_detect_voice
|
|
filtered = self.detect_event(model=model,
|
|
ms_per_input=20,
|
|
transition_matrix=(pyesno, pnoyes),
|
|
model_stats=(p_realyes_outputyes, p_realyes_outputno),
|
|
event_length_s=0.25,
|
|
prob_raw_yes=p_yes_raw)
|
|
ret = []
|
|
for tup in filtered:
|
|
t = ('v', tup[1]) if tup[0] == 'y' else ('u', tup[1])
|
|
ret.append(t)
|
|
return ret
|
|
|
|
def dice(self, seconds, zero_pad=False):
|
|
"""
|
|
Cuts the AudioSegment into `seconds` segments (at most). So for example, if seconds=10,
|
|
this will return a list of AudioSegments, in order, where each one is at most 10 seconds
|
|
long. If `zero_pad` is True, the last item AudioSegment object will be zero padded to result
|
|
in `seconds` seconds.
|
|
|
|
:param seconds: The length of each segment in seconds. Can be either a float/int, in which case
|
|
`self.duration_seconds` / `seconds` are made, each of `seconds` length, or a
|
|
list-like can be given, in which case the given list must sum to
|
|
`self.duration_seconds` and each segment is specified by the list - e.g.
|
|
the 9th AudioSegment in the returned list will be `seconds[8]` seconds long.
|
|
:param zero_pad: Whether to zero_pad the final segment if necessary. Ignored if `seconds` is
|
|
a list-like.
|
|
:returns: A list of AudioSegments, each of which is the appropriate number of seconds long.
|
|
:raises: ValueError if a list-like is given for `seconds` and the list's durations do not sum
|
|
to `self.duration_seconds`.
|
|
"""
|
|
try:
|
|
total_s = sum(seconds)
|
|
if not (self.duration_seconds <= total_s + 1 and self.duration_seconds >= total_s - 1):
|
|
raise ValueError("`seconds` does not sum to within one second of the duration of this AudioSegment.\
|
|
given total seconds: %s and self.duration_seconds: %s" % (total_s, self.duration_seconds))
|
|
starts = []
|
|
stops = []
|
|
time_ms = 0
|
|
for dur in seconds:
|
|
starts.append(time_ms)
|
|
time_ms += dur * MS_PER_S
|
|
stops.append(time_ms)
|
|
zero_pad = False
|
|
except TypeError:
|
|
# `seconds` is not a list
|
|
starts = range(0, int(round(self.duration_seconds * MS_PER_S)), int(round(seconds * MS_PER_S)))
|
|
stops = (min(self.duration_seconds * MS_PER_S, start + seconds * MS_PER_S) for start in starts)
|
|
outs = [self[start:stop] for start, stop in zip(starts, stops)]
|
|
out_lens = [out.duration_seconds for out in outs]
|
|
# Check if our last slice is within one ms of expected - if so, we don't need to zero pad
|
|
if zero_pad and not (out_lens[-1] <= seconds * MS_PER_S + 1 and out_lens[-1] >= seconds * MS_PER_S - 1):
|
|
num_zeros = self.frame_rate * (seconds * MS_PER_S - out_lens[-1])
|
|
outs[-1] = outs[-1].zero_extend(num_samples=num_zeros)
|
|
return outs
|
|
|
|
def detect_event(self, model, ms_per_input, transition_matrix, model_stats, event_length_s,
|
|
start_as_yes=False, prob_raw_yes=0.5):
|
|
"""
|
|
A list of tuples of the form [('n', AudioSegment), ('y', AudioSegment), etc.] is returned, where tuples
|
|
of the form ('n', AudioSegment) are the segments of sound where the event was not detected,
|
|
while ('y', AudioSegment) tuples were the segments of sound where the event was detected.
|
|
|
|
.. code-block:: python
|
|
|
|
# Example usage
|
|
import audiosegment
|
|
import keras
|
|
import keras.models
|
|
import numpy as np
|
|
import sys
|
|
|
|
class Model:
|
|
def __init__(self, modelpath):
|
|
self.model = keras.models.load_model(modelpath)
|
|
|
|
def predict(self, seg):
|
|
_bins, fft_vals = seg.fft()
|
|
fft_vals = np.abs(fft_vals) / len(fft_vals)
|
|
predicted_np_form = self.model.predict(np.array([fft_vals]), batch_size=1)
|
|
prediction_as_int = int(round(predicted_np_form[0][0]))
|
|
return prediction_as_int
|
|
|
|
modelpath = sys.argv[1]
|
|
wavpath = sys.argv[2]
|
|
model = Model(modelpath)
|
|
seg = audiosegment.from_file(wavpath).resample(sample_rate_Hz=32000, sample_width=2, channels=1)
|
|
pyes_to_no = 0.3 # The probability of one 30 ms sample being an event, and the next one not
|
|
pno_to_yes = 0.2 # The probability of one 30 ms sample not being an event, and the next one yes
|
|
ptrue_pos_rate = 0.8 # The true positive rate (probability of a predicted yes being right)
|
|
pfalse_neg_rate = 0.3 # The false negative rate (probability of a predicted no being wrong)
|
|
raw_prob = 0.7 # The raw probability of seeing the event in any random 30 ms slice of this file
|
|
events = seg.detect_event(model, ms_per_input=30, transition_matrix=[pyes_to_no, pno_to_yes],
|
|
model_stats=[ptrue_pos_rate, pfalse_neg_rate], event_length_s=0.25,
|
|
prob_raw_yes=raw_prob)
|
|
nos = [event[1] for event in events if event[0] == 'n']
|
|
yeses = [event[1] for event in events if event[0] == 'y']
|
|
if len(nos) > 1:
|
|
notdetected = nos[0].reduce(nos[1:])
|
|
notdetected.export("notdetected.wav", format="WAV")
|
|
if len(yeses) > 1:
|
|
detected = yeses[0].reduce(yeses[1:])
|
|
detected.export("detected.wav", format="WAV")
|
|
|
|
|
|
:param model: The model. The model must have a predict() function which takes an AudioSegment
|
|
of `ms_per_input` number of ms and which outputs 1 if the audio event is detected
|
|
in that input, and 0 if not. Make sure to resample the AudioSegment to the right
|
|
values before calling this function on it.
|
|
|
|
:param ms_per_input: The number of ms of AudioSegment to be fed into the model at a time. If this does not
|
|
come out even, the last AudioSegment will be zero-padded.
|
|
|
|
:param transition_matrix: An iterable of the form: [p(yes->no), p(no->yes)]. That is, the probability of moving
|
|
from a 'yes' state to a 'no' state and the probability of vice versa.
|
|
|
|
:param model_stats: An iterable of the form: [p(reality=1|output=1), p(reality=1|output=0)]. That is,
|
|
the probability of the ground truth really being a 1, given that the model output a 1,
|
|
and the probability of the ground truth being a 1, given that the model output a 0.
|
|
|
|
:param event_length_s: The typical duration of the event you are looking for in seconds (can be a float).
|
|
|
|
:param start_as_yes: If True, the first `ms_per_input` will be in the 'y' category. Otherwise it will be
|
|
in the 'n' category.
|
|
|
|
:param prob_raw_yes: The raw probability of finding the event in any given `ms_per_input` vector.
|
|
|
|
:returns: A list of tuples of the form [('n', AudioSegment), ('y', AudioSegment), etc.],
|
|
where over the course of the list, the AudioSegment in tuple 3 picks up
|
|
where the one in tuple 2 left off.
|
|
|
|
:raises: ValueError if `ms_per_input` is negative or larger than the number of ms in this
|
|
AudioSegment; if `transition_matrix` or `model_stats` do not have a __len__ attribute
|
|
or are not length 2; if the values in `transition_matrix` or `model_stats` are not
|
|
in the closed interval [0.0, 1.0].
|
|
"""
|
|
if ms_per_input < 0 or ms_per_input / MS_PER_S > self.duration_seconds:
|
|
raise ValueError("ms_per_input cannot be negative and cannot be longer than the duration of the AudioSegment."\
|
|
" The given value was " + str(ms_per_input))
|
|
elif not hasattr(transition_matrix, "__len__") or len(transition_matrix) != 2:
|
|
raise ValueError("transition_matrix must be an iterable of length 2.")
|
|
elif not hasattr(model_stats, "__len__") or len(model_stats) != 2:
|
|
raise ValueError("model_stats must be an iterable of length 2.")
|
|
elif any([True for prob in transition_matrix if prob > 1.0 or prob < 0.0]):
|
|
raise ValueError("Values in transition_matrix are probabilities, and so must be in the range [0.0, 1.0].")
|
|
elif any([True for prob in model_stats if prob > 1.0 or prob < 0.0]):
|
|
raise ValueError("Values in model_stats are probabilities, and so must be in the range [0.0, 1.0].")
|
|
elif prob_raw_yes > 1.0 or prob_raw_yes < 0.0:
|
|
raise ValueError("`prob_raw_yes` is a probability, and so must be in the range [0.0, 1.0]")
|
|
|
|
# Get the yeses or nos for when the filter is triggered (when the event is on/off)
|
|
filter_indices = [yes_or_no for yes_or_no in self._get_filter_indices(start_as_yes,
|
|
prob_raw_yes,
|
|
ms_per_input,
|
|
model,
|
|
transition_matrix,
|
|
model_stats)]
|
|
# Run a homogeneity filter over the values to make local regions more self-similar (reduce noise)
|
|
ret = self._homogeneity_filter(filter_indices, window_size=int(round(0.25 * MS_PER_S / ms_per_input)))
|
|
# Group the consecutive ones together
|
|
ret = self._group_filter_values(ret, ms_per_input)
|
|
# Take the groups and turn them into AudioSegment objects
|
|
real_ret = self._reduce_filtered_segments(ret)
|
|
|
|
return real_ret
|
|
|
|
def _get_filter_indices(self, start_as_yes, prob_raw_yes, ms_per_input, model, transition_matrix, model_stats):
|
|
"""
|
|
This has been broken out of the `filter` function to reduce cognitive load.
|
|
"""
|
|
filter_triggered = 1 if start_as_yes else 0
|
|
prob_raw_no = 1.0 - prob_raw_yes
|
|
for segment, _timestamp in self.generate_frames_as_segments(ms_per_input):
|
|
yield filter_triggered
|
|
observation = int(round(model.predict(segment)))
|
|
assert observation == 1 or observation == 0, "The given model did not output a 1 or a 0, output: "\
|
|
+ str(observation)
|
|
prob_hyp_yes_given_last_hyp = 1.0 - transition_matrix[0] if filter_triggered else transition_matrix[1]
|
|
prob_hyp_no_given_last_hyp = transition_matrix[0] if filter_triggered else 1.0 - transition_matrix[1]
|
|
prob_hyp_yes_given_data = model_stats[0] if observation == 1 else model_stats[1]
|
|
prob_hyp_no_given_data = 1.0 - model_stats[0] if observation == 1 else 1.0 - model_stats[1]
|
|
hypothesis_yes = prob_raw_yes * prob_hyp_yes_given_last_hyp * prob_hyp_yes_given_data
|
|
hypothesis_no = prob_raw_no * prob_hyp_no_given_last_hyp * prob_hyp_no_given_data
|
|
# make a list of ints - each is 0 or 1. The number of 1s is hypotheis_yes * 100
|
|
# the number of 0s is hypothesis_no * 100
|
|
distribution = [1 for i in range(int(round(hypothesis_yes * 100)))]
|
|
distribution.extend([0 for i in range(int(round(hypothesis_no * 100)))])
|
|
# shuffle
|
|
random.shuffle(distribution)
|
|
filter_triggered = random.choice(distribution)
|
|
|
|
def _group_filter_values(self, filter_indices, ms_per_input):
|
|
"""
|
|
This has been broken out of the `filter` function to reduce cognitive load.
|
|
"""
|
|
ret = []
|
|
for filter_value, (_segment, timestamp) in zip(filter_indices, self.generate_frames_as_segments(ms_per_input)):
|
|
if filter_value == 1:
|
|
if len(ret) > 0 and ret[-1][0] == 'n':
|
|
ret.append(['y', timestamp]) # The last one was different, so we create a new one
|
|
elif len(ret) > 0 and ret[-1][0] == 'y':
|
|
ret[-1][1] = timestamp # The last one was the same as this one, so just update the timestamp
|
|
else:
|
|
ret.append(['y', timestamp]) # This is the first one
|
|
else:
|
|
if len(ret) > 0 and ret[-1][0] == 'n':
|
|
ret[-1][1] = timestamp
|
|
elif len(ret) > 0 and ret[-1][0] == 'y':
|
|
ret.append(['n', timestamp])
|
|
else:
|
|
ret.append(['n', timestamp])
|
|
return ret
|
|
|
|
def _homogeneity_filter(self, ls, window_size):
|
|
"""
|
|
This has been broken out of the `filter` function to reduce cognitive load.
|
|
|
|
ls is a list of 1s or 0s for when the filter is on or off
|
|
"""
|
|
k = window_size
|
|
i = k
|
|
while i <= len(ls) - k:
|
|
# Get a window of k items
|
|
window = [ls[i + j] for j in range(k)]
|
|
# Change the items in the window to be more like the mode of that window
|
|
mode = 1 if sum(window) >= k / 2 else 0
|
|
for j in range(k):
|
|
ls[i+j] = mode
|
|
i += k
|
|
return ls
|
|
|
|
def _reduce_filtered_segments(self, ret):
|
|
"""
|
|
This has been broken out of the `filter` function to reduce cognitive load.
|
|
"""
|
|
real_ret = []
|
|
for i, (this_yesno, next_timestamp) in enumerate(ret):
|
|
if i > 0:
|
|
_next_yesno, timestamp = ret[i - 1]
|
|
else:
|
|
timestamp = 0
|
|
|
|
data = self[timestamp * MS_PER_S:next_timestamp * MS_PER_S].raw_data
|
|
seg = AudioSegment(pydub.AudioSegment(data=data, sample_width=self.sample_width,
|
|
frame_rate=self.frame_rate, channels=self.channels), self.name)
|
|
real_ret.append((this_yesno, seg))
|
|
return real_ret
|
|
|
|
def _execute_sox_cmd(self, cmd, console_output=False):
|
|
"""
|
|
Executes a Sox command in a platform-independent manner.
|
|
|
|
`cmd` must be a format string that includes {inputfile} and {outputfile}.
|
|
"""
|
|
on_windows = platform.system().lower() == "windows"
|
|
|
|
# On Windows, a temporary file cannot be shared outside the process that creates it
|
|
# so we need to create a "permanent" file that we will use and delete afterwards
|
|
def _get_random_tmp_file():
|
|
if on_windows:
|
|
rand_string = "".join(random.choice(string.ascii_uppercase + string.digits) for _ in range(8))
|
|
tmp = self.name + "_" + rand_string
|
|
WinTempFile = collections.namedtuple("WinTempFile", "name")
|
|
tmp = WinTempFile(tmp)
|
|
else:
|
|
tmp = tempfile.NamedTemporaryFile()
|
|
return tmp
|
|
|
|
# Get a temp file to put our data and a temp file to store the result
|
|
tmp = _get_random_tmp_file()
|
|
othertmp = _get_random_tmp_file()
|
|
|
|
# Store our data in the temp file
|
|
self.export(tmp.name, format="WAV")
|
|
|
|
# Write the command to sox
|
|
stdout = stderr = subprocess.PIPE if console_output else subprocess.DEVNULL
|
|
command = cmd.format(inputfile=tmp.name, outputfile=othertmp.name)
|
|
res = subprocess.run(command.split(' '), stdout=stdout, stderr=stderr)
|
|
assert res.returncode == 0, "Sox did not work as intended, or perhaps you don't have Sox installed?"
|
|
|
|
# Create a new AudioSegment from the other temp file (where Sox put the result)
|
|
other = AudioSegment(pydub.AudioSegment.from_wav(othertmp.name), self.name)
|
|
|
|
# Clean up the temp files
|
|
if on_windows:
|
|
os.remove(tmp.name)
|
|
os.remove(othertmp.name)
|
|
else:
|
|
tmp.close()
|
|
othertmp.close()
|
|
|
|
return other
|
|
|
|
def filter_silence(self, duration_s=1, threshold_percentage=1, console_output=False):
|
|
"""
|
|
Returns a copy of this AudioSegment, but whose silence has been removed.
|
|
|
|
.. note:: This method requires that you have the program 'sox' installed.
|
|
|
|
.. warning:: This method uses the program 'sox' to perform the task. While this is very fast for a single
|
|
function call, the IO may add up for large numbers of AudioSegment objects.
|
|
|
|
:param duration_s: The number of seconds of "silence" that must be present in a row to
|
|
be stripped.
|
|
:param threshold_percentage: Silence is defined as any samples whose absolute value is below
|
|
`threshold_percentage * max(abs(samples in this segment))`.
|
|
:param console_output: If True, will pipe all sox output to the console.
|
|
:returns: A copy of this AudioSegment, but whose silence has been removed.
|
|
"""
|
|
command = "sox {inputfile} -t wav {outputfile} silence -l 1 0.1 "\
|
|
+ str(threshold_percentage) + "% -1 " + str(float(duration_s)) + " " + str(threshold_percentage) + "%"
|
|
return self._execute_sox_cmd(command)
|
|
|
|
def fft(self, start_s=None, duration_s=None, start_sample=None, num_samples=None, zero_pad=False):
|
|
"""
|
|
Transforms the indicated slice of the AudioSegment into the frequency domain and returns the bins
|
|
and the values.
|
|
|
|
If neither `start_s` or `start_sample` is specified, the first sample of the slice will be the first sample
|
|
of the AudioSegment.
|
|
|
|
If neither `duration_s` or `num_samples` is specified, the slice will be from the specified start
|
|
to the end of the segment.
|
|
|
|
.. code-block:: python
|
|
|
|
# Example for plotting the FFT using this function
|
|
import matplotlib.pyplot as plt
|
|
import numpy as np
|
|
|
|
seg = audiosegment.from_file("furelise.wav")
|
|
# Just take the first 3 seconds
|
|
hist_bins, hist_vals = seg[1:3000].fft()
|
|
hist_vals_real_normed = np.abs(hist_vals) / len(hist_vals)
|
|
plt.plot(hist_bins / 1000, hist_vals_real_normed)
|
|
plt.xlabel("kHz")
|
|
plt.ylabel("dB")
|
|
plt.show()
|
|
|
|
.. image:: images/fft.png
|
|
|
|
:param start_s: The start time in seconds. If this is specified, you cannot specify `start_sample`.
|
|
:param duration_s: The duration of the slice in seconds. If this is specified, you cannot specify `num_samples`.
|
|
:param start_sample: The zero-based index of the first sample to include in the slice.
|
|
If this is specified, you cannot specify `start_s`.
|
|
:param num_samples: The number of samples to include in the slice. If this is specified, you cannot
|
|
specify `duration_s`.
|
|
:param zero_pad: If True and the combination of start and duration result in running off the end of
|
|
the AudioSegment, the end is zero padded to prevent this.
|
|
:returns: np.ndarray of frequencies, np.ndarray of amount of each frequency
|
|
:raises: ValueError If `start_s` and `start_sample` are both specified and/or if both `duration_s` and
|
|
`num_samples` are specified.
|
|
"""
|
|
if start_s is not None and start_sample is not None:
|
|
raise ValueError("Only one of start_s and start_sample can be specified.")
|
|
if duration_s is not None and num_samples is not None:
|
|
raise ValueError("Only one of duration_s and num_samples can be specified.")
|
|
if start_s is None and start_sample is None:
|
|
start_sample = 0
|
|
if duration_s is None and num_samples is None:
|
|
num_samples = len(self.get_array_of_samples()) - int(start_sample)
|
|
|
|
if duration_s is not None:
|
|
num_samples = int(round(duration_s * self.frame_rate))
|
|
if start_s is not None:
|
|
start_sample = int(round(start_s * self.frame_rate))
|
|
|
|
end_sample = start_sample + num_samples # end_sample is excluded
|
|
if end_sample > len(self.get_array_of_samples()) and not zero_pad:
|
|
raise ValueError("The combination of start and duration will run off the end of the AudioSegment object.")
|
|
elif end_sample > len(self.get_array_of_samples()) and zero_pad:
|
|
arr = np.array(self.get_array_of_samples())
|
|
zeros = np.zeros(end_sample - len(arr))
|
|
arr = np.append(arr, zeros)
|
|
else:
|
|
arr = np.array(self.get_array_of_samples())
|
|
|
|
audioslice = np.array(arr[start_sample:end_sample])
|
|
fft_result = np.fft.fft(audioslice)[range(int(round(num_samples/2)) + 1)]
|
|
step_size = self.frame_rate / num_samples
|
|
bins = np.arange(0, int(round(num_samples/2)) + 1, 1.0) * step_size
|
|
return bins, fft_result
|
|
|
|
def generate_frames(self, frame_duration_ms, zero_pad=True):
|
|
"""
|
|
Yields self's data in chunks of frame_duration_ms.
|
|
|
|
This function adapted from pywebrtc's example [https://github.com/wiseman/py-webrtcvad/blob/master/example.py].
|
|
|
|
:param frame_duration_ms: The length of each frame in ms.
|
|
:param zero_pad: Whether or not to zero pad the end of the AudioSegment object to get all
|
|
the audio data out as frames. If not, there may be a part at the end
|
|
of the Segment that is cut off (the part will be <= `frame_duration_ms` in length).
|
|
:returns: A Frame object with properties 'bytes (the data)', 'timestamp (start time)', and 'duration'.
|
|
"""
|
|
Frame = collections.namedtuple("Frame", "bytes timestamp duration")
|
|
|
|
# (samples/sec) * (seconds in a frame) * (bytes/sample)
|
|
bytes_per_frame = int(self.frame_rate * (frame_duration_ms / 1000) * self.sample_width)
|
|
offset = 0 # where we are so far in self's data (in bytes)
|
|
timestamp = 0.0 # where we are so far in self (in seconds)
|
|
# (bytes/frame) * (sample/bytes) * (sec/samples)
|
|
frame_duration_s = (bytes_per_frame / self.frame_rate) / self.sample_width
|
|
while offset + bytes_per_frame < len(self.raw_data):
|
|
yield Frame(self.raw_data[offset:offset + bytes_per_frame], timestamp, frame_duration_s)
|
|
timestamp += frame_duration_s
|
|
offset += bytes_per_frame
|
|
|
|
if zero_pad:
|
|
rest = self.raw_data[offset:]
|
|
zeros = bytes(bytes_per_frame - len(rest))
|
|
yield Frame(rest + zeros, timestamp, frame_duration_s)
|
|
|
|
def generate_frames_as_segments(self, frame_duration_ms, zero_pad=True):
|
|
"""
|
|
Does the same thing as `generate_frames`, but yields tuples of (AudioSegment, timestamp) instead of Frames.
|
|
"""
|
|
for frame in self.generate_frames(frame_duration_ms, zero_pad=zero_pad):
|
|
seg = AudioSegment(pydub.AudioSegment(data=frame.bytes, sample_width=self.sample_width,
|
|
frame_rate=self.frame_rate, channels=self.channels), self.name)
|
|
yield seg, frame.timestamp
|
|
|
|
def normalize_spl_by_average(self, db):
|
|
"""
|
|
Normalize the values in the AudioSegment so that its average dB value
|
|
is `db`.
|
|
|
|
The dB of a value is calculated as 20 * log10(abs(value + 1E-9)).
|
|
|
|
:param db: The decibels to normalize average to.
|
|
:returns: A new AudioSegment object whose values are changed so that their
|
|
average is `db`.
|
|
"""
|
|
def inverse_spl(val):
|
|
"""Calculates the (positive) 'PCM' value for the given SPl val"""
|
|
return 10 ** (val / 20.0)
|
|
|
|
# Convert dB into 'PCM'
|
|
db_pcm = inverse_spl(db)
|
|
# Calculate current 'PCM' average
|
|
curavg = np.abs(np.mean(self.to_numpy_array()))
|
|
# Calculate ratio of dB_pcm / curavg_pcm
|
|
ratio = db_pcm / curavg
|
|
# Multiply all values by ratio
|
|
dtype_dict = {1: np.int8, 2: np.int16, 4: np.int32}
|
|
dtype = dtype_dict[self.sample_width]
|
|
new_seg = from_numpy_array(np.array(self.to_numpy_array() * ratio, dtype=dtype), self.frame_rate)
|
|
# Check SPL average to see if we are right
|
|
#assert math.isclose(np.mean(new_seg.spl), db), "new = " + str(np.mean(new_seg.spl)) + " != " + str(db)
|
|
return new_seg
|
|
|
|
def reduce(self, others):
|
|
"""
|
|
Reduces others into this one by concatenating all the others onto this one and
|
|
returning the result. Does not modify self, instead, makes a copy and returns that.
|
|
|
|
:param others: The other AudioSegment objects to append to this one.
|
|
:returns: The concatenated result.
|
|
"""
|
|
ret = AudioSegment(self.seg, self.name)
|
|
selfdata = [self.seg._data]
|
|
otherdata = [o.seg._data for o in others]
|
|
ret.seg._data = b''.join(selfdata + otherdata)
|
|
|
|
return ret
|
|
|
|
def resample(self, sample_rate_Hz=None, sample_width=None, channels=None, console_output=False):
|
|
"""
|
|
Returns a new AudioSegment whose data is the same as this one, but which has been resampled to the
|
|
specified characteristics. Any parameter left None will be unchanged.
|
|
|
|
.. note:: This method requires that you have the program 'sox' installed.
|
|
|
|
.. warning:: This method uses the program 'sox' to perform the task. While this is very fast for a single
|
|
function call, the IO may add up for large numbers of AudioSegment objects.
|
|
|
|
:param sample_rate_Hz: The new sample rate in Hz.
|
|
:param sample_width: The new sample width in bytes, so sample_width=2 would correspond to 16 bit (2 byte) width.
|
|
:param channels: The new number of channels.
|
|
:param console_output: Will print the output of sox to the console if True.
|
|
:returns: The newly sampled AudioSegment.
|
|
"""
|
|
if sample_rate_Hz is None:
|
|
sample_rate_Hz = self.frame_rate
|
|
if sample_width is None:
|
|
sample_width = self.sample_width
|
|
if channels is None:
|
|
channels = self.channels
|
|
|
|
command = "sox {inputfile} -b " + str(sample_width * 8) + " -r " + str(sample_rate_Hz) \
|
|
+ " -t wav {outputfile} channels " + str(channels)
|
|
|
|
return self._execute_sox_cmd(command, console_output=console_output)
|
|
|
|
def __getstate__(self):
|
|
"""
|
|
Serializes into a dict for the pickle protocol.
|
|
|
|
:returns: The dict to pickle.
|
|
"""
|
|
return {'name': self.name, 'seg': self.seg}
|
|
|
|
def __setstate__(self, d):
|
|
"""
|
|
Deserializes from a dict for the pickle protocol.
|
|
|
|
:param d: The dict to unpickle from.
|
|
"""
|
|
self.__dict__.update(d)
|
|
|
|
def serialize(self):
|
|
"""
|
|
Serializes into a bytestring.
|
|
|
|
:returns: An object of type Bytes.
|
|
"""
|
|
d = self.__getstate__()
|
|
return pickle.dumps({
|
|
'name': d['name'],
|
|
'seg': pickle.dumps(d['seg'], protocol=-1),
|
|
}, protocol=-1)
|
|
|
|
def spectrogram(self, start_s=None, duration_s=None, start_sample=None, num_samples=None,
|
|
window_length_s=None, window_length_samples=None, overlap=0.5):
|
|
"""
|
|
Does a series of FFTs from `start_s` or `start_sample` for `duration_s` or `num_samples`.
|
|
Effectively, transforms a slice of the AudioSegment into the frequency domain across different
|
|
time bins.
|
|
|
|
.. code-block:: python
|
|
|
|
# Example for plotting a spectrogram using this function
|
|
import audiosegment
|
|
import matplotlib.pyplot as plt
|
|
|
|
#...
|
|
seg = audiosegment.from_file("somebodytalking.wav")
|
|
freqs, times, amplitudes = seg.spectrogram(window_length_s=0.03, overlap=0.5)
|
|
amplitudes = 10 * np.log10(amplitudes + 1e-9)
|
|
|
|
# Plot
|
|
plt.pcolormesh(times, freqs, amplitudes)
|
|
plt.xlabel("Time in Seconds")
|
|
plt.ylabel("Frequency in Hz")
|
|
plt.show()
|
|
|
|
.. image:: images/spectrogram.png
|
|
|
|
:param start_s: The start time. Starts at the beginning if neither this nor `start_sample` is specified.
|
|
:param duration_s: The duration of the spectrogram in seconds. Goes to the end if neither this nor
|
|
`num_samples` is specified.
|
|
:param start_sample: The index of the first sample to use. Starts at the beginning if neither this nor
|
|
`start_s` is specified.
|
|
:param num_samples: The number of samples in the spectrogram. Goes to the end if neither this nor
|
|
`duration_s` is specified.
|
|
:param window_length_s: The length of each FFT in seconds. If the total number of samples in the spectrogram
|
|
is not a multiple of the window length in samples, the last window will be zero-padded.
|
|
:param window_length_samples: The length of each FFT in number of samples. If the total number of samples in the
|
|
spectrogram is not a multiple of the window length in samples, the last window will
|
|
be zero-padded.
|
|
:param overlap: The fraction of each window to overlap.
|
|
:returns: Three np.ndarrays: The frequency values in Hz (the y-axis in a spectrogram), the time values starting
|
|
at start time and then increasing by `duration_s` each step (the x-axis in a spectrogram), and
|
|
the dB of each time/frequency bin as a 2D array of shape [len(frequency values), len(duration)].
|
|
:raises ValueError: If `start_s` and `start_sample` are both specified, if `duration_s` and `num_samples` are both
|
|
specified, if the first window's duration plus start time lead to running off the end
|
|
of the AudioSegment, or if `window_length_s` and `window_length_samples` are either
|
|
both specified or if they are both not specified.
|
|
"""
|
|
if start_s is not None and start_sample is not None:
|
|
raise ValueError("Only one of start_s and start_sample may be specified.")
|
|
if duration_s is not None and num_samples is not None:
|
|
raise ValueError("Only one of duration_s and num_samples may be specified.")
|
|
if window_length_s is not None and window_length_samples is not None:
|
|
raise ValueError("Only one of window_length_s and window_length_samples may be specified.")
|
|
if window_length_s is None and window_length_samples is None:
|
|
raise ValueError("You must specify a window length, either in window_length_s or in window_length_samples.")
|
|
|
|
if start_s is None and start_sample is None:
|
|
start_sample = 0
|
|
if duration_s is None and num_samples is None:
|
|
num_samples = len(self.get_array_of_samples()) - int(start_sample)
|
|
|
|
if duration_s is not None:
|
|
num_samples = int(round(duration_s * self.frame_rate))
|
|
if start_s is not None:
|
|
start_sample = int(round(start_s * self.frame_rate))
|
|
|
|
if window_length_s is not None:
|
|
window_length_samples = int(round(window_length_s * self.frame_rate))
|
|
|
|
if start_sample + num_samples > len(self.get_array_of_samples()):
|
|
raise ValueError("The combination of start and duration will run off the end of the AudioSegment object.")
|
|
|
|
f, t, sxx = signal.spectrogram(self.to_numpy_array(), self.frame_rate, scaling='spectrum', nperseg=window_length_samples,
|
|
noverlap=int(round(overlap * window_length_samples)),
|
|
mode='magnitude')
|
|
return f, t, sxx
|
|
|
|
def to_numpy_array(self):
|
|
"""
|
|
Convenience function for `np.array(self.get_array_of_samples())` while
|
|
keeping the appropriate dtype.
|
|
"""
|
|
dtype_dict = {
|
|
1: np.int8,
|
|
2: np.int16,
|
|
4: np.int32
|
|
}
|
|
dtype = dtype_dict[self.sample_width]
|
|
return np.array(self.get_array_of_samples(), dtype=dtype)
|
|
|
|
@deprecated
|
|
def trim_to_minutes(self, strip_last_seconds=False):
|
|
"""
|
|
Returns a list of minute-long (at most) Segment objects.
|
|
|
|
.. note:: This function has been deprecated. Use the `dice` function instead.
|
|
|
|
:param strip_last_seconds: If True, this method will return minute-long segments,
|
|
but the last three seconds of this AudioSegment won't be returned.
|
|
This is useful for removing the microphone artifact at the end of the recording.
|
|
:returns: A list of AudioSegment objects, each of which is one minute long at most
|
|
(and only the last one - if any - will be less than one minute).
|
|
"""
|
|
outs = self.dice(seconds=60, zero_pad=False)
|
|
|
|
# Now cut out the last three seconds of the last item in outs (it will just be microphone artifact)
|
|
# or, if the last item is less than three seconds, just get rid of it
|
|
if strip_last_seconds:
|
|
if outs[-1].duration_seconds > 3:
|
|
outs[-1] = outs[-1][:-MS_PER_S * 3]
|
|
else:
|
|
outs = outs[:-1]
|
|
|
|
return outs
|
|
|
|
def zero_extend(self, duration_s=None, num_samples=None):
|
|
"""
|
|
Adds a number of zeros (digital silence) to the AudioSegment (returning a new one).
|
|
|
|
:param duration_s: The number of seconds of zeros to add. If this is specified, `num_samples` must be None.
|
|
:param num_samples: The number of zeros to add. If this is specified, `duration_s` must be None.
|
|
:returns: A new AudioSegment object that has been zero extended.
|
|
:raises: ValueError if duration_s and num_samples are both specified.
|
|
"""
|
|
if duration_s is not None and num_samples is not None:
|
|
raise ValueError("`duration_s` and `num_samples` cannot both be specified.")
|
|
elif duration_s is not None:
|
|
num_samples = self.frame_rate * duration_s
|
|
seg = AudioSegment(self.seg, self.name)
|
|
zeros = silent(duration=num_samples / self.frame_rate, frame_rate=self.frame_rate)
|
|
return zeros.overlay(seg)
|
|
|
|
def deserialize(bstr):
|
|
"""
|
|
Attempts to deserialize a bytestring into an audiosegment.
|
|
|
|
:param bstr: The bytestring serialized via an audiosegment's serialize() method.
|
|
:returns: An AudioSegment object deserialized from `bstr`.
|
|
"""
|
|
d = pickle.loads(bstr)
|
|
seg = pickle.loads(d['seg'])
|
|
return AudioSegment(seg, d['name'])
|
|
|
|
def empty():
|
|
"""
|
|
Creates a zero-duration AudioSegment object.
|
|
|
|
:returns: An empty AudioSegment object.
|
|
"""
|
|
dubseg = pydub.AudioSegment.empty()
|
|
return AudioSegment(dubseg, "")
|
|
|
|
def from_file(path):
|
|
"""
|
|
Returns an AudioSegment object from the given file based on its file extension.
|
|
If the extension is wrong, this will throw some sort of error.
|
|
|
|
:param path: The path to the file, including the file extension.
|
|
:returns: An AudioSegment instance from the file.
|
|
"""
|
|
_name, ext = os.path.splitext(path)
|
|
ext = ext.lower()[1:]
|
|
if "m4a" in path:
|
|
ext="m4a"
|
|
elif "wav" in path:
|
|
ext="wav"
|
|
seg = pydub.AudioSegment.from_file(path, format=ext)
|
|
return AudioSegment(seg, path)
|
|
|
|
def from_mono_audiosegments(*args):
|
|
"""
|
|
Creates a multi-channel AudioSegment out of multiple mono AudioSegments (two or more). Each mono
|
|
AudioSegment passed in should be exactly the same number of samples.
|
|
|
|
:returns: An AudioSegment of multiple channels formed from the given mono AudioSegments.
|
|
"""
|
|
return AudioSegment(pydub.AudioSegment.from_mono_audiosegments(*args), "")
|
|
|
|
def from_numpy_array(nparr, framerate):
|
|
"""
|
|
Returns an AudioSegment created from the given numpy array.
|
|
|
|
The numpy array must have shape = (num_samples, num_channels).
|
|
|
|
:param nparr: The numpy array to create an AudioSegment from.
|
|
:returns: An AudioSegment created from the given array.
|
|
"""
|
|
# interleave the audio across all channels and collapse
|
|
if nparr.dtype.itemsize not in (1, 2, 4):
|
|
raise ValueError("Numpy Array must contain 8, 16, or 32 bit values.")
|
|
if len(nparr.shape) == 1:
|
|
arrays = [nparr]
|
|
elif len(nparr.shape) == 2:
|
|
arrays = [nparr[i,:] for i in range(nparr.shape[0])]
|
|
else:
|
|
raise ValueError("Numpy Array must be one or two dimensional. Shape must be: (num_samples, num_channels).")
|
|
interleaved = np.vstack(arrays).reshape((-1,), order='F')
|
|
dubseg = pydub.AudioSegment(interleaved.tobytes(),
|
|
frame_rate=framerate,
|
|
sample_width=interleaved.dtype.itemsize,
|
|
channels=len(interleaved.shape)
|
|
)
|
|
return AudioSegment(dubseg, "")
|
|
|
|
def silent(duration=1000, frame_rate=11025):
|
|
"""
|
|
Creates an AudioSegment object of the specified duration/frame_rate filled with digital silence.
|
|
|
|
:param duration: The duration of the returned object in ms.
|
|
:param frame_rate: The samples per second of the returned object.
|
|
:returns: AudioSegment object filled with pure digital silence.
|
|
"""
|
|
seg = pydub.AudioSegment.silent(duration=duration, frame_rate=frame_rate)
|
|
return AudioSegment(seg, "")
|
|
|