|
1 | | -import logging |
2 | 1 | import os |
3 | | -from functools import cache |
4 | 2 | from pathlib import Path |
5 | 3 |
|
6 | | -import requests |
7 | 4 | from cassandra.cluster import Cluster |
8 | 5 |
|
9 | 6 | DATA_DIR = Path(__file__).parent.resolve() / "data" |
10 | 7 |
|
11 | | -TAG = "master" |
12 | | -SCHEMA_BASE = f"https://raw.githubusercontent.com/graphsense/graphsense-lib/{TAG}/src/graphsenselib/schema/resources/" |
13 | | - |
14 | | -SCHEMA_MAPPING = {"btc": "utxo", "ltc": "utxo", "eth": "account", "trx": "account_trx"} |
15 | | - |
16 | | -SCHEMA_MAPPING_OVERRIDE = {("trx", "transformed"): "account"} |
17 | | - |
18 | | -MAGIC_REPLACE_CONSTANT = "0x8BADF00D" |
19 | | -MAGIC_REPLACE_CONSTANT2 = f"{MAGIC_REPLACE_CONSTANT}_REPLICATION_CONFIG" |
20 | | - |
21 | | -SIMPLE_REPLICATION_CONFIG = "{'class': 'SimpleStrategy', 'replication_factor': 1}" |
22 | | - |
23 | | - |
24 | | -@cache |
25 | | -def get_schema_file(file: str): |
26 | | - res = requests.get(SCHEMA_BASE + file) |
27 | | - return res.text |
28 | | - |
29 | 8 |
|
30 | 9 | def load_test_data(host, port): |
| 10 | + """Load test data into pre-baked Cassandra (schemas already exist).""" |
31 | 11 | cluster = Cluster([host], port=port) |
32 | 12 | session = cluster.connect() |
33 | 13 |
|
34 | | - for k, v in SCHEMA_MAPPING.items(): |
35 | | - for st in ["raw", "transformed"]: |
36 | | - v = SCHEMA_MAPPING_OVERRIDE.get((k, st), v) |
37 | | - filename = f"{st}_{v}_schema.sql" |
38 | | - keyspace = f"resttest_{k}_{st}" |
39 | | - |
40 | | - logging.info(f"creating db tables cassandra {filename}") |
41 | | - schema_str = ( |
42 | | - get_schema_file(filename) |
43 | | - .replace(MAGIC_REPLACE_CONSTANT2, SIMPLE_REPLICATION_CONFIG) |
44 | | - .replace(MAGIC_REPLACE_CONSTANT, keyspace) |
45 | | - ) |
46 | | - for x in schema_str.split(";"): |
47 | | - x = x.strip() |
48 | | - if x: |
49 | | - session.execute(x) |
50 | | - |
51 | | - for x in DATA_DIR.iterdir(): |
52 | | - if not x.is_file(): |
| 14 | + # Collect all insert statements |
| 15 | + inserts = [] |
| 16 | + for file_path in DATA_DIR.iterdir(): |
| 17 | + if not file_path.is_file(): |
53 | 18 | continue |
54 | | - table_name = os.path.basename(x) |
55 | | - content = x.read_text() |
56 | | - for x in content.split("\n"): |
57 | | - x = x.strip() |
58 | | - if x: |
59 | | - session.execute( |
60 | | - f""" |
61 | | - INSERT INTO {table_name} JSON '{x}' |
62 | | - """ |
63 | | - ) |
| 19 | + table_name = os.path.basename(file_path) |
| 20 | + content = file_path.read_text() |
| 21 | + for line in content.split("\n"): |
| 22 | + line = line.strip() |
| 23 | + if line: |
| 24 | + inserts.append(f"INSERT INTO {table_name} JSON '{line}'") |
| 25 | + |
| 26 | + # Execute inserts concurrently using async Cassandra driver |
| 27 | + futures = [session.execute_async(stmt) for stmt in inserts] |
| 28 | + |
| 29 | + # Wait for all inserts to complete |
| 30 | + for future in futures: |
| 31 | + future.result() |
0 commit comments