Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 144 additions & 0 deletions pfb_to_zip/db_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
"""Load export whitelist/blacklist from amanuensis project_datapoints API."""

import os
import sys
import types
from importlib import import_module
from pathlib import Path
from typing import Any, Dict, List, Optional
import requests


def load_config_module(config_file_path: str):
path = Path(config_file_path).resolve()
parent = str(path.parent)
if parent not in sys.path:
sys.path.insert(0, parent)
return import_module(path.stem)


def get_api_config(
base_url: str = "https://localhost",
token: Optional[str] = None):
return {
"base_url": os.environ.get("AMANUENSIS_URL", base_url),
"token": os.environ.get("AMANUENSIS_ACCESS_TOKEN", token),
}


def project_datapoints_url(base_url: str, endpoint: str) -> str:
base = base_url.rstrip("/")
if base.endswith("/amanuensis"):
return f"{base}/project-datapoints/{endpoint}"
return f"{base}/amanuensis/project-datapoints/{endpoint}"


def _auth_headers(token: str) -> Dict[str, str]:
return {"Authorization": f"Bearer {token}"}


def _request_verify():
"""SSL verification for amanuensis API requests.

Set AMANUENSIS_INSECURE_SSL=1 for local dev when the portal uses a
self-signed cert without a localhost SAN (common with gen3-helm).
"""
if os.environ.get("AMANUENSIS_INSECURE_SSL", "").lower() in ("1", "true", "yes"):
return False
ca_bundle = os.environ.get("REQUESTS_CA_BUNDLE")
return ca_bundle if ca_bundle else True


def fetch_project_datapoints(
api_config: Dict[str, Any], project_id: int
) -> List[Dict[str, Any]]:
token = api_config.get("token")
if not token:
raise ValueError("AMANUENSIS_ACCESS_TOKEN is required")

url = project_datapoints_url(api_config["base_url"], "get-datapoints")
response = requests.get(
url,
json={"project_id": project_id, "many": True},
headers=_auth_headers(token),
timeout=30,
verify=_request_verify(),
)
response.raise_for_status()
data = response.json()
if not data:
return []
if isinstance(data, list):
return data
return [data]


def add_project_datapoint(
api_config: Dict[str, Any],
term: str,
value_list: List[str],
dtype: str,
project_id: int,
) -> None:
token = api_config.get("token")
if not token:
raise ValueError("AMANUENSIS_ACCESS_TOKEN is required")

url = project_datapoints_url(api_config["base_url"], "add-datapoints")
response = requests.post(
url,
json={
"term": term,
"value_list": value_list,
"type": dtype,
"project_id": project_id,
},
headers=_auth_headers(token),
timeout=30,
verify=_request_verify(),
)
response.raise_for_status()


def load_config(project_id: int, api_config: Dict[str, Any], fallback_module):
"""
Load white_list and black_list from project_datapoints for project_id.
exclude_files and data_dictionary always come from fallback_module.
Falls back to fallback_module when the API is unavailable or has no rows.
"""
white_list = {}
black_list = {}

try:
rows = fetch_project_datapoints(api_config, project_id)
except Exception as exc:
return fallback_module, (
f"Config source: local file (could not fetch from amanuensis API: {exc})"
)

if not rows:
return fallback_module, (
f"Config source: local file (no datapoints found for project_id={project_id})"
)

for row in rows:
term = row.get("term")
dtype = row.get("type")
value_list = row.get("value_list") or []
if dtype == "w":
white_list[term] = list(value_list)
elif dtype == "b":
black_list[term] = list(value_list)

config = types.SimpleNamespace(
white_list=white_list,
black_list=black_list,
exclude_files=list(getattr(fallback_module, "exclude_files", [])),
data_dictionary=getattr(fallback_module, "data_dictionary", None),
)
summary = (
f"Config source: amanuensis API (project_id={project_id}, "
f"{len(white_list)} whitelist and {len(black_list)} blacklist entries). "
f"exclude_files and data_dictionary from local file."
)
return config, summary
57 changes: 41 additions & 16 deletions pfb_to_zip/pfb_to_zip.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import argparse
import sys
from importlib import import_module
from pathlib import Path
import os
import re
Expand All @@ -16,6 +14,8 @@
from pfb.exporters import tsv
from dictionaryutils.utils import node_values_to_codes

from db_config import get_api_config, load_config, load_config_module


def to_folder_name(value: str) -> str:
value = value.lower()
Expand All @@ -31,19 +31,32 @@ def __init__(self):


