Skip to content

Commit dca09ac

Browse files
committed
dirty and works
1 parent b623a6d commit dca09ac

8 files changed

Lines changed: 133 additions & 27 deletions

File tree

csv2sql.yml

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,13 @@ plugins:
1111
- name: csv2sql
1212
enabled: true
1313

14+
- name: persist
15+
enabled: true
16+
driver: mysql
17+
mysql:
18+
connection: root:qeephp@tcp(localhost:3306)/ifish?charset=utf8&parseTime=true&loc=Local
19+
20+
1421
modules:
1522
- name: pipeline
1623
enabled: true
@@ -26,27 +33,33 @@ modules:
2633
joint: read_csv
2734
enabled: true
2835
parameters:
36+
file_name: "test/import.xlsx"
2937
sheet_name: 基础数据导入表
30-
data_start_from_index: 4
38+
data_start_from_index: 3
3139
column_name:
32-
- outer_code
33-
- common_name
34-
- scientific_name
35-
- english_name
36-
- chinese_name
37-
- region_name
38-
- category_name
39-
- is_homemade
40-
- aquatic_region_id
41-
- inner_code
42-
- produce_pattern
43-
- feed_pattern
44-
- produce_pattern
40+
- id
41+
- outer_code
42+
- common_name
43+
- scientific_name
44+
- english_name
45+
- chinese_name
46+
- region_name
47+
- aquatic_category_id
48+
- category_name
49+
- is_homemade
50+
- aquatic_region_id
51+
- inner_code
52+
- produce_pattern
53+
- feed_pattern
54+
- produce_pattern
4555
row_format:
46-
- INSERT INTO `aquatic_base_info` (`id`, `outer_code`, `common_name`, `scientific_name`, `english_name`, `chinese_name`, `region_name`, `aquatic_category_id`, `created_time`, `updated_time`)
47-
- VALUES (<{base_info_id: }>, <{outer_code: }>, <{common_name: }>, <{scientific_name: }>, <{english_name: }>, <{chinese_name: }>, <{region_name: }>, <{aquatic_category_id: }>, <{created_time: }>, <{updated_time: }>);
56+
- 'INSERT INTO `aquatic_base_info` (`id`, `outer_code`, `common_name`, `scientific_name`, `english_name`, `chinese_name`, `region_name`, `aquatic_category_id`)'
57+
- 'VALUES (<{id: }>, <{outer_code: }>, <{common_name: }>, <{scientific_name: }>, <{english_name: }>, <{chinese_name: }>, <{region_name: }>, <{aquatic_category_id: }>);'
58+
59+
process:
60+
- joint: import_sql
61+
enabled: true
4862

49-
process: []
5063
error:
5164
joint: on_error
5265
enabled: true

