Skip to content

Commit 67026ee

Browse files
committed
support import multi datasheet
1 parent 7aaadd6 commit 67026ee

7 files changed

Lines changed: 79 additions & 54 deletions

File tree

csv2sql.yml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,18 @@ modules:
5353
- 'INSERT INTO `ifish`.`aquatic_base_info` (`id`, `outer_code`, `common_name`, `scientific_name`, `english_name`, `chinese_name`, `region_name`, `aquatic_category_id`)'
5454
- 'VALUES (<{id: }>, <{outer_code: }>, <{common_name: }>, <{scientific_name: }>, <{english_name: }>, <{chinese_name: }>, <{region_name: }>, <{aquatic_category_id: }>);'
5555

56+
- joint: convert_sql
57+
enabled: true
58+
parameters:
59+
sheet_name: aquatic_region
60+
data_start_from_index: 1
61+
column_name:
62+
- id
63+
- code
64+
- name
65+
- is_homemade
66+
row_format:
67+
- 'INSERT INTO `aquatic_region` (`id`, `code`, `name`, `is_homemade`) VALUES (<{id: }>, <{code: }>, <{name: }>, <{is_homemade: }>);'
5668

5769
- joint: import_sql
5870
enabled: true

pipelines/convert_sql.go

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ import (
2222
"github.com/360EntSecGroup-Skylar/excelize"
2323
log "github.com/cihub/seelog"
2424
"github.com/infinitbyte/framework/core/pipeline"
25-
"strings"
2625
"github.com/infinitbyte/framework/core/util"
26+
"strings"
2727
)
2828

