# Stream Processing

## Overview

When saving data from a QUA program, it is first saved on the OPX on-board memory. From there, it’s transferred to the server and eventually to the user PC. Stream processing provides the capability to process data as it is being transferred to the QM server. This both reduces the amount of data that has to be saved on the user PC, as well as the post-processing computation time.

Consider, as a simple example, a case where we would like to characterize the success rate of preparing a qubit in the $$|1 \rangle$$ state. We want to play the pulse, read-out the qubit’s state and decide whether it is a $$|1\rangle$$ or a $$|0\rangle$$. We then repeat this a large number of times, say $$1 \times 10^5$$. Collecting the results of all $$10^5$$ experiments is not so interesting. Instead, we can only collect the final average of the set of experiments or how this average develops as data is accumulated.

A running average is a straightforward example, but many other manipulations are possible. The “stream processing” allows for arithmetic operations and data reshaping to occur on server, in parallel to OPX experimental runs.

## Basic Syntax and Examples

To use the server processing feature, a stream must be defined. A stream consists of a stream variable, a pipeline and a terminal. The processed stream items can be accessed on the client PC, where they are referred to as results, by creating result handles on the client PC. In what follows, we introduce each of these components and explain how they are used.

To initiate a stream, we declare a stream variable, using the following syntax inside a QUA program:

my_stream = declare_stream()


To pass a variable to a stream, we use the save() statement or the measure() statement. This creates a data transfer path through which stream items are processed and then saved to either a permanent or overriding storage (see Glossary for more details).

To illustrate how pipelines and terminals are created and used, we show below a full QUA program which saves data to a stream, manipulates it and stores the results to terminals. In addition you can take a look at a few examples in our GitHub library

with program() as prog:
my_stream2 = declare_stream()
a = declare(fixed)
assign(a, 0.3)
save(a, my_stream2)
measure('my_pulse', 'qe', my_stream1)
with stream_processing():
my_stream2.save_all('a_results')


Here two streams are created. The first, my_stream1, is used to stream raw ADC samples. The second, my_stream2, is used to stream the value of the QUA variable a.

The pipelines and terminals are defined under the with stream_processing() context. Pipelines initiate with a stream variable and terminate with a save() or save_all() function, which acts as a terminal. In this example the pipelines are simple: The first selects only the data from analog input1 and adds timestamps to it. The second is immediately terminated.

The terminal used for the pipeline initiated with my_stream1 is a Save terminal. This is a memory-less terminal that holds only the last value transferred - this can result in data loss if data is not fetched quickly enough from the client PC. The terminal is given the tag 'adc_results', which can later referenced using a result handle. The terminal for my_stream2 is a save_all terminal which does store all the results (up to a memory limitation, see Server PC storage and data limitations). The tag for this terminal is 'a_results'.

To access the results on the client PC side, we create a Result handle. This allows us to retrieve (“fetch”) the stream items on the client PC side, or to perform other query or storage operations on it. To continue our example:

job = qm.execute(prog)
res = job.result_handles


This example collects all results which were contained in the stream using the fetch_all() method. Alternatively, we can only get the most recent result by calling the fetch() method. Note that both these method are called on a result handle. This structure and its usage are described below.

Note

It is still possible to use the syntax from previous versions for creating a stream that is only terminated using save_all(). The following examples are equivalent:

with program() as prog:
my_stream = declare_stream()
a = declare(fixed)
assign(a, 0.3)
save(a, my_stream)
with stream_processing():
my_stream.save_all('a_results')

with program() as prog:
a = declare(fixed)
assign(a, 0.3)
save(a, 'a_results')


Warning

Saving values to the same stream from different OPXs is not supported.

The following is a simple example of a program that acquires a single raw ADC trace:

with program() as prog:

with stream_processing():


By setting adc_trace=True we specify that data should be grouped into individual ADC traces and not passed on a sample-by-sample manner, such that each trace will be of size (1, measurement_duration). Next, to populate the stream with results, we specify the relevant stream in the measure statement. Finally, in the stream_processing context, the pipeline specifies that we acquire data from analog input 1, and save it with the tag raw_adc which we can later refer to in the client PC.

Note

