Skip to content

Commit b4b0406

Browse files
committed
init commit
0 parents  commit b4b0406

30 files changed

Lines changed: 1863 additions & 0 deletions

.gitignore

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
*.iml
2+
*.ipr
3+
*.iws
4+
.idea/
5+
/build/
6+
/dist/
7+
/out/

README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
## Tiny Dataframe
2+
3+
Inspired by [FrostDB](https://github.com/polarsignals/frostdb)
4+
and [Arrow DataFusion](https://github.com/apache/arrow-datafusion).
5+
6+
### Why Dataframe
7+
8+
Dataframe removes the complexity of handling `SQL parsing`, `SQL rewriting`, `Binding`, `SQL Query Planner` etc. Once
9+
the dataframe is mature, we can easily integrate it with an SQL engine.

go.mod

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
module tiny_dataframe
2+
3+
go 1.21
4+
5+
require (
6+
github.com/apache/arrow/go/v12 v12.0.1
7+
github.com/olekukonko/tablewriter v0.0.5
8+
github.com/parquet-go/parquet-go v0.20.0
9+
github.com/stretchr/testify v1.8.4
10+
golang.org/x/sync v0.1.0
11+
)
12+
13+
require (
14+
github.com/andybalholm/brotli v1.0.5 // indirect
15+
github.com/apache/thrift v0.16.0 // indirect
16+
github.com/davecgh/go-spew v1.1.1 // indirect
17+
github.com/goccy/go-json v0.9.11 // indirect
18+
github.com/golang/snappy v0.0.4 // indirect
19+
github.com/google/flatbuffers v2.0.8+incompatible // indirect
20+
github.com/google/uuid v1.3.0 // indirect
21+
github.com/klauspost/asmfmt v1.3.2 // indirect
22+
github.com/klauspost/compress v1.16.7 // indirect
23+
github.com/klauspost/cpuid/v2 v2.1.0 // indirect
24+
github.com/kr/text v0.2.0 // indirect
25+
github.com/mattn/go-runewidth v0.0.9 // indirect
26+
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
27+
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
28+
github.com/pierrec/lz4/v4 v4.1.18 // indirect
29+
github.com/pmezard/go-difflib v1.0.0 // indirect
30+
github.com/segmentio/encoding v0.3.6 // indirect
31+
github.com/zeebo/xxh3 v1.0.2 // indirect
32+
golang.org/x/mod v0.8.0 // indirect
33+
golang.org/x/sys v0.10.0 // indirect
34+
golang.org/x/tools v0.6.0 // indirect
35+
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
36+
gopkg.in/yaml.v3 v3.0.1 // indirect
37+
)

pkg/a_engine/context.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package engine
2+
3+
import (
4+
"time"
5+
dataframe "tiny_dataframe/pkg/b_dataframe"
6+
physicalplan "tiny_dataframe/pkg/d_physicalplan"
7+
datasource "tiny_dataframe/pkg/f_storage_engine"
8+
containers "tiny_dataframe/pkg/g_containers"
9+
)
10+
11+
type ExecContext struct {
12+
SessionID string
13+
State *physicalplan.ExecState
14+
}
15+
16+
func NewContext() *ExecContext {
17+
sessionId := "session_" + time.Now().String()
18+
return &ExecContext{
19+
SessionID: sessionId,
20+
State: physicalplan.NewExecState(sessionId),
21+
}
22+
}
23+
24+
func (c *ExecContext) Parquet(path string, schema containers.ISchema) (dataframe.IDataFrame, error) {
25+
src, err := datasource.NewParquetDataSource(path, schema)
26+
if err != nil {
27+
return nil, err
28+
}
29+
30+
return dataframe.NewDataFrame(c.State).Scan(path, src, nil), nil
31+
}

pkg/a_engine/engine_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package engine
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
logicalplan "tiny_dataframe/pkg/c_logical_plan"
7+
)
8+
9+
func TestParquetFile(t *testing.T) {
10+
ctx := NewContext()
11+
df, err := ctx.Parquet("../../test/data/c1_c2_int64.parquet", nil)
12+
if err != nil {
13+
t.Error(err)
14+
}
15+
16+
df = df.
17+
Project(
18+
logicalplan.Column{Name: "c1"},
19+
logicalplan.Column{Name: "c2"},
20+
).
21+
Filter(logicalplan.Eq(
22+
logicalplan.Column{Name: "c1"},
23+
logicalplan.LiteralInt64{Val: 100},
24+
))
25+
26+
logicalPlan, _ := df.LogicalPlan()
27+
fmt.Println(logicalplan.PrettyPrint(logicalPlan, 0))
28+
29+
err = df.Show()
30+
if err != nil {
31+
t.Error(err)
32+
}
33+
}

pkg/b_dataframe/dataframe.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
package dataframe
2+
3+
import (
4+
"context"
5+
"github.com/olekukonko/tablewriter"
6+
"os"
7+
execution "tiny_dataframe/pkg/e_exec_runtime"
8+
9+
logicalplan "tiny_dataframe/pkg/c_logical_plan"
10+
phyiscalplan "tiny_dataframe/pkg/d_physicalplan"
11+
"tiny_dataframe/pkg/d_physicalplan/operators"
12+
datasource "tiny_dataframe/pkg/f_storage_engine"
13+
containers "tiny_dataframe/pkg/g_containers"
14+
)
15+
16+
type IDataFrame interface {
17+
Scan(path string, source datasource.TableReader, proj []string) IDataFrame
18+
Project(expr ...logicalplan.Expr) IDataFrame
19+
Filter(expr logicalplan.Expr) IDataFrame
20+
Aggregate(groupBy []logicalplan.Expr, aggregateExpr []logicalplan.AggregateExpr) IDataFrame
21+
22+
Schema() (containers.ISchema, error)
23+
Collect(ctx context.Context, callback datasource.Callback) error
24+
Show() error
25+
26+
LogicalPlan() (logicalplan.LogicalPlan, error)
27+
PhysicalPlan() (operators.PhysicalPlan, error)
28+
}
29+
30+
type DataFrame struct {
31+
sessionState *phyiscalplan.ExecState
32+
planBuilder *logicalplan.Builder
33+
}
34+
35+
func NewDataFrame(sessionState *phyiscalplan.ExecState) IDataFrame {
36+
return &DataFrame{sessionState: sessionState, planBuilder: logicalplan.NewBuilder()}
37+
}
38+
39+
func (df *DataFrame) Scan(path string, source datasource.TableReader, proj []string) IDataFrame {
40+
df.planBuilder = df.planBuilder.Input(path, source, proj)
41+
return df
42+
}
43+
44+
func (df *DataFrame) Project(proj ...logicalplan.Expr) IDataFrame {
45+
//TODO: Fix builder pattern
46+
df.planBuilder = df.planBuilder.Project(proj...)
47+
return df
48+
}
49+
50+
func (df *DataFrame) Filter(predicate logicalplan.Expr) IDataFrame {
51+
df.planBuilder = df.planBuilder.Filter(predicate)
52+
return df
53+
}
54+
55+
func (df *DataFrame) Aggregate(groupBy []logicalplan.Expr, aggExpr []logicalplan.AggregateExpr) IDataFrame {
56+
df.planBuilder = df.planBuilder.Aggregate(groupBy, aggExpr)
57+
return df
58+
}
59+
60+
func (df *DataFrame) Collect(ctx context.Context, callback datasource.Callback) error {
61+
df.planBuilder = df.planBuilder.Output(callback)
62+
63+
physicalPlan, err := df.PhysicalPlan()
64+
if err != nil {
65+
return err
66+
}
67+
return physicalPlan.Execute(df.TaskContext(), callback)
68+
}
69+
70+
func (df *DataFrame) TaskContext() execution.TaskContext {
71+
return df.sessionState.TaskContext()
72+
}
73+
74+
func (df *DataFrame) Schema() (containers.ISchema, error) {
75+
build, err := df.planBuilder.Build()
76+
if err != nil {
77+
return nil, err
78+
}
79+
80+
return build.Schema(), nil
81+
}
82+
83+
func (df *DataFrame) LogicalPlan() (logicalplan.LogicalPlan, error) {
84+
return df.planBuilder.Build()
85+
}
86+
87+
func (df *DataFrame) Show() error {
88+
89+
batches := make([]containers.IBatch, 0)
90+
err := df.Collect(context.TODO(), func(ctx context.Context, batch containers.IBatch) error {
91+
batches = append(batches, batch)
92+
return nil
93+
})
94+
95+
if err != nil {
96+
return err
97+
}
98+
table := tablewriter.NewWriter(os.Stdout)
99+
100+
// 1. add headers
101+
headers := make([]string, 0)
102+
schema, err := df.Schema()
103+
if err != nil {
104+
return err
105+
}
106+
for _, field := range schema.Fields() {
107+
headers = append(headers, field.Name)
108+
}
109+
table.SetHeader(headers)
110+
111+
// 2. add data
112+
for _, batch := range batches {
113+
table.AppendBulk(batch.StringTable())
114+
}
115+
116+
// 3. render
117+
table.Render()
118+
return nil
119+
}
120+
121+
func (df *DataFrame) PhysicalPlan() (operators.PhysicalPlan, error) {
122+
plan, err := df.LogicalPlan()
123+
if err != nil {
124+
return nil, err
125+
}
126+
return df.sessionState.CreatePhysicalPlan(plan)
127+
}
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package logicalplan
2+
3+
import (
4+
"fmt"
5+
"github.com/apache/arrow/go/v12/arrow"
6+
"strconv"
7+
containers "tiny_dataframe/pkg/g_containers"
8+
)
9+
10+
type Expr interface {
11+
// DataType returns the data type of the expression. It returns error as well.
12+
DataType(schema containers.ISchema) (arrow.DataType, error)
13+
14+
// ColumnsUsed returns the columns used in the expression.
15+
//TODO: replace it with ColumnsUsedExprs() []Expr
16+
ColumnsUsed(input LogicalPlan) []arrow.Field
17+
String() string
18+
}
19+
20+
var _ Expr = Column{}
21+
var _ Expr = Alias{}
22+
23+
var _ Expr = BooleanBinaryExpr{}
24+
var _ Expr = MathExpr{}
25+
var _ Expr = AggregateExpr{}
26+
27+
var _ Expr = LiteralString{}
28+
var _ Expr = LiteralInt64{}
29+
var _ Expr = LiteralFloat64{}
30+
31+
// ---------- Column ----------
32+
33+
type Column struct {
34+
// TODO: should this have arrow.Field?
35+
Name string
36+
}
37+
38+
func (col Column) DataType(schema containers.ISchema) (arrow.DataType, error) {
39+
for _, f := range schema.Fields() {
40+
if f.Name == col.Name {
41+
return f.Type, nil
42+
}
43+
}
44+
return nil, fmt.Errorf("column %s not found", col.Name)
45+
}
46+
47+
func (col Column) ColumnsUsed(input LogicalPlan) []arrow.Field {
48+
schema := input.Schema()
49+
for _, f := range schema.Fields() {
50+
if f.Name == col.Name {
51+
return []arrow.Field{f}
52+
}
53+
}
54+
panic(fmt.Sprintf("column %s not found", col.Name))
55+
return []arrow.Field{}
56+
}
57+
58+
func (col Column) String() string {
59+
return "#" + col.Name
60+
}
61+
62+
// ---------- Alias ----------
63+
64+
type Alias struct {
65+
Expr Expr
66+
Alias string
67+
}
68+
69+
func (expr Alias) DataType(schema containers.ISchema) (arrow.DataType, error) {
70+
return expr.Expr.DataType(schema)
71+
}
72+
73+
func (expr Alias) ColumnsUsed(input LogicalPlan) []arrow.Field {
74+
return expr.Expr.ColumnsUsed(input)
75+
}
76+
77+
func (expr Alias) String() string {
78+
return fmt.Sprintf("%s as %s", expr.Expr.String(), expr.Alias)
79+
}
80+
81+
// ---------- Literals ----------
82+
83+
type LiteralString struct {
84+
Val string
85+
}
86+
87+
func (lit LiteralString) DataType(schema containers.ISchema) (arrow.DataType, error) {
88+
return arrow.BinaryTypes.String, nil
89+
}
90+
91+
func (lit LiteralString) ColumnsUsed(input LogicalPlan) []arrow.Field {
92+
return nil
93+
}
94+
95+
func (lit LiteralString) String() string {
96+
return fmt.Sprintf("'%s'", lit.Val)
97+
}
98+
99+
type LiteralInt64 struct {
100+
Val int64
101+
}
102+
103+
func (lit LiteralInt64) DataType(schema containers.ISchema) (arrow.DataType, error) {
104+
return arrow.PrimitiveTypes.Int64, nil
105+
}
106+
107+
func (lit LiteralInt64) ColumnsUsed(input LogicalPlan) []arrow.Field {
108+
return nil
109+
}
110+
111+
func (lit LiteralInt64) String() string {
112+
return strconv.Itoa(int(lit.Val))
113+
}
114+
115+
type LiteralFloat64 struct {
116+
Val float64
117+
}
118+
119+
func (lit LiteralFloat64) DataType(schema containers.ISchema) (arrow.DataType, error) {
120+
return arrow.PrimitiveTypes.Float64, nil
121+
}
122+
123+
func (lit LiteralFloat64) ColumnsUsed(input LogicalPlan) []arrow.Field {
124+
return nil
125+
}
126+
127+
func (lit LiteralFloat64) String() string {
128+
return strconv.FormatFloat(lit.Val, 'f', -1, 64)
129+
}

0 commit comments

Comments
 (0)