asr_eval.streaming

Streaming ASR pipelines construction and evaluation.

Model

A streaming pipeline starts with subclassing a StreamingASR model. It runs in a separate thread, accepts a stream of InputChunk and emits a stream of OutputChunk. Each chunk contains audio recording id, which allows to process multiple recordings in parallel.

Input format. A model may accept different audio formats (float32, WAV etc.) and sample rates - this is defined in the audio_type and sampling_rate fields.

Output format. A each output chunk contains a TranscriptionChunk which carries a text and a chunk ID. Re-emitting new chunk with the same ID allows to correct a partial transcription emitted earlier.

Sending and receiving

One can directly send and receive chunks from StreamingASR. These methods can be connected to web API routes in production.

To evaluate, several automation tools exist:

Sending. A make_sender() tool automates sending. It creates a list of Cutoff that define a schedule for sending audio chunks. Then a StreamingSender is instantiated. You can further call sender.start_sending() to start sending.

Receiving. A receiving outputs can be automated via receive_transcription().

Remapping

Remapping is an optional mechanism that eliminates time spans where both the sender waits (due to its schedule) and the model waits (because it already processed the chunk and waits for the next). This makes evaluation faster than real time. A remapping remap_time() is the operation that correct the timings as if the sending were in real time. However, that this is not applicable (would work incorrectly) for StreamingASR that start another threads from its main thread (where is_multithreaded is True).

Evaluating

To evaluate, one needs audio samples with word timings. This can be done via forced alignment using a CTC model and fill_word_timings_inplace() function.

The evaluate_streaming() function automates evaluation. It analyses a history of input and output chunks (this history can also be serialized to json and loaded back). As a result, we obtain a RecordingStreamingEvaluation object.

Further we can draw various diagrams, such as:

This package also features two wrappers that convert a non-streaming model into a streaming one (OfflineToStreaming) or back (StreamingToOffline).

class asr_eval.streaming.buffer.StreamingQueue(name='unnamed')[source]

Bases: Generic

Similar to queue.Queue with the following differences: - Typization. This is a generic class for any element type. - Each element has an ID (not unqiue), and we can .get() the

next element for a specific ID. For example, IDs can be audio recording IDs for each audio chunk, when transcribing multiple recordings in parallel.

  • If an exception occurs, the procucer thread can .put() the exception into the queue, instead of the next chunk. It will be raised in the consumer thread on the next .get() operation.

Parameters:

name (str)

put(data, id=0)[source]

Add new element into a queue (non-blocking, thread-safe).

Return type:

None

Parameters:
  • data (T)

  • id (int | str)

get(id=None, timeout=None)[source]

Wait for an alement to appear in the queue, pop and return it (blocking, thread-safe).

Parameters:
  • id (int | str | None) – The required ID to get. If None, will return an element with any ID. If not None, will return only an element with the specified ID.

  • timeout (float | None) – if set, will raise TimeoutError if waiting takes longer than timeout seconds.

Return type:

tuple[TypeVar(T), int | str]

put_error(error)[source]

Set the queue into error state.

Any consumers that try to .get() from the queue will recieve this exception wrapped into a RuntimeError. This allows to propagate exceptions from sender to consumer thread to terminate the program.

Parameters:

error (BaseException)

asr_eval.streaming.caller.receive_transcription(asr, id)[source]

Blocks and waits until the full transcription (ended with Signal.FINISH) received for the given ID.

Return type:

Iterable[OutputChunk]

Parameters:
asr_eval.streaming.caller.transcribe_parallel(asr, waveforms, n_threads, send_all_without_delays=False, real_time_interval_sec=1 / 25, speed_multiplier=1)[source]

Transcribes the waveforms in parallel, but with no more than n_threads simultaneous senders.

Call asr.start_thread() before calling this method, and asr.stop_thread() after.

Return type:

dict[int | str, list[OutputChunk]]

Parameters:
  • asr (StreamingASR)

  • waveforms (list[ndarray[tuple[int, ...], dtype[floating[Any]]]])

  • n_threads (int)

  • send_all_without_delays (bool)

  • real_time_interval_sec (float)

  • speed_multiplier (float)

class asr_eval.streaming.evaluation.StreamingEvaluationResults(*, timed_transcription, waveform, cutoffs, input_chunks, output_chunks, partial_alignments)[source]

A container for evaluation results for a streaming speech recognition on a single sample.

Usually a result of the evaluate_streaming() function.

Parameters:
timed_transcription: Transcription

