11import argparse
22import mars .dataframe as md
33import os
4- from binning .binning import calc_stats
4+ import pandas as pd
5+ from bin .binning_calculator import calc_stats , calc_two_dim_binning_stats , get_cols_bin_boundaries
56from run_io .db_adapter import convertDSNToRfc1738
67from sqlalchemy import create_engine
78
@@ -12,7 +13,9 @@ def build_argument_parser():
1213 parser .add_argument ("--columns" , type = str , required = True )
1314 parser .add_argument ("--bin_methods" , type = str , required = False )
1415 parser .add_argument ("--bin_nums" , type = str , required = False )
16+ parser .add_argument ("--bin_input_table" , type = str , required = False )
1517 parser .add_argument ("--reverse_cumsum" , type = bool , default = False )
18+ parser .add_argument ("--two_dim_bin_cols" , type = str , required = False )
1619
1720 return parser
1821
@@ -23,29 +26,80 @@ def build_argument_parser():
2326 columns = args .columns .split (',' )
2427 bin_methods = args .bin_methods .split (',' ) if args .bin_methods else None
2528 bin_nums = [int (item ) for item in args .bin_nums .split (',' )] if args .bin_nums else None
29+ two_dim_bin_cols = args .two_dim_bin_cols .split (',' ) if args .two_dim_bin_cols else None
2630
2731 select_input = os .getenv ("SQLFLOW_TO_RUN_SELECT" )
2832 output = os .getenv ("SQLFLOW_TO_RUN_INTO" )
33+ output_tables = output .split (',' )
2934 datasource = os .getenv ("SQLFLOW_DATASOURCE" )
3035
36+ # Check arguments
37+ if two_dim_bin_cols :
38+ assert (len (two_dim_bin_cols ) == 2 )
39+ assert (len (output_tables ) == 3 )
40+
3141 url = convertDSNToRfc1738 (datasource , args .dbname )
3242 engine = create_engine (url )
3343 input_md = md .read_sql (
3444 sql = select_input ,
3545 con = engine )
3646 input_md .execute ()
3747
48+ cols_bin_boundaries = {}
49+ if args .bin_input_table :
50+ print ("Get provided bin boundaries from table {}" .format (args .bin_input_table ))
51+ bin_input_df = pd .read_sql_table (
52+ table_name = args .bin_input_table ,
53+ con = engine )
54+ cols_bin_boundaries = get_cols_bin_boundaries (bin_input_df )
55+
56+ if set (columns ) > cols_bin_boundaries .keys ():
57+ raise ValueError ("The provided bin boundaries contains keys: {}. But they cannot cover all the \
58+ input columns: {}" .format (cols_bin_boundaries .keys (), columns ))
59+
60+ print ("Ignore the bin_nums and bin_methods arguments" )
61+ bin_nums = [None for i in range (len (columns ))]
62+ bin_methods = [None for i in range (len (columns ))]
63+
64+ print ("Calculate the statistics result for columns: {}" .format (columns ))
3865 stats_df = calc_stats (
3966 input_md ,
4067 columns ,
4168 bin_methods ,
4269 bin_nums ,
43- {} ,
70+ cols_bin_boundaries ,
4471 args .reverse_cumsum )
4572
46- print ("Persist the statistics result into the table {}" .format (output ))
73+ print ("Persist the statistics result into the table {}" .format (output_tables [ 0 ] ))
4774 stats_df .to_sql (
48- name = output ,
75+ name = output_tables [ 0 ] ,
4976 con = engine ,
5077 index = False
5178 )
79+
80+ if args .two_dim_bin_cols :
81+ print ("Calculate two dimension binning result for columns: {}" .format (columns ))
82+ bin_prob_df , bin_cumsum_prob_df = calc_two_dim_binning_stats (
83+ input_md ,
84+ columns [0 ],
85+ columns [1 ],
86+ bin_methods [0 ],
87+ bin_methods [1 ],
88+ bin_nums [0 ],
89+ bin_nums [1 ],
90+ cols_bin_boundaries .get (columns [0 ], None ),
91+ cols_bin_boundaries .get (columns [1 ], None ),
92+ args .reverse_cumsum )
93+
94+ print ("Persist the binning probabilities into table {}" .format (output_tables [1 ]))
95+ bin_prob_df .to_sql (
96+ name = output_tables [1 ],
97+ con = engine ,
98+ index = False
99+ )
100+ print ("Persist the binning accumulated probabilities into table {}" .format (output_tables [2 ]))
101+ bin_cumsum_prob_df .to_sql (
102+ name = output_tables [2 ],
103+ con = engine ,
104+ index = False
105+ )
0 commit comments