asr_eval.streaming¶
Streaming ASR pipelines construction and evaluation.
Model¶
A streaming pipeline starts with subclassing a
StreamingASR model. It runs in a
separate thread, accepts a stream of
InputChunk and emits a stream of
OutputChunk. Each chunk contains
audio recording id, which allows to process multiple recordings in
parallel.
Input format. A model may accept different audio formats (float32,
WAV etc.) and sample rates - this is defined in the
audio_type and
sampling_rate fields.
Output format. A each output chunk contains a
TranscriptionChunk which carries a
text and a chunk ID. Re-emitting new chunk with the same ID allows to
correct a partial transcription emitted earlier.
Sending and receiving¶
One can directly send and receive chunks from
StreamingASR. These methods can be
connected to web API routes in production.
To evaluate, several automation tools exist:
Sending. A make_sender()
tool automates sending. It creates a list of
Cutoff that define a schedule for
sending audio chunks. Then a
StreamingSender is instantiated.
You can further call sender.start_sending() to start sending.
Receiving. A receiving outputs can be automated via
receive_transcription().
Remapping¶
Remapping is an optional mechanism that eliminates time spans where both
the sender waits (due to its schedule) and the model waits (because it
already processed the chunk and waits for the next). This makes
evaluation faster than real time. A remapping
remap_time() is the operation
that correct the timings as if the sending were in real time. However,
that this is not applicable (would work incorrectly) for
StreamingASR that start another threads from its main thread
(where is_multithreaded
is True).
Evaluating¶
To evaluate, one needs audio samples with word timings. This can be done
via forced alignment using a
CTC model and
fill_word_timings_inplace() function.
The evaluate_streaming() function
automates evaluation. It analyses a history of input and output chunks
(this history can also be serialized to json and loaded back). As a
result, we obtain a
RecordingStreamingEvaluation
object.
Further we can draw various diagrams, such as:
draw_partial_alignment()
This package also features two wrappers that convert a non-streaming
model into a streaming one
(OfflineToStreaming) or back
(StreamingToOffline).
- class asr_eval.streaming.buffer.StreamingQueue(name='unnamed')[source]¶
Bases:
GenericSimilar to
queue.Queuewith the following differences: - Typization. This is a generic class for any element type. - Each element has an ID (not unqiue), and we can.get()thenext element for a specific ID. For example, IDs can be audio recording IDs for each audio chunk, when transcribing multiple recordings in parallel.
If an exception occurs, the procucer thread can
.put()the exception into the queue, instead of the next chunk. It will be raised in the consumer thread on the next.get()operation.
- Parameters:
name (str)
- put(data, id=0)[source]¶
Add new element into a queue (non-blocking, thread-safe).
- Return type:
None- Parameters:
data (T)
id (int | str)
- get(id=None, timeout=None)[source]¶
Wait for an alement to appear in the queue, pop and return it (blocking, thread-safe).
- Parameters:
id (
int|str|None) – The required ID to get. If None, will return an element with any ID. If not None, will return only an element with the specified ID.timeout (
float|None) – if set, will raiseTimeoutErrorif waiting takes longer thantimeoutseconds.
- Return type:
tuple[TypeVar(T),int|str]
- asr_eval.streaming.caller.receive_transcription(asr, id)[source]¶
Blocks and waits until the full transcription (ended with
Signal.FINISH) received for the given ID.- Return type:
Iterable[OutputChunk]- Parameters:
asr (StreamingASR)
id (int | str)
- asr_eval.streaming.caller.transcribe_parallel(asr, waveforms, n_threads, send_all_without_delays=False, real_time_interval_sec=1 / 25, speed_multiplier=1)[source]¶
Transcribes the waveforms in parallel, but with no more than
n_threadssimultaneous senders.Call
asr.start_thread()before calling this method, andasr.stop_thread()after.- Return type:
dict[int|str,list[OutputChunk]]- Parameters:
asr (StreamingASR)
waveforms (list[ndarray[tuple[int, ...], dtype[floating[Any]]]])
n_threads (int)
send_all_without_delays (bool)
real_time_interval_sec (float)
speed_multiplier (float)
- class asr_eval.streaming.evaluation.StreamingEvaluationResults(*, timed_transcription, waveform, cutoffs, input_chunks, output_chunks, partial_alignments)[source]¶
A container for evaluation results for a streaming speech recognition on a single sample.
Usually a result of the
evaluate_streaming()function.- Parameters:
timed_transcription (Transcription)
waveform (ndarray[tuple[int, ...], dtype[floating[Any]]])
cutoffs (list[Cutoff])
input_chunks (list[InputChunk])
output_chunks (list[OutputChunk])
partial_alignments (list[PartialAlignment])
- timed_transcription: Transcription¶
The ground truth transcription for the whole audio with filled timings for each token.
- waveform: ndarray[tuple[int, ...], dtype[floating[Any]]]¶
A waveform in float32 dtype with sampling rate 16000.
- input_chunks: list[InputChunk]¶
The input chunks history. The fields
.put_timestampand.get_timestampare relative to the start time.
- output_chunks: list[OutputChunk]¶
The output chunks history. The fields
.put_timestampand.get_timestampare relative to the start time.
- partial_alignments: list[PartialAlignment]¶
Alignments of the partial transcriptions against starting parts of the ground truth. Each partial alignment keep the
at_timefield that indicates a timestamp relative to the start time.
- property start_timestamp: float¶
A start time, where the first input chunks was put into the input buffer.Should be always zero, because all the timestamps in the
StreamingEvaluationResultsare relative to this moment.
- property finish_timestamp: float¶
A finish time, where the last output chunk was put into the output buffer. The timestamp is relative to the starting moment.
- asr_eval.streaming.evaluation.make_sender(waveform, asr, real_time_interval_sec=1 / 5, speed_multiplier=1, uid=None)[source]¶
An automation to make a sender that sends an audio recording into a
StreamingASR.After running
cutoffs, sender = make_sender(...), you typically need to runsender.start_sending()to start a thread that actually sends all the chunks.- Parameters:
waveform (
ndarray[tuple[int,...],dtype[floating[Any]]]) – The audio in float32 dtype with sampling rate 16000. Note that the streaming recognizer may accept a different sampling rate or dtype. A conversion to the required rate and dtype will be done on-the-fly inside this function.asr (
StreamingASR) – A streaming transcriber to send chunks into.real_time_interval_sec (
float) – How often in real time to send chunks?speed_multiplier (
float) – For example, ifspeed_multiplier=2, will sent the audio twice of normal speed, that is, a 10 seconds audio will be sent in 5 seconds.uid (
str|None) – Assign UID to the recording (select ranfom of omitted).
- Return type:
tuple[list[Cutoff],StreamingSender]- Returns:
- A sending schedule in form of a list of cutoffs. See
get_uniform_cutoffs()for details.
A sender object thatis ready to start sending. Call
start_sending()to start sending chunks.
- asr_eval.streaming.evaluation.evaluate_streaming(timed_transcription, waveform, cutoffs, input_chunks, output_chunks, partial_alignment_interval=0.25)[source]¶
An automation to evaluate streaming recognition results.
Aligns partial transcriptions against starting parts of the ground truth.
For each of the
timestampsobtains ths starting part of thetimed_transcriptionup to the specified timestamp, and aligns against the partial transcription that was received up to the specified timestamp. If the timestamp is inside a word in the ground truth transcription, considers two partial true transcriptions - with and without this word - and selects one with the best alignment score.- Parameters:
timed_transcription (
Transcription) – The ground truth transcription for the whole audio with filled timings for each token. Is typically obtained withfill_word_timings_inplace().waveform (
ndarray[tuple[int,...],dtype[floating[Any]]]) – A waveform in float32 dtype with sampling rate 16000. Note that the streaming recognizer may accept a different sampling rate or dtype. A conversion to the required rate and dtype is typically done on-the-fly insidemake_sender()function.cutoffs (
list[Cutoff]) – A schedule on which the input chunks was sent.input_chunks (
list[InputChunk]) – The input chunks history. Will create a copy of each chunk with relative timestamps instead of absolute. Will not modify the original chunks.output_chunks (
list[OutputChunk]) – The outputs chunks history. Will create a copy of each chunk with relative timestamps instead of absolute. Will not modify the original chunks.partial_alignment_interval (
float) – Time interval between consecutive alignments of the partial transcriptions against starting parts of the ground truth. Is real-timescale: for example, if a 10 sec long audios is transcribed for 30 seconds, andpartial_alignment_interval=1, then we will get 30 partial alignments.
- Return type:
- Returns:
A
StreamingEvaluationResultsdataclass that scores the resulting partial alignments, as well as the input data.
- class asr_eval.streaming.evaluation.PartialAlignment(pred, alignment, at_time, audio_seconds_sent, audio_seconds_processed)[source]¶
An alignment between the ground truth up to the
audio_seconds_sentand the partial transcription.- Parameters:
pred (SingleVariantTranscription)
alignment (MatchesList)
at_time (float)
audio_seconds_sent (float)
audio_seconds_processed (float)
- pred: SingleVariantTranscription¶
A partial transcription from the streaming model. While the raw transcription is provided in form of the transcription chunks, this field represents the chunks joined with
join()to form a transcription as text, and then parsed into words.
- alignment: MatchesList¶
An alignment between the ground truth starting part, and the partial transcription.
- at_time: float¶
The timestamp where the alignment was evaluated. All the output chunks sent later than this timestamp are not included.
- audio_seconds_processed: float¶
How many seconds of the audio was processed by the time
at_time. This value is extracted from the output chunks (seeseconds_processed).
- get_error_positions()[source]¶
Categorizes each word match from
alignmentinto one of 5 types: (“correct”, “deletion”, “insertion”, “replacement”, “not_yet”). See theStreamingASRErrorPositiondocs for details.- Return type:
- class asr_eval.streaming.evaluation.StreamingASRErrorPosition(start_time, end_time, sent_time, processed_time, status)[source]¶
A word-level match in a
PartialAlignmentwith assignedstatus.- Parameters:
start_time (float)
end_time (float)
sent_time (float)
processed_time (float)
status (Literal['correct', 'deletion', 'insertion', 'replacement', 'not_yet'])
- start_time: float¶
Start time of the ground truth word. If the match is insertion, no ground truth word exists, and the start time is the end time of the previous ground truth word, or zero.
- end_time: float¶
End time of the ground truth word. If the match is insertion, no ground truth word exists, and the end time is the start time of the next ground truth word, or the processed time.
- sent_time: float¶
How much seconds of the input audio was sent at the time when the current partial alignment was calculated.
- processed_time: float¶
How much seconds of the input audio was processed at the time when the current partial alignment was calculated (see
seconds_processed).
- status: Literal['correct', 'deletion', 'insertion', 'replacement', 'not_yet']¶
(“correct”, “deletion”, “insertion”, “replacement”, “not_yet”). The first 4 statuses are explained in the
status. The status “not_yet” is a special status that is assigned for trailing deletions. We consider that if a deletion is trailing, it represents a word not transcribed yet. This may occur either due to long inference times which cause delays, or because a model refuses to transcribe until it accumulates enough context. The fieldprocessed_timeallows to differentiate between these two reasons.- Type:
One of 5 statuses
- property center_time: float¶
A center between the start and the end time.
- asr_eval.streaming.evaluation.get_audio_seconds_sent(time, input_chunks)[source]¶
Given a full history of input chunks, and a
time, finds the last sent chunk with put timestamp before thetimeand returns its.end_time. If no such chunks, returns 0.- Return type:
float- Parameters:
time (float)
input_chunks (Sequence[InputChunk])
- asr_eval.streaming.evaluation.get_audio_seconds_processed(time, output_chunks)[source]¶
Given a full history of output chunks, and a
time`, finds the last sent chunk with put timestamp beforetimeand returns itsseconds_processed. If no such chunks, returns 0.- Return type:
float- Parameters:
time (float)
output_chunks (Sequence[OutputChunk])
- asr_eval.streaming.evaluation.get_partial_alignments(input_history, output_history, timed_transcription, timestamps=None, processes=1)[source]¶
Aligns partial transcriptions against starting parts of the ground truth.
For each of the
timestampsobtains ths starting part of thetimed_transcriptionup to the specified timestamp, and aligns against the partial transcription that was received up to the specified timestamp. If the timestamp is inside a word in the ground truth transcription, considers two partial true transcriptions - with and without this word - and selects one with the best alignment score.- Parameters:
input_history (
Sequence[InputChunk]) – The input chunks history.output_history (
Sequence[OutputChunk]) – The output chunks history.true_word_timings – The ground truth transcription for the whole audio with filled timings for each token. Is typically obtained with
fill_word_timings_inplace().timestamps (
list[float] |ndarray[tuple[int,...],dtype[floating[Any]]] |None) – A list of times when to evaluate partial results. If None, will evaluate after each of the output chunks, except the lastSignal.FINISHchunk if present.processes (
int) – If > 1, paralellizes using multiprocessing (we cannot use multithreading here because of GIL, considering that the alignment function is written on pure Python).timed_transcription (Transcription)
- Return type:
list[PartialAlignment]
- asr_eval.streaming.evaluation.remap_time(cutoffs, input_chunks, output_chunks)[source]¶
Remapping is an optional mechanism that eliminates time spans where both the sender waits (due to its schedule) and the model waits (because it already processed the chunk and waits for the next). This makes evaluation faster than real time with the same results. Using remapping is meaningful when input chunks was sent with
without_delays=True.Technically,
remap_timeadds artificial delays in some places, shifting put timestamps and get timestamps forward for both input and output chuks. More concretely, it iterates chunks from the first to the last and finds input chunks that were taken from the input buffer until they should be placed in the buffer according to thecutoffsschedule. When such a situation is found, all the put and get timestamps starting from this time are shifted forwards by the calculated time delta.In the end, this allows to imitate a chunk history as it would have looked if
without_delays=Falsein senders.Note
This is not applicable (would work incorrectly) for
StreamingASRthat start another threads from its main beckground thread (whereis_multithreadedis True).- Return type:
tuple[list[InputChunk],list[OutputChunk]]- Parameters:
cutoffs (list[Cutoff])
input_chunks (list[InputChunk])
output_chunks (list[OutputChunk])
- class asr_eval.streaming.model.Signal(*values)[source]¶
Bases:
EnumSignals to control StreamingASR thread. See
StreamingASRdocs for details.- FINISH = 0¶
Indicates that stream finishes for the given audio recording.
- exception asr_eval.streaming.model.Exit[source]¶
Bases:
ExceptionA signal to terminate StreamingASR thread. See
StreamingASRdocs for details.
- asr_eval.streaming.model.AUDIO_CHUNK_TYPE = numpy.ndarray[tuple[int, ...], numpy.dtype[numpy.floating[typing.Any]]] | numpy.ndarray[tuple[int, ...], numpy.dtype[numpy.integer[typing.Any]]] | bytes¶
Audio stream that is chunkable using slices.
- class asr_eval.streaming.model.CHUNK_TYPE¶
Either input or output chunk.
alias of TypeVar(‘CHUNK_TYPE’, ~asr_eval.streaming.model.InputChunk, ~asr_eval.streaming.model.OutputChunk)
- class asr_eval.streaming.model.InputChunk(*, data, end_time, put_timestamp=nan, get_timestamp=nan)[source]¶
An input chunk for
StreamingASR. Input chunks can be sent byStreamingSenderor manually and received byStreamingASRthread.See
StreamingASRdocs for usage details.- Parameters:
data (ndarray[tuple[int, ...], dtype[floating[Any]]] | ndarray[tuple[int, ...], dtype[integer[Any]]] | bytes | Literal[Signal.FINISH])
end_time (float)
put_timestamp (float)
get_timestamp (float)
- data: ndarray[tuple[int, ...], dtype[floating[Any]]] | ndarray[tuple[int, ...], dtype[integer[Any]]] | bytes | Literal[Signal.FINISH]¶
Either a chunk of audio stream, or a
Signal.FINISH.
- end_time: float¶
A chunk end time (in seconds) in the audio timescale, where 0 means the beginning of the audio recording.
- put_timestamp: float = nan¶
Is filled automatically when the chunk is added to the input buffer.
- get_timestamp: float = nan¶
Is filled automatically when the
StreamingASRthread takes the chunk from the buffer.
- class asr_eval.streaming.model.OutputChunk(*, data, seconds_processed, put_timestamp=nan, get_timestamp=nan)[source]¶
An output chunk for
StreamingASR. Output chunks are sent byStreamingASRthread and received manually or byreceive_transcription().See
StreamingASRandreceive_transcription()docs for usage details.- Parameters:
data (TranscriptionChunk | Literal[Signal.FINISH])
seconds_processed (float)
put_timestamp (float)
get_timestamp (float)
- data: TranscriptionChunk | Literal[Signal.FINISH]¶
Either a part of transcription, or a
Signal.FINISH.
- seconds_processed: float¶
A total audio seconds processed before emitting the current chunk. Is filled by the transcriber.
- put_timestamp: float = nan¶
Is filled automatically when the chunk is added to the output buffer.
- get_timestamp: float = nan¶
Is filled automatically when the chunk is taken from the output buffer.
- asr_eval.streaming.model.check_consistency(input_chunks, output_chunks)[source]¶
Asserts that:
For each input and output chunk
put_timestamp <= get_timestamp.
2. For each output chunk
seconds_processedis not larger than audio seconds taken from the input buffer by the time the output is put into the buffer.Fails indicate errors in the chunk processing pipeline (sender, buffer or model).
- Raises:
AssertionError – On check failures.
- Parameters:
input_chunks (list[InputChunk])
output_chunks (list[OutputChunk])
- class asr_eval.streaming.model.ASRStreamingQueue(name='unnamed')[source]¶
Bases:
StreamingQueue[CHUNK_TYPE]An input or output buffer in
StreamingASR.This subclass extends the
StreamingQueue:1. It fills
put_timestamp,get_timestampfor input or output chunks. 2. It asserts that ifSignal.FINISHwas received, no more chunks are expected for this audio recording ID.- Parameters:
name (str)
- get(id=None, timeout=None)[source]¶
Wait for an alement to appear in the queue, pop and return it (blocking, thread-safe).
- Parameters:
id (
int|str|None) – The required ID to get. If None, will return an element with any ID. If not None, will return only an element with the specified ID.timeout (
float|None) – if set, will raiseTimeoutErrorif waiting takes longer thantimeoutseconds.
- Return type:
tuple[TypeVar(CHUNK_TYPE,InputChunk,OutputChunk),int|str]
- put(data, id=0)[source]¶
Add new element into a queue (non-blocking, thread-safe).
- Return type:
None- Parameters:
data (CHUNK_TYPE)
id (int | str)
- class asr_eval.streaming.model.InputBuffer(name='unnamed')[source]¶
Bases:
ASRStreamingQueue[InputChunk]An input buffer for
StreamingASR.This subclass adds a
get_with_rechunking()method. If it was called at least once, a rechunking mode is enabled and.get()cannot be called anymore.- Parameters:
name (str)
- get(id=None, timeout=None)[source]¶
Wait for an alement to appear in the queue, pop and return it (blocking, thread-safe).
- Parameters:
id (
int|str|None) – The required ID to get. If None, will return an element with any ID. If not None, will return only an element with the specified ID.timeout (
float|None) – if set, will raiseTimeoutErrorif waiting takes longer thantimeoutseconds.
- Return type:
tuple[InputChunk,int|str]
- get_with_rechunking(size, id=None)[source]¶
Internally calles ;code:.get() as many times as needed and concatenates and/or slices the results to obtain the desired array size.
For example, let each input chunk contain 1000 audio frames, and we requested
size=2400. The.get()will be called 3 times, and the last chunks will be split into two parts, of size 400 and 600. An array of size 2400 will be returned, and 600 remaining elements will be kept in the rechunking buffer. If then werequest size=100, the array of size 100 will be returned without newget(), and buffer will keep 500 remaining elements, and so on.The retuned array can be smaller than requested only if
Signal.FINISHreached for the ID.- Return type:
tuple[int|str,ndarray[tuple[int,...],dtype[floating[Any]]] |ndarray[tuple[int,...],dtype[integer[Any]]] |bytes|None,bool,float]- Returns:
ID (equals the
idargument if was specified, otherwise the first available id).Audio chunk of the desired size (or less if
Signal.FINISHreached).A flag if
Signal.FINISHreached.The audio end time of the last recived chunk (even if its part is still in the rechunking buffer). TODO maybe set the audio end time more correctly?
- Parameters:
size (int)
id (int | str | None)
- class asr_eval.streaming.model.OutputBuffer(name='unnamed')[source]¶
Bases:
ASRStreamingQueue[OutputChunk]An output buffer for StreamingASR.
- Parameters:
name (str)
- class asr_eval.streaming.model.StreamingASR(sampling_rate=16_000)[source]¶
Bases:
ABCAn abstract streaming transcriber that is able to process multiple audio recordings in parallel.
Accepts a stream of input chunks marked by recording ID and emits a stream of output chunks.
Definitions:
Audio chunk: a part of a waveform. Sampling rate is defined in the class constructor. For example, a 10 sec mono recording with rate 16_000 can be represented as 10 chunks, each with shape
(16_000,). Several channels can also be supported for some models. The chunk length is not restricted. NOTE: eachStreamingASRimplementation hasaudio_typeandsampling_ratefields that define a required type and sampling rate of audio chunks.TranscriptionChunk: a partial transcription that either add new words to the transcription or edit the previous words. See details in the
TranscriptionChunkdocs.Recording ID: a unique int or string identifier for an audio recording. This is useful if several recordings are streamed simultaneously, and we should know which audio recording each chunk belongs to. IDs should be unique for
StreamingASRobject and should not be reused, or exception will be thrown.Signal.FINISH: a symbol that signals that a stream for a specific recording ID has ended. This can refer to either the input stream (audio chunks) or the output stream (:code:`TranscriptionChunk`s).
Exit: an exception that signals that all streams for all recording IDs have ended. This can refer to either the input stream or the output stream. After receiving
Exitfrom the input buffer and sendingExitto the output buffer,StreamingASRthread finishes.
Data model:
Each input chunk can be one of:
An
InputChunk(id=<Recording ID>, data=<Audio chunk>).An
InputChunk(id=<Recording ID>, data=Signal.FINISH)- indicates that the audio for the ID has been fully sent.
Each output chunk can be one of:
An
OutputChunk(id=<Recording ID>, data=<TranscriptionChunk>).An
OutputChunk(id=<Recording ID>, data=Signal.FINISH)- indicates that FINISH input chunk received fhr the given ID and the transcription is done.
Models may fill
.seconds_processedfield inOutputChunk- audio seconds processed (for the current recording ID) before yielding the current output chunk. This may be useful, because we could send 100 chunks (let it be 10 sec in total), but the model performs slow calculations and has already processed only 20 chunks (2 sec in total). Depending on the testing scenario we can treat the result as a partial transcription of the first 2 or 10 seconds of the audio signal.Sending and receiving:
After creating an
StreamingASRobject, we should start a thread that will process input chunks and emit output chunks. After this, new audio chunks can be sent using.input_buffer.put(...)(non-blocking), and the outputs can be received with.output_buffer.get(...)(blocks until output becomes available). Instead of manual sending, amake_sender()function can be helpful. It prepares a sender to send audio chunks with a delay between each chunk.Input and output buffers automatically fill the follwing fields:
InputChunk.put_timestamp- the time when the chunk added to theStreamingASR.input_buffer.InputChunk.get_timestamp- the time when the chunk received from theStreamingASR.input_buffer.OutputChunk.put_timestamp- the time when the chunk added to theStreamingASR.output_buffer.OutputChunk.get_timestamp- the time when the chunk received from theStreamingASR.output_buffer.
Pts 1, 4 happen in the caller thread, and pts 2, 3 happen in the
StreamingASRthread.Terminating a StreamingASR thread:
An
Exitexception in the input buffer indicates that all audios have been fully sent. AnExitexception in the output buffer indicates thatExitwas received from the input buffer and theStreamingASRthread exited. This does not mean that all transcriptions are fully done.Exception handling:
Any exception raised from the
StreamingASR:code:` thread will set the output buffer in the error state. This will raise the exception when reading from the output buffer.Trying to write invalid data into the input buffer (including reusing previous IDs) may set it into the error state. This will raise the exception when reading from the input buffer in the
StreamingASR. thread, then see pt. 1.Exceptions in the sender thread will set the input buffer into the error state, then see pt. 2.
Exitis a special exception type indicating that input or output stream has been closed properly.
Implementing models:
To subclass a
StreamingASR, one should implement_run()andaudio_typemethods. Also, a subclass__init__method shoud callsuper().__init__specifying the audio sampling rate.- Parameters:
sampling_rate (int)
- sampling_rate: int¶
Sampling rate for the input audio chunks.
TODO clarify what to set for bytes or WAV.
- start_thread()[source]¶
Start the background thread with
_run()that processes input chunks and emits outputs chunks.- Return type:
Self
- stop_thread()[source]¶
Stops the background thrad started with
start_thread().- Return type:
None
- is_thread_started()[source]¶
Is the background thread started with
start_thread()running?- Return type:
bool
- abstractmethod _run()[source]¶
A background thread that processes input chunks and emits outputs chunks.
Is started with
start_thread()and should live forever, usually withwhile Trueloop. To get the next input chunk, we can useself.input_buffer.get()orget_with_rechunking()(both methods block until the next chunk is available). To emit a new output chunk, useself.output_buffer.put()(non-blocking).For example, if 16_000 floats/sec are streamed, and an exteral sender sends chunks of size 1600 10 times per second, but your model want to get 1s chunks, call self.input_buffer.get_with_rechunking(size=16_000). This will block until 10 chunks are accumulated for any ID and return the result.
Normally on
stop_thread()anExitexception is raised when the_runmethod tries to read from the input buffer. It causes exit from_runand is handled in a wrapping method_run_and_send_exit. So, theExitexception should not be handled in_run.
- abstract property audio_type: Literal['float', 'int', 'bytes', 'wav']¶
The required input audio format. Together with
sampling_rateproperty, forms a specification of input audio.See also
convert_audio_format()for details about formats.
- property is_multithreaded: bool¶
Whether another threads are started from the background thread
_run().False by defatult. If overriden with True, the evaluation protocol will not try to use
remap_time().
- class asr_eval.streaming.model.DummyASR(sampling_rate=16_000)[source]¶
Bases:
StreamingASRWill transcribe N seconds long audio into “1 2 … N”.
- Parameters:
sampling_rate (int)
- _run()[source]¶
A background thread that processes input chunks and emits outputs chunks.
Is started with
start_thread()and should live forever, usually withwhile Trueloop. To get the next input chunk, we can useself.input_buffer.get()orget_with_rechunking()(both methods block until the next chunk is available). To emit a new output chunk, useself.output_buffer.put()(non-blocking).For example, if 16_000 floats/sec are streamed, and an exteral sender sends chunks of size 1600 10 times per second, but your model want to get 1s chunks, call self.input_buffer.get_with_rechunking(size=16_000). This will block until 10 chunks are accumulated for any ID and return the result.
Normally on
stop_thread()anExitexception is raised when the_runmethod tries to read from the input buffer. It causes exit from_runand is handled in a wrapping method_run_and_send_exit. So, theExitexception should not be handled in_run.
- property audio_type: Literal['float', 'int', 'bytes']¶
The required input audio format. Together with
sampling_rateproperty, forms a specification of input audio.See also
convert_audio_format()for details about formats.
- class asr_eval.streaming.model.TranscriptionChunk(*, uid=<factory>, text)[source]¶
A chunk returned by a
StreamingASRmodel.Contain a text and optional ID. If we want to override the previously emitted partial transcription, we should emit a new chunk with the same ID. It will not be treated as a new text to append, but as a correction for the previous chunk with this ID.
Note
A
TranscriptionChunkid is not the same concept as audio recording ID.Example
>>> from asr_eval.streaming.model import TranscriptionChunk >>> chunks = [] >>> # append a new chunk without an explicit uid to refer >>> # without ID we cannot correct this chunk later >>> chunks.append(TranscriptionChunk(text='word1')) >>> # append a new chunk with id 1 >>> chunks.append(TranscriptionChunk(uid=1, text='word2')) >>> # append a new chunk with id 2 >>> chunks.append(TranscriptionChunk(uid=2, text='word3')) >>> print(TranscriptionChunk.join(chunks)) word1 word2 word3 >>> # correct a chunk with id 1 >>> chunks.append(TranscriptionChunk(uid=1, text='word2a word2b')) >>> # remove a chunk with id 2 >>> chunks.append(TranscriptionChunk(uid=2, text='')) >>> print(TranscriptionChunk.join(chunks)) word1 word2a word2b
- Parameters:
uid (int | str)
text (str)
- classmethod join(transcriptions)[source]¶
Join transcription chunks. If the
transcriptionsareOutputChunkinstances, extracts a transcription chunks from each output chunk.See example in the
TranscriptionChunkdocs.- Return type:
str- Parameters:
transcriptions (Sequence[TranscriptionChunk] | Sequence[OutputChunk | Literal[Signal.FINISH]])
- asr_eval.streaming.plots.partial_alignments_plot(eval, ax=None)[source]¶
Draw a partial alignment diagram.
See more details and examples in the user guide: Streaming evaluation.
- Parameters:
eval (StreamingEvaluationResults)
ax (Axes | None)
- asr_eval.streaming.plots.visualize_history(input_chunks, output_chunks=None, ax=None)[source]¶
Visualize the history of sending and receiving chunks.
See more details and examples in the user guide: Streaming evaluation.
- Parameters:
input_chunks (list[InputChunk])
output_chunks (list[OutputChunk] | None)
ax (Axes | None)
- asr_eval.streaming.plots.streaming_error_vs_latency_histogram(evals, ax=None, max_latency=10)[source]¶
Summarizes error percentage versus latency in a historgram, given evaluations for multiple samples.
See more details and examples in the user guide: Streaming evaluation.
- Parameters:
evals (list[StreamingEvaluationResults])
ax (Axes | None)
max_latency (float)
- asr_eval.streaming.plots.latency_plot(evals, ax=None)[source]¶
Summarizes latencies, given evaluations for multiple samples.
See more details and examples in the user guide: Streaming evaluation.
- Parameters:
evals (list[StreamingEvaluationResults])
ax (Axes | None)
- asr_eval.streaming.plots.show_last_alignments(evals, ax=None)[source]¶
Visualizes the last alignments (finalized transcriptions), given evaluations for multiple samples.
See more details and examples in the user guide: Streaming evaluation.
- Parameters:
evals (list[StreamingEvaluationResults])
ax (Axes | None)
- class asr_eval.streaming.sender.Cutoff(t_real, t_audio, arr_pos)[source]¶
A container for audio position and a real time (relative) moment. Is used to shedule a waveform sending into
StreamingASR.Let we have an audio waveform and two consecutive cutoffs
[c1, c2]:[Cutoff(tr1, ta1, pos1), Cutoff(tr2, ta2, pos2)]
This means that:
waveform[:c1.arr_pos]gives an audio with lengthc1.t_audio.waveform[:c2.arr_pos]gives an audio with lengthc2.t_audio.
waveform[c1.arr_pos:c2.arr_pos]should be sent at the timec2.t_real.
- Parameters:
t_real (float)
t_audio (float)
arr_pos (int)
- t_real: float¶
A real world time measured from the beginning of the sending process.
- t_audio: float¶
A time in the audio timescale. For example, if we send 10 sec audio in 5 seconds (with 2x speed), the
t_realwill be 5 in the end, andt_audiowill be 10.
- arr_pos: int¶
A position in the audio as array. Is calculated from the
t_audiousing array length per second.
- asr_eval.streaming.sender.get_uniform_cutoffs(waveform, real_time_interval_sec=1 / 25, speed_multiplier=1.0, sampling_rate=16_000)[source]¶
Returns a uniform shedule to send the audio.
- Parameters:
waveform (
ndarray[tuple[int,...],dtype[floating[Any]]]) – The audio in float32 dtype.real_time_interval_sec (
float) – How often in real time to send chunks?speed_multiplier (
float) – For example, ifspeed_multiplier=2, will sent the audio twice of normal speed, that is, a 10 seconds audio will be sent in 5 seconds.sampling_rate (
int) – The sampling rate of thewaveform.
- Return type:
list[Cutoff]
- class asr_eval.streaming.sender.StreamingSender(cutoffs, waveform, asr, id=<factory>, verbose=False, sampling_rate=16000, _history=<factory>, _thread=None)[source]¶
Can be used to automate sending audio stream to
StreamingASR.- Parameters:
cutoffs (
list[Cutoff]) – A schedule to send the audio.waveform (
ndarray[tuple[int,...],dtype[floating[Any]]]) – A waveform in float32 dtype. The sampling rate information is encoded in cutoffs, because they store both the audio time and the audio array position.asr (
StreamingASR) – A streaming transcriber to send chunks into.id (
int|str) – A recording ID to assign.verbose (
bool) – Whether to print each chunk info to stdout.sampling_rate (int)
_history (list[InputChunk])
_thread (Thread | None)
Call
start_sending()to start a sending process.Note
Keeps the history of all sent chunks for evaluation purposes (can be retrieved with
join_and_get_history()). To avoid out of memory, ensure that senders are garbage-collected afterwards.- start_sending(without_delays=False)[source]¶
If
without_delays=False(default) starts sending in a separate thread according to the shedule given in constuctor. Ifwithout_delays=Truesends all the chunks immediately. Non-blocking.- Return type:
Self- Parameters:
without_delays (bool)
- join_and_get_history()[source]¶
Wait for the sending process to finish and return the history of chunks sent.
- Return type:
list[InputChunk]
- class asr_eval.streaming.wrappers.StreamingToOffline(streaming_model, start_thread=True)[source]¶
Bases:
TranscriberA wrapper that turns
StreamingASRinto aTranscriber. Transcribes the full audio and returns the full transcription.The
StreamingASRkeeps running after.transcribeand waits for new input streams. You may want to stop it via.streaming_model.stop_thread()at the end.- Parameters:
streaming_model (StreamingASR)
start_thread (bool)
- class asr_eval.streaming.wrappers.OfflineToStreaming(offline_model, interval=0.5)[source]¶
Bases:
StreamingASRConverts non-streaming (offline) ASR model into a streaming one. Calls the offline model with the given
interval(at the audio timescale).For example, let the audio be 3 seconds long, and
interval=1. Will call the offline model:On the waveform slice from 0 to 1 second (when enough data received)
On the waveform slice from 0 to 2 seconds (when enough data received)
On the waveform slice from 0 to 3 seconds (when enough data received)
Each time completely overwrites the old transcription with the new one (this is achieved by sending a new
TranscriptionChunkwith the same id).TODO support longform audios somehow (with or without VAD).
TODO support batching?
TODO set also a real-time minimal interval between model calls.
TODO add
keep=Truearg to.get_with_rechunking()instead making another buffer.- Parameters:
offline_model (Transcriber)
interval (float)
- _run()[source]¶
A background thread that processes input chunks and emits outputs chunks.
Is started with
start_thread()and should live forever, usually withwhile Trueloop. To get the next input chunk, we can useself.input_buffer.get()orget_with_rechunking()(both methods block until the next chunk is available). To emit a new output chunk, useself.output_buffer.put()(non-blocking).For example, if 16_000 floats/sec are streamed, and an exteral sender sends chunks of size 1600 10 times per second, but your model want to get 1s chunks, call self.input_buffer.get_with_rechunking(size=16_000). This will block until 10 chunks are accumulated for any ID and return the result.
Normally on
stop_thread()anExitexception is raised when the_runmethod tries to read from the input buffer. It causes exit from_runand is handled in a wrapping method_run_and_send_exit. So, theExitexception should not be handled in_run.
- property audio_type: Literal['float']¶
The required input audio format. Together with
sampling_rateproperty, forms a specification of input audio.See also
convert_audio_format()for details about formats.