The ground truth transcription for the whole audio with filled timings for each token.

waveform: ndarray[tuple[int, ...], dtype[floating[Any]]]

A waveform in float32 dtype with sampling rate 16000.

cutoffs: list[Cutoff]

A schedule on which the input chunks was sent.

input_chunks: list[InputChunk]

The input chunks history. The fields .put_timestamp and .get_timestamp are relative to the start time.

output_chunks: list[OutputChunk]

The output chunks history. The fields .put_timestamp and .get_timestamp are relative to the start time.

partial_alignments: list[PartialAlignment]

Alignments of the partial transcriptions against starting parts of the ground truth. Each partial alignment keep the at_time field that indicates a timestamp relative to the start time.

property start_timestamp: float

A start time, where the first input chunks was put into the input buffer.Should be always zero, because all the timestamps in the StreamingEvaluationResults are relative to this moment.

property finish_timestamp: float

A finish time, where the last output chunk was put into the output buffer. The timestamp is relative to the starting moment.

asr_eval.streaming.evaluation.make_sender(waveform, asr, real_time_interval_sec=1 / 5, speed_multiplier=1, uid=None)[source]

An automation to make a sender that sends an audio recording into a StreamingASR.

After running cutoffs, sender = make_sender(...), you typically need to run sender.start_sending() to start a thread that actually sends all the chunks.

Parameters:
  • waveform (ndarray[tuple[int, ...], dtype[floating[Any]]]) – The audio in float32 dtype with sampling rate 16000. Note that the streaming recognizer may accept a different sampling rate or dtype. A conversion to the required rate and dtype will be done on-the-fly inside this function.

  • asr (StreamingASR) – A streaming transcriber to send chunks into.

  • real_time_interval_sec (float) – How often in real time to send chunks?

  • speed_multiplier (float) – For example, if speed_multiplier=2, will sent the audio twice of normal speed, that is, a 10 seconds audio will be sent in 5 seconds.

  • uid (str | None) – Assign UID to the recording (select ranfom of omitted).

Return type:

tuple[list[Cutoff], StreamingSender]

Returns:

  • A sending schedule in form of a list of cutoffs. See

    get_uniform_cutoffs() for details.

  • A sender object thatis ready to start sending. Call

start_sending() to start sending chunks.

asr_eval.streaming.evaluation.evaluate_streaming(timed_transcription, waveform, cutoffs, input_chunks, output_chunks, partial_alignment_interval=0.25)[source]

An automation to evaluate streaming recognition results.

Aligns partial transcriptions against starting parts of the ground truth.

For each of the timestamps obtains ths starting part of the timed_transcription up to the specified timestamp, and aligns against the partial transcription that was received up to the specified timestamp. If the timestamp is inside a word in the ground truth transcription, considers two partial true transcriptions - with and without this word - and selects one with the best alignment score.

Parameters:
  • timed_transcription (Transcription) – The ground truth transcription for the whole audio with filled timings for each token. Is typically obtained with fill_word_timings_inplace().

  • waveform (ndarray[tuple[int, ...], dtype[floating[Any]]]) – A waveform in float32 dtype with sampling rate 16000. Note that the streaming recognizer may accept a different sampling rate or dtype. A conversion to the required rate and dtype is typically done on-the-fly inside make_sender() function.

  • cutoffs (list[Cutoff]) – A schedule on which the input chunks was sent.

  • input_chunks (list[InputChunk]) – The input chunks history. Will create a copy of each chunk with relative timestamps instead of absolute. Will not modify the original chunks.

  • output_chunks (list[OutputChunk]) – The outputs chunks history. Will create a copy of each chunk with relative timestamps instead of absolute. Will not modify the original chunks.

  • partial_alignment_interval (float) – Time interval between consecutive alignments of the partial transcriptions against starting parts of the ground truth. Is real-timescale: for example, if a 10 sec long audios is transcribed for 30 seconds, and partial_alignment_interval=1, then we will get 30 partial alignments.

Return type:

StreamingEvaluationResults

Returns:

A StreamingEvaluationResults dataclass that scores the resulting partial alignments, as well as the input data.

class asr_eval.streaming.evaluation.PartialAlignment(pred, alignment, at_time, audio_seconds_sent, audio_seconds_processed)[source]

An alignment between the ground truth up to the audio_seconds_sent and the partial transcription.

Parameters:
pred: SingleVariantTranscription

A partial transcription from the streaming model. While the raw transcription is provided in form of the transcription chunks, this field represents the chunks joined with join() to form a transcription as text, and then parsed into words.

