33from __future__ import annotations
44
55from enum import IntEnum
6+ import logging
67from typing import TYPE_CHECKING
78
89import numpy as np
910import pandas as pd
1011
11- from odsbox .datamatrices_to_pandas import to_pandas
12- from odsbox .proto .ods_pb2 import ValueMatrixRequestStruct # pylint: disable=E0611
12+ from odsbox .datamatrices_to_pandas import extract_column_unit_ids , to_pandas
13+ from odsbox .proto .ods_pb2 import DataMatrices , ValueMatrixRequestStruct # pylint: disable=E0611
1314
1415if TYPE_CHECKING :
1516 from .con_i import ConI
@@ -60,9 +61,27 @@ class BulkReader:
6061 to create customer specific code to retrieve bulk data.
6162 """
6263
64+ _log : logging .Logger = logging .getLogger (__name__ )
65+
6366 def __init__ (self , con_i : ConI ) -> None :
6467 """Initialize the BulkReader with a ConI instance."""
6568 self .__con_i = con_i
69+ self ._unit_name_lookup_cache : dict [int , str ] | None = None
70+
71+ def unit_name_lookup (self , update : bool = False ) -> dict [int , str ]:
72+ """
73+ Get a mapping of unit id to unit name. This is used to cache the unit names for better readability of the data.
74+ :param bool update: If True, force update the cache.
75+ :return dict[int, str]: A dictionary mapping unit id to unit name.
76+ """
77+ if self ._unit_name_lookup_cache is None or update :
78+ try :
79+ units_df = self .__con_i .query ({"AoUnit" : {}, "$attributes" : {"id" : 1 , "name" : 1 }})
80+ self ._unit_name_lookup_cache = dict (zip (units_df ["id" ], units_df ["name" ]))
81+ except Exception as e :
82+ self ._unit_name_lookup_cache = {}
83+ self ._log .warning (f"Failed to load unit names: { e } " )
84+ return self ._unit_name_lookup_cache
6685
6786 @staticmethod
6887 def __apply_sequence_representation (
@@ -209,7 +228,9 @@ def query(
209228 :param int values_limit: Maximum number of values to be retrieved in this chunk. 0 means all remaining values.
210229 :param bool calculate_raw: Whether to calculate raw values for certain raw sequence representations.
211230 :raises requests.HTTPError: If access fails.
212- :return DataFrame: The Pandas.DataFrame contains the local_column.values as DataFrame column.
231+ :return DataFrame: The Pandas.DataFrame contains the local_column metadata and values as DataFrame columns.
232+ ``df.attrs["unit_names"]`` is set to a ``dict[str, str]`` mapping each local column
233+ name to its unit name (empty string when the unit id is unknown or zero).
213234 """
214235
215236 lc_meta_df : pd .DataFrame = self .__con_i .query_data (
@@ -264,14 +285,15 @@ def query(
264285 },
265286 }
266287 )
288+ unit_names = self ._extract_unit_names (localcolumn_bulk_dms )
267289 localcolumn_bulk_df = to_pandas (
268290 localcolumn_bulk_dms , date_as_timestamp = date_as_timestamp , prefer_np_array_for_unknown = True
269291 )
270292 del localcolumn_bulk_dms # free memory
271293 localcolumn_bulk_df .columns = [attr for attr in attributes ]
272294
273295 # merge metadata into bulk, preserving bulk order (left join)
274- merged = localcolumn_bulk_df .merge (lc_meta_df , left_on = "id" , right_index = True , how = "left" )
296+ merged : pd . DataFrame = localcolumn_bulk_df .merge (lc_meta_df , left_on = "id" , right_index = True , how = "left" )
275297
276298 missing_meta_ids = merged [merged ["name" ].isna ()]["id" ].unique ()
277299 if len (missing_meta_ids ):
@@ -290,6 +312,7 @@ def query(
290312 remaining_cols = [col for col in existing_cols if col not in desired_first_cols ]
291313 new_column_order = desired_first_cols + remaining_cols
292314 merged = merged [new_column_order ]
315+ self ._attach_unit_attr (merged , merged ["name" ], unit_names )
293316
294317 return merged
295318
@@ -327,7 +350,9 @@ def data_read(
327350 :param int values_start: Zero-based starting index for the values to be processed. Used for chunk loading.
328351 :param int values_limit: Maximum number of values to be retrieved in this chunk. 0 means all remaining values.
329352 :raises requests.HTTPError: If access fails.
330- :return DataFrame: The Pandas.DataFrame contains the local_column.values as DataFrame column.
353+ :return DataFrame: The Pandas.DataFrame contains one column per local column, named after the local
354+ column name. ``df.attrs["unit_names"]`` is set to a ``dict[str, str]`` mapping
355+ each column name to its unit name (empty string when the unit id is unknown or zero).
331356 """
332357
333358 conditions = {"submatrix" : submatrix_iid }
@@ -342,6 +367,7 @@ def data_read(
342367
343368 # Create DataFrame from column data
344369 rv = pd .DataFrame ({r ["name" ]: r ["values" ] for _ , r in localcolumn_df .iterrows ()})
370+ rv .attrs ["unit_names" ] = localcolumn_df .attrs .get ("unit_names" , {})
345371
346372 # Set independent column as index if requested
347373 if set_independent_as_index :
@@ -381,32 +407,66 @@ def valuematrix_read(
381407 :param int values_start: Zero-based starting index for the values to be processed. Used for chunk loading.
382408 :param int values_limit: Maximum number of values to be retrieved in this chunk. 0 means all remaining values.
383409 :raises requests.HTTPError: If access fails.
384- :return DataFrame: The Pandas.DataFrame contains the local_column.values as DataFrame column.
410+ :return DataFrame: The Pandas.DataFrame contains one column per local column, named after the local
411+ column name. ``df.attrs["unit_names"]`` is set to a ``dict[str, str]`` mapping
412+ each column name to its unit name (empty string when the unit id is unknown or zero).
385413 """
386414 sm_e = self .__con_i .mc .entity_by_base_name ("AoSubmatrix" )
387415 lc_e = self .__con_i .mc .entity_by_base_name ("AoLocalColumn" )
388416 name_patterns = column_patterns or ["*" ]
389417
390- df = to_pandas (
391- self .__con_i .valuematrix_read (
392- ValueMatrixRequestStruct (
393- aid = sm_e .aid ,
394- iid = submatrix_iid ,
395- columns = [ValueMatrixRequestStruct .ColumnItem (name = name_pattern ) for name_pattern in name_patterns ],
396- attributes = [
397- self .__con_i .mc .attribute_by_base_name (lc_e , "name" ).name ,
398- self .__con_i .mc .attribute_by_base_name (lc_e , "values" ).name ,
399- ],
400- mode = ValueMatrixRequestStruct .ModeEnum .MO_CALCULATED ,
401- values_start = values_start ,
402- values_limit = values_limit ,
403- )
404- ),
405- date_as_timestamp = date_as_timestamp ,
406- prefer_np_array_for_unknown = True ,
418+ raw_dms = self .__con_i .valuematrix_read (
419+ ValueMatrixRequestStruct (
420+ aid = sm_e .aid ,
421+ iid = submatrix_iid ,
422+ columns = [ValueMatrixRequestStruct .ColumnItem (name = name_pattern ) for name_pattern in name_patterns ],
423+ attributes = [
424+ self .__con_i .mc .attribute_by_base_name (lc_e , "name" ).name ,
425+ self .__con_i .mc .attribute_by_base_name (lc_e , "values" ).name ,
426+ ],
427+ mode = ValueMatrixRequestStruct .ModeEnum .MO_CALCULATED ,
428+ values_start = values_start ,
429+ values_limit = values_limit ,
430+ )
407431 )
432+ unit_names = self ._extract_unit_names (raw_dms )
433+ df = to_pandas (raw_dms , date_as_timestamp = date_as_timestamp , prefer_np_array_for_unknown = True )
434+ del raw_dms # free memory
408435 df .columns = ["name" , "values" ]
409- return pd .DataFrame ({name : values for name , values in zip (df ["name" ].values , df ["values" ].values )})
436+ rv = pd .DataFrame ({name : values for name , values in zip (df ["name" ].values , df ["values" ].values )})
437+ self ._attach_unit_attr (rv , df ["name" ], unit_names )
438+ return rv
439+
440+ def _attach_unit_attr (self , df : pd .DataFrame , column_names : pd .Series , unit_names : list [str ]) -> None :
441+ """
442+ Attach a ``unit_names`` mapping to ``df.attrs``.
443+
444+ Sets ``df.attrs["unit_names"]`` to a ``dict`` mapping each name in *column_names* to the
445+ corresponding entry in *unit_names*. Nothing is written when *unit_names* is empty or its
446+ length does not match *column_names* (a warning is logged in case of an unexpected error).
447+
448+ :param pd.DataFrame df: The DataFrame to annotate.
449+ :param pd.Series column_names: Series of column names in the same order as *unit_names*.
450+ :param list[str] unit_names: Unit name for each column (empty string for unknown units).
451+ """
452+ try :
453+ if unit_names and len (column_names ) == len (unit_names ):
454+ df .attrs ["unit_names" ] = dict (zip (column_names .values , unit_names ))
455+ except Exception as e :
456+ self ._log .warning (f"Failed to attach unit names: { e } " )
457+
458+ def _extract_unit_names (self , data_matrices : DataMatrices ) -> list [str ]:
459+ """
460+ Extract unit names for the columns in the provided data matrices.
461+
462+ :param data_matrices: The data matrices containing the columns for which to extract unit names.
463+ :return list[str]: A list of unit names corresponding to the columns.
464+
465+ """
466+ unit_id_lookup = self .unit_name_lookup ()
467+ column_unit_ids = extract_column_unit_ids (data_matrices )
468+
469+ return [unit_id_lookup .get (unit_id , "" ) for unit_id in column_unit_ids ]
410470
411471 @staticmethod
412472 def add_column_filters (
0 commit comments