Skip to content

Commit 3362182

Browse files
committed
refactor sql execution
1 parent dca09ac commit 3362182

13 files changed

Lines changed: 75 additions & 205 deletions

File tree

config/config.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package config
22

33
type AppConfig struct {
4-
UIEnabled bool `config:"ui_enabled"`
4+
UIEnabled bool `config:"ui_enabled"`
55
}
66

77
var appConfig AppConfig
@@ -13,4 +13,3 @@ func GetAppConfig() AppConfig {
1313
func SetAppConfig(cfg AppConfig) {
1414
appConfig = cfg
1515
}
16-

csv2sql.yml

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,6 @@ plugins:
1111
- name: csv2sql
1212
enabled: true
1313

14-
- name: persist
15-
enabled: true
16-
driver: mysql
17-
mysql:
18-
connection: root:qeephp@tcp(localhost:3306)/ifish?charset=utf8&parseTime=true&loc=Local
19-
20-
2114
modules:
2215
- name: pipeline
2316
enabled: true
@@ -53,12 +46,17 @@ modules:
5346
- feed_pattern
5447
- produce_pattern
5548
row_format:
56-
- 'INSERT INTO `aquatic_base_info` (`id`, `outer_code`, `common_name`, `scientific_name`, `english_name`, `chinese_name`, `region_name`, `aquatic_category_id`)'
49+
- 'INSERT INTO `ifish`.`aquatic_base_info` (`id`, `outer_code`, `common_name`, `scientific_name`, `english_name`, `chinese_name`, `region_name`, `aquatic_category_id`)'
5750
- 'VALUES (<{id: }>, <{outer_code: }>, <{common_name: }>, <{scientific_name: }>, <{english_name: }>, <{chinese_name: }>, <{region_name: }>, <{aquatic_category_id: }>);'
5851

5952
process:
6053
- joint: import_sql
6154
enabled: true
55+
parameters:
56+
mysql_conn: root:qeephp@tcp(localhost:3306)/ifish?charset=utf8
57+
58+
- joint: logging
59+
enabled: true
6260

6361
error:
6462
joint: on_error

executed_sql

Lines changed: 0 additions & 9 deletions
This file was deleted.

main.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,8 @@ import (
2020
_ "expvar"
2121
"github.com/infinitbyte/framework"
2222
"github.com/infinitbyte/framework/core/module"
23-
orm "github.com/infinitbyte/framework/core/persist"
24-
q "github.com/infinitbyte/framework/core/queue"
2523
pipe "github.com/infinitbyte/framework/core/pipeline"
24+
q "github.com/infinitbyte/framework/core/queue"
2625
"github.com/infinitbyte/framework/core/util"
2726
"github.com/infinitbyte/framework/modules/api"
2827
"github.com/infinitbyte/framework/modules/filter"
@@ -34,7 +33,6 @@ import (
3433
"github.com/infinitbyte/framework/modules/ui"
3534
"github.com/medcl/csv2sql/config"
3635
"github.com/medcl/csv2sql/plugin"
37-
"github.com/medcl/csv2sql/model"
3836
)
3937

4038
func main() {
@@ -72,8 +70,6 @@ func main() {
7270
//start modules
7371
module.Start()
7472

75-
orm.RegisterSchema(&model.Request{})
76-
7773
//manual trigger a pipeline event
7874
context := pipe.Context{}
7975
q.Push("primary", util.ToJSONBytes(context))

model/request.go

Lines changed: 0 additions & 151 deletions
This file was deleted.

pipelines/convert_sql.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ limitations under the License.
1616

1717
package pipelines
1818

19-
2019
import (
2120
"github.com/infinitbyte/framework/core/pipeline"
2221
)
@@ -30,6 +29,5 @@ func (joint ConvertSQLJoint) Name() string {
3029

3130
func (joint ConvertSQLJoint) Process(c *pipeline.Context) error {
3231

33-
3432
return nil
3533
}

pipelines/detect_max_id.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ limitations under the License.
1616

1717
package pipelines
1818

19-
2019
import (
2120
"github.com/infinitbyte/framework/core/pipeline"
2221
)
@@ -30,6 +29,5 @@ func (joint DetectMaxIDJoint) Name() string {
3029

3130
func (joint DetectMaxIDJoint) Process(c *pipeline.Context) error {
3231

33-
3432
return nil
3533
}

pipelines/import_sql.go

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,32 +16,66 @@ limitations under the License.
1616

1717
package pipelines
1818

19-
2019
import (
21-
"github.com/infinitbyte/framework/core/pipeline"
22-
"github.com/infinitbyte/framework/core/persist"
20+
"database/sql"
2321
log "github.com/cihub/seelog"
22+
_ "github.com/go-sql-driver/mysql"
23+
"github.com/infinitbyte/framework/core/pipeline"
2424
"github.com/infinitbyte/framework/core/util"
2525
)
2626

2727
type ImportSQLJoint struct {
2828
pipeline.Parameters
2929
}
3030

31+
const mysqlConn = "mysql_conn"
32+
3133
func (joint ImportSQLJoint) Name() string {
3234
return "import_sql"
3335
}
3436

3537
func (joint ImportSQLJoint) Process(c *pipeline.Context) error {
3638

37-
sql:=c.MustGetString(sqlKey)
39+
sqlText := c.MustGetString(sqlKey)
40+
41+
log.Debug("start execute: ", sqlText)
42+
43+
db, err := sql.Open("mysql", joint.MustGetString(mysqlConn))
44+
if err != nil {
45+
log.Error(err)
46+
panic(err)
47+
}
3848

39-
log.Info("start execute:",sql)
49+
tx, err := db.Begin()
50+
if err != nil {
51+
log.Error(err)
52+
panic(err)
53+
}
54+
55+
defer func() {
56+
if r := recover(); r != nil {
57+
log.Info("the database is rolled back.")
58+
err = tx.Rollback()
59+
if err != nil {
60+
log.Error(err)
61+
panic(err)
62+
}
63+
}
64+
}()
65+
66+
//插入数据
67+
result, err := tx.Exec(sqlText)
68+
69+
if err != nil {
70+
log.Error(err, result)
71+
panic(err)
72+
}
4073

41-
util.FileAppendNewLine("executed_sql",sql)
74+
log.Debug(result)
4275

43-
err:=persist.RawQuery(sql)
44-
if(err!=nil){
76+
err = tx.Commit()
77+
if err != nil {
78+
log.Error(err)
4579
panic(err)
4680
}
4781

pipelines/logging.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package pipelines
22

33
import (
44
"github.com/infinitbyte/framework/core/pipeline"
5+
"github.com/infinitbyte/framework/core/util"
6+
"time"
57
)
68

79
type LoggingJoint struct {
@@ -13,6 +15,12 @@ func (joint LoggingJoint) Name() string {
1315

1416
func (joint LoggingJoint) Process(c *pipeline.Context) error {
1517

18+
sql := c.MustGetString(sqlKey)
19+
20+
util.FileAppendNewLine("log/executed_sql.log", "")
21+
util.FileAppendNewLine("log/executed_sql.log", time.Now().String())
22+
util.FileAppendNewLine("log/executed_sql.log", sql)
23+
util.FileAppendNewLine("log/executed_sql.log", "")
1624

1725
return nil
1826
}

0 commit comments

Comments
 (0)