alignment: MatchesList

An alignment between the ground truth starting part, and the partial transcription.

at_time: float

The timestamp where the alignment was evaluated. All the output chunks sent later than this timestamp are not included.

audio_seconds_sent: float

How many seconds of the audio was sent by the time at_time.

audio_seconds_processed: float

How many seconds of the audio was processed by the time at_time. This value is extracted from the output chunks (see seconds_processed).

get_error_positions()[source]

Categorizes each word match from alignment into one of 5 types: (“correct”, “deletion”, “insertion”, “replacement”, “not_yet”). See the StreamingASRErrorPosition docs for details.

Return type:

list[StreamingASRErrorPosition]

class asr_eval.streaming.evaluation.StreamingASRErrorPosition(start_time, end_time, sent_time, processed_time, status)[source]

A word-level match in a PartialAlignment with assigned status.

Parameters:
  • start_time (float)

  • end_time (float)

  • sent_time (float)

  • processed_time (float)

  • status (Literal['correct', 'deletion', 'insertion', 'replacement', 'not_yet'])

start_time: float

Start time of the ground truth word. If the match is insertion, no ground truth word exists, and the start time is the end time of the previous ground truth word, or zero.

end_time: float

End time of the ground truth word. If the match is insertion, no ground truth word exists, and the end time is the start time of the next ground truth word, or the processed time.

sent_time: float

How much seconds of the input audio was sent at the time when the current partial alignment was calculated.

processed_time: float

How much seconds of the input audio was processed at the time when the current partial alignment was calculated (see seconds_processed).

status: Literal['correct', 'deletion', 'insertion', 'replacement', 'not_yet']

(“correct”, “deletion”, “insertion”, “replacement”, “not_yet”). The first 4 statuses are explained in the status. The status “not_yet” is a special status that is assigned for trailing deletions. We consider that if a deletion is trailing, it represents a word not transcribed yet. This may occur either due to long inference times which cause delays, or because a model refuses to transcribe until it accumulates enough context. The field processed_time allows to differentiate between these two reasons.

Type:

One of 5 statuses

property center_time: float

A center between the start and the end time.

asr_eval.streaming.evaluation.get_audio_seconds_sent(time, input_chunks)[source]

Given a full history of input chunks, and a time, finds the last sent chunk with put timestamp before the time and returns its .end_time. If no such chunks, returns 0.

Return type:

float

Parameters:
  • time (float)

  • input_chunks (Sequence[InputChunk])

asr_eval.streaming.evaluation.get_audio_seconds_processed(time, output_chunks)[source]