Setting adc_trace=True is equivalent to writing stream.buffer(pulse_len + 2*smearing). See details on buffer below and here on smearing)

To record a raw ADC stream, it is required to play a digital marker that is associated with the measurement element. Only samples that arrive while the digital trigger is on will be recorded in the stream pipeline. This means it is possible to “gate” the raw ADC stream by using different sequences of digital waveforms in the readout operation used for the measure() command.

Fig. 1: The raw ADC stream. The blue line is the raw ADC stream when the digital trigger is on for the entire measurement. The orange dots are the raw ADC samples when the digital trigger is switched on/off every eight samples.

Note

The digital waveform only affects the raw ADC streams. It does not change the data used when processing the measurement

using integration, demodulation etc.

## Results handles

The stream items stored in the save/save all terminals on the server PC can be fetched to the client PC using the results_handles property of the qm.QmJob object. The fields of the result_handles property contain references to the stream terminals, and their names are the same as the tags given to the terminals. A handle to a specific terminal with tag "my_result", for example, is accessed with my_result = job.results_handles.get("my_result"). A shorthand notation for this is my_result = job.results_handles.my_result.

We can query both the state of a single result handle or that of the collection of all results handles. For example, one can query the processing state using the is_processing() method, or the wait_for_all_values() method to suspend python execution until either a timeout has occurred or saving has completed. In case the handle references results from a save terminal, we can also wait for a specific number of results to arrive using the wait_for_values(count) method. The number of stream items referenced by a specific result handle can be obtained by calling count_so_far() on that handle, or equivalently by calling len(my_result).

Saving results in numpy format to a local variable is done using the fetch and fetch_all methods as specified below. We can also save results to the file system by calling save_to_store(path).

## fetch and fetch_all

To transfer the results from the server PC to the client PC, fetch and fetch_all commands are called on a result handle, for example:

my_stream_res=res.my_stream.fetch_all()


where my_stream is the tag result tag given to the save/save_all terminal.

fetch collects the most recent result in the stream and fetch_all collects all saved data. In the case a save terminal is used, fetch all is equivalent to fetch.