executed_sql

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
INSERT INTO `aquatic_base_info` (`id`, `outer_code`, `common_name`, `scientific_name`, `english_name`, `chinese_name`, `region_name`, `aquatic_category_id`)VALUES ('100', '1700829701', '花鲈/鲈鱼/海鲈鱼', 'Lateolabrax japonicus', 'Japanese seabass', '中国花鲈', '', '');
2+
INSERT INTO `aquatic_base_info` (`id`, `outer_code`, `common_name`, `scientific_name`, `english_name`, `chinese_name`, `region_name`, `aquatic_category_id`)VALUES ('100', '1700829701', '花鲈/鲈鱼/海鲈鱼', 'Lateolabrax japonicus', 'Japanese seabass', '中国花鲈', '', '');
3+
INSERT INTO `aquatic_base_info` (`id`, `outer_code`, `common_name`, `scientific_name`, `english_name`, `chinese_name`, `region_name`, `aquatic_category_id`)VALUES ('100', '1700829701', '花鲈/鲈鱼/海鲈鱼', 'Lateolabrax japonicus', 'Japanese seabass', '中国花鲈', '', '');
4+
INSERT INTO `aquatic_base_info` (`id`, `outer_code`, `common_name`, `scientific_name`, `english_name`, `chinese_name`, `region_name`, `aquatic_category_id`)VALUES ('100', '1700829701', '花鲈/鲈鱼/海鲈鱼', 'Lateolabrax japonicus', 'Japanese seabass', '中国花鲈', '', '');
5+
INSERT INTO `aquatic_base_info` (`id`, `outer_code`, `common_name`, `scientific_name`, `english_name`, `chinese_name`, `region_name`, `aquatic_category_id`)VALUES (100, 1700829701, 花鲈/鲈鱼/海鲈鱼, Lateolabrax japonicus, Japanese seabass, 中国花鲈, , );
6+
INSERT INTO `aquatic_base_info` (`id`, `outer_code`, `common_name`, `scientific_name`, `english_name`, `chinese_name`, `region_name`, `aquatic_category_id`)VALUES ('100', '1700829701', '花鲈/鲈鱼/海鲈鱼', 'Lateolabrax japonicus', 'Japanese seabass', '中国花鲈', '', '');
7+
INSERT INTO `aquatic_base_info` (`id`, `outer_code`, `common_name`, `scientific_name`, `english_name`, `chinese_name`, `region_name`, `aquatic_category_id`)VALUES ('100', '1700829701', '花鲈/鲈鱼/海鲈鱼', 'Lateolabrax japonicus', 'Japanese seabass', '中国花鲈', '', '3');
8+
INSERT INTO `aquatic_base_info` (`id`, `outer_code`, `common_name`, `scientific_name`, `english_name`, `chinese_name`, `region_name`, `aquatic_category_id`)VALUES ('100', '1700829701', '花鲈/鲈鱼/海鲈鱼', 'Lateolabrax japonicus', 'Japanese seabass', '中国花鲈', '', '3');
9+
INSERT INTO `aquatic_base_info` (`id`, `outer_code`, `common_name`, `scientific_name`, `english_name`, `chinese_name`, `region_name`, `aquatic_category_id`)VALUES ('100', '1700829701', '花鲈/鲈鱼/海鲈鱼', 'Lateolabrax japonicus', 'Japanese seabass', '中国花鲈', '', '3');

main.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"github.com/infinitbyte/framework"
2222
"github.com/infinitbyte/framework/core/module"
2323
orm "github.com/infinitbyte/framework/core/persist"
24+
q "github.com/infinitbyte/framework/core/queue"
25+
pipe "github.com/infinitbyte/framework/core/pipeline"
2426
"github.com/infinitbyte/framework/core/util"
2527
"github.com/infinitbyte/framework/modules/api"
2628
"github.com/infinitbyte/framework/modules/filter"
@@ -72,6 +74,10 @@ func main() {
7274

7375
orm.RegisterSchema(&model.Request{})
7476

77+
//manual trigger a pipeline event
78+
context := pipe.Context{}
79+
q.Push("primary", util.ToJSONBytes(context))
80+
7581
})
7682

7783
}

pipelines/import_sql.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
Copyright 2016 Medcl (m AT medcl.net)
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package pipelines
18+
19+
20+
import (
21+
"github.com/infinitbyte/framework/core/pipeline"
22+
"github.com/infinitbyte/framework/core/persist"
23+
log "github.com/cihub/seelog"
24+
"github.com/infinitbyte/framework/core/util"
25+
)
26+
27+
type ImportSQLJoint struct {
28+
pipeline.Parameters
29+
}
30+
31+
func (joint ImportSQLJoint) Name() string {
32+
return "import_sql"
33+
}
34+
35+
func (joint ImportSQLJoint) Process(c *pipeline.Context) error {
36+
37+
sql:=c.MustGetString(sqlKey)
38+
39+
log.Info("start execute:",sql)
40+
41+
util.FileAppendNewLine("executed_sql",sql)
42+
43+
err:=persist.RawQuery(sql)
44+
if(err!=nil){
45+
panic(err)
46+
}
47+
48+
log.Info("sql execute success")
49+
50+
return nil
51+
}

pipelines/read_csv.go

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,19 @@ import (
2222
log "github.com/cihub/seelog"
2323
"github.com/infinitbyte/framework/core/pipeline"
2424
"strings"
25+
"github.com/infinitbyte/framework/core/util"
2526
)
2627

28+
var fileName pipeline.ParaKey = "file_name"
2729
var rowFormat pipeline.ParaKey = "row_format"
2830
var sheetName pipeline.ParaKey = "sheet_name"
2931
var columnName pipeline.ParaKey = "column_name"
3032
var dataFromIndex pipeline.ParaKey = "data_start_from_index"
3133