Given a full history of output chunks, and a time`, finds the last sent chunk with put timestamp before time and returns its seconds_processed. If no such chunks, returns 0.

Return type:

float

Parameters:
asr_eval.streaming.evaluation.get_partial_alignments(input_history, output_history, timed_transcription, timestamps=None, processes=1)[source]

Aligns partial transcriptions against starting parts of the ground truth.

For each of the timestamps obtains ths starting part of the timed_transcription up to the specified timestamp, and aligns against the partial transcription that was received up to the specified timestamp. If the timestamp is inside a word in the ground truth transcription, considers two partial true transcriptions - with and without this word - and selects one with the best alignment score.

Parameters:
  • input_history (Sequence[InputChunk]) – The input chunks history.

  • output_history (Sequence[OutputChunk]) – The output chunks history.

  • true_word_timings – The ground truth transcription for the whole audio with filled timings for each token. Is typically obtained with fill_word_timings_inplace().

  • timestamps (list[float] | ndarray[tuple[int, ...], dtype[floating[Any]]] | None) – A list of times when to evaluate partial results. If None, will evaluate after each of the output chunks, except the last Signal.FINISH chunk if present.

  • processes (int) – If > 1, paralellizes using multiprocessing (we cannot use multithreading here because of GIL, considering that the alignment function is written on pure Python).

  • timed_transcription (Transcription)

Return type:

list[PartialAlignment]

asr_eval.streaming.evaluation.remap_time(cutoffs, input_chunks, output_chunks)[source]

Remapping is an optional mechanism that eliminates time spans where both the sender waits (due to its schedule) and the model waits (because it already processed the chunk and waits for the next). This makes evaluation faster than real time with the same results. Using remapping is meaningful when input chunks was sent with without_delays=True.

Technically, remap_time adds artificial delays in some places, shifting put timestamps and get timestamps forward for both input and output chuks. More concretely, it iterates chunks from the first to the last and finds input chunks that were taken from the input buffer until they should be placed in the buffer according to the cutoffs schedule. When such a situation is found, all the put and get timestamps starting from this time are shifted forwards by the calculated time delta.

In the end, this allows to imitate a chunk history as it would have looked if without_delays=False in senders.

Note

This is not applicable (would work incorrectly) for StreamingASR that start another threads from its main beckground thread (where is_multithreaded is True).

Return type:

tuple[list[InputChunk], list[OutputChunk]]

Parameters:
class asr_eval.streaming.model.Signal(*values)[source]

Bases: Enum

Signals to control StreamingASR thread. See StreamingASR docs for details.

FINISH = 0

Indicates that stream finishes for the given audio recording.

exception asr_eval.streaming.model.Exit[source]

Bases: Exception

A signal to terminate StreamingASR thread. See StreamingASR docs for details.

asr_eval.streaming.model.AUDIO_CHUNK_TYPE = numpy.ndarray[tuple[int, ...], numpy.dtype[numpy.floating[typing.Any]]] | numpy.ndarray[tuple[int, ...], numpy.dtype[numpy.integer[typing.Any]]] | bytes

Audio stream that is chunkable using slices.

class asr_eval.streaming.model.CHUNK_TYPE

Either input or output chunk.

alias of TypeVar(‘CHUNK_TYPE’, ~asr_eval.streaming.model.InputChunk, ~asr_eval.streaming.model.OutputChunk)

class asr_eval.streaming.model.InputChunk(*, data, end_time, put_timestamp=nan, get_timestamp=nan)[source]

An input chunk for StreamingASR. Input chunks can be sent by StreamingSender or manually and received by StreamingASR thread.

See StreamingASR docs for usage details.

Parameters:
  • data (ndarray[tuple[int, ...], dtype[floating[Any]]] | ndarray[tuple[int, ...], dtype[integer[Any]]] | bytes | Literal[Signal.FINISH])

  • end_time (float)

  • put_timestamp (float)

  • get_timestamp (float)

data: ndarray[tuple[int, ...], dtype[floating[Any]]] | ndarray[tuple[int, ...], dtype[integer[Any]]] | bytes | Literal[Signal.FINISH]

Either a chunk of audio stream, or a Signal.FINISH.

end_time: float

A chunk end time (in seconds) in the audio timescale, where 0 means the beginning of the audio recording.

put_timestamp: float = nan

Is filled automatically when the chunk is added to the input buffer.

get_timestamp: float = nan

Is filled automatically when the StreamingASR thread takes the chunk from the buffer.

class asr_eval.streaming.model.OutputChunk(*, data, seconds_processed, put_timestamp=nan, get_timestamp=nan)[source]

An output chunk for StreamingASR. Output chunks are sent by StreamingASR thread and received manually or by receive_transcription().

See StreamingASR and receive_transcription() docs for usage details.

Parameters:
  • data (TranscriptionChunk | Literal[Signal.FINISH])

  • seconds_processed (float)

  • put_timestamp (float)

  • get_timestamp (float)

data: TranscriptionChunk | Literal[Signal.FINISH]

Either a part of transcription, or a Signal.FINISH.

seconds_processed: float

A total audio seconds processed before emitting the current chunk. Is filled by the transcriber.

put_timestamp: float = nan

Is filled automatically when the chunk is added to the output buffer.

get_timestamp: float = nan

Is filled automatically when the chunk is taken from the output buffer.

asr_eval.streaming.model.check_consistency(input_chunks, output_chunks)[source]

Asserts that:

  1. For each input and output chunk put_timestamp <= get_timestamp.

2. For each output chunk seconds_processed is not larger than audio seconds taken from the input buffer by the time the output is put into the buffer.

Fails indicate errors in the chunk processing pipeline (sender, buffer or model).

Raises:

AssertionError – On check failures.

Parameters:
class asr_eval.streaming.model.ASRStreamingQueue(name='unnamed')[source]

Bases: StreamingQueue[CHUNK_TYPE]

An input or output buffer in StreamingASR.

This subclass extends the StreamingQueue:

1. It fills put_timestamp, get_timestamp for input or output chunks. 2. It asserts that if Signal.FINISH was received, no more chunks are expected for this audio recording ID.

Parameters:

name (str)

get(id=None, timeout=None)[source]

Wait for an alement to appear in the queue, pop and return it (blocking, thread-safe).

Parameters:
  • id (int | str | None) – The required ID to get. If None, will return an element with any ID. If not None, will return only an element with the specified ID.

  • timeout (float | None) – if set, will raise TimeoutError if waiting takes longer than timeout seconds.

Return type:

tuple[TypeVar(CHUNK_TYPE, InputChunk, OutputChunk), int | str]

put(data, id=0)[source]

Add new element into a queue (non-blocking, thread-safe).

Return type:

None

Parameters:
class asr_eval.streaming.model.InputBuffer(name='unnamed')[source]

Bases: ASRStreamingQueue[InputChunk]

An input buffer for StreamingASR.

This subclass adds a get_with_rechunking() method. If it was called at least once, a rechunking mode is enabled and .get() cannot be called anymore.

Parameters:

name (str)

get(id=None, timeout=None)[source]

Wait for an alement to appear in the queue, pop and return it (blocking, thread-safe).

Parameters:
  • id (int | str | None) – The required ID to get. If None, will return an element with any ID. If not None, will return only an element with the specified ID.

  • timeout (float | None) – if set, will raise TimeoutError if waiting takes longer than timeout seconds.

Return type:

tuple[InputChunk, int | str]

get_with_rechunking(size, id=None)[source]

Internally calles ;code:.get() as many times as needed and concatenates and/or slices the results to obtain the desired array size.

For example, let each input chunk contain 1000 audio frames, and we requested size=2400. The .get() will be called 3 times, and the last chunks will be split into two parts, of size 400 and 600. An array of size 2400 will be returned, and 600 remaining elements will be kept in the rechunking buffer. If then we request size=100, the array of size 100 will be returned without new get(), and buffer will keep 500 remaining elements, and so on.

The retuned array can be smaller than requested only if Signal.FINISH reached for the ID.

Return type:

tuple[int | str, ndarray[tuple[int, ...], dtype[floating[Any]]] | ndarray[tuple[int, ...], dtype[integer[Any]]] | bytes | None, bool, float]

Returns:

  1. ID (equals the id argument if was specified, otherwise the first available id).

  2. Audio chunk of the desired size (or less if Signal.FINISH reached).

  3. A flag if Signal.FINISH reached.

  4. The audio end time of the last recived chunk (even if its part is still in the rechunking buffer). TODO maybe set the audio end time more correctly?

Parameters:
  • size (int)

  • id (int | str | None)

class asr_eval.streaming.model.OutputBuffer(name='unnamed')[source]

Bases: ASRStreamingQueue[OutputChunk]

An output buffer for StreamingASR.

Parameters:

name (str)

class asr_eval.streaming.model.StreamingASR(sampling_rate=16_000)[source]

Bases: ABC

An abstract streaming transcriber that is able to process multiple audio recordings in parallel.

Accepts a stream of input chunks marked by recording ID and emits a stream of output chunks.

Definitions:

  • Audio chunk: a part of a waveform. Sampling rate is defined in the class constructor. For example, a 10 sec mono recording with rate 16_000 can be represented as 10 chunks, each with shape (16_000,). Several channels can also be supported for some models. The chunk length is not restricted. NOTE: each StreamingASR implementation has audio_type and sampling_rate fields that define a required type and sampling rate of audio chunks.

  • TranscriptionChunk: a partial transcription that either add new words to the transcription or edit the previous words. See details in the TranscriptionChunk docs.

  • Recording ID: a unique int or string identifier for an audio recording. This is useful if several recordings are streamed simultaneously, and we should know which audio recording each chunk belongs to. IDs should be unique for StreamingASR object and should not be reused, or exception will be thrown.

  • Signal.FINISH: a symbol that signals that a stream for a specific recording ID has ended. This can refer to either the input stream (audio chunks) or the output stream (:code:`TranscriptionChunk`s).

  • Exit: an exception that signals that all streams for all recording IDs have ended. This can refer to either the input stream or the output stream. After receiving Exit from the input buffer and sending Exit to the output buffer, StreamingASR thread finishes.

Data model:

Each input chunk can be one of:

  1. An InputChunk(id=<Recording ID>, data=<Audio chunk>).

  2. An InputChunk(id=<Recording ID>, data=Signal.FINISH) - indicates that the audio for the ID has been fully sent.

Each output chunk can be one of:

  1. An OutputChunk(id=<Recording ID>, data=<TranscriptionChunk>).

  2. An OutputChunk(id=<Recording ID>, data=Signal.FINISH) - indicates that FINISH input chunk received fhr the given ID and the transcription is done.

Models may fill .seconds_processed field in OutputChunk - audio seconds processed (for the current recording ID) before yielding the current output chunk. This may be useful, because we could send 100 chunks (let it be 10 sec in total), but the model performs slow calculations and has already processed only 20 chunks (2 sec in total). Depending on the testing scenario we can treat the result as a partial transcription of the first 2 or 10 seconds of the audio signal.

Sending and receiving:

After creating an StreamingASR object, we should start a thread that will process input chunks and emit output chunks. After this, new audio chunks can be sent using .input_buffer.put(...) (non-blocking), and the outputs can be received with .output_buffer.get(...) (blocks until output becomes available). Instead of manual sending, a make_sender() function can be helpful. It prepares a sender to send audio chunks with a delay between each chunk.

Input and output buffers automatically fill the follwing fields:

  1. InputChunk.put_timestamp - the time when the chunk added to the StreamingASR.input_buffer.

  2. InputChunk.get_timestamp - the time when the chunk received from the StreamingASR.input_buffer.

  3. OutputChunk.put_timestamp - the time when the chunk added to the StreamingASR.output_buffer.

  4. OutputChunk.get_timestamp - the time when the chunk received from the StreamingASR.output_buffer.

Pts 1, 4 happen in the caller thread, and pts 2, 3 happen in the StreamingASR thread.

Terminating a StreamingASR thread:

An Exit exception in the input buffer indicates that all audios have been fully sent. An Exit exception in the output buffer indicates that Exit was received from the input buffer and the StreamingASR thread exited. This does not mean that all transcriptions are fully done.

Exception handling:

  1. Any exception raised from the StreamingASR:code:` thread will set the output buffer in the error state. This will raise the exception when reading from the output buffer.

  2. Trying to write invalid data into the input buffer (including reusing previous IDs) may set it into the error state. This will raise the exception when reading from the input buffer in the StreamingASR. thread, then see pt. 1.

  3. Exceptions in the sender thread will set the input buffer into the error state, then see pt. 2.

  4. Exit is a special exception type indicating that input or output stream has been closed properly.

