feat: ✨ raw data to staging#94
Conversation
| VAS_TIME_FIELD_PATTERN = re.compile( | ||
| r"^vas_(?P<field_name>.+?)(_fasted)?_(?P<time>minus10|30|60|90|120|180|240)min$" | ||
| ) |
There was a problem hiding this comment.
We can share this with the metadata transformation
…easibility-data into feat/data-raw-to-staging
| .rename({"redcap_event_name": "event"}) | ||
| .with_columns( | ||
| pl.lit("Copenhagen").alias("center"), | ||
| pl.lit(resource_name).alias("resource_name"), |
There was a problem hiding this comment.
I'll use this to write the parquet file then drop it
| for col in vas_cols: | ||
| match = cast(re.Match[str], VAS_TIME_FIELD_PATTERN.match(col)) | ||
|
|
||
| time = match.group("time") | ||
| if time == "minus10": | ||
| time = "-10" | ||
|
|
||
| cols_grouped_by_time.setdefault(int(time), []).append(col) |
There was a problem hiding this comment.
I can rewrite this dictionary construction without a for loop if you want, but I think it will just be longer and more complicated
There was a problem hiding this comment.
This was done automatically, not sure if it's the right way
| @@ -1 +1,2 @@ | |||
| raw/** filter=lfs diff=lfs merge=lfs -text | |||
| staging/** filter=lfs diff=lfs merge=lfs -text | |||
There was a problem hiding this comment.
Or we can track *.parquet
There was a problem hiding this comment.
Not sure, is that a question?
There was a problem hiding this comment.
Just an alternative
| """Selects columns and adds base columns common to all dataframes.""" | ||
| return ( | ||
| raw_df.select(["redcap_event_name"] + cols) | ||
| .rename({"redcap_event_name": "event"}) |
There was a problem hiding this comment.
Having looked at more of the data, I don't think event by itself can be the PK. Raised #95
There was a problem hiding this comment.
Yea, I was thinking the same.
| """Selects columns and adds base columns common to all dataframes.""" | ||
| return ( | ||
| raw_df.select(["redcap_event_name"] + cols) | ||
| .rename({"redcap_event_name": "event"}) |
There was a problem hiding this comment.
Yea, I was thinking the same.
| @@ -1 +1,2 @@ | |||
| raw/** filter=lfs diff=lfs merge=lfs -text | |||
| staging/** filter=lfs diff=lfs merge=lfs -text | |||
There was a problem hiding this comment.
Not sure, is that a question?
| ) | ||
|
|
||
|
|
||
| def raw_to_staged(raw_df: pl.DataFrame) -> list[pl.DataFrame]: |
There was a problem hiding this comment.
Maybe organize so either all the _fn are above or below the normal functions?
| pl.lit("Copenhagen").alias("center"), | ||
| pl.lit(resource_name).alias("resource_name"), |
There was a problem hiding this comment.
| pl.lit("Copenhagen").alias("center"), | |
| pl.lit(resource_name).alias("resource_name"), | |
| # Only used for creating the Parquet files. | |
| pl.lit("Copenhagen").alias("center"), | |
| pl.lit(resource_name).alias("resource_name"), |
| vas_cols = so.keep( | ||
| raw_df.columns, | ||
| lambda column: VAS_TIME_FIELD_PATTERN.match(column) is not None, | ||
| ) |
There was a problem hiding this comment.
This would be better by using Polars rather than a filter. E.g. select() can take a pattern/exclude
| vas_dfs = so.pairwise_fmap( | ||
| list(cols_grouped_by_time.items()), [raw_df], _create_df_for_time_group | ||
| ) | ||
| return pl.concat(vas_dfs, how="vertical") |
There was a problem hiding this comment.
I think all of this would be better with a pivot https://docs.pola.rs/user-guide/transformations/pivot/
| ) | ||
|
|
||
|
|
||
| def load_raw_data() -> pl.DataFrame: |
There was a problem hiding this comment.
This is where we need to think about the design of the Python files, etc, as we will eventually have non-REDCap raw data, while this only loads the REDCap data. Plus this function name implies reading in all raw data, but it's only reading in the latest.
Maybe to start, refactor this to have a path as an arg? And I'm not sure we need to only read the latest all the time, we'll want the orchestrater to handle which files to read and which to not read.
There was a problem hiding this comment.
Hmm, yeah, everything here is just about the REDCap data, so like one step in an orchestrated flow. Maybe I will rename things to make that clear, rather than trying to anticipate how it will fit into the flow.
Right now, every time we download the raw REDCap data, we download the full set, that's why I thought taking the latest one made sense. This doesn't quite align with the earlier batch update model, and I was wondering if you had a broader vision for how raw data should be staged. E.g., when would we not stage the latest data?
Description
This PR transforms raw data to staged data. For now, this includes special transformations only for VAS.
Closes #73
This PR needs an in-depth review.
Checklist
just run-all