Skip to content

Commit 9c06a83

Browse files
Add binning Runnable. (#85)
* Add the binning Runnable - Calculate the basic statistics results and binning stats results for multiple columns. * Remove the unused methods. * Remove unnecessary print. * Add new line. * Remove empty file.
1 parent 1d5535a commit 9c06a83

4 files changed

Lines changed: 165 additions & 1 deletion

File tree

runnables/binning.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import argparse
2+
import mars.dataframe as md
3+
import os
4+
from binning.binning import calc_stats
5+
from run_io.db_adapter import convertDSNToRfc1738
6+
from sqlalchemy import create_engine
7+
8+
9+
def build_argument_parser():
10+
parser = argparse.ArgumentParser(allow_abbrev=False)
11+
parser.add_argument("--dbname", type=str, required=True)
12+
parser.add_argument("--columns", type=str, required=True)
13+
parser.add_argument("--bin_methods", type=str, required=False)
14+
parser.add_argument("--bin_nums", type=str, required=False)
15+
parser.add_argument("--reverse_cumsum", type=bool, default=False)
16+
17+
return parser
18+
19+
20+
if __name__ == "__main__":
21+
parser = build_argument_parser()
22+
args, _ = parser.parse_known_args()
23+
columns = args.columns.split(',')
24+
bin_methods = args.bin_methods.split(',') if args.bin_methods else None
25+
bin_nums = [int(item) for item in args.bin_nums.split(',')] if args.bin_nums else None
26+
27+
select_input = os.getenv("SQLFLOW_TO_RUN_SELECT")
28+
output = os.getenv("SQLFLOW_TO_RUN_INTO")
29+
datasource = os.getenv("SQLFLOW_DATASOURCE")
30+
31+
url = convertDSNToRfc1738(datasource, args.dbname)
32+
engine = create_engine(url)
33+
input_md = md.read_sql(
34+
sql=select_input,
35+
con=engine)
36+
input_md.execute()
37+
38+
stats_df = calc_stats(
39+
input_md,
40+
columns,
41+
bin_methods,
42+
bin_nums,
43+
{},
44+
args.reverse_cumsum)
45+
46+
print("Persist the statistics result into the table {}".format(output))
47+
stats_df.to_sql(
48+
name=output,
49+
con=engine,
50+
index=False
51+
)

runnables/binning/__init__.py

Whitespace-only changes.

runnables/binning/binning.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
import mars.dataframe as md
2+
import mars.tensor as mt
3+
import numpy as np
4+
import pandas as pd
5+
6+
7+
class BinningMethod(object):
8+
BUCKET = "bucket"
9+
QUANTILE = "quantile"
10+
LOG_BUCKET = "log_bucket"
11+
12+
13+
def binning(
14+
in_md,
15+
col_name,
16+
bin_method,
17+
bins,
18+
boundaries):
19+
if boundaries:
20+
bin_o, bins = md.cut(in_md[col_name], bins=boundaries, labels=False, retbins=True)
21+
bins_np = bins.to_numpy()
22+
else:
23+
if bin_method.lower() == BinningMethod.BUCKET.lower():
24+
bin_o, bins = md.cut(in_md[col_name], bins=bins, labels=False, retbins=True)
25+
bins_np = bins.to_numpy()
26+
elif bin_method.lower() == BinningMethod.LOG_BUCKET.lower():
27+
bin_o, bins = md.cut(mt.log(in_md[col_name]), bins=bins, labels=False, retbins=True)
28+
bins_np = np.exp(bins.to_numpy())
29+
else:
30+
raise ValueError("Unsupport binning method: {}".format(bin_method))
31+
32+
return bin_o, bins_np
33+
34+
35+
def cumsum(arr, reverse):
36+
if type(arr) == np.ndarray:
37+
sum_arr = arr
38+
elif type(arr) == pd.DataFrame:
39+
sum_arr = arr.to_numpy()
40+
else:
41+
raise ValueError("Invalid input type: {}".format(type(arr)))
42+
43+
for i in range(np.ndim(arr)):
44+
sum_arr = np.flip(np.cumsum(np.flip(sum_arr, i), i), i) if reverse else np.cumsum(sum_arr, i)
45+
46+
if type(arr) == np.ndarray:
47+
return sum_arr
48+
elif type(arr) == pd.DataFrame:
49+
return pd.DataFrame(sum_arr)
50+
else:
51+
raise ValueError("Invalid input type: {}".format(type(arr)))
52+
53+
54+
def calc_binning_stats(
55+
in_md,
56+
sel_cols,
57+
bin_methods,
58+
bin_nums,
59+
cols_bin_boundaries,
60+
reverse_cumsum=False):
61+
cols_bin_stats = []
62+
for i in range(len(sel_cols)):
63+
sel_col = sel_cols[i]
64+
bin_o, bins = binning(in_md, sel_col, bin_methods[i], bin_nums[i], cols_bin_boundaries.get(sel_col, None))
65+
bin_num = len(bins) - 1
66+
bin_prob_df = bin_o.value_counts(normalize=True).to_pandas().to_frame()
67+
bin_prob_df = bin_prob_df.reindex(range(bin_num), fill_value=0)
68+
bin_cumsum_prob_df = cumsum(bin_prob_df, reverse_cumsum)
69+
70+
cols_bin_stats.append(
71+
{
72+
"name": sel_col,
73+
"bin_boundaries": ','.join(bins.astype(str)),
74+
"bin_prob": ','.join(bin_prob_df[bin_prob_df.columns[0]].to_numpy().astype(str)),
75+
"bin_cumsum_prob": ','.join(bin_cumsum_prob_df[bin_cumsum_prob_df.columns[0]].to_numpy().astype(str))
76+
}
77+
)
78+
79+
return pd.DataFrame(cols_bin_stats)
80+
81+
82+
def calc_basic_stats(
83+
in_md,
84+
sel_cols):
85+
stats_data = [
86+
{
87+
"name": sel_col,
88+
"min": mt.min(in_md[sel_col]).to_numpy(),
89+
"max": mt.max(in_md[sel_col]).to_numpy(),
90+
"mean": mt.mean(in_md[sel_col]).to_numpy(),
91+
"median": mt.median(in_md[sel_col]).to_numpy(),
92+
"std": mt.std(in_md[sel_col]).to_numpy(),
93+
} for sel_col in sel_cols
94+
]
95+
96+
return pd.DataFrame(stats_data)
97+
98+
99+
def calc_stats(
100+
in_md,
101+
sel_cols,
102+
bin_methods,
103+
bin_nums,
104+
cols_bin_boundaries,
105+
reverse_cumsum=False):
106+
basic_stats_df = calc_basic_stats(in_md, sel_cols)
107+
cols_bin_stats_df = calc_binning_stats(in_md, sel_cols, bin_methods, bin_nums, cols_bin_boundaries, reverse_cumsum)
108+
109+
stats_df = pd.merge(basic_stats_df, cols_bin_stats_df, how='inner', on='name')
110+
111+
return stats_df
112+

runnables/requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
tsfresh
22
sqlalchemy
3-
mysql
3+
mysql
4+
pymars

0 commit comments

Comments
 (0)