Implementing models:

To subclass a StreamingASR, one should implement _run() and audio_type methods. Also, a subclass __init__ method shoud call super().__init__ specifying the audio sampling rate.

Parameters:

sampling_rate (int)

sampling_rate: int

Sampling rate for the input audio chunks.

TODO clarify what to set for bytes or WAV.

start_thread()[source]

Start the background thread with _run() that processes input chunks and emits outputs chunks.

Return type:

Self

stop_thread()[source]

Stops the background thrad started with start_thread().

Return type:

None

is_thread_started()[source]

Is the background thread started with start_thread() running?

Return type:

bool

abstractmethod _run()[source]

A background thread that processes input chunks and emits outputs chunks.

Is started with start_thread() and should live forever, usually with while True loop. To get the next input chunk, we can use self.input_buffer.get() or get_with_rechunking() (both methods block until the next chunk is available). To emit a new output chunk, use self.output_buffer.put() (non-blocking).

For example, if 16_000 floats/sec are streamed, and an exteral sender sends chunks of size 1600 10 times per second, but your model want to get 1s chunks, call self.input_buffer.get_with_rechunking(size=16_000). This will block until 10 chunks are accumulated for any ID and return the result.

Normally on stop_thread() an Exit exception is raised when the _run method tries to read from the input buffer. It causes exit from _run and is handled in a wrapping method _run_and_send_exit. So, the Exit exception should not be handled in _run.