2929
type ConvertSQLJoint struct {
@@ -36,10 +36,10 @@ func (joint ConvertSQLJoint) Name() string {
3636

3737
const excelBytesKey = "excelBytes"
3838

39-
var rowFormat pipeline.ParaKey = "row_format"
40-
var sheetName pipeline.ParaKey = "sheet_name"
41-
var columnName pipeline.ParaKey = "column_name"
42-
var dataFromIndex pipeline.ParaKey = "data_start_from_index"
39+
var rowFormatKey pipeline.ParaKey = "row_format"
40+
var sheetNameKey pipeline.ParaKey = "sheet_name"
41+
var columnNameKey pipeline.ParaKey = "column_name"
42+
var dataFromIndexKey pipeline.ParaKey = "data_start_from_index"
4343

4444
var sqlKey pipeline.ParaKey = "sql"
4545

@@ -48,7 +48,7 @@ func (joint ConvertSQLJoint) Process(c *pipeline.Context) error {
4848
excelBytes := c.MustGetBytes(excelBytesKey)
4949
r := bytes.NewReader(excelBytes)
5050

51-
templates := joint.MustGetStringArray(rowFormat)
51+
templates := joint.MustGetStringArray(rowFormatKey)
5252
log.Debug("row templates: ", templates)
5353

5454
xlsx, err := excelize.OpenReader(r)
@@ -59,10 +59,12 @@ func (joint ConvertSQLJoint) Process(c *pipeline.Context) error {
5959
sheetMap := xlsx.GetSheetMap()
6060
log.Debug("sheets: ", sheetMap)
6161

62-
colNames := joint.MustGetStringArray(columnName)
63-
dataOffset := joint.MustGetInt(dataFromIndex)
62+
colNames := joint.MustGetStringArray(columnNameKey)
63+
dataOffset := joint.MustGetInt(dataFromIndexKey)
64+
65+
sheetName := joint.MustGetString(sheetNameKey)
6466

65-
rows := xlsx.GetRows(joint.MustGetString(sheetName))
67+
rows := xlsx.GetRows(sheetName)
6668

6769
var sqlBuffer bytes.Buffer
6870
for offset, row := range rows {
@@ -100,13 +102,19 @@ func (joint ConvertSQLJoint) Process(c *pipeline.Context) error {
100102
}
101103
}
102104

103-
c.Set(sqlKey, sqlBuffer.String())
105+
sqlMap, ok := c.GetMap(sqlKey)
106+
if !ok {
107+
sqlMap = map[string]interface{}{}
108+
}
109+
110+
sqlText := sqlBuffer.String()
111+
sqlMap[sheetName] = sqlText
112+
c.Set(sqlKey, sqlMap)
104113

105114
log.Trace(sqlBuffer.String())
106115
return nil
107116
}
108117

109-
110118
func formatString(str string) string {
111119
str = strings.Replace(str, "\"", "", -1)
112120
str = strings.Replace(str, "'", "", -1)

pipelines/import_sql.go

Lines changed: 37 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -35,52 +35,54 @@ func (joint ImportSQLJoint) Name() string {
3535

3636
func (joint ImportSQLJoint) Process(c *pipeline.Context) error {
3737

38-
sqlText := c.MustGetString(sqlKey)
38+
sqlTextMap := c.MustGetMap(sqlKey)
3939

40-
log.Debug("start execute: ", sqlText)
40+
for k, sqlText := range sqlTextMap {
4141

42-
db, err := sql.Open("mysql", joint.MustGetString(mysqlConn))
43-
if err != nil {
44-
log.Error(err)
45-
panic(err)
46-
}
42+
log.Debug("start execute: ", k, ",sql: ", sqlText)
43+
db, err := sql.Open("mysql", joint.MustGetString(mysqlConn))
44+
if err != nil {
45+
log.Error(err)
46+
panic(err)
47+
}
4748

48-
tx, err := db.Begin()
49-
if err != nil {
50-
log.Error(err)
51-
panic(err)
52-
}
49+
tx, err := db.Begin()
50+
if err != nil {
51+
log.Error(err)
52+
panic(err)
53+
}
5354

54-
defer func() {
55-
if r := recover(); r != nil {
56-
log.Info("the database is rolled back.")
57-
err = tx.Rollback()
58-
if err != nil {
59-
log.Error(err)
60-
panic(err)
55+
defer func() {
56+
if r := recover(); r != nil {
57+
log.Info("the database is rolled back.")
58+
err = tx.Rollback()
59+
if err != nil {
60+
log.Error(err)
61+
panic(err)
62+
}
6163
}
64+
}()
65+
66+
//插入数据
67+
result, err := tx.Exec(sqlText.(string))
68+
69+
if err != nil {
70+
log.Error(err, result)
71+
panic(err)
6272
}
63-
}()
6473

65-
//插入数据
66-
result, err := tx.Exec(sqlText)
74+
rc, _ := result.RowsAffected()
75+
l, _ := result.RowsAffected()
6776

68-
if err != nil {
69-
log.Error(err, result)
70-
panic(err)
71-
}
77+
err = tx.Commit()
78+
if err != nil {
79+
log.Error(err)
80+
panic(err)
81+
}
7282

73-
rc, _ := result.RowsAffected()
74-
l, _ := result.RowsAffected()
75-
log.Debugf("%v rows affected, lastInsertID: %v", rc, l)
83+
log.Infof("sql execute success, %v rows affected, lastInsertID: %v", rc, l)
7684

77-
err = tx.Commit()
78-
if err != nil {
79-
log.Error(err)
80-
panic(err)
8185
}
8286

83-
log.Info("sql execute success")
84-
8587
return nil
8688
}

pipelines/logging.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,16 @@ func (joint LoggingJoint) Name() string {
1515

1616
func (joint LoggingJoint) Process(c *pipeline.Context) error {
1717

18-
sql := c.MustGetString(sqlKey)
18+
sqlTextMap := c.MustGetMap(sqlKey)
1919

2020
util.FileAppendNewLine("log/executed_sql.log", "")
2121
util.FileAppendNewLine("log/executed_sql.log", time.Now().String())
22-
util.FileAppendNewLine("log/executed_sql.log", sql)
23-
util.FileAppendNewLine("log/executed_sql.log", "")
22+
23+
for k, sqlText := range sqlTextMap {
24+
util.FileAppendNewLine("log/executed_sql.log", "process data sheet: "+k)
25+
util.FileAppendNewLine("log/executed_sql.log", sqlText.(string))
26+
util.FileAppendNewLine("log/executed_sql.log", "")
27+
}
2428

2529
return nil
2630
}

pipelines/read_csv.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,3 @@ func (joint ReadCsvJoint) Process(c *pipeline.Context) error {
4545

4646
return nil
4747
}
48-

pipelines/read_csv_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,9 @@ func TestSeek(t *testing.T) {
6363
e.IsDebug = true
6464
joint := ReadCsvJoint{}
6565
c := pipeline.Context{}
66-
c.Set(rowFormat, []string{"insert into mytable(a,b,c) values(<{colA: }>,<{colB: }>,<{colC: }>);"})
67-
c.Set(sheetName, "Sheet1")
68-
c.Set(dataFromIndex, 2)
69-
c.Set(columnName, []string{"colA", "colB", "colC", "colD", "colE"})
66+
c.Set(rowFormatKey, []string{"insert into mytable(a,b,c) values(<{colA: }>,<{colB: }>,<{colC: }>);"})
67+
c.Set(sheetNameKey, "Sheet1")
68+
c.Set(dataFromIndexKey, 2)
69+
c.Set(columnNameKey, []string{"colA", "colB", "colC", "colD", "colE"})
7070
joint.Process(&c)
7171
}

test/import.xlsx

-4.84 KB
Binary file not shown.

0 commit comments

Comments
 (0)