Skip to content

Commit a1772c4

Browse files
committed
Add AggFn
Add AggFn Bug Bug Bug Bug Agg init Agg init minor minor minor HashAggregate init HashAggregate init minor minor RBO RBO RBO Add SnapshotTs Add SnapshotTs Fix builder pattern init commit
1 parent 4163ea7 commit a1772c4

27 files changed

Lines changed: 721 additions & 183 deletions

README.md

Lines changed: 72 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,55 +1,102 @@
11
## Tiny Dataframe
22

3-
Inspired by [FrostDB](https://github.com/polarsignals/frostdb)
4-
and [Arrow DataFusion](https://github.com/apache/arrow-datafusion).
5-
63
### Why Dataframe
74

85
Dataframe removes the complexity of handling `SQL parsing`, `SQL rewriting`, `Binding`, `SQL Query Planner` etc. Once
96
the dataframe is mature, we can easily integrate it with an SQL engine.
107

8+
### Features
9+
- `Push based` query execution
10+
- Abstraction over `arrow.Record`, `arrow.Array` and `arrow.Schema`
11+
- Support `Parquet` reading with schema inference
12+
- `Rule Based` Optimizer
13+
- `AggFunc`: Sum
14+
- `BooleanBinaryExpr`: Eq
1115

1216
### Example
1317

1418
```go
15-
package main
19+
package simple
1620

1721
import (
18-
"fmt"
22+
"github.com/stretchr/testify/assert"
23+
"testing"
1924
"tiny_dataframe/pkg/a_engine"
2025
logicalplan "tiny_dataframe/pkg/c_logical_plan"
2126
)
2227

23-
func main() {
28+
func TestParquetFile(t *testing.T) {
2429
ctx := engine.NewContext()
25-
df, _ := ctx.Parquet("../../test/data/c1_c2_int64.parquet", nil)
26-
30+
df, err := ctx.Parquet("../../test/data/c1_c2_c3_int64.parquet", nil)
31+
/*
32+
+-----+-----+-----+
33+
| C1 | C2 | C3 |
34+
+-----+-----+-----+
35+
| 100 | 101 | 102 |
36+
| 100 | 201 | 202 |
37+
| 100 | 301 | 302 |
38+
| 200 | 401 | 402 |
39+
| 200 | 501 | 502 |
40+
| 300 | 601 | 602 |
41+
+-----+-----+-----+
42+
*/
43+
if err != nil {
44+
t.Error(err)
45+
}
46+
2747
df = df.
28-
Project(
29-
logicalplan.Column{Name: "c1"},
30-
logicalplan.Column{Name: "c2"},
31-
).
3248
Filter(logicalplan.Eq(
33-
logicalplan.Column{Name: "c1"},
34-
logicalplan.LiteralInt64{Val: 100},
35-
))
49+
logicalplan.ColumnExpr{Name: "c1"},
50+
logicalplan.LiteralInt64Expr{Val: 200},
51+
)).
52+
Project(
53+
logicalplan.ColumnExpr{Name: "c1"},
54+
logicalplan.ColumnExpr{Name: "c2"},
55+
).Aggregate(
56+
[]logicalplan.Expr{
57+
logicalplan.ColumnExpr{Name: "c1"},
58+
},
59+
[]logicalplan.AggregateExpr{
60+
{
61+
Name: "sum",
62+
Expr: logicalplan.ColumnExpr{Name: "c2"},
63+
},
64+
})
3665

3766
logicalPlan, _ := df.LogicalPlan()
3867
fmt.Println(logicalplan.PrettyPrint(logicalPlan, 0))
3968
/*
40-
Filter: #c1 = 100
41-
Projection: #c1, #c2
42-
Input: ../../test/data/c1_c2_int64.parquet; projExpr=None
69+
Aggregate: groupExpr=[#c1], aggregateExpr=[sum(#c2)]
70+
Projection: #c1, #c2
71+
Filter: #c1 = 200
72+
Input: ../../test/data/c1_c2_c3_int64.parquet; projExpr=None
73+
*/
74+
75+
logicalPlan, _ = df.OptimizedLogicalPlan()
76+
fmt.Println(logicalplan.PrettyPrint(logicalPlan, 0))
77+
/*
78+
Aggregate: groupExpr=[#c1], aggregateExpr=[sum(#c2)]
79+
Projection: #c1, #c2
80+
Filter: #c1 = 200
81+
Input: ../../test/data/c1_c2_c3_int64.parquet; projExpr=[c1 c2]
4382
*/
4483

45-
_ = df.Show()
84+
err = df.Show()
85+
if err != nil {
86+
t.Error(err)
87+
}
4688
/*
47-
+-----+-----+
48-
| C1 | C2 |
49-
+-----+-----+
50-
| 100 | 101 |
51-
+-----+-----+
89+
+-----+---------+
90+
| #0 | SUM(#1) |
91+
+-----+---------+
92+
| 200 | 902 |
93+
+-----+---------+
5294
*/
5395
}
96+
```
5497

55-
```
98+
### Reference
99+
- [FrostDB](https://github.com/polarsignals/frostdb)
100+
- [Arrow DataFusion](https://github.com/apache/arrow-datafusion)
101+
- [KQuery](https://github.com/dbminions/how-query-engine-work)
102+
- [Drogo](https://github.com/dbminions/drogo)

cmd/simple/engine_test.go

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,30 +2,45 @@ package simple
22

33
import (
44
"fmt"
5+
"github.com/stretchr/testify/assert"
56
"testing"
67
"tiny_dataframe/pkg/a_engine"
78
logicalplan "tiny_dataframe/pkg/c_logical_plan"
89
)
910

1011
func TestParquetFile(t *testing.T) {
1112
ctx := engine.NewContext()
12-
df, err := ctx.Parquet("../../test/data/c1_c2_int64.parquet", nil)
13+
df, err := ctx.Parquet("../../test/data/c1_c2_c3_int64.parquet", nil)
1314
if err != nil {
1415
t.Error(err)
1516
}
1617

17-
df = df.
18-
Project(
19-
logicalplan.Column{Name: "c1"},
20-
logicalplan.Column{Name: "c2"},
21-
).
22-
Filter(logicalplan.Eq(
23-
logicalplan.Column{Name: "c1"},
24-
logicalplan.LiteralInt64{Val: 100},
25-
))
18+
//df = df.
19+
// Filter(logicalplan.Eq(
20+
// logicalplan.ColumnExpr{Name: "c1"},
21+
// logicalplan.LiteralInt64Expr{Val: 200},
22+
// )).
23+
// Project(
24+
// logicalplan.ColumnExpr{Name: "c1"},
25+
// logicalplan.ColumnExpr{Name: "c2"},
26+
// ).Aggregate(
27+
// []logicalplan.Expr{
28+
// logicalplan.ColumnExpr{Name: "c1"},
29+
// },
30+
// []logicalplan.AggregateExpr{
31+
// {
32+
// Name: "sum",
33+
// Expr: logicalplan.ColumnExpr{Name: "c2"},
34+
// },
35+
// })
2636

2737
logicalPlan, _ := df.LogicalPlan()
2838
fmt.Println(logicalplan.PrettyPrint(logicalPlan, 0))
39+
assert.Equal(t, "Aggregate: groupExpr=[#c1], aggregateExpr=[sum(#c2)]\n\tProjection: #c1, #c2\n\t\tFilter: #c1 = 200\n\t\t\tInput: ../../test/data/c1_c2_c3_int64.parquet; projExpr=None\n", logicalplan.PrettyPrint(logicalPlan, 0))
40+
41+
logicalPlan, _ = df.OptimizedLogicalPlan()
42+
fmt.Println(logicalplan.PrettyPrint(logicalPlan, 0))
43+
assert.Equal(t, "Aggregate: groupExpr=[#c1], aggregateExpr=[sum(#c2)]\n\tProjection: #c1, #c2\n\t\tFilter: #c1 = 200\n\t\t\tInput: ../../test/data/c1_c2_c3_int64.parquet; projExpr=[c1 c2]\n", logicalplan.PrettyPrint(logicalPlan, 0))
2944

3045
err = df.Show()
3146
if err != nil {

pkg/a_engine/context.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"time"
55
dataframe "tiny_dataframe/pkg/b_dataframe"
66
physicalplan "tiny_dataframe/pkg/d_physicalplan"
7-
datasource "tiny_dataframe/pkg/f_storage_engine"
7+
datasource "tiny_dataframe/pkg/f_data_source"
88
containers "tiny_dataframe/pkg/g_containers"
99
)
1010

pkg/b_dataframe/dataframe.go

Lines changed: 38 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@ import (
44
"context"
55
"github.com/olekukonko/tablewriter"
66
"os"
7+
"tiny_dataframe/pkg/c_logical_plan/optimizer"
78
execution "tiny_dataframe/pkg/e_exec_runtime"
89

910
logicalplan "tiny_dataframe/pkg/c_logical_plan"
1011
phyiscalplan "tiny_dataframe/pkg/d_physicalplan"
1112
"tiny_dataframe/pkg/d_physicalplan/operators"
12-
datasource "tiny_dataframe/pkg/f_storage_engine"
13+
datasource "tiny_dataframe/pkg/f_data_source"
1314
containers "tiny_dataframe/pkg/g_containers"
1415
)
1516

@@ -19,46 +20,50 @@ type IDataFrame interface {
1920
Filter(expr logicalplan.Expr) IDataFrame
2021
Aggregate(groupBy []logicalplan.Expr, aggregateExpr []logicalplan.AggregateExpr) IDataFrame
2122

22-
Schema() (containers.ISchema, error)
23-
Collect(ctx context.Context, callback datasource.Callback) error
23+
TaskContext() *execution.TaskContext
2424
Show() error
2525

2626
LogicalPlan() (logicalplan.LogicalPlan, error)
27+
OptimizedLogicalPlan() (logicalplan.LogicalPlan, error)
2728
PhysicalPlan() (operators.PhysicalPlan, error)
2829
}
2930

3031
type DataFrame struct {
31-
sessionState *phyiscalplan.ExecState
32-
planBuilder *logicalplan.Builder
32+
sessionState *phyiscalplan.ExecState
33+
planBuilder *logicalplan.Builder
34+
ruleBasedOptimizer *optimizer.Optimizer
3335
}
3436

3537
func NewDataFrame(sessionState *phyiscalplan.ExecState) IDataFrame {
36-
return &DataFrame{sessionState: sessionState, planBuilder: logicalplan.NewBuilder()}
38+
return &DataFrame{
39+
sessionState: sessionState,
40+
planBuilder: logicalplan.NewBuilder(),
41+
ruleBasedOptimizer: optimizer.NewOptimizer(),
42+
}
3743
}
3844

3945
func (df *DataFrame) Scan(path string, source datasource.TableReader, proj []string) IDataFrame {
40-
df.planBuilder = df.planBuilder.Input(path, source, proj)
46+
df.planBuilder.Input(path, source, proj)
4147
return df
4248
}
4349

4450
func (df *DataFrame) Project(proj ...logicalplan.Expr) IDataFrame {
45-
//TODO: Fix builder pattern
46-
df.planBuilder = df.planBuilder.Project(proj...)
51+
df.planBuilder.Project(proj...)
4752
return df
4853
}
4954

5055
func (df *DataFrame) Filter(predicate logicalplan.Expr) IDataFrame {
51-
df.planBuilder = df.planBuilder.Filter(predicate)
56+
df.planBuilder.Filter(predicate)
5257
return df
5358
}
5459

5560
func (df *DataFrame) Aggregate(groupBy []logicalplan.Expr, aggExpr []logicalplan.AggregateExpr) IDataFrame {
56-
df.planBuilder = df.planBuilder.Aggregate(groupBy, aggExpr)
61+
df.planBuilder.Aggregate(groupBy, aggExpr)
5762
return df
5863
}
5964

60-
func (df *DataFrame) Collect(ctx context.Context, callback datasource.Callback) error {
61-
df.planBuilder = df.planBuilder.Output(callback)
65+
func (df *DataFrame) collect(ctx context.Context, callback datasource.Callback) error {
66+
df.planBuilder.Output(callback)
6267

6368
physicalPlan, err := df.PhysicalPlan()
6469
if err != nil {
@@ -67,42 +72,46 @@ func (df *DataFrame) Collect(ctx context.Context, callback datasource.Callback)
6772
return physicalPlan.Execute(df.TaskContext(), callback)
6873
}
6974

70-
func (df *DataFrame) TaskContext() execution.TaskContext {
75+
func (df *DataFrame) TaskContext() *execution.TaskContext {
7176
return df.sessionState.TaskContext()
7277
}
7378

74-
func (df *DataFrame) Schema() (containers.ISchema, error) {
75-
build, err := df.planBuilder.Build()
79+
func (df *DataFrame) LogicalPlan() (logicalplan.LogicalPlan, error) {
80+
return df.planBuilder.Build()
81+
}
82+
83+
// OptimizedLogicalPlan returns the optimized logical plan. This is mainly for testing only.
84+
func (df *DataFrame) OptimizedLogicalPlan() (logicalplan.LogicalPlan, error) {
85+
plan, err := df.LogicalPlan()
7686
if err != nil {
7787
return nil, err
7888
}
79-
80-
return build.Schema(), nil
81-
}
82-
83-
func (df *DataFrame) LogicalPlan() (logicalplan.LogicalPlan, error) {
84-
return df.planBuilder.Build()
89+
return df.ruleBasedOptimizer.Optimize(plan), nil
8590
}
8691

8792
func (df *DataFrame) Show() error {
8893

8994
batches := make([]containers.IBatch, 0)
90-
err := df.Collect(context.TODO(), func(ctx context.Context, batch containers.IBatch) error {
95+
var schema containers.ISchema
96+
err := df.collect(context.TODO(), func(ctx context.Context, batch containers.IBatch) error {
97+
if schema == nil {
98+
schema = batch.Schema()
99+
}
91100
batches = append(batches, batch)
92101
return nil
93102
})
94103

95104
if err != nil {
96105
return err
97106
}
107+
if len(batches) == 0 {
108+
return nil
109+
}
110+
98111
table := tablewriter.NewWriter(os.Stdout)
99112

100113
// 1. add headers
101114
headers := make([]string, 0)
102-
schema, err := df.Schema()
103-
if err != nil {
104-
return err
105-
}
106115
for _, field := range schema.Fields() {
107116
headers = append(headers, field.Name)
108117
}
@@ -119,9 +128,10 @@ func (df *DataFrame) Show() error {
119128
}
120129

121130
func (df *DataFrame) PhysicalPlan() (operators.PhysicalPlan, error) {
122-
plan, err := df.LogicalPlan()
131+
plan, err := df.OptimizedLogicalPlan()
123132
if err != nil {
124133
return nil, err
125134
}
135+
126136
return df.sessionState.CreatePhysicalPlan(plan)
127137
}

0 commit comments

Comments
 (0)