abstract property audio_type: Literal['float', 'int', 'bytes', 'wav']

The required input audio format. Together with sampling_rate property, forms a specification of input audio.

See also convert_audio_format() for details about formats.

property is_multithreaded: bool

Whether another threads are started from the background thread _run().

False by defatult. If overriden with True, the evaluation protocol will not try to use remap_time().

class asr_eval.streaming.model.DummyASR(sampling_rate=16_000)[source]

Bases: StreamingASR

Will transcribe N seconds long audio into “1 2 … N”.

Parameters:

sampling_rate (int)

_run()[source]

A background thread that processes input chunks and emits outputs chunks.

Is started with start_thread() and should live forever, usually with while True loop. To get the next input chunk, we can use self.input_buffer.get() or get_with_rechunking() (both methods block until the next chunk is available). To emit a new output chunk, use self.output_buffer.put() (non-blocking).

For example, if 16_000 floats/sec are streamed, and an exteral sender sends chunks of size 1600 10 times per second, but your model want to get 1s chunks, call self.input_buffer.get_with_rechunking(size=16_000). This will block until 10 chunks are accumulated for any ID and return the result.

Normally on stop_thread() an Exit exception is raised when the _run method tries to read from the input buffer. It causes exit from _run and is handled in a wrapping method _run_and_send_exit. So, the Exit exception should not be handled in _run.