34+
var sqlKey pipeline.ParaKey = "sql"
35+
3236
type ReadCsvJoint struct {
37+
pipeline.Parameters
3338
}
3439

3540
func (joint ReadCsvJoint) Name() string {
@@ -38,22 +43,25 @@ func (joint ReadCsvJoint) Name() string {
3843

3944
func (joint ReadCsvJoint) Process(c *pipeline.Context) error {
4045

46+
log.Debug(joint.Data)
4147
log.Debug(c.Data)
42-
templates := c.MustGetStringArray(rowFormat)
48+
templates := joint.MustGetStringArray(rowFormat)
4349
log.Debug("row templates: ", templates)
4450

45-
xlsx, err := excelize.OpenFile("../test/Book1.xlsx")
51+
xlsx, err := excelize.OpenFile(joint.MustGetString(fileName))
4652
if err != nil {
4753
log.Error(err)
4854
return err
4955
}
5056
sheetMap := xlsx.GetSheetMap()
5157
log.Debug("sheets: ", sheetMap)
5258

53-
colNames := c.MustGetStringArray(columnName)
54-
dataOffset := c.MustGetInt(dataFromIndex)
59+
colNames := joint.MustGetStringArray(columnName)
60+
dataOffset := joint.MustGetInt(dataFromIndex)
61+
62+
rows := xlsx.GetRows(joint.MustGetString(sheetName))
5563

56-
rows := xlsx.GetRows(c.MustGetString(sheetName))
64+
sql := ""
5765
for offset, row := range rows {
5866
if offset < dataOffset {
5967
log.Debugf("%v < data offset: %v, ignore", offset, dataOffset)
@@ -62,28 +70,45 @@ func (joint ReadCsvJoint) Process(c *pipeline.Context) error {
6270

6371
colMap := map[string]string{}
6472

73+
hit := false
6574
for k, colCell := range row {
75+
if colCell != "" {
76+
hit = true
77+
}
6678
colName := colNames[k]
6779
colMap[colName] = colCell
68-
log.Debug("row:",offset, ": ", colName, "-", k, "-", colCell)
80+
log.Trace("row:", offset, ": ", colName, "-", k, "-", colCell)
81+
}
82+
83+
//ignore empty row
84+
if !hit {
85+
continue
6986
}
7087

7188
for _, x := range templates {
72-
line:=x
73-
log.Debug("template:",line)
89+
line := x
90+
//line:=templates
91+
log.Debug("template:", line)
7492
for k, v := range colMap {
75-
log.Debug(fmt.Sprintf("<{%v: }>",k),",", formatString(v))
93+
log.Debug(fmt.Sprintf("<{%v: }>", k), ",", formatString(v))
7694
line = strings.Replace(line, fmt.Sprintf("<{%v: }>", k), formatString(v), -1)
7795
}
7896
log.Debug(line)
97+
sql = sql + line
98+
log.Debug("sql:",sql)
7999
}
80100
}
81101

102+
c.Set(sqlKey, sql)
103+
104+
log.Info(sql)
105+
82106
return nil
83107
}
84108

85109
func formatString(str string) string {
86110
str = strings.Replace(str, "\"", "", -1)
87-
str=fmt.Sprintf("'%s'",str)
111+
str = strings.Replace(str, "'", "", -1)
112+
str = fmt.Sprintf("'%s'", util.TrimSpaces(str))
88113
return str
89114
}

plugin/plugin.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ func (module CSV2SQLPlugin) Start(cfg *Config) {
3535

3636
//register pipeline joints
3737
pipeline.RegisterPipeJoint(pipelines.LoggingJoint{})
38+
pipeline.RegisterPipeJoint(pipelines.ReadCsvJoint{})
39+
pipeline.RegisterPipeJoint(pipelines.ImportSQLJoint{})
3840
}
3941

4042
func (module CSV2SQLPlugin) Stop() error {

test/import.xlsx

37.5 KB
Binary file not shown.

test/~$Book1.xlsx

-171 Bytes
Binary file not shown.

0 commit comments

Comments
 (0)