Skip to content

Commit 7aaadd6

Browse files
committed
read_csv refactor
1 parent b49ae1b commit 7aaadd6

5 files changed

Lines changed: 94 additions & 72 deletions

File tree

csv2sql.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ modules:
2727
enabled: true
2828
parameters:
2929
file_name: "test/import.xlsx"
30+
process:
31+
- joint: convert_sql
32+
enabled: true
33+
parameters:
3034
sheet_name: 基础数据导入表
3135
data_start_from_index: 3
3236
column_name:
@@ -49,7 +53,7 @@ modules:
4953
- 'INSERT INTO `ifish`.`aquatic_base_info` (`id`, `outer_code`, `common_name`, `scientific_name`, `english_name`, `chinese_name`, `region_name`, `aquatic_category_id`)'
5054
- 'VALUES (<{id: }>, <{outer_code: }>, <{common_name: }>, <{scientific_name: }>, <{english_name: }>, <{chinese_name: }>, <{region_name: }>, <{aquatic_category_id: }>);'
5155

52-
process:
56+
5357
- joint: import_sql
5458
enabled: true
5559
parameters:

pipelines/convert_sql.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,99 @@ limitations under the License.
1717
package pipelines
1818

1919
import (
20+
"bytes"
21+
"fmt"
22+
"github.com/360EntSecGroup-Skylar/excelize"
23+
log "github.com/cihub/seelog"
2024
"github.com/infinitbyte/framework/core/pipeline"
25+
"strings"
26+
"github.com/infinitbyte/framework/core/util"
2127
)
2228

2329
type ConvertSQLJoint struct {
30+
pipeline.Parameters
2431
}
2532

2633
func (joint ConvertSQLJoint) Name() string {
2734
return "convert_sql"
2835
}
2936

37+
const excelBytesKey = "excelBytes"
38+
39+
var rowFormat pipeline.ParaKey = "row_format"
40+
var sheetName pipeline.ParaKey = "sheet_name"
41+
var columnName pipeline.ParaKey = "column_name"
42+
var dataFromIndex pipeline.ParaKey = "data_start_from_index"
43+
44+
var sqlKey pipeline.ParaKey = "sql"
45+
3046
func (joint ConvertSQLJoint) Process(c *pipeline.Context) error {
3147

48+
excelBytes := c.MustGetBytes(excelBytesKey)
49+
r := bytes.NewReader(excelBytes)
50+
51+
templates := joint.MustGetStringArray(rowFormat)
52+
log.Debug("row templates: ", templates)
53+
54+
xlsx, err := excelize.OpenReader(r)
55+
if err != nil {
56+
log.Error(err)
57+
return err
58+
}
59+
sheetMap := xlsx.GetSheetMap()
60+
log.Debug("sheets: ", sheetMap)
61+
62+
colNames := joint.MustGetStringArray(columnName)
63+
dataOffset := joint.MustGetInt(dataFromIndex)
64+
65+
rows := xlsx.GetRows(joint.MustGetString(sheetName))
66+
67+
var sqlBuffer bytes.Buffer
68+
for offset, row := range rows {
69+
if offset < dataOffset {
70+
log.Debugf("%v < data offset: %v, ignore", offset, dataOffset)
71+
continue
72+
}
73+
74+
colMap := map[string]string{}
75+
76+
hit := false
77+
for k, colCell := range row {
78+
if colCell != "" {
79+
hit = true
80+
}
81+
colName := colNames[k]
82+
colMap[colName] = colCell
83+
log.Trace("row:", offset, ": ", colName, "-", k, "-", colCell)
84+
}
85+
86+
//ignore empty row
87+
if !hit {
88+
continue
89+
}
90+
91+
for _, x := range templates {
92+
line := x
93+
log.Debug("template:", line)
94+
for k, v := range colMap {
95+
log.Debug(fmt.Sprintf("<{%v: }>", k), ",", formatString(v))
96+
line = strings.Replace(line, fmt.Sprintf("<{%v: }>", k), formatString(v), -1)
97+
}
98+
log.Debug(line)
99+
sqlBuffer.WriteString(line)
100+
}
101+
}
102+
103+
c.Set(sqlKey, sqlBuffer.String())
104+
105+
log.Trace(sqlBuffer.String())
32106
return nil
33107
}
108+
109+
110+
func formatString(str string) string {
111+
str = strings.Replace(str, "\"", "", -1)
112+
str = strings.Replace(str, "'", "", -1)
113+
str = fmt.Sprintf("'%s'", util.TrimSpaces(str))
114+
return str
115+
}