property audio_type: Literal['float', 'int', 'bytes']

The required input audio format. Together with sampling_rate property, forms a specification of input audio.

See also convert_audio_format() for details about formats.

class asr_eval.streaming.model.TranscriptionChunk(*, uid=<factory>, text)[source]

A chunk returned by a StreamingASR model.

Contain a text and optional ID. If we want to override the previously emitted partial transcription, we should emit a new chunk with the same ID. It will not be treated as a new text to append, but as a correction for the previous chunk with this ID.

Note

A TranscriptionChunk id is not the same concept as audio recording ID.

Example

>>> from asr_eval.streaming.model import TranscriptionChunk
>>> chunks = []
>>> # append a new chunk without an explicit uid to refer
>>> # without ID we cannot correct this chunk later
>>> chunks.append(TranscriptionChunk(text='word1'))
>>> # append a new chunk with id 1
>>> chunks.append(TranscriptionChunk(uid=1, text='word2'))
>>> # append a new chunk with id 2
>>> chunks.append(TranscriptionChunk(uid=2, text='word3'))
>>> print(TranscriptionChunk.join(chunks))
word1 word2 word3
>>> # correct a chunk with id 1
>>> chunks.append(TranscriptionChunk(uid=1, text='word2a word2b'))
>>> # remove a chunk with id 2
>>> chunks.append(TranscriptionChunk(uid=2, text=''))
>>> print(TranscriptionChunk.join(chunks))
word1 word2a word2b
Parameters:
  • uid (int | str)

  • text (str)

classmethod join(transcriptions)[source]

Join transcription chunks. If the transcriptions are OutputChunk instances, extracts a transcription chunks from each output chunk.

See example in the TranscriptionChunk docs.

Return type:

str

Parameters:

transcriptions (Sequence[TranscriptionChunk] | Sequence[OutputChunk | Literal[Signal.FINISH]])

asr_eval.streaming.plots.partial_alignments_plot(eval, ax=None)[source]

Draw a partial alignment diagram.

See more details and examples in the user guide: Streaming evaluation.

Parameters:
asr_eval.streaming.plots.visualize_history(input_chunks, output_chunks=None, ax=None)[source]

Visualize the history of sending and receiving chunks.

See more details and examples in the user guide: Streaming evaluation.

Parameters:
asr_eval.streaming.plots.streaming_error_vs_latency_histogram(evals, ax=None, max_latency=10)[source]

Summarizes error percentage versus latency in a historgram, given evaluations for multiple samples.

See more details and examples in the user guide: Streaming evaluation.

Parameters:
asr_eval.streaming.plots.latency_plot(evals, ax=None)[source]

Summarizes latencies, given evaluations for multiple samples.

See more details and examples in the user guide: Streaming evaluation.

Parameters:
asr_eval.streaming.plots.show_last_alignments(evals, ax=None)[source]

Visualizes the last alignments (finalized transcriptions), given evaluations for multiple samples.

See more details and examples in the user guide: Streaming evaluation.

Parameters:
class asr_eval.streaming.sender.Cutoff(t_real, t_audio, arr_pos)[source]

A container for audio position and a real time (relative) moment. Is used to shedule a waveform sending into StreamingASR.

Let we have an audio waveform and two consecutive cutoffs [c1, c2]:

[Cutoff(tr1, ta1, pos1), Cutoff(tr2, ta2, pos2)]

This means that:

  1. waveform[:c1.arr_pos] gives an audio with length c1.t_audio.

  2. waveform[:c2.arr_pos] gives an audio with length c2.t_audio.

  1. waveform[c1.arr_pos:c2.arr_pos] should be sent at the time c2.t_real.

Parameters:
  • t_real (float)

  • t_audio (float)

  • arr_pos (int)

t_real: float

A real world time measured from the beginning of the sending process.

t_audio: float

A time in the audio timescale. For example, if we send 10 sec audio in 5 seconds (with 2x speed), the t_real will be 5 in the end, and t_audio will be 10.

arr_pos: int

A position in the audio as array. Is calculated from the t_audio using array length per second.

asr_eval.streaming.sender.get_uniform_cutoffs(waveform, real_time_interval_sec=1 / 25, speed_multiplier=1.0, sampling_rate=16_000)[source]

Returns a uniform shedule to send the audio.

