-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathrow.go
More file actions
158 lines (137 loc) · 4.45 KB
/
row.go
File metadata and controls
158 lines (137 loc) · 4.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package wire
import (
"context"
"errors"
"fmt"
"strings"
"github.com/jackc/pgtype"
"github.com/lib/pq/oid"
"github.com/stackql/psql-wire/internal/buffer"
"github.com/stackql/psql-wire/internal/types"
)
// Columns represent a collection of columns
type Columns []Column
// Define writes the table RowDescription headers for the given table and the containing
// columns. The headers have to be written before any data rows could be send back
// to the client.
func (columns Columns) Define(ctx context.Context, writer buffer.Writer) error {
writer.Start(types.ServerRowDescription)
writer.AddInt16(int16(len(columns)))
for _, column := range columns {
column.Define(ctx, writer)
}
return writer.End()
}
// Write writes the given column values back to the client using the predefined
// table column types and format encoders (text/binary).
func (columns Columns) Write(ctx context.Context, writer buffer.Writer, srcs []interface{}) (err error) {
if len(srcs) != len(columns) {
return fmt.Errorf("unexpected columns, %d columns are defined inside the given table but %d were given", len(columns), len(srcs))
}
writer.Start(types.ServerDataRow)
writer.AddInt16(int16(len(columns)))
for index, column := range columns {
err = column.Write(ctx, writer, srcs[index])
if err != nil {
return err
}
}
return writer.End()
}
// Column represents a table column and its attributes such as name, type and
// encode formatter.
// https://www.postgresql.org/docs/8.3/catalog-pg-attribute.html
type Column struct {
Table int32 // table id
Name string // column name
AttrNo int16 // column attribute no (optional)
Oid oid.Oid
Width int16
TypeModifier int32
Format FormatCode
}
// Define writes the column header values to the given writer.
// This method is used to define a column inside RowDescription message defining
// the column type, width, and name.
func (column Column) Define(ctx context.Context, writer buffer.Writer) {
writer.AddString(column.Name)
writer.AddNullTerminate()
writer.AddInt32(column.Table)
writer.AddInt16(column.AttrNo)
writer.AddInt32(int32(column.Oid))
writer.AddInt16(column.Width)
writer.AddInt32(-1) // TODO(Jeroen): type modifiers have not yet been fully implemented. Setting -1 to indicate a undefined value
writer.AddInt16(int16(column.Format))
}
// Write encodes the given source value using the column type definition and connection
// info. The encoded byte buffer is added to the given write buffer. This method
// is used to encode values and return them inside a DataRow message.
//
// For text format (FormatCode=0), if the source is already a string or []byte,
// the raw bytes are written directly without going through pgtype encoding.
// This preserves the exact representation from the backend (e.g. sqlite's "t"/"f"
// for booleans) while still advertising the correct OID in RowDescription.
func (column Column) Write(ctx context.Context, writer buffer.Writer, src interface{}) (err error) {
if ctx.Err() != nil {
return ctx.Err()
}
// For text format, bypass pgtype encoding when the source is already
// a string or []byte. This avoids representation changes (e.g. pgtype.Bool
// encoding "true"/"false" instead of the original "t"/"f").
if column.Format == TextFormat {
if b, ok := asTextBytes(src); ok {
if b == nil {
writer.AddInt32(-1) // NULL
return nil
}
writer.AddInt32(int32(len(b)))
writer.AddBytes(b)
return nil
}
}
ci := TypeInfo(ctx)
if ci == nil {
return errors.New("postgres connection info has not been defined inside the given context")
}
typed, has := ci.DataTypeForOID(uint32(column.Oid))
if !has {
return fmt.Errorf("unknown data type: %T", column)
}
switch typed.Value.(type) {
case *pgtype.Bool:
switch s := src.(type) {
case string:
if strings.ToLower(s) == "null" {
var s2 *bool
src = s2
}
}
}
err = typed.Value.Set(src)
if err != nil {
return err
}
encoder := column.Format.Encoder(typed)
bb, err := encoder(ci, nil)
if err != nil {
return err
}
writer.AddInt32(int32(len(bb)))
writer.AddBytes(bb)
return nil
}
// asTextBytes extracts raw bytes from string or []byte sources for text-format
// passthrough. Returns (nil, true) for NULL-representing strings.
func asTextBytes(src interface{}) ([]byte, bool) {
switch v := src.(type) {
case string:
if strings.ToLower(v) == "null" {
return nil, true
}
return []byte(v), true
case []byte:
return v, true
default:
return nil, false
}
}