It is also possible to collect only a portion of the collected data by passing fetch a slice object. For example, say your result stream contains [0,1,2,3,4,5,6,7,8,9]. fetch(0) returns 0, fetch(4) returns 4 and fetch(slice(2,5) returns [2,3,4,5]. Any valid python slice object can be used.

Warning

When fetching the data while using pause()-resume() with a save() command there is no guarantee that the data you fetch at a given moment is the data point from the current iteration. There can be some delay from the moment the data is acquired until it is available for fetching. Moreover, the streams are a-synchronous. For example, the availability of the result I[j] might be a few moments before Q[j]. If you fetch the data before the data is available at the Q stream, you will end up with a mixed data point I[j],Q[j-1]. That means that it is better to use the save_all() command and fetch the desired data.

## Combining streams using the zip operator

Streams can be zipped together into tuples of results. This is similar to creating a buffer but combines data from two separate streams rather than reshaping a single one. In this case, the zipped resulting stream will have a shape dictated by the number of zipped tuples. Each element of this tuple is a named field value_i where i the named field number (the index of the zipped stream).

A_stream.zip(B_stream).save_all("zipped_tuple")
...
zipped = job.result_handles.zipped_tuple.fetch_all()


zipped has a shaped based on the number of items n. It has two named fields: value_0 and value_1.

## Data restructuring with buffer()

The buffer method allows reshaping of the incoming stream items. For example

my_stream.buffer(2).save_all('output_name')


Will result in a stream of number pairs. And

my_stream.buffer(2,2).save('output_name')


Will result in a stream of 2 by 2 matrices. This is useful inside a doubly nested for loop as we can loop over a pair of independent variables and save the resulting dependent variable with the correct structure. We can then perform additional processing. For example, we can perform a running average over the resulting matrix in the following way:

my_stream.buffer(2,2).average().save()


This allows us to collect the resulting matrix and observe how the average evolved with the subsequent runs in real-time.

### Buffer limitations

It is not possible to process more than 100e6 buffered variables at the same time. This is true for total number of variable streamed at the same time.

For example in the following snippet of code, the first line can be processed by itself, adding the second line will drive the system behind its limitations:

my_stream1.buffer(int(100e6)).save()
my_stream2.buffer(int(3e5)).save()


Note

streaming a buffered variable with time stamps is akin to streaming double the amount of variable. When using with_timestamps, each variable should be calculated as two variables when considering this limitation.

## Mapping operations over streams with the map function

The map function allows you to perform an operation on each element in a stream. It becomes especially useful when combined with the buffer function because then operations can be performed on each filled buffer. For example, one can generate a buffer of size 3, and average over the samples in each triplet of numbers in the buffer. This is done as follows:

A_stream.buffer(3).map(FUNCTIONS.average()).save_all('averaged_stream')


There are multiple FUNCTIONS that can be used in this way. For example, FUNCTIONS.dot_product(), FUNCTIONS.multiply_by(scalar_or_vector). Both these have shortcut operators so you can simply use .multiply_by as an operator in a stream pipline. However, not all function have a shortcut and it’s useful to learn what the options are. Some of these functions are: demod, boolean_to_int,fft,tuple_convolution, convolution,tuple_multiply but there are others. You can use Python’s interospection to see what is available.

### Average on axis

When working with two dimensional buffers, one can specify over which axis to average. For example, in the following three dimensional buffer, averaging is done on the first (zero-eth) axis with the following syntax:

A_stream.buffer(axis2_size).buffer(axis1_size).buffer(axis0_size).map(FUNCTIONS.average(0)).save_all(f"avg_axis0")


A list of axis can also be passed, in which case averaging is done on all passed axes. This matches behaviour in numpy for example.

A_stream[i].buffer(axis2_size).buffer(axis1_size).buffer(axis0_size).map(FUNCTIONS.average([0, 1, 2])).save_all(f"avg_axis012")


## Flat data structure option

The data structure returned by calling fetch_all is a numpy structured array. In simple cases, where buffering is not used, the shape of this array is exactly given by the number of saved items. If time stamps are also included, there is a single column of values and a second column with timestamps. If, however the data is buffered or manipulated in some way, the number of items in the output array will be the number of filled (or partially filled) buffers. For example, consider the following buffered stream

stream.buffer(10).buffer(5).save_all()


If we had 300 items entering the steam, the shape of the output stream numpy array is: 6 (10*5*6 = 300). Each numpy array item has shape 10 X 6, but because the inner dimensions are “hidden” this is not indexable in the way you might expect.

To modify this behaviour, an option to “flatten” the result array has been added. This is called by passing the following parameter to fetch_all:

job.result_handles.samples.fetch_all(flat_struct=True)


In this case, the result shape will have the shape (6,5,10) and indexing can be performed as is usual in numpy array.

If the flat_struct flag is used on zipped streams, each named field will have a shape as described for a single stream.

## Stream Processing Arithmetics

It is possible to use the server to perform arithmetic operations on a single stream or between different streams. A complete list of the possible operations can be found in _ResultStream(). In general, an operation can be performed between a stream to itself, two different streams or a stream and a scalar. The operation is done element-wise, so both streams need to be buffered identically. In the case of an operation between a stream and a scalar, the operations must be done before buffering the stream.

Note

It is possible to do multiplication between a buffered stream and a vector using the multiply_by() function. The vector length must be identical to the stream buffer.

Some simple syntax examples:

(stream + 1).save("example")  # Adding 1 to each element in the stream
(stream_1 + stream_2).buffer(5).save("example")  # Adding element-wise stream1 to stream2 and buffering the result
(stream_1.buffer(5) + stream_2.buffer(5)).save("example")  # First buffering the streams and then adding them to each other
((stream1+1)*3+(stream2+1)/2).buffer(5).average().save_all("example")  # A combination of stream and scalar arithmetics


Notice that lines two and three in the above examples will results in identical outputs.

Warning

An operation between two streams that have different total length will only occur a number of times equal to the shortest stream in their order within the pipeline, regardless of the location of the save() command in the QUA progream. For example if stream1 has n items, and stream2 has m items, where m < n. The result of (stream1+stream2).save_all() would be an array of length m, with the values of the first m elements in stream1 added element-wise to stream2.

### Examples

A simple example that shows the utility of stream arithmetic is the ability to calculate the variance of a measured variable during the measurement. In the figure below, we see the results of generating a random integer between 0 to 9 repeatedly, and streaming two streams as written below:

stream.average().save_all("average")
((stream*stream).average()-(stream.average()*stream.average())).save_all("variance")


where the first line stream the running average (in blue in the figure), and the second line calculate the variance and stream the results (in orange in the figure).

Fig. 2: Running average stream (blue) and the variance stream(orange) of a randomly generated number on the OPX+. The horizontal lines mark the theoretical average (blue) and variance (orange).

In the following example, we see the use of the histogram. By using the Time tagging, we get an array of time stamps indicating at which point in the measurement we counted a pulse (simulating a photon). By repeating the time tagging measurement, we can stream the results in a histogram telling us how many pulses we counted in each time bin.

times_st.histogram([[i, i + (resolution - 1)] for i in range(0, meas_len, resolution)]).save("times_hist")


The above code line shows how to use the histogram in the stream processing where meas_len is the measurement length, and resolution is the required histogram bin size.

Fig. 3: A histogram of the number of counts as a function of measurement time. The time bin resolution is 20ns.

## Legacy Save

Prior to saving into explicitly declared streams, QUA’s save mechanism was directly to a tag : save(var,’tag’) where ‘tag’ is a string identifier. When using this legacy saving mechanism the output can be retrieved without change, as below:

fetch_all_data = job.result_handles.get("A").fetch_all()


This retrieves both values and timestamps. Alternatively, if only values (or only timestamps) are needed, we have introduced the following notation:

fetch_only_values= job.result_handles.get("A").fetch_all(flat_struct=True)
fetch_only_timestamps= job.result_handles.get("A_timestamps").fetch_all(flat_struct=True)


This provides only values (or only timestamps) and is slighly more performant than the previous option. In future versions flat_struct=True the default option.

## Server PC storage and data limitations

A save_all terminal will store in the all of the stream items created during the execution of a QUA program as results in the permanent memory of the server PC. The permanent memory of the server PC is large but not unlimited, and therefore it is possible in some cases to create more results than can be stored at the server PC. The server PC permanent memory can store up to 100GB of result data without danger of data-loss.

Warning

If more than 100GB are stored in the permanent memory, the data may be erased after a short time to ensure the permanent storage is not overfilled. To ensure that job results are not erased, always make sure that no more than 100GB of data are generated by the QUA program without being fetched.

The results will not be immediately erased from the server once they are fetched. To clear the permanent storage and ensure that all of the 100GB are available, use the qm.QuantumMachineManager.clear_all_job_results() method.

When collecting results from a stream, the system can usually estimate how much data is expected to be received. For example, if the measure command plays a pulse with a duration of 1000 samples, we expect to collect an output of that length. In cases where the expected amount of data does not match the received amount, an error message will appear when the res.my_stream.fetch_all() or .fetch() commands are called. It can also be manually checked by calling

res.my_stream.has_data_loss()


## Glossary

Stream Variable

A source (or start point) of a pipeline, receives values of QUA variables or raw input samples as input

Save Terminal

An overriding named sink or (end point) of a pipeline that creates a result. Each new stream item received at this terminal, will override the previous. The result is a single item, the last one received in a stream

Warning

Using a save terminal can result in data loss. It is to be used only in cases where data loss is acceptable, such as streaming plots.

Save All Terminal

A named sink or end point of a pipeline that creates a result, which stores all the stream items in the pipeline. All stream items are appended to the result, complete history of the stream is preserved

Pipeline

An ordered collection of operations that is performed on an input stream to return a processed version of the stream (output stream) In the context of stream processing, it is the collection of operations that can be performed starting from a stream variable and terminating in a “save terminal” or “save all terminal” node

Stream

An ordered set of values that are being sent from a QUA program into a pipeline. A stream is set up when a QUA program is started and is torn down when it is finished.

Stream item

A single discrete unit of data of a stream. Can be one of the following:

• ICP result

• Single input sample from the controller

• An input trace vector (single measure statement)

It is possible to augment the timestamp to each item using withTimestamp

Result

The last or accumulated stream items that are created by save or save_all terminals, respectively

Result Handle

An object through which results can be fetched