Parameters:
  • waveform (ndarray[tuple[int, ...], dtype[floating[Any]]]) – The audio in float32 dtype.

  • real_time_interval_sec (float) – How often in real time to send chunks?

  • speed_multiplier (float) – For example, if speed_multiplier=2, will sent the audio twice of normal speed, that is, a 10 seconds audio will be sent in 5 seconds.

  • sampling_rate (int) – The sampling rate of the waveform.

Return type:

list[Cutoff]

class asr_eval.streaming.sender.StreamingSender(cutoffs, waveform, asr, id=<factory>, verbose=False, sampling_rate=16000, _history=<factory>, _thread=None)[source]

Can be used to automate sending audio stream to StreamingASR.

Parameters:
  • cutoffs (list[Cutoff]) – A schedule to send the audio.

  • waveform (ndarray[tuple[int, ...], dtype[floating[Any]]]) – A waveform in float32 dtype. The sampling rate information is encoded in cutoffs, because they store both the audio time and the audio array position.

  • asr (StreamingASR) – A streaming transcriber to send chunks into.

  • id (int | str) – A recording ID to assign.

  • verbose (bool) – Whether to print each chunk info to stdout.

  • sampling_rate (int)

  • _history (list[InputChunk])

  • _thread (Thread | None)

Call start_sending() to start a sending process.

Note

Keeps the history of all sent chunks for evaluation purposes (can be retrieved with join_and_get_history()). To avoid out of memory, ensure that senders are garbage-collected afterwards.

start_sending(without_delays=False)[source]

If without_delays=False (default) starts sending in a separate thread according to the shedule given in constuctor. If without_delays=True sends all the chunks immediately. Non-blocking.

Return type:

Self

Parameters:

without_delays (bool)

join()[source]

Wait for the sending process to finish.

join_and_get_history()[source]

Wait for the sending process to finish and return the history of chunks sent.

Return type:

list[InputChunk]

get_status()[source]

Possible statuses:

  • not_started: Sending was not started.

  • started: Sending in progress.

  • finished: Sending finished.

Return type:

Literal['not_started', 'started', 'finished']

class asr_eval.streaming.wrappers.StreamingToOffline(streaming_model, start_thread=True)[source]

Bases: Transcriber

A wrapper that turns StreamingASR into a Transcriber. Transcribes the full audio and returns the full transcription.

The StreamingASR keeps running after .transcribe and waits for new input streams. You may want to stop it via .streaming_model.stop_thread() at the end.

Parameters:
transcribe(waveform)[source]

Transcribes a float32 waveform, typically normalized from -1 to 1.

Return type:

str

Parameters:

waveform (ndarray[tuple[int, ...], dtype[floating[Any]]])

class asr_eval.streaming.wrappers.OfflineToStreaming(offline_model, interval=0.5)[source]

Bases: StreamingASR

Converts non-streaming (offline) ASR model into a streaming one. Calls the offline model with the given interval (at the audio timescale).

For example, let the audio be 3 seconds long, and interval=1. Will call the offline model:

  1. On the waveform slice from 0 to 1 second (when enough data received)

  2. On the waveform slice from 0 to 2 seconds (when enough data received)

  3. On the waveform slice from 0 to 3 seconds (when enough data received)

Each time completely overwrites the old transcription with the new one (this is achieved by sending a new TranscriptionChunk with the same id).

TODO support longform audios somehow (with or without VAD).

TODO support batching?

TODO set also a real-time minimal interval between model calls.

TODO add keep=True arg to .get_with_rechunking() instead making another buffer.

Parameters:
_run()[source]

A background thread that processes input chunks and emits outputs chunks.

Is started with start_thread() and should live forever, usually with while True loop. To get the next input chunk, we can use self.input_buffer.get() or get_with_rechunking() (both methods block until the next chunk is available). To emit a new output chunk, use self.output_buffer.put() (non-blocking).

For example, if 16_000 floats/sec are streamed, and an exteral sender sends chunks of size 1600 10 times per second, but your model want to get 1s chunks, call self.input_buffer.get_with_rechunking(size=16_000). This will block until 10 chunks are accumulated for any ID and return the result.

Normally on stop_thread() an Exit exception is raised when the _run method tries to read from the input buffer. It causes exit from _run and is handled in a wrapping method _run_and_send_exit. So, the Exit exception should not be handled in _run.

property audio_type: Literal['float']

The required input audio format. Together with sampling_rate property, forms a specification of input audio.

See also convert_audio_format() for details about formats.