pipelines/import_sql.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,9 @@ func (joint ImportSQLJoint) Process(c *pipeline.Context) error {
7070
panic(err)
7171
}
7272

73-
rc,_:=result.RowsAffected()
74-
l,_:=result.RowsAffected()
75-
log.Debugf("%v rows affected, lastInsertID: %v",rc,l)
73+
rc, _ := result.RowsAffected()
74+
l, _ := result.RowsAffected()
75+
log.Debugf("%v rows affected, lastInsertID: %v", rc, l)
7676

7777
err = tx.Commit()
7878
if err != nil {

pipelines/read_csv.go

Lines changed: 3 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,12 @@ limitations under the License.
1717
package pipelines
1818

1919
import (
20-
"bytes"
21-
"fmt"
22-
"github.com/360EntSecGroup-Skylar/excelize"
2320
log "github.com/cihub/seelog"
2421
"github.com/infinitbyte/framework/core/pipeline"
2522
"github.com/infinitbyte/framework/core/util"
26-
"strings"
2723
)
2824

2925
var fileName pipeline.ParaKey = "file_name"
30-
var rowFormat pipeline.ParaKey = "row_format"
31-
var sheetName pipeline.ParaKey = "sheet_name"
32-
var columnName pipeline.ParaKey = "column_name"
33-
var dataFromIndex pipeline.ParaKey = "data_start_from_index"
34-
35-
var sqlKey pipeline.ParaKey = "sql"
3626

3727
type ReadCsvJoint struct {
3828
pipeline.Parameters
@@ -44,70 +34,15 @@ func (joint ReadCsvJoint) Name() string {
4434

4535
func (joint ReadCsvJoint) Process(c *pipeline.Context) error {
4636

47-
log.Debug(joint.Data)
48-
log.Debug(c.Data)
49-
templates := joint.MustGetStringArray(rowFormat)
50-
log.Debug("row templates: ", templates)
51-
52-
xlsx, err := excelize.OpenFile(joint.MustGetString(fileName))
37+
excelBytes, err := util.FileGetContent(joint.MustGetString(fileName))
5338
if err != nil {
5439
log.Error(err)
40+
panic(err)
5541
return err
5642
}
57-
sheetMap := xlsx.GetSheetMap()
58-
log.Debug("sheets: ", sheetMap)
59-
60-
colNames := joint.MustGetStringArray(columnName)
61-
dataOffset := joint.MustGetInt(dataFromIndex)
62-
63-
rows := xlsx.GetRows(joint.MustGetString(sheetName))
64-
65-
var sqlBuffer bytes.Buffer
66-
for offset, row := range rows {
67-
if offset < dataOffset {
68-
log.Debugf("%v < data offset: %v, ignore", offset, dataOffset)
69-
continue
70-
}
71-
72-
colMap := map[string]string{}
73-
74-
hit := false
75-
for k, colCell := range row {
76-
if colCell != "" {
77-
hit = true
78-
}
79-
colName := colNames[k]
80-
colMap[colName] = colCell
81-
log.Trace("row:", offset, ": ", colName, "-", k, "-", colCell)
82-
}
83-
84-
//ignore empty row
85-
if !hit {
86-
continue
87-
}
88-
89-
for _, x := range templates {
90-
line := x
91-
log.Debug("template:", line)
92-
for k, v := range colMap {
93-
log.Debug(fmt.Sprintf("<{%v: }>", k), ",", formatString(v))
94-
line = strings.Replace(line, fmt.Sprintf("<{%v: }>", k), formatString(v), -1)
95-
}
96-
log.Debug(line)
97-
sqlBuffer.WriteString(line)
98-
}
99-
}
100-
101-
c.Set(sqlKey, sqlBuffer.String())
10243

103-
log.Trace(sqlBuffer.String())
44+
c.Set(excelBytesKey, excelBytes)
10445

10546
return nil
10647
}
10748

108-
func formatString(str string) string {
109-
str = strings.Replace(str, "\"", "", -1)
110-
str = strings.Replace(str, "'", "", -1)
111-
str = fmt.Sprintf("'%s'", util.TrimSpaces(str))
112-
return str
113-
}

plugin/plugin.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ func (module CSV2SQLPlugin) Start(cfg *Config) {
3636
//register pipeline joints
3737
pipeline.RegisterPipeJoint(pipelines.LoggingJoint{})
3838
pipeline.RegisterPipeJoint(pipelines.ReadCsvJoint{})
39+
pipeline.RegisterPipeJoint(pipelines.ConvertSQLJoint{})
3940
pipeline.RegisterPipeJoint(pipelines.ImportSQLJoint{})
4041
}
4142

0 commit comments

Comments
 (0)