Files
ths-speech/python-server/audiosegment_wrapper.py
2018-08-27 13:45:07 +02:00

1061 lines
50 KiB
Python

"""
This module simply exposes a wrapper of a pydub.AudioSegment object.
"""
from __future__ import division
from __future__ import print_function
import collections
import functools
import itertools
import math
import numpy as np
import pickle
import platform
import pydub
import os
import random
import scipy.signal as signal
import string
import subprocess
import sys
import tempfile
import warnings
import webrtcvad
MS_PER_S = 1000
S_PER_MIN = 60
MS_PER_MIN = MS_PER_S * S_PER_MIN
def deprecated(func):
"""
Deprecator decorator.
"""
@functools.wraps(func)
def new_func(*args, **kwargs):
warnings.warn("Call to deprecated function {}.".format(func.__name__), category=DeprecationWarning, stacklevel=2)
return func(*args, **kwargs)
return new_func
class AudioSegment:
"""
This class is a wrapper for a pydub.AudioSegment that provides additional methods.
"""
def __init__(self, pydubseg, name):
self.seg = pydubseg
self.name = name
def __getattr__(self, attr):
orig_attr = self.seg.__getattribute__(attr)
if callable(orig_attr):
def hooked(*args, **kwargs):
result = orig_attr(*args, **kwargs)
if result == self.seg:
return self
elif type(result) == pydub.AudioSegment:
return AudioSegment(result, self.name)
else:
return result
return hooked
else:
return orig_attr
def __len__(self):
return len(self.seg)
def __eq__(self, other):
return self.seg == other
def __ne__(self, other):
return self.seg != other
def __iter__(self):
return (x for x in self.seg)
def __getitem__(self, millisecond):
return AudioSegment(self.seg[millisecond], self.name)
def __add__(self, arg):
if type(arg) == AudioSegment:
self.seg._data = self.seg._data + arg.seg._data
else:
self.seg = self.seg + arg
return self
def __radd__(self, rarg):
return self.seg.__radd__(rarg)
def __repr__(self):
return str(self)
def __str__(self):
s = "%s: %s channels, %s bit, sampled @ %s kHz, %.3fs long" %\
(self.name, str(self.channels), str(self.sample_width * 8),\
str(self.frame_rate / 1000.0), self.duration_seconds)
return s
def __sub__(self, arg):
if type(arg) == AudioSegment:
self.seg = self.seg - arg.seg
else:
self.seg = self.seg - arg
return self
def __mul__(self, arg):
if type(arg) == AudioSegment:
self.seg = self.seg * arg.seg
else:
self.seg = self.seg * arg
return self
@property
def spl(self):
"""
Sound Pressure Level - defined as 20 * log10(abs(value)).
Returns a numpy array of SPL dB values.
"""
return 20.0 * np.log10(np.abs(self.to_numpy_array() + 1E-9))
@staticmethod
def _bandpass_filter(data, low, high, fs, order=5):
"""
:param data: The data (numpy array) to be filtered.
:param low: The low cutoff in Hz.
:param high: The high cutoff in Hz.
:param fs: The sample rate (in Hz) of the data.
:param order: The order of the filter. The higher the order, the tighter the roll-off.
:returns: Filtered data (numpy array).
"""
nyq = 0.5 * fs
low = low / nyq
high = high / nyq
b, a = signal.butter(order, [low, high], btype='band')
y = signal.lfilter(b, a, data)
return y
@staticmethod
def lowpass_filter(data, cutoff, fs, order=5):
"""
:param data: The data (numpy array) to be filtered.
:param cutoff: The high cutoff in Hz.
:param fs: The sample rate in Hz of the data.
:param order: The order of the filter. The higher the order, the tighter the roll-off.
:returns: Filtered data (numpy array).
"""
nyq = 0.5 * fs
normal_cutoff = cutoff / nyq
b, a = signal.butter(order, normal_cutoff, btype='low', analog=False)
y = signal.lfilter(b, a, data)
return y
def auditory_scene_analysis(self):
"""
Algorithm based on paper: Auditory Segmentation Based on Onset and Offset Analysis,
by Hu and Wang, 2007.
"""
import matplotlib.pyplot as plt
def visualize_time_domain(seg, title=""):
plt.plot(seg)
plt.title(title)
plt.show()
plt.clf()
def visualize(spect, frequencies, title=""):
i = 0
for freq, (index, row) in zip(frequencies[::-1], enumerate(spect[::-1, :])):
plt.subplot(spect.shape[0], 1, index + 1)
if i == 0:
plt.title(title)
i += 1
plt.ylabel("{0:.0f}".format(freq))
plt.plot(row)
plt.show()
plt.clf()
# Normalize self into 25dB average SPL
normalized = self.normalize_spl_by_average(db=25)
visualize_time_domain(normalized.to_numpy_array(), "Normalized")
# Do a band-pass filter in each frequency
data = normalized.to_numpy_array()
start_frequency = 50
stop_frequency = 8000
start = np.log10(start_frequency)
stop = np.log10(stop_frequency)
frequencies = np.logspace(start, stop, num=10, endpoint=True, base=10.0)
print("Dealing with the following frequencies:", frequencies)
rows = [AudioSegment._bandpass_filter(data, freq*0.8, freq*1.2, self.frame_rate) for freq in frequencies]
rows = np.array(rows)
spect = np.vstack(rows)
visualize(spect, frequencies, "After bandpass filtering (cochlear model)")
# Half-wave rectify each frequency channel
spect[spect < 0] = 0
visualize(spect, frequencies, "After half-wave rectification in each frequency")
# Low-pass filter each frequency channel
spect = np.apply_along_axis(AudioSegment.lowpass_filter, 1, spect, 30, self.frame_rate, 6)
visualize(spect, frequencies, "After low-pass filtering in each frequency")
# Downsample each frequency to 400 Hz
downsample_freq_hz = 400
if self.frame_rate > downsample_freq_hz:
step = int(round(self.frame_rate / downsample_freq_hz))
spect = spect[:, ::step]
visualize(spect, frequencies, "After downsampling in each frequency")
# Now you have the temporal envelope of each frequency channel
# Smoothing
scales = [(6, 1/4), (6, 1/14), (1/2, 1/14)]
thetas = [0.95, 0.95, 0.85]
## For each (sc, st) scale, smooth across time using st, then across frequency using sc
gaussian = lambda x, mu, sig: np.exp(-np.power(x - mu, 2.0) / (2 * np.power(sig, 2.0)))
gaussian_kernel = lambda sig: gaussian(np.linspace(-10, 10, len(frequencies) / 2), 0, sig)
spectrograms = []
for sc, st in scales:
time_smoothed = np.apply_along_axis(AudioSegment.lowpass_filter, 1, spect, 1/st, downsample_freq_hz, 6)
visualize(time_smoothed, frequencies, "After time smoothing with scale: " + str(st))
freq_smoothed = np.apply_along_axis(np.convolve, 0, spect, gaussian_kernel(sc))
spectrograms.append(freq_smoothed)
visualize(freq_smoothed, frequencies, "After time and frequency smoothing with scales (freq) " + str(sc) + " and (time) " + str(st))
## Now we have a set of scale-space spectrograms of different scales (sc, st)
# Onset/Offset Detection and Matching
def theta_on(spect):
return np.nanmean(spect) + np.nanstd(spect)
def compute_peaks_or_valleys_of_first_derivative(s, do_peaks=True):
"""
Takes a spectrogram and returns a 2D array of the form:
0 0 0 1 0 0 1 0 0 0 1 <-- Frequency 0
0 0 1 0 0 0 0 0 0 1 0 <-- Frequency 1
0 0 0 0 0 0 1 0 1 0 0 <-- Frequency 2
*** Time axis *******
Where a 1 means that the value in that time bin in the spectrogram corresponds to
a peak/valley in the first derivative.
"""
gradient = np.nan_to_num(np.apply_along_axis(np.gradient, 1, s), copy=False)
half_window = 4
if do_peaks:
indexes = [signal.argrelextrema(gradient[i, :], np.greater, order=half_window) for i in range(gradient.shape[0])]
else:
indexes = [signal.argrelextrema(gradient[i, :], np.less, order=half_window) for i in range(gradient.shape[0])]
extrema = np.zeros(s.shape)
for row_index, index_array in enumerate(indexes):
# Each index_array is a list of indexes corresponding to all the extrema in a given row
for col_index in index_array:
extrema[row_index, col_index] = 1
return extrema
for spect, (sc, st) in zip(spectrograms, scales):
# Compute sudden upward changes in spect, these are onsets of events
onsets = compute_peaks_or_valleys_of_first_derivative(spect)
# Compute sudden downward changes in spect, these are offsets of events
offsets = compute_peaks_or_valleys_of_first_derivative(spect, do_peaks=False)
print("TOTAL ONSETS:", np.sum(onsets, axis=1))
print("TOTAL OFFSETS:", np.sum(offsets, axis=1))
exit()
# onsets and offsets are 2D arrays
## Determine the offset time for each onset:
### If t_on[c, i] represents the time of the ith onset in frequency channel c, the corresponding offset
### must occur between t_on[c, i] and t_on[c, i+1]
### If there are more than one offsets candidates in this range, choose the one with largest intensity decrease.
## Create onset/offset fronts by connecting onsets across frequency channels (connect two onsets
## if they occur within 20ms of each other). Start over whenever a frequency band does not contain an offset
## in this range. Do the same procedure for offsets. Now you have onset and offset fronts.
## Now hook up the onsets with the offsets to form segments:
## For each onset front, (t_on[c, i1, t_on[c + 1, i2], ..., t_on[c + m - 1, im]):
## matching_offsets = (t_off[c, i1], t_off[c + 1, i2], ..., t_off[c + m - 1, im])
## Get all offset fronts which contain at least one of offset time found in matching_offsets
## Among these offset fronts, the one that crosses the most of matching_offsets is chosen,
## - call this offset front: matching_offset_front
## Update all t_offs in matching_offsets whose 'c's are in matching_offset_front to be 'matched', and
## - update their times to the corresponding channel offset in matching_offset_front.
## If all t_offs in matching_offsets are 'matched', continue to next onset front
## Now go through all the segments you have created and break them up along frequencies if the temporal
## envelopes don't match well enough. That is, if we have two adjacent channels c and c+1, and they
## are part of the same segment as determined above, break this segment into two along these lines
## if the correlation between them is below theta_c. Theta_c is thetas[i] where i depends on the scale.
# Multiscale Integration
##
## TODO
def detect_voice(self, prob_detect_voice=0.5):
"""
Returns self as a list of tuples:
[('v', voiced segment), ('u', unvoiced segment), (etc.)]
The overall order of the AudioSegment is preserved.
:param prob_detect_voice: The raw probability that any random 20ms window of the audio file
contains voice.
:returns: The described list.
"""
assert self.frame_rate in (48000, 32000, 16000, 8000), "Try resampling to one of the allowed frame rates."
assert self.sample_width == 2, "Try resampling to 16 bit."
assert self.channels == 1, "Try resampling to one channel."
class model_class:
def __init__(self, aggressiveness):
self.v = webrtcvad.Vad(int(aggressiveness))
def predict(self, vector):
if self.v.is_speech(vector.raw_data, vector.frame_rate):
return 1
else:
return 0
model = model_class(aggressiveness=2)
pyesno = 0.3 # Probability of the next 20 ms being unvoiced given that this 20 ms was voiced
pnoyes = 0.2 # Probability of the next 20 ms being voiced given that this 20 ms was unvoiced
p_realyes_outputyes = 0.4 # WebRTCVAD has a very high FP rate - just because it says yes, doesn't mean much
p_realyes_outputno = 0.05 # If it says no, we can be very certain that it really is a no
p_yes_raw = prob_detect_voice
filtered = self.detect_event(model=model,
ms_per_input=20,
transition_matrix=(pyesno, pnoyes),
model_stats=(p_realyes_outputyes, p_realyes_outputno),
event_length_s=0.25,
prob_raw_yes=p_yes_raw)
ret = []
for tup in filtered:
t = ('v', tup[1]) if tup[0] == 'y' else ('u', tup[1])
ret.append(t)
return ret
def dice(self, seconds, zero_pad=False):
"""
Cuts the AudioSegment into `seconds` segments (at most). So for example, if seconds=10,
this will return a list of AudioSegments, in order, where each one is at most 10 seconds
long. If `zero_pad` is True, the last item AudioSegment object will be zero padded to result
in `seconds` seconds.
:param seconds: The length of each segment in seconds. Can be either a float/int, in which case
`self.duration_seconds` / `seconds` are made, each of `seconds` length, or a
list-like can be given, in which case the given list must sum to
`self.duration_seconds` and each segment is specified by the list - e.g.
the 9th AudioSegment in the returned list will be `seconds[8]` seconds long.
:param zero_pad: Whether to zero_pad the final segment if necessary. Ignored if `seconds` is
a list-like.
:returns: A list of AudioSegments, each of which is the appropriate number of seconds long.
:raises: ValueError if a list-like is given for `seconds` and the list's durations do not sum
to `self.duration_seconds`.
"""
try:
total_s = sum(seconds)
if not (self.duration_seconds <= total_s + 1 and self.duration_seconds >= total_s - 1):
raise ValueError("`seconds` does not sum to within one second of the duration of this AudioSegment.\
given total seconds: %s and self.duration_seconds: %s" % (total_s, self.duration_seconds))
starts = []
stops = []
time_ms = 0
for dur in seconds:
starts.append(time_ms)
time_ms += dur * MS_PER_S
stops.append(time_ms)
zero_pad = False
except TypeError:
# `seconds` is not a list
starts = range(0, int(round(self.duration_seconds * MS_PER_S)), int(round(seconds * MS_PER_S)))
stops = (min(self.duration_seconds * MS_PER_S, start + seconds * MS_PER_S) for start in starts)
outs = [self[start:stop] for start, stop in zip(starts, stops)]
out_lens = [out.duration_seconds for out in outs]
# Check if our last slice is within one ms of expected - if so, we don't need to zero pad
if zero_pad and not (out_lens[-1] <= seconds * MS_PER_S + 1 and out_lens[-1] >= seconds * MS_PER_S - 1):
num_zeros = self.frame_rate * (seconds * MS_PER_S - out_lens[-1])
outs[-1] = outs[-1].zero_extend(num_samples=num_zeros)
return outs
def detect_event(self, model, ms_per_input, transition_matrix, model_stats, event_length_s,
start_as_yes=False, prob_raw_yes=0.5):
"""
A list of tuples of the form [('n', AudioSegment), ('y', AudioSegment), etc.] is returned, where tuples
of the form ('n', AudioSegment) are the segments of sound where the event was not detected,
while ('y', AudioSegment) tuples were the segments of sound where the event was detected.
.. code-block:: python
# Example usage
import audiosegment
import keras
import keras.models
import numpy as np
import sys
class Model:
def __init__(self, modelpath):
self.model = keras.models.load_model(modelpath)
def predict(self, seg):
_bins, fft_vals = seg.fft()
fft_vals = np.abs(fft_vals) / len(fft_vals)
predicted_np_form = self.model.predict(np.array([fft_vals]), batch_size=1)
prediction_as_int = int(round(predicted_np_form[0][0]))
return prediction_as_int
modelpath = sys.argv[1]
wavpath = sys.argv[2]
model = Model(modelpath)
seg = audiosegment.from_file(wavpath).resample(sample_rate_Hz=32000, sample_width=2, channels=1)
pyes_to_no = 0.3 # The probability of one 30 ms sample being an event, and the next one not
pno_to_yes = 0.2 # The probability of one 30 ms sample not being an event, and the next one yes
ptrue_pos_rate = 0.8 # The true positive rate (probability of a predicted yes being right)
pfalse_neg_rate = 0.3 # The false negative rate (probability of a predicted no being wrong)
raw_prob = 0.7 # The raw probability of seeing the event in any random 30 ms slice of this file
events = seg.detect_event(model, ms_per_input=30, transition_matrix=[pyes_to_no, pno_to_yes],
model_stats=[ptrue_pos_rate, pfalse_neg_rate], event_length_s=0.25,
prob_raw_yes=raw_prob)
nos = [event[1] for event in events if event[0] == 'n']
yeses = [event[1] for event in events if event[0] == 'y']
if len(nos) > 1:
notdetected = nos[0].reduce(nos[1:])
notdetected.export("notdetected.wav", format="WAV")
if len(yeses) > 1:
detected = yeses[0].reduce(yeses[1:])
detected.export("detected.wav", format="WAV")
:param model: The model. The model must have a predict() function which takes an AudioSegment
of `ms_per_input` number of ms and which outputs 1 if the audio event is detected
in that input, and 0 if not. Make sure to resample the AudioSegment to the right
values before calling this function on it.
:param ms_per_input: The number of ms of AudioSegment to be fed into the model at a time. If this does not
come out even, the last AudioSegment will be zero-padded.
:param transition_matrix: An iterable of the form: [p(yes->no), p(no->yes)]. That is, the probability of moving
from a 'yes' state to a 'no' state and the probability of vice versa.
:param model_stats: An iterable of the form: [p(reality=1|output=1), p(reality=1|output=0)]. That is,
the probability of the ground truth really being a 1, given that the model output a 1,
and the probability of the ground truth being a 1, given that the model output a 0.
:param event_length_s: The typical duration of the event you are looking for in seconds (can be a float).
:param start_as_yes: If True, the first `ms_per_input` will be in the 'y' category. Otherwise it will be
in the 'n' category.
:param prob_raw_yes: The raw probability of finding the event in any given `ms_per_input` vector.
:returns: A list of tuples of the form [('n', AudioSegment), ('y', AudioSegment), etc.],
where over the course of the list, the AudioSegment in tuple 3 picks up
where the one in tuple 2 left off.
:raises: ValueError if `ms_per_input` is negative or larger than the number of ms in this
AudioSegment; if `transition_matrix` or `model_stats` do not have a __len__ attribute
or are not length 2; if the values in `transition_matrix` or `model_stats` are not
in the closed interval [0.0, 1.0].
"""
if ms_per_input < 0 or ms_per_input / MS_PER_S > self.duration_seconds:
raise ValueError("ms_per_input cannot be negative and cannot be longer than the duration of the AudioSegment."\
" The given value was " + str(ms_per_input))
elif not hasattr(transition_matrix, "__len__") or len(transition_matrix) != 2:
raise ValueError("transition_matrix must be an iterable of length 2.")
elif not hasattr(model_stats, "__len__") or len(model_stats) != 2:
raise ValueError("model_stats must be an iterable of length 2.")
elif any([True for prob in transition_matrix if prob > 1.0 or prob < 0.0]):
raise ValueError("Values in transition_matrix are probabilities, and so must be in the range [0.0, 1.0].")
elif any([True for prob in model_stats if prob > 1.0 or prob < 0.0]):
raise ValueError("Values in model_stats are probabilities, and so must be in the range [0.0, 1.0].")
elif prob_raw_yes > 1.0 or prob_raw_yes < 0.0:
raise ValueError("`prob_raw_yes` is a probability, and so must be in the range [0.0, 1.0]")
# Get the yeses or nos for when the filter is triggered (when the event is on/off)
filter_indices = [yes_or_no for yes_or_no in self._get_filter_indices(start_as_yes,
prob_raw_yes,
ms_per_input,
model,
transition_matrix,
model_stats)]
# Run a homogeneity filter over the values to make local regions more self-similar (reduce noise)
ret = self._homogeneity_filter(filter_indices, window_size=int(round(0.25 * MS_PER_S / ms_per_input)))
# Group the consecutive ones together
ret = self._group_filter_values(ret, ms_per_input)
# Take the groups and turn them into AudioSegment objects
real_ret = self._reduce_filtered_segments(ret)
return real_ret
def _get_filter_indices(self, start_as_yes, prob_raw_yes, ms_per_input, model, transition_matrix, model_stats):
"""
This has been broken out of the `filter` function to reduce cognitive load.
"""
filter_triggered = 1 if start_as_yes else 0
prob_raw_no = 1.0 - prob_raw_yes
for segment, _timestamp in self.generate_frames_as_segments(ms_per_input):
yield filter_triggered
observation = int(round(model.predict(segment)))
assert observation == 1 or observation == 0, "The given model did not output a 1 or a 0, output: "\
+ str(observation)
prob_hyp_yes_given_last_hyp = 1.0 - transition_matrix[0] if filter_triggered else transition_matrix[1]
prob_hyp_no_given_last_hyp = transition_matrix[0] if filter_triggered else 1.0 - transition_matrix[1]
prob_hyp_yes_given_data = model_stats[0] if observation == 1 else model_stats[1]
prob_hyp_no_given_data = 1.0 - model_stats[0] if observation == 1 else 1.0 - model_stats[1]
hypothesis_yes = prob_raw_yes * prob_hyp_yes_given_last_hyp * prob_hyp_yes_given_data
hypothesis_no = prob_raw_no * prob_hyp_no_given_last_hyp * prob_hyp_no_given_data
# make a list of ints - each is 0 or 1. The number of 1s is hypotheis_yes * 100
# the number of 0s is hypothesis_no * 100
distribution = [1 for i in range(int(round(hypothesis_yes * 100)))]
distribution.extend([0 for i in range(int(round(hypothesis_no * 100)))])
# shuffle
random.shuffle(distribution)
filter_triggered = random.choice(distribution)
def _group_filter_values(self, filter_indices, ms_per_input):
"""
This has been broken out of the `filter` function to reduce cognitive load.
"""
ret = []
for filter_value, (_segment, timestamp) in zip(filter_indices, self.generate_frames_as_segments(ms_per_input)):
if filter_value == 1:
if len(ret) > 0 and ret[-1][0] == 'n':
ret.append(['y', timestamp]) # The last one was different, so we create a new one
elif len(ret) > 0 and ret[-1][0] == 'y':
ret[-1][1] = timestamp # The last one was the same as this one, so just update the timestamp
else:
ret.append(['y', timestamp]) # This is the first one
else:
if len(ret) > 0 and ret[-1][0] == 'n':
ret[-1][1] = timestamp
elif len(ret) > 0 and ret[-1][0] == 'y':
ret.append(['n', timestamp])
else:
ret.append(['n', timestamp])
return ret
def _homogeneity_filter(self, ls, window_size):
"""
This has been broken out of the `filter` function to reduce cognitive load.
ls is a list of 1s or 0s for when the filter is on or off
"""
k = window_size
i = k
while i <= len(ls) - k:
# Get a window of k items
window = [ls[i + j] for j in range(k)]
# Change the items in the window to be more like the mode of that window
mode = 1 if sum(window) >= k / 2 else 0
for j in range(k):
ls[i+j] = mode
i += k
return ls
def _reduce_filtered_segments(self, ret):
"""
This has been broken out of the `filter` function to reduce cognitive load.
"""
real_ret = []
for i, (this_yesno, next_timestamp) in enumerate(ret):
if i > 0:
_next_yesno, timestamp = ret[i - 1]
else:
timestamp = 0
data = self[timestamp * MS_PER_S:next_timestamp * MS_PER_S].raw_data
seg = AudioSegment(pydub.AudioSegment(data=data, sample_width=self.sample_width,
frame_rate=self.frame_rate, channels=self.channels), self.name)
real_ret.append((this_yesno, seg))
return real_ret
def _execute_sox_cmd(self, cmd, console_output=False):
"""
Executes a Sox command in a platform-independent manner.
`cmd` must be a format string that includes {inputfile} and {outputfile}.
"""
on_windows = platform.system().lower() == "windows"
# On Windows, a temporary file cannot be shared outside the process that creates it
# so we need to create a "permanent" file that we will use and delete afterwards
def _get_random_tmp_file():
if on_windows:
rand_string = "".join(random.choice(string.ascii_uppercase + string.digits) for _ in range(8))
tmp = self.name + "_" + rand_string
WinTempFile = collections.namedtuple("WinTempFile", "name")
tmp = WinTempFile(tmp)
else:
tmp = tempfile.NamedTemporaryFile()
return tmp
# Get a temp file to put our data and a temp file to store the result
tmp = _get_random_tmp_file()
othertmp = _get_random_tmp_file()
# Store our data in the temp file
self.export(tmp.name, format="WAV")
# Write the command to sox
stdout = stderr = subprocess.PIPE if console_output else subprocess.DEVNULL
command = cmd.format(inputfile=tmp.name, outputfile=othertmp.name)
res = subprocess.run(command.split(' '), stdout=stdout, stderr=stderr)
assert res.returncode == 0, "Sox did not work as intended, or perhaps you don't have Sox installed?"
# Create a new AudioSegment from the other temp file (where Sox put the result)
other = AudioSegment(pydub.AudioSegment.from_wav(othertmp.name), self.name)
# Clean up the temp files
if on_windows:
os.remove(tmp.name)
os.remove(othertmp.name)
else:
tmp.close()
othertmp.close()
return other
def filter_silence(self, duration_s=1, threshold_percentage=1, console_output=False):
"""
Returns a copy of this AudioSegment, but whose silence has been removed.
.. note:: This method requires that you have the program 'sox' installed.
.. warning:: This method uses the program 'sox' to perform the task. While this is very fast for a single
function call, the IO may add up for large numbers of AudioSegment objects.
:param duration_s: The number of seconds of "silence" that must be present in a row to
be stripped.
:param threshold_percentage: Silence is defined as any samples whose absolute value is below
`threshold_percentage * max(abs(samples in this segment))`.
:param console_output: If True, will pipe all sox output to the console.
:returns: A copy of this AudioSegment, but whose silence has been removed.
"""
command = "sox {inputfile} -t wav {outputfile} silence -l 1 0.1 "\
+ str(threshold_percentage) + "% -1 " + str(float(duration_s)) + " " + str(threshold_percentage) + "%"
return self._execute_sox_cmd(command)
def fft(self, start_s=None, duration_s=None, start_sample=None, num_samples=None, zero_pad=False):
"""
Transforms the indicated slice of the AudioSegment into the frequency domain and returns the bins
and the values.
If neither `start_s` or `start_sample` is specified, the first sample of the slice will be the first sample
of the AudioSegment.
If neither `duration_s` or `num_samples` is specified, the slice will be from the specified start
to the end of the segment.
.. code-block:: python
# Example for plotting the FFT using this function
import matplotlib.pyplot as plt
import numpy as np
seg = audiosegment.from_file("furelise.wav")
# Just take the first 3 seconds
hist_bins, hist_vals = seg[1:3000].fft()
hist_vals_real_normed = np.abs(hist_vals) / len(hist_vals)
plt.plot(hist_bins / 1000, hist_vals_real_normed)
plt.xlabel("kHz")
plt.ylabel("dB")
plt.show()
.. image:: images/fft.png
:param start_s: The start time in seconds. If this is specified, you cannot specify `start_sample`.
:param duration_s: The duration of the slice in seconds. If this is specified, you cannot specify `num_samples`.
:param start_sample: The zero-based index of the first sample to include in the slice.
If this is specified, you cannot specify `start_s`.
:param num_samples: The number of samples to include in the slice. If this is specified, you cannot
specify `duration_s`.
:param zero_pad: If True and the combination of start and duration result in running off the end of
the AudioSegment, the end is zero padded to prevent this.
:returns: np.ndarray of frequencies, np.ndarray of amount of each frequency
:raises: ValueError If `start_s` and `start_sample` are both specified and/or if both `duration_s` and
`num_samples` are specified.
"""
if start_s is not None and start_sample is not None:
raise ValueError("Only one of start_s and start_sample can be specified.")
if duration_s is not None and num_samples is not None:
raise ValueError("Only one of duration_s and num_samples can be specified.")
if start_s is None and start_sample is None:
start_sample = 0
if duration_s is None and num_samples is None:
num_samples = len(self.get_array_of_samples()) - int(start_sample)
if duration_s is not None:
num_samples = int(round(duration_s * self.frame_rate))
if start_s is not None:
start_sample = int(round(start_s * self.frame_rate))
end_sample = start_sample + num_samples # end_sample is excluded
if end_sample > len(self.get_array_of_samples()) and not zero_pad:
raise ValueError("The combination of start and duration will run off the end of the AudioSegment object.")
elif end_sample > len(self.get_array_of_samples()) and zero_pad:
arr = np.array(self.get_array_of_samples())
zeros = np.zeros(end_sample - len(arr))
arr = np.append(arr, zeros)
else:
arr = np.array(self.get_array_of_samples())
audioslice = np.array(arr[start_sample:end_sample])
fft_result = np.fft.fft(audioslice)[range(int(round(num_samples/2)) + 1)]
step_size = self.frame_rate / num_samples
bins = np.arange(0, int(round(num_samples/2)) + 1, 1.0) * step_size
return bins, fft_result
def generate_frames(self, frame_duration_ms, zero_pad=True):
"""
Yields self's data in chunks of frame_duration_ms.
This function adapted from pywebrtc's example [https://github.com/wiseman/py-webrtcvad/blob/master/example.py].
:param frame_duration_ms: The length of each frame in ms.
:param zero_pad: Whether or not to zero pad the end of the AudioSegment object to get all
the audio data out as frames. If not, there may be a part at the end
of the Segment that is cut off (the part will be <= `frame_duration_ms` in length).
:returns: A Frame object with properties 'bytes (the data)', 'timestamp (start time)', and 'duration'.
"""
Frame = collections.namedtuple("Frame", "bytes timestamp duration")
# (samples/sec) * (seconds in a frame) * (bytes/sample)
bytes_per_frame = int(self.frame_rate * (frame_duration_ms / 1000) * self.sample_width)
offset = 0 # where we are so far in self's data (in bytes)
timestamp = 0.0 # where we are so far in self (in seconds)
# (bytes/frame) * (sample/bytes) * (sec/samples)
frame_duration_s = (bytes_per_frame / self.frame_rate) / self.sample_width
while offset + bytes_per_frame < len(self.raw_data):
yield Frame(self.raw_data[offset:offset + bytes_per_frame], timestamp, frame_duration_s)
timestamp += frame_duration_s
offset += bytes_per_frame
if zero_pad:
rest = self.raw_data[offset:]
zeros = bytes(bytes_per_frame - len(rest))
yield Frame(rest + zeros, timestamp, frame_duration_s)
def generate_frames_as_segments(self, frame_duration_ms, zero_pad=True):
"""
Does the same thing as `generate_frames`, but yields tuples of (AudioSegment, timestamp) instead of Frames.
"""
for frame in self.generate_frames(frame_duration_ms, zero_pad=zero_pad):
seg = AudioSegment(pydub.AudioSegment(data=frame.bytes, sample_width=self.sample_width,
frame_rate=self.frame_rate, channels=self.channels), self.name)
yield seg, frame.timestamp
def normalize_spl_by_average(self, db):
"""
Normalize the values in the AudioSegment so that its average dB value
is `db`.
The dB of a value is calculated as 20 * log10(abs(value + 1E-9)).
:param db: The decibels to normalize average to.
:returns: A new AudioSegment object whose values are changed so that their
average is `db`.
"""
def inverse_spl(val):
"""Calculates the (positive) 'PCM' value for the given SPl val"""
return 10 ** (val / 20.0)
# Convert dB into 'PCM'
db_pcm = inverse_spl(db)
# Calculate current 'PCM' average
curavg = np.abs(np.mean(self.to_numpy_array()))
# Calculate ratio of dB_pcm / curavg_pcm
ratio = db_pcm / curavg
# Multiply all values by ratio
dtype_dict = {1: np.int8, 2: np.int16, 4: np.int32}
dtype = dtype_dict[self.sample_width]
new_seg = from_numpy_array(np.array(self.to_numpy_array() * ratio, dtype=dtype), self.frame_rate)
# Check SPL average to see if we are right
#assert math.isclose(np.mean(new_seg.spl), db), "new = " + str(np.mean(new_seg.spl)) + " != " + str(db)
return new_seg
def reduce(self, others):
"""
Reduces others into this one by concatenating all the others onto this one and
returning the result. Does not modify self, instead, makes a copy and returns that.
:param others: The other AudioSegment objects to append to this one.
:returns: The concatenated result.
"""
ret = AudioSegment(self.seg, self.name)
selfdata = [self.seg._data]
otherdata = [o.seg._data for o in others]
ret.seg._data = b''.join(selfdata + otherdata)
return ret
def resample(self, sample_rate_Hz=None, sample_width=None, channels=None, console_output=False):
"""
Returns a new AudioSegment whose data is the same as this one, but which has been resampled to the
specified characteristics. Any parameter left None will be unchanged.
.. note:: This method requires that you have the program 'sox' installed.
.. warning:: This method uses the program 'sox' to perform the task. While this is very fast for a single
function call, the IO may add up for large numbers of AudioSegment objects.
:param sample_rate_Hz: The new sample rate in Hz.
:param sample_width: The new sample width in bytes, so sample_width=2 would correspond to 16 bit (2 byte) width.
:param channels: The new number of channels.
:param console_output: Will print the output of sox to the console if True.
:returns: The newly sampled AudioSegment.
"""
if sample_rate_Hz is None:
sample_rate_Hz = self.frame_rate
if sample_width is None:
sample_width = self.sample_width
if channels is None:
channels = self.channels
command = "sox {inputfile} -b " + str(sample_width * 8) + " -r " + str(sample_rate_Hz) \
+ " -t wav {outputfile} channels " + str(channels)
return self._execute_sox_cmd(command, console_output=console_output)
def __getstate__(self):
"""
Serializes into a dict for the pickle protocol.
:returns: The dict to pickle.
"""
return {'name': self.name, 'seg': self.seg}
def __setstate__(self, d):
"""
Deserializes from a dict for the pickle protocol.
:param d: The dict to unpickle from.
"""
self.__dict__.update(d)
def serialize(self):
"""
Serializes into a bytestring.
:returns: An object of type Bytes.
"""
d = self.__getstate__()
return pickle.dumps({
'name': d['name'],
'seg': pickle.dumps(d['seg'], protocol=-1),
}, protocol=-1)
def spectrogram(self, start_s=None, duration_s=None, start_sample=None, num_samples=None,
window_length_s=None, window_length_samples=None, overlap=0.5):
"""
Does a series of FFTs from `start_s` or `start_sample` for `duration_s` or `num_samples`.
Effectively, transforms a slice of the AudioSegment into the frequency domain across different
time bins.
.. code-block:: python
# Example for plotting a spectrogram using this function
import audiosegment
import matplotlib.pyplot as plt
#...
seg = audiosegment.from_file("somebodytalking.wav")
freqs, times, amplitudes = seg.spectrogram(window_length_s=0.03, overlap=0.5)
amplitudes = 10 * np.log10(amplitudes + 1e-9)
# Plot
plt.pcolormesh(times, freqs, amplitudes)
plt.xlabel("Time in Seconds")
plt.ylabel("Frequency in Hz")
plt.show()
.. image:: images/spectrogram.png
:param start_s: The start time. Starts at the beginning if neither this nor `start_sample` is specified.
:param duration_s: The duration of the spectrogram in seconds. Goes to the end if neither this nor
`num_samples` is specified.
:param start_sample: The index of the first sample to use. Starts at the beginning if neither this nor
`start_s` is specified.
:param num_samples: The number of samples in the spectrogram. Goes to the end if neither this nor
`duration_s` is specified.
:param window_length_s: The length of each FFT in seconds. If the total number of samples in the spectrogram
is not a multiple of the window length in samples, the last window will be zero-padded.
:param window_length_samples: The length of each FFT in number of samples. If the total number of samples in the
spectrogram is not a multiple of the window length in samples, the last window will
be zero-padded.
:param overlap: The fraction of each window to overlap.
:returns: Three np.ndarrays: The frequency values in Hz (the y-axis in a spectrogram), the time values starting
at start time and then increasing by `duration_s` each step (the x-axis in a spectrogram), and
the dB of each time/frequency bin as a 2D array of shape [len(frequency values), len(duration)].
:raises ValueError: If `start_s` and `start_sample` are both specified, if `duration_s` and `num_samples` are both
specified, if the first window's duration plus start time lead to running off the end
of the AudioSegment, or if `window_length_s` and `window_length_samples` are either
both specified or if they are both not specified.
"""
if start_s is not None and start_sample is not None:
raise ValueError("Only one of start_s and start_sample may be specified.")
if duration_s is not None and num_samples is not None:
raise ValueError("Only one of duration_s and num_samples may be specified.")
if window_length_s is not None and window_length_samples is not None:
raise ValueError("Only one of window_length_s and window_length_samples may be specified.")
if window_length_s is None and window_length_samples is None:
raise ValueError("You must specify a window length, either in window_length_s or in window_length_samples.")
if start_s is None and start_sample is None:
start_sample = 0
if duration_s is None and num_samples is None:
num_samples = len(self.get_array_of_samples()) - int(start_sample)
if duration_s is not None:
num_samples = int(round(duration_s * self.frame_rate))
if start_s is not None:
start_sample = int(round(start_s * self.frame_rate))
if window_length_s is not None:
window_length_samples = int(round(window_length_s * self.frame_rate))
if start_sample + num_samples > len(self.get_array_of_samples()):
raise ValueError("The combination of start and duration will run off the end of the AudioSegment object.")
f, t, sxx = signal.spectrogram(self.to_numpy_array(), self.frame_rate, scaling='spectrum', nperseg=window_length_samples,
noverlap=int(round(overlap * window_length_samples)),
mode='magnitude')
return f, t, sxx
def to_numpy_array(self):
"""
Convenience function for `np.array(self.get_array_of_samples())` while
keeping the appropriate dtype.
"""
dtype_dict = {
1: np.int8,
2: np.int16,
4: np.int32
}
dtype = dtype_dict[self.sample_width]
return np.array(self.get_array_of_samples(), dtype=dtype)
@deprecated
def trim_to_minutes(self, strip_last_seconds=False):
"""
Returns a list of minute-long (at most) Segment objects.
.. note:: This function has been deprecated. Use the `dice` function instead.
:param strip_last_seconds: If True, this method will return minute-long segments,
but the last three seconds of this AudioSegment won't be returned.
This is useful for removing the microphone artifact at the end of the recording.
:returns: A list of AudioSegment objects, each of which is one minute long at most
(and only the last one - if any - will be less than one minute).
"""
outs = self.dice(seconds=60, zero_pad=False)
# Now cut out the last three seconds of the last item in outs (it will just be microphone artifact)
# or, if the last item is less than three seconds, just get rid of it
if strip_last_seconds:
if outs[-1].duration_seconds > 3:
outs[-1] = outs[-1][:-MS_PER_S * 3]
else:
outs = outs[:-1]
return outs
def zero_extend(self, duration_s=None, num_samples=None):
"""
Adds a number of zeros (digital silence) to the AudioSegment (returning a new one).
:param duration_s: The number of seconds of zeros to add. If this is specified, `num_samples` must be None.
:param num_samples: The number of zeros to add. If this is specified, `duration_s` must be None.
:returns: A new AudioSegment object that has been zero extended.
:raises: ValueError if duration_s and num_samples are both specified.
"""
if duration_s is not None and num_samples is not None:
raise ValueError("`duration_s` and `num_samples` cannot both be specified.")
elif duration_s is not None:
num_samples = self.frame_rate * duration_s
seg = AudioSegment(self.seg, self.name)
zeros = silent(duration=num_samples / self.frame_rate, frame_rate=self.frame_rate)
return zeros.overlay(seg)
def deserialize(bstr):
"""
Attempts to deserialize a bytestring into an audiosegment.
:param bstr: The bytestring serialized via an audiosegment's serialize() method.
:returns: An AudioSegment object deserialized from `bstr`.
"""
d = pickle.loads(bstr)
seg = pickle.loads(d['seg'])
return AudioSegment(seg, d['name'])
def empty():
"""
Creates a zero-duration AudioSegment object.
:returns: An empty AudioSegment object.
"""
dubseg = pydub.AudioSegment.empty()
return AudioSegment(dubseg, "")
def from_file(path):
"""
Returns an AudioSegment object from the given file based on its file extension.
If the extension is wrong, this will throw some sort of error.
:param path: The path to the file, including the file extension.
:returns: An AudioSegment instance from the file.
"""
_name, ext = os.path.splitext(path)
ext = ext.lower()[1:]
if "m4a" in path:
ext="m4a"
elif "wav" in path:
ext="wav"
seg = pydub.AudioSegment.from_file(path, format=ext)
return AudioSegment(seg, path)
def from_mono_audiosegments(*args):
"""
Creates a multi-channel AudioSegment out of multiple mono AudioSegments (two or more). Each mono
AudioSegment passed in should be exactly the same number of samples.
:returns: An AudioSegment of multiple channels formed from the given mono AudioSegments.
"""
return AudioSegment(pydub.AudioSegment.from_mono_audiosegments(*args), "")
def from_numpy_array(nparr, framerate):
"""
Returns an AudioSegment created from the given numpy array.
The numpy array must have shape = (num_samples, num_channels).
:param nparr: The numpy array to create an AudioSegment from.
:returns: An AudioSegment created from the given array.
"""
# interleave the audio across all channels and collapse
if nparr.dtype.itemsize not in (1, 2, 4):
raise ValueError("Numpy Array must contain 8, 16, or 32 bit values.")
if len(nparr.shape) == 1:
arrays = [nparr]
elif len(nparr.shape) == 2:
arrays = [nparr[i,:] for i in range(nparr.shape[0])]
else:
raise ValueError("Numpy Array must be one or two dimensional. Shape must be: (num_samples, num_channels).")
interleaved = np.vstack(arrays).reshape((-1,), order='F')
dubseg = pydub.AudioSegment(interleaved.tobytes(),
frame_rate=framerate,
sample_width=interleaved.dtype.itemsize,
channels=len(interleaved.shape)
)
return AudioSegment(dubseg, "")
def silent(duration=1000, frame_rate=11025):
"""
Creates an AudioSegment object of the specified duration/frame_rate filled with digital silence.
:param duration: The duration of the returned object in ms.
:param frame_rate: The samples per second of the returned object.
:returns: AudioSegment object filled with pure digital silence.
"""
seg = pydub.AudioSegment.silent(duration=duration, frame_rate=frame_rate)
return AudioSegment(seg, "")