-
Notifications
You must be signed in to change notification settings - Fork 0
Add Support for Batch Publishing Non-Scalar Measurement Values #133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
e0bca01
7aaf379
3a34263
335e0d7
22c581b
07ee6a7
bdd5a08
b54be78
0f554ad
2cdf3dc
b2b931d
e183efa
f6e0816
ad142e1
4696c15
ebe4dc6
99ca7a9
96e2cea
c8260f5
1a4d952
31748d5
9044ef2
c555417
77d7114
1c0f0cc
9440668
f83e258
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,7 +4,7 @@ | |
|
|
||
| import datetime as std_datetime | ||
| import logging | ||
| from typing import Iterable, cast | ||
| from typing import Any, Callable, Iterable, cast | ||
|
|
||
| import hightime as ht | ||
| import numpy as np | ||
|
|
@@ -49,6 +49,129 @@ | |
| _logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| def _copy_batch_values( | ||
| repeated_field: Any, | ||
| batch_values: Iterable[object], | ||
| is_supported: Callable[[object], bool], | ||
| convert_value: Callable[[Any], Any], | ||
| error_message: str, | ||
| ) -> None: | ||
| for value in batch_values: | ||
| if not is_supported(value): | ||
| raise TypeError(error_message) | ||
| repeated_field.add().CopyFrom(convert_value(value)) | ||
|
|
||
|
|
||
| def _populate_vector_batch_values( | ||
| publish_request: PublishMeasurementBatchRequest, values: Iterable[object] | ||
| ) -> None: | ||
| _copy_batch_values( | ||
| publish_request.vector_values.vectors, | ||
| values, | ||
| lambda value: isinstance(value, Vector), | ||
| vector_to_protobuf, | ||
| "Unsupported iterable: all values must be Vector.", | ||
| ) | ||
|
|
||
|
|
||
| def _populate_analog_waveform_batch_values( | ||
| publish_request: PublishMeasurementBatchRequest, | ||
| first_value: AnalogWaveform[Any], | ||
| values: Iterable[object], | ||
| ) -> None: | ||
| if first_value.dtype == np.float64: | ||
| _copy_batch_values( | ||
| publish_request.double_analog_waveform_values.waveforms, | ||
| values, | ||
| lambda value: isinstance(value, AnalogWaveform) and value.dtype == np.float64, | ||
| float64_analog_waveform_to_protobuf, | ||
| "Unsupported iterable: all values must be float64 AnalogWaveform.", | ||
| ) | ||
| return | ||
| elif first_value.dtype == np.int16: | ||
| _copy_batch_values( | ||
| publish_request.i16_analog_waveform_values.waveforms, | ||
| values, | ||
| lambda value: isinstance(value, AnalogWaveform) and value.dtype == np.int16, | ||
| int16_analog_waveform_to_protobuf, | ||
| "Unsupported iterable: all values must be int16 AnalogWaveform.", | ||
| ) | ||
| return | ||
| raise TypeError(f"Unsupported AnalogWaveform dtype: {first_value.dtype}") | ||
|
|
||
|
|
||
| def _populate_complex_waveform_batch_values( | ||
| publish_request: PublishMeasurementBatchRequest, | ||
| first_value: ComplexWaveform[Any], | ||
| values: Iterable[object], | ||
| ) -> None: | ||
| if first_value.dtype == np.complex128: | ||
| _copy_batch_values( | ||
| publish_request.double_complex_waveform_values.waveforms, | ||
| values, | ||
| lambda value: isinstance(value, ComplexWaveform) and value.dtype == np.complex128, | ||
| float64_complex_waveform_to_protobuf, | ||
| "Unsupported iterable: all values must be complex128 ComplexWaveform.", | ||
| ) | ||
| return | ||
| if first_value.dtype == ComplexInt32DType: | ||
| _copy_batch_values( | ||
| publish_request.i16_complex_waveform_values.waveforms, | ||
| values, | ||
| lambda value: isinstance(value, ComplexWaveform) and value.dtype == ComplexInt32DType, | ||
| int16_complex_waveform_to_protobuf, | ||
| "Unsupported iterable: all values must be ComplexWaveform with ComplexInt32DType.", | ||
| ) | ||
| return | ||
| raise TypeError(f"Unsupported ComplexWaveform dtype: {first_value.dtype}") | ||
|
|
||
|
|
||
| def _populate_spectrum_batch_values( | ||
| publish_request: PublishMeasurementBatchRequest, | ||
| first_value: Spectrum[Any], | ||
| values: Iterable[object], | ||
| ) -> None: | ||
| if first_value.dtype != np.float64: | ||
| raise TypeError(f"Unsupported Spectrum dtype: {first_value.dtype}") | ||
|
|
||
| _copy_batch_values( | ||
| publish_request.double_spectrum_values.waveforms, | ||
| values, | ||
| lambda value: isinstance(value, Spectrum) and value.dtype == np.float64, | ||
| float64_spectrum_to_protobuf, | ||
| "Unsupported iterable: all values must be float64 Spectrum.", | ||
| ) | ||
|
|
||
|
|
||
| def _populate_digital_waveform_batch_values( | ||
| publish_request: PublishMeasurementBatchRequest, values: Iterable[object] | ||
| ) -> None: | ||
| _copy_batch_values( | ||
| publish_request.digital_waveform_values.waveforms, | ||
| values, | ||
| lambda value: isinstance(value, DigitalWaveform), | ||
| digital_waveform_to_protobuf, | ||
| "Unsupported iterable: all values must be DigitalWaveform.", | ||
| ) | ||
|
|
||
|
|
||
| def _populate_xydata_batch_values( | ||
| publish_request: PublishMeasurementBatchRequest, | ||
| first_value: XYData[Any], | ||
| values: Iterable[object], | ||
| ) -> None: | ||
| if first_value.dtype != np.float64: | ||
| raise TypeError(f"Unsupported XYData dtype: {first_value.dtype}") | ||
|
|
||
| _copy_batch_values( | ||
| publish_request.x_y_data_values.x_y_data, | ||
| values, | ||
| lambda value: isinstance(value, XYData) and value.dtype == np.float64, | ||
| float64_xydata_to_protobuf, | ||
| "Unsupported iterable: all values must be float64 XYData.", | ||
| ) | ||
|
|
||
|
|
||
| def populate_publish_condition_request_value( | ||
| publish_request: PublishConditionRequest, value: object | ||
| ) -> None: | ||
|
|
@@ -162,16 +285,33 @@ def populate_publish_measurement_batch_request_values( | |
| if isinstance(values, Vector): | ||
| publish_request.scalar_values.CopyFrom(vector_to_protobuf(values)) | ||
| elif isinstance(values, Iterable): | ||
| if not values: | ||
| raise ValueError("Cannot publish an empty Iterable.") | ||
| values_iterator = iter(values) | ||
| try: | ||
| vector = Vector(values) | ||
| except (TypeError, ValueError): | ||
| raise TypeError( | ||
| f"Unsupported iterable: {values}. Subtype must be bool, float, int, or string." | ||
| ) | ||
| first_value = next(values_iterator) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a really long if / elif block. At a minimum, maybe we could split up the contents of the if blocks into small helper methods. Better is probably some kind of a strategy to produce the data to publish. I'll be out after today, so feel free to discuss with others or override this, but as is, it's pretty unreadable.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I've updated to split out some of the logic into helpers for now. I'll see if @csjall requests any further refactoring here. |
||
| except StopIteration as exc: | ||
| raise ValueError("Cannot publish an empty Iterable.") from exc | ||
|
|
||
| publish_request.scalar_values.CopyFrom(vector_to_protobuf(vector)) | ||
| if isinstance(first_value, Vector): | ||
| _populate_vector_batch_values(publish_request, values) | ||
| elif isinstance(first_value, AnalogWaveform): | ||
| _populate_analog_waveform_batch_values(publish_request, first_value, values) | ||
| elif isinstance(first_value, ComplexWaveform): | ||
| _populate_complex_waveform_batch_values(publish_request, first_value, values) | ||
| elif isinstance(first_value, Spectrum): | ||
| _populate_spectrum_batch_values(publish_request, first_value, values) | ||
| elif isinstance(first_value, DigitalWaveform): | ||
| _populate_digital_waveform_batch_values(publish_request, values) | ||
| elif isinstance(first_value, XYData): | ||
| _populate_xydata_batch_values(publish_request, first_value, values) | ||
| else: | ||
| try: | ||
| vector = Vector(values) | ||
| except (TypeError, ValueError): | ||
| raise TypeError( | ||
| f"Unsupported iterable. Subtype must be bool, float, int, string, Vector, " | ||
| "AnalogWaveform, ComplexWaveform, Spectrum, DigitalWaveform, or XYData." | ||
| ) | ||
| publish_request.scalar_values.CopyFrom(vector_to_protobuf(vector)) | ||
| else: | ||
| raise TypeError( | ||
| f"Unsupported measurement values type: {type(values)}. Please consult the documentation." | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.