@@ -6,12 +6,13 @@ import (
66 "encoding/json"
77 "fmt"
88
9+ // Postgres driver for migrate.
10+ _ "github.com/golang-migrate/migrate/database/postgres"
11+ "github.com/lib/pq"
12+
913 "github.com/get-eventually/go-eventually"
1014 "github.com/get-eventually/go-eventually/eventstore"
1115 "github.com/get-eventually/go-eventually/eventstore/stream"
12- "github.com/lib/pq"
13-
14- _ "github.com/golang-migrate/migrate/database/postgres" // postgres driver for migrate
1516)
1617
1718var (
@@ -26,6 +27,7 @@ type EventStore struct {
2627 registry eventstore.Registry
2728}
2829
30+ // NewEventStore creates a new EventStore using the database connection pool provided.
2931func NewEventStore (db * sql.DB ) EventStore {
3032 return EventStore {
3133 db : db ,
@@ -59,28 +61,28 @@ func (st EventStore) Stream(
5961
6062 switch t := target .(type ) {
6163 case stream.All :
64+ args = append (args , selectt .From )
6265 query = `SELECT * FROM events
6366 WHERE global_sequence_number >= $1
6467 ORDER BY global_sequence_number ASC`
65- args = append (args , selectt .From )
6668
6769 case stream.ByType :
70+ args = append (args , selectt .From , string (t ))
6871 query = `SELECT * FROM events
6972 WHERE global_sequence_number >= $1 AND stream_type = $2
7073 ORDER BY global_sequence_number ASC`
71- args = append (args , selectt .From , string (t ))
7274
7375 case stream.ByTypes :
76+ args = append (args , selectt .From , pq .Array (t ))
7477 query = `SELECT * FROM events
7578 WHERE global_sequence_number >= $1 AND stream_type = ANY($2)
7679 ORDER BY global_sequence_number ASC`
77- args = append (args , selectt .From , pq .Array (t ))
7880
7981 case stream.ByID :
82+ args = append (args , selectt .From , t .Type , t .Name )
8083 query = `SELECT * FROM events
8184 WHERE "version" >= $1 AND stream_type = $2 AND stream_id = $3
8285 ORDER BY "version" ASC`
83- args = append (args , selectt .From , t .Type , t .Name )
8486
8587 default :
8688 return fmt .Errorf ("postgres.EventStore: unsupported stream target: %T" , t )
0 commit comments