Skip to content

Commit 861ad8a

Browse files
Add binning_dim2 runnable for two dimension binning statistics. (#90)
* Add binning_dim2 runnable and remove two dimension binning stats in binning.py. * Update the assert failure message. * Rename binning_dim2.py to two_dim_binning.py
1 parent 09dda05 commit 861ad8a

2 files changed

Lines changed: 112 additions & 32 deletions

File tree

runnables/binning.py

Lines changed: 1 addition & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,13 @@ def build_argument_parser():
2626
columns = args.columns.split(',')
2727
bin_method_array = args.bin_method.split(',') if args.bin_method else None
2828
bin_num_array = [int(item) for item in args.bin_num.split(',')] if args.bin_num else None
29-
two_dim_bin_cols = args.two_dim_bin_cols.split(',') if args.two_dim_bin_cols else None
3029

3130
select_input = os.getenv("SQLFLOW_TO_RUN_SELECT")
3231
output = os.getenv("SQLFLOW_TO_RUN_INTO")
3332
output_tables = output.split(',')
3433
datasource = os.getenv("SQLFLOW_DATASOURCE")
3534

36-
# Check arguments
37-
if two_dim_bin_cols:
38-
assert(len(two_dim_bin_cols) == 2)
39-
assert(len(output_tables) == 3)
35+
assert len(output_tables) == 1, "The output tables shouldn't be null and can contain only one."
4036

4137
url = convertDSNToRfc1738(datasource, args.dbname)
4238
engine = create_engine(url)
@@ -86,30 +82,3 @@ def build_argument_parser():
8682
con=engine,
8783
index=False
8884
)
89-
90-
if args.two_dim_bin_cols:
91-
print("Calculate two dimension binning result for columns: {}".format(columns))
92-
bin_prob_df, bin_cumsum_prob_df = calc_two_dim_binning_stats(
93-
input_md,
94-
columns[0],
95-
columns[1],
96-
bin_method_array[0],
97-
bin_method_array[1],
98-
bin_num_array[0],
99-
bin_num_array[1],
100-
cols_bin_boundaries.get(columns[0], None),
101-
cols_bin_boundaries.get(columns[1], None),
102-
args.reverse_cumsum)
103-
104-
print("Persist the binning probabilities into table {}".format(output_tables[1]))
105-
bin_prob_df.to_sql(
106-
name=output_tables[1],
107-
con=engine,
108-
index=False
109-
)
110-
print("Persist the binning accumulated probabilities into table {}".format(output_tables[2]))
111-
bin_cumsum_prob_df.to_sql(
112-
name=output_tables[2],
113-
con=engine,
114-
index=False
115-
)

runnables/two_dim_binning.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
import argparse
2+
import mars.dataframe as md
3+
import os
4+
import pandas as pd
5+
from bin.binning_calculator import calc_stats, calc_two_dim_binning_stats, get_cols_bin_boundaries
6+
from run_io.db_adapter import convertDSNToRfc1738
7+
from sqlalchemy import create_engine
8+
9+
10+
def build_argument_parser():
11+
parser = argparse.ArgumentParser(allow_abbrev=False)
12+
parser.add_argument("--dbname", type=str, required=True)
13+
parser.add_argument("--columns", type=str, required=True)
14+
parser.add_argument("--bin_method", type=str, required=False)
15+
parser.add_argument("--bin_num", type=str, required=False)
16+
parser.add_argument("--bin_input_table", type=str, required=False)
17+
parser.add_argument("--reverse_cumsum", type=bool, default=False)
18+
19+
return parser
20+
21+
22+
if __name__ == "__main__":
23+
parser = build_argument_parser()
24+
args, _ = parser.parse_known_args()
25+
columns = args.columns.split(',')
26+
bin_method_array = args.bin_method.split(',') if args.bin_method else None
27+
bin_num_array = [int(item) for item in args.bin_num.split(',')] if args.bin_num else None
28+
29+
select_input = os.getenv("SQLFLOW_TO_RUN_SELECT")
30+
output = os.getenv("SQLFLOW_TO_RUN_INTO")
31+
output_tables = output.split(',')
32+
datasource = os.getenv("SQLFLOW_DATASOURCE")
33+
34+
# Check arguments
35+
assert len(columns) == 2, "The column number should only be 2"
36+
assert len(output_tables) == 3, "The output table number should only be 3"
37+
38+
url = convertDSNToRfc1738(datasource, args.dbname)
39+
engine = create_engine(url)
40+
input_md = md.read_sql(
41+
sql=select_input,
42+
con=engine)
43+
input_md.execute()
44+
45+
cols_bin_boundaries = {}
46+
if args.bin_input_table:
47+
print("Get provided bin boundaries from table {}".format(args.bin_input_table))
48+
bin_input_df = pd.read_sql_table(
49+
table_name=args.bin_input_table,
50+
con=engine)
51+
cols_bin_boundaries = get_cols_bin_boundaries(bin_input_df)
52+
53+
if set(columns) > cols_bin_boundaries.keys():
54+
raise ValueError("The provided bin boundaries contains keys: {}. But they cannot cover all the \
55+
input columns: {}".format(cols_bin_boundaries.keys(), columns))
56+
57+
print("Ignore the bin_num and bin_method arguments")
58+
bin_num_array = [None] * len(columns)
59+
bin_method_array = [None] * len(columns)
60+
else:
61+
if len(bin_num_array) == 1:
62+
bin_num_array = bin_num_array * len(columns)
63+
else:
64+
assert(len(bin_num_array) == len(columns))
65+
66+
if len(bin_method_array) == 1:
67+
bin_method_array = bin_method_array * len(columns)
68+
else:
69+
assert(len(bin_method_array) == len(columns))
70+
71+
print("Calculate the statistics result for columns: {}".format(columns))
72+
stats_df = calc_stats(
73+
input_md,
74+
columns,
75+
bin_method_array,
76+
bin_num_array,
77+
cols_bin_boundaries,
78+
args.reverse_cumsum)
79+
80+
print("Persist the statistics result into the table {}".format(output_tables[0]))
81+
stats_df.to_sql(
82+
name=output_tables[0],
83+
con=engine,
84+
index=False
85+
)
86+
87+
print("Calculate two dimension binning result for columns: {}".format(columns))
88+
bin_prob_df, bin_cumsum_prob_df = calc_two_dim_binning_stats(
89+
input_md,
90+
columns[0],
91+
columns[1],
92+
bin_method_array[0],
93+
bin_method_array[1],
94+
bin_num_array[0],
95+
bin_num_array[1],
96+
cols_bin_boundaries.get(columns[0], None),
97+
cols_bin_boundaries.get(columns[1], None),
98+
args.reverse_cumsum)
99+
100+
print("Persist the binning probabilities into table {}".format(output_tables[1]))
101+
bin_prob_df.to_sql(
102+
name=output_tables[1],
103+
con=engine,
104+
index=False
105+
)
106+
print("Persist the binning accumulated probabilities into table {}".format(output_tables[2]))
107+
bin_cumsum_prob_df.to_sql(
108+
name=output_tables[2],
109+
con=engine,
110+
index=False
111+
)

0 commit comments

Comments
 (0)