Skip to content

Commit e631805

Browse files
committed
imporve data import
1 parent 67026ee commit e631805

8 files changed

Lines changed: 167 additions & 22 deletions

File tree

README.md

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,130 @@
11

22
CSV2SQL, An util to convert csv data to SQL scripts.
33

4+
## How it works
5+
6+
Pipeline:
7+
8+
[Read the xlsx file] -> [Convert to SQL scripts] -> [Execute SQL scripts]
9+
10+
## How to use
11+
12+
1. Prepare the xlsx file with your data
13+
14+
Assume the file is dropped into `test/import.xlsx`, you can download it from [here](https://github.com/medcl/ifish-data-loader/blob/master/test/import.xlsx)
15+
16+
<img width="800" src="https://raw.githubusercontent.com/medcl/ifish-data-loader/master/docs/assets/img/Snip20180505_8.png">
17+
18+
2. Config the `csv2sql.yml`
19+
20+
```
21+
modules:
22+
- name: pipeline
23+
enabled: true
24+
runners:
25+
- name: process_excel
26+
enabled: true
27+
input_queue: primary
28+
max_go_routine: 1
29+
threshold_in_ms: 0
30+
timeout_in_ms: 5000
31+
default_config:
32+
start:
33+
joint: read_csv
34+
enabled: true
35+
parameters:
36+
file_name: "test/import.xlsx"
37+
process:
38+
- joint: convert_sql
39+
enabled: true
40+
parameters:
41+
sheet_name: fish_information
42+
data_start_from_index: 3
43+
column_name:
44+
- id
45+
- outer_code
46+
- common_name
47+
- scientific_name
48+
- english_name
49+
- chinese_name
50+
- region_name
51+
- aquatic_category_id
52+
- category_name
53+
- is_homemade
54+
- aquatic_region_id
55+
- inner_code
56+
- produce_pattern
57+
- feed_pattern
58+
- catch_pattern
59+
row_format:
60+
- 'INSERT INTO `aquatic_base_info` (`id`, `outer_code`, `common_name`, `scientific_name`, `english_name`, `chinese_name`, `region_name`, `aquatic_category_id`)'
61+
- 'VALUES (<{id: }>, <{outer_code: }>, <{common_name: }>, <{scientific_name: }>, <{english_name: }>, <{chinese_name: }>, <{region_name: }>, <{aquatic_category_id: }>);'
62+
- 'INSERT INTO `aquatic_source` (`inner_code`, `aquatic_base_info_id`, `is_homemade`, `aquatic_region_id`, `produce_pattern`, `feed_pattern`, `catch_pattern`) '
63+
- 'VALUES (<{inner_code: }>, <{id: }>, <{is_homemade: }>, <{aquatic_region_id: }>, <{produce_pattern: }>, <{feed_pattern: }>, <{catch_pattern: }>);'
64+
65+
- joint: convert_sql
66+
enabled: false
67+
parameters:
68+
sheet_name: aquatic_region
69+
data_start_from_index: 1
70+
column_name:
71+
- id
72+
- code
73+
- name
74+
- is_homemade
75+
row_format:
76+
- 'INSERT INTO `aquatic_region` (`id`, `code`, `name`, `is_homemade`) VALUES (<{id: }>, <{code: }>, <{name: }>, <{is_homemade: }>);'
77+
78+
- joint: import_sql
79+
enabled: true
80+
parameters:
81+
mysql_conn: root:password@tcp(localhost:3306)/ifish?charset=utf8
82+
rollback_enabled: true
83+
84+
- joint: logging
85+
enabled: true
86+
87+
error:
88+
joint: on_error
89+
enabled: true
90+
```
91+
92+
Note, there are more than one joint config with name: `convert_sql`, which means you can import multi data sheet with different config,
93+
in this example the second `convert_sql` joint has been disabled, you can enable it if you wish, and also you can add more `convert_sql` joints.
94+
95+
And also as you can see, in the first `convert_sql` joint, with the data sheet `fish_information`, the config `row_format` is a array,
96+
and have more than one SQL template, separated with `;` ,which means you can generate multi SQL from one single data sheet,
97+
and then we can inset the data into different mysql tables.
98+
99+
Note, please also change MySQL connection in joint `mysql_conn` config, ie: `your_mysql_user:you_password@tcp(your_mysql_host:3306)/your_mysql_db?charset=utf8`
100+
101+
3. Start to import the data
102+
```
103+
➜ csv2sql git:(master) ✗ ./bin/csv2sql
104+
[CSV2SQL] An util to convert csv data to SQL scripts.
105+
0.1.0_SNAPSHOT, 67026ee, Sat May 5 13:23:42 2018 +0800, medcl, support import multi datasheet
106+
107+
[05-05 15:29:38] [INF] [instance.go:23] workspace: data/APP/nodes/0
108+
[05-05 15:29:38] [INF] [pipeline.go:67] pipeline: process_excel started with 1 instances
109+
[05-05 15:29:38] [INF] [api.go:147] api server listen at: http://0.0.0.0:2900
110+
[05-05 15:29:38] [INF] [ui.go:149] http server listen at: http://127.0.0.1:9001
111+
[05-05 15:29:38] [INF] [import_sql.go:87] sql execute success, 1 rows affected, lastInsertID: 1
112+
[05-05 15:29:38] [INF] [import_sql.go:87] sql execute success, 1 rows affected, lastInsertID: 1
113+
^C
114+
[CSV2SQL] got signal:interrupt, start shutting down
115+
| |
116+
_` | _ \ _ \ _` | _ \ | | -_)
117+
\__, | \___/ \___/ \__,_| _.__/ \_, | \___|
118+
____/ ___/
119+
[CSV2SQL] 0.1.0_SNAPSHOT, uptime:5.773621s
120+
121+
```
122+
123+
4. Now, check out the database, you will see 2 new records in two different table
124+
125+
<img width="800" src="https://raw.githubusercontent.com/medcl/ifish-data-loader/master/docs/assets/img/Snip20180505_5.png">
126+
<img width="800" src="https://raw.githubusercontent.com/medcl/ifish-data-loader/master/docs/assets/img/Snip20180505_6.png">
127+
4128

5129
License
6130
=======

csv2sql.yml

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ modules:
3131
- joint: convert_sql
3232
enabled: true
3333
parameters:
34-
sheet_name: 基础数据导入表
34+
sheet_name: fish_information
3535
data_start_from_index: 3
3636
column_name:
3737
- id
@@ -48,10 +48,12 @@ modules:
4848
- inner_code
4949
- produce_pattern
5050
- feed_pattern
51-
- produce_pattern
51+
- catch_pattern
5252
row_format:
53-
- 'INSERT INTO `ifish`.`aquatic_base_info` (`id`, `outer_code`, `common_name`, `scientific_name`, `english_name`, `chinese_name`, `region_name`, `aquatic_category_id`)'
53+
- 'INSERT INTO `aquatic_base_info` (`id`, `outer_code`, `common_name`, `scientific_name`, `english_name`, `chinese_name`, `region_name`, `aquatic_category_id`)'
5454
- 'VALUES (<{id: }>, <{outer_code: }>, <{common_name: }>, <{scientific_name: }>, <{english_name: }>, <{chinese_name: }>, <{region_name: }>, <{aquatic_category_id: }>);'
55+
- 'INSERT INTO `aquatic_source` (`inner_code`, `aquatic_base_info_id`, `is_homemade`, `aquatic_region_id`, `produce_pattern`, `feed_pattern`, `catch_pattern`) '
56+
- 'VALUES (<{inner_code: }>, <{id: }>, <{is_homemade: }>, <{aquatic_region_id: }>, <{produce_pattern: }>, <{feed_pattern: }>, <{catch_pattern: }>);'
5557

5658
- joint: convert_sql
5759
enabled: true
@@ -70,6 +72,7 @@ modules:
7072
enabled: true
7173
parameters:
7274
mysql_conn: root:qeephp@tcp(localhost:3306)/ifish?charset=utf8
75+
rollback_enabled: true
7376

7477
- joint: logging
7578
enabled: true

doc/assets/img/Snip20180505_5.png

83.6 KB
Loading

doc/assets/img/Snip20180505_6.png

58.3 KB
Loading

doc/assets/img/Snip20180505_8.png

226 KB
Loading

pipelines/convert_sql.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,12 @@ func (joint ConvertSQLJoint) Process(c *pipeline.Context) error {
116116
}
117117

118118
func formatString(str string) string {
119+
str = util.TrimSpaces(str)
120+
if str == "" {
121+
return "NULL"
122+
}
119123
str = strings.Replace(str, "\"", "", -1)
120124
str = strings.Replace(str, "'", "", -1)
121-
str = fmt.Sprintf("'%s'", util.TrimSpaces(str))
125+
str = fmt.Sprintf("'%s'", str)
122126
return str
123127
}

pipelines/import_sql.go

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,16 @@ import (
2121
log "github.com/cihub/seelog"
2222
_ "github.com/go-sql-driver/mysql"
2323
"github.com/infinitbyte/framework/core/pipeline"
24+
"github.com/infinitbyte/framework/core/util"
25+
"strings"
2426
)
2527

2628
type ImportSQLJoint struct {
2729
pipeline.Parameters
2830
}
2931

3032
const mysqlConn = "mysql_conn"
33+
const enableRollback = "rollback_enabled"
3134

3235
func (joint ImportSQLJoint) Name() string {
3336
return "import_sql"
@@ -52,36 +55,47 @@ func (joint ImportSQLJoint) Process(c *pipeline.Context) error {
5255
panic(err)
5356
}
5457

55-
defer func() {
56-
if r := recover(); r != nil {
57-
log.Info("the database is rolled back.")
58-
err = tx.Rollback()
59-
if err != nil {
60-
log.Error(err)
61-
panic(err)
58+
if(c.GetBool(enableRollback,true)){
59+
defer func() {
60+
if r := recover(); r != nil {
61+
log.Info("the database is rolled back.")
62+
err = tx.Rollback()
63+
if err != nil {
64+
log.Error(err)
65+
panic(err)
66+
}
6267
}
63-
}
64-
}()
68+
}()
69+
}
6570

6671
//插入数据
67-
result, err := tx.Exec(sqlText.(string))
72+
sql := sqlText.(string)
6873

69-
if err != nil {
70-
log.Error(err, result)
71-
panic(err)
72-
}
74+
array := strings.Split(sql, ";")
75+
for _, v := range array {
76+
if v == "" {
77+
continue
78+
}
79+
result, err := tx.Exec(v)
7380

74-
rc, _ := result.RowsAffected()
75-
l, _ := result.RowsAffected()
81+
if err != nil {
82+
log.Error(err, result,v)
83+
util.FileAppendNewLine("log/executed_sql_error.log", sql)
84+
util.FileAppendNewLine("log/executed_sql_error.log", err.Error())
85+
panic(err)
86+
}
87+
88+
rc, _ := result.RowsAffected()
89+
l, _ := result.RowsAffected()
90+
log.Infof("sql execute success, %v rows affected, lastInsertID: %v", rc, l)
91+
}
7692

7793
err = tx.Commit()
7894
if err != nil {
7995
log.Error(err)
8096
panic(err)
8197
}
8298

83-
log.Infof("sql execute success, %v rows affected, lastInsertID: %v", rc, l)
84-
8599
}
86100

87101
return nil

test/import.xlsx

5.71 KB
Binary file not shown.

0 commit comments

Comments
 (0)