class PFBExporter:
def __init__(self, pfb_file_path:str, tmp_folder:str, output_path:str, config_file_path:str, ontology:str=None, extra_analysis:str=None) -> None:
def __init__(
self,
pfb_file_path: str,
tmp_folder: str,
output_path: str,
config_file_path: str,
ontology: str = None,
extra_analysis: str = None,
project_id: int = None,
api_config: dict = None,
) -> None:
self.pfb_file_path = pfb_file_path
self.tmp_folder = tmp_folder if tmp_folder else "./tmp"
self.output_path = output_path if output_path else "./"
self.ontology = ontology
self.analysis_path = extra_analysis
self.zip_file_output_path = None

# Retrieve config file module
path, file = config_file_path.rsplit('/', 1)
file = file[:-3]
sys.path.append(path)
self.config = import_module(file)
fallback_config = load_config_module(config_file_path)
if project_id is not None:
self.config, self.config_source = load_config(
project_id, api_config or get_api_config(), fallback_config
)
else:
self.config = fallback_config
self.config_source = "Config source: local file (--project-id not set)"

self.data_dictionary = None
if self.ontology and self.ontology == "ncit":
Expand Down Expand Up @@ -148,7 +161,6 @@ def filter_attributes(self, is_black_list=False):
a list of attributes to whitelist, by default, and a black_list
boolean to indicate if the list of attributes should be blacklisted instead.
'''
invalid_attributes = {}
attribute_list = self.config.black_list if is_black_list else self.config.white_list

for file in os.listdir(self.zip_folder + "/tsvs_original"):
Expand All @@ -164,9 +176,6 @@ def filter_attributes(self, is_black_list=False):
if attribute not in attribute_list[file.split(".")[0]]
]
else:
invalid_attributes[file.split(".")[0]] = [
a for a in attribute_list[file.split(".")[0]] if a not in header
]
filtered_header = [
attribute
for attribute in header
Expand All @@ -187,8 +196,6 @@ def filter_attributes(self, is_black_list=False):
print(file + " NOT FILTERED, no config is present for it.")
# just copy it over to the filtered folder
copy(self.zip_folder + "/tsvs_original/" + file, self.zip_folder + "/tsvs/" + file)
if any(a for a in invalid_attributes.values()):
raise RuntimeError(f'Invalid attributes in config: {({k:v for k,v in invalid_attributes.items() if v})}')


# TODO not working need to be updated
Expand Down Expand Up @@ -348,7 +355,10 @@ def main():
# EXAMPLE: python pfb_to_zip.py -i ./export_2023-03-27T02_42_17.avro -o ./outputs/ -c ./config.py -d https://portal.pedscommons.org/api/v0/submission/_dictionary/_all -t ncit

parser = argparse.ArgumentParser(description="Build ZIP bundle for data delivery after project request has been approved")
parser.add_argument('-c', '--config', help='The config file')
parser.add_argument('-c', '--config', help='Fallback config file (exclude_files, data_dictionary, and white/black lists if API unavailable)')
parser.add_argument('-p', '--project-id', type=int, help='Load white_list and black_list from amanuensis project_datapoints for this project')
parser.add_argument('--amanuensis-url', default=os.environ.get('AMANUENSIS_URL', 'https://localhost'), help='Portal base URL (default: AMANUENSIS_URL or https://localhost)')
parser.add_argument('--access-token', default=os.environ.get('AMANUENSIS_ACCESS_TOKEN'), help='Bearer token with amanuensis access (default: AMANUENSIS_ACCESS_TOKEN env var)')
parser.add_argument('-i', '--input', help='Input PFB file path')
parser.add_argument('-o', '--output', help='Output ZIP directory')
parser.add_argument('-t', '--terminology', help='The ontology you want to transform GEN3 values to.')
Expand All @@ -359,6 +369,11 @@ def main():
input_path = args.input
output_path = args.output
config_file = args.config
project_id = args.project_id
api_config = get_api_config(
base_url=args.amanuensis_url,
token=args.access_token,
)
ontology = args.terminology
analysis_script_consortia = args.analysis
except argparse.ArgumentError as err:
Expand All @@ -367,7 +382,16 @@ def main():


tmp_folder = "./tmp"
pfb_export = PFBExporter(input_path, tmp_folder, output_path, config_file, ontology, True if analysis_script_consortia and analysis_script_consortia != "" else False)
pfb_export = PFBExporter(
input_path,
tmp_folder,
output_path,
config_file,
ontology,
True if analysis_script_consortia and analysis_script_consortia != "" else False,
project_id=project_id,
api_config=api_config,
)
if not pfb_export:
print("One or more problems occurred during the initialization of the PFBExporter class")
exit()
Expand All @@ -381,6 +405,7 @@ def main():
pfb_export.add_external_references_material()
pfb_export.zip()
pfb_export.clean_up()
print(pfb_export.config_source)



Expand Down
Loading