Skip to content

Commit 0905973

Browse files
committed
feat: watch improvements
Makes several substantial improvements to the `storage.Watch` operations: - `Watch` now does a synchronous `Get` operation before the starting the watch to get the current value(s) and to get the start revision for the watch. Callers no longer have to do this operation themselves. - `Watch` now restarts itself automatically using the most recently- fetched revision. We were previously repeating this restart logic in every caller. Restarts are rate-limited to 1 per second. - `Watch` reports errors over an error channel, matching the convention that we've established everywhere. We had an unused `Until` operation and I opted to remove it rather than update it for these changes.
1 parent fa785be commit 0905973

10 files changed

Lines changed: 314 additions & 238 deletions

File tree

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ require (
152152
golang.org/x/sync v0.19.0 // indirect
153153
golang.org/x/sys v0.39.0 // indirect
154154
golang.org/x/text v0.32.0 // indirect
155-
golang.org/x/time v0.9.0 // indirect
155+
golang.org/x/time v0.9.0
156156
golang.org/x/tools v0.40.0 // indirect
157157
google.golang.org/genproto/googleapis/api v0.0.0-20251022142026-3a174f9686a8 // indirect
158158
google.golang.org/genproto/googleapis/rpc v0.0.0-20251213004720-97cd9d5aeac2 // indirect

server/internal/election/candidate.go

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ func NewCandidate(
6060
ttl: ttl,
6161
errCh: make(chan error, 1),
6262
onClaim: onClaim,
63+
watchOp: store.Watch(electionName),
6364
}
6465
}
6566

@@ -105,7 +106,9 @@ func (c *Candidate) Start(ctx context.Context) error {
105106
c.done = done
106107
c.ticker = ticker
107108

108-
go c.watch(ctx)
109+
if err := c.watch(ctx, done); err != nil {
110+
return err
111+
}
109112

110113
if c.IsLeader() {
111114
c.logger.Debug().Msg("i am the current leader")
@@ -129,11 +132,7 @@ func (c *Candidate) Stop(ctx context.Context) error {
129132
return nil
130133
}
131134

132-
if c.watchOp != nil {
133-
c.watchOp.Close()
134-
c.watchOp = nil
135-
}
136-
135+
c.watchOp.Close()
137136
c.done <- struct{}{}
138137
c.running = false
139138

@@ -221,33 +220,42 @@ func (c *Candidate) attemptClaim(ctx context.Context) error {
221220
return nil
222221
}
223222

224-
func (c *Candidate) watch(ctx context.Context) {
225-
c.mu.Lock()
226-
defer c.mu.Unlock()
227-
223+
func (c *Candidate) watch(ctx context.Context, done chan struct{}) error {
228224
c.logger.Debug().Msg("starting watch")
229225

230-
c.watchOp = c.store.Watch(c.electionName)
231-
err := c.watchOp.Watch(ctx, func(evt *storage.Event[*StoredElection]) {
226+
err := c.watchOp.Watch(ctx, func(evt *storage.Event[*StoredElection]) error {
232227
switch evt.Type {
233228
case storage.EventTypeDelete:
234229
// The delete event will fire simultaneously with the ticker in some
235230
// types of outages, so the claim might have already been created
236231
// when this handler runs, even though its for a 'delete' event.
237232
if err := c.lockAndCheckClaim(ctx); err != nil {
238-
c.errCh <- err
233+
return err
239234
}
240235
case storage.EventTypeError:
241-
c.logger.Debug().Err(evt.Err).Msg("encountered a watch error")
242-
if errors.Is(evt.Err, storage.ErrWatchClosed) && c.running {
243-
// Restart the watch if we're still running.
244-
c.watch(ctx)
245-
}
236+
c.logger.Warn().Err(evt.Err).Msg("encountered a watch error")
237+
case storage.EventTypeUnknown:
238+
c.logger.Debug().Msg("encountered unknown watch event type")
246239
}
240+
return nil
247241
})
248242
if err != nil {
249-
c.errCh <- fmt.Errorf("failed to start watch: %w", err)
243+
return fmt.Errorf("failed to start watch: %w", err)
250244
}
245+
go func() {
246+
for {
247+
select {
248+
case <-ctx.Done():
249+
return
250+
case <-done:
251+
return
252+
case err := <-c.watchOp.Error():
253+
c.errCh <- err
254+
}
255+
}
256+
}()
257+
258+
return nil
251259
}
252260

253261
func (c *Candidate) release(ctx context.Context) error {

server/internal/election/candidate_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,12 @@ import (
1616
func TestCandidate(t *testing.T) {
1717
server := storagetest.NewEtcdTestServer(t)
1818
client := server.Client(t)
19-
loggerFactory := testutils.LoggerFactory(t)
2019
store := election.NewElectionStore(client, uuid.NewString())
21-
electionSvc := election.NewService(store, loggerFactory)
2220

2321
t.Run("basic functionality", func(t *testing.T) {
22+
loggerFactory := testutils.LoggerFactory(t)
23+
electionSvc := election.NewService(store, loggerFactory)
24+
2425
ctx := t.Context()
2526
name := election.Name(uuid.NewString())
2627
candidate := electionSvc.NewCandidate(name, "host-1", time.Second)
@@ -62,6 +63,9 @@ func TestCandidate(t *testing.T) {
6263
})
6364

6465
t.Run("multiple candidates", func(t *testing.T) {
66+
loggerFactory := testutils.LoggerFactory(t)
67+
electionSvc := election.NewService(store, loggerFactory)
68+
6569
bElected := make(chan struct{}, 1)
6670

6771
ctx := t.Context()

server/internal/migrate/runner.go

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,10 @@ func (r *Runner) Run(ctx context.Context) error {
7474
ctx, cancel := context.WithTimeout(ctx, migrationTimeout)
7575
defer cancel()
7676

77-
r.watch(ctx)
77+
r.watchOp = r.store.Revision.Watch()
78+
if err := r.watch(ctx); err != nil {
79+
return err
80+
}
7881
defer r.watchOp.Close()
7982

8083
r.candidate.AddHandlers(func(_ context.Context) {
@@ -99,25 +102,15 @@ func (r *Runner) Run(ctx context.Context) error {
99102
}
100103
}
101104

102-
func (r *Runner) watch(ctx context.Context) {
105+
func (r *Runner) watch(ctx context.Context) error {
103106
r.logger.Debug().Msg("starting watch")
104107

105108
if len(r.migrations) == 0 {
106-
r.errCh <- errors.New("watch called with empty migrations list")
107-
return
109+
return errors.New("watch called with empty migrations list")
108110
}
109111
targetRevision := r.migrations[len(r.migrations)-1].Identifier()
110112

111-
// Ensure that any previous watches were closed. Close thread-safe and
112-
// idempotent.
113-
if r.watchOp != nil {
114-
r.watchOp.Close()
115-
}
116-
117-
// Since we're not specifying a start version on the watch, this will always
118-
// fire for the current revision.
119-
r.watchOp = r.store.Revision.Watch()
120-
err := r.watchOp.Watch(ctx, func(evt *storage.Event[*StoredRevision]) {
113+
err := r.watchOp.Watch(ctx, func(evt *storage.Event[*StoredRevision]) error {
121114
switch evt.Type {
122115
case storage.EventTypePut:
123116
if evt.Value.Identifier == targetRevision {
@@ -126,15 +119,18 @@ func (r *Runner) watch(ctx context.Context) {
126119
})
127120
}
128121
case storage.EventTypeError:
129-
r.logger.Debug().Err(evt.Err).Msg("encountered a watch error")
130-
if errors.Is(evt.Err, storage.ErrWatchClosed) {
131-
r.watch(ctx)
132-
}
122+
r.logger.Warn().Err(evt.Err).Msg("encountered error in watch")
123+
case storage.EventTypeUnknown:
124+
r.logger.Debug().Msg("encountered unknown watch event type")
133125
}
126+
return nil
134127
})
135128
if err != nil {
136-
r.errCh <- fmt.Errorf("failed to start watch: %w", err)
129+
return fmt.Errorf("failed to start watch: %w", err)
137130
}
131+
r.watchOp.PropagateErrors(ctx, r.errCh)
132+
133+
return nil
138134
}
139135

140136
func (r *Runner) runMigrations(ctx context.Context) error {

server/internal/scheduler/service.go

Lines changed: 13 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package scheduler
22

33
import (
44
"context"
5-
"errors"
65
"fmt"
76
"path"
87
"sync"
@@ -68,17 +67,9 @@ func (s *Service) Start(ctx context.Context) error {
6867
scheduler.WithDistributedElector(s.elector)
6968
s.scheduler = scheduler
7069

71-
jobs, err := s.store.GetAll().Exec(ctx)
72-
if err != nil {
73-
s.logger.Debug().Err(err).Msg("failed to retrieve scheduled jobs from store")
74-
}
75-
for _, job := range jobs {
76-
if err := s.registerJob(ctx, job); err != nil {
77-
return fmt.Errorf("failed to register scheduled job: %w", err)
78-
}
70+
if err := s.watchJobChanges(ctx); err != nil {
71+
return err
7972
}
80-
81-
go s.watchJobChanges(ctx)
8273
s.scheduler.StartAsync()
8374

8475
return nil
@@ -163,36 +154,34 @@ func (s *Service) ListScheduledJobs() []string {
163154
}
164155
return ids
165156
}
166-
func (s *Service) watchJobChanges(ctx context.Context) {
157+
func (s *Service) watchJobChanges(ctx context.Context) error {
167158
s.logger.Debug().Msg("watching for scheduled job changes")
168159

169160
s.watchOp = s.store.WatchJobs()
170-
err := s.watchOp.Watch(ctx, func(e *storage.Event[*StoredScheduledJob]) {
161+
err := s.watchOp.Watch(ctx, func(e *storage.Event[*StoredScheduledJob]) error {
171162
switch e.Type {
172163
case storage.EventTypePut:
173164
s.logger.Debug().Str("job_id", e.Value.ID).Msg("detected job creation or update")
174165
if err := s.registerJob(ctx, e.Value); err != nil {
175-
s.errCh <- fmt.Errorf("failed to register job from watch: %w", err)
166+
return fmt.Errorf("failed to register job from watch: %w", err)
176167
}
177168
case storage.EventTypeDelete:
178169
jobID := path.Base(e.Key)
179170
s.logger.Debug().Str("job_id", jobID).Msg("detected job deletion")
180171
s.UnregisterJob(jobID)
181172
case storage.EventTypeError:
182-
s.logger.Debug().Err(e.Err).Msg("encountered a watch error")
183-
if errors.Is(e.Err, storage.ErrWatchClosed) {
184-
defer s.watchJobChanges(ctx)
185-
}
186-
default:
187-
s.logger.Warn().
188-
Err(e.Err).
189-
Str("event_type", string(e.Type)).
190-
Msg("unhandled event type in scheduled job watch")
173+
s.logger.Warn().Err(e.Err).Msg("encountered error in watch")
174+
case storage.EventTypeUnknown:
175+
s.logger.Debug().Msg("encountered unknown watch event type")
191176
}
177+
return nil
192178
})
193179
if err != nil {
194-
s.logger.Debug().Err(err).Msg("job watch exited with error")
180+
return fmt.Errorf("failed to initialize job watch: %w", err)
195181
}
182+
s.watchOp.PropagateErrors(ctx, s.errCh)
183+
184+
return nil
196185
}
197186

198187
func (s *Service) Error() <-chan error {

server/internal/storage/errors.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,3 @@ var ErrDuplicateKeysInTransaction = errors.New("duplicate keys in transaction")
2424
// ErrWatchAlreadyInProgress indicates that the WatchOp has already been started
2525
// and cannot be started again until it's closed.
2626
var ErrWatchAlreadyInProgress = errors.New("watch already in progress")
27-
28-
// ErrWatchUntilTimedOut indicates that the condition given to Watch.Until was
29-
// not met before the given timeout.
30-
var ErrWatchUntilTimedOut = errors.New("timed out waiting for watch condition")
31-
32-
// ErrWatchClosed indicates that the server has forced the watch to close.
33-
// Callers should either restart or recreate the watch in that case.
34-
var ErrWatchClosed = errors.New("watch closed by server")

server/internal/storage/interface.go

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -94,17 +94,15 @@ type DeleteValueOp[V Value] interface {
9494
type EventType string
9595

9696
const (
97-
EventTypePut = "put"
98-
EventTypeDelete = "delete"
99-
EventTypeError = "error"
100-
EventTypeUnknown = "unknown"
97+
EventTypePut EventType = "put"
98+
EventTypeDelete EventType = "delete"
99+
EventTypeError EventType = "error"
100+
EventTypeUnknown EventType = "unknown"
101101
)
102102

103103
// Event is generated by a modification to the watched key.
104104
type Event[V Value] struct {
105-
// Err will be non-nil when Type is "error". The error will be an
106-
// ErrWatchClosed if the server forced the watch to close. In that case,
107-
// callers are able to call Watch() again to restart the watch.
105+
// Err will be non-nil when Type is "error".
108106
Err error
109107
Type EventType
110108
Key string
@@ -115,16 +113,20 @@ type Event[V Value] struct {
115113

116114
// WatchOp watches one or more keys for modifications.
117115
type WatchOp[V Value] interface {
118-
// Watch persistently watches for modifications until Close is called or
119-
// until Etcd returns an error. Watch will automatically call Close in case
120-
// of an error. This method is non-blocking.
121-
Watch(ctx context.Context, handle func(e *Event[V])) error
122-
// Until blocks until either the timeout has elapsed or until handle returns
123-
// true. Until automatically calls Close before returning.
124-
Until(ctx context.Context, timeout time.Duration, handle func(e *Event[V]) bool) error
125-
// Close cancels the active watch and enables callers to use Watch or Until
126-
// again.
116+
// Watch performs an initial Get of current items, calls handle for each,
117+
// then persistently watches for modifications, calling handle for each
118+
// event. The initial Get is blocking, and then it starts the watch in a
119+
// goroutine. Errors originating from the watch or handler are reported via
120+
// the Error() channel.
121+
Watch(ctx context.Context, handle func(e *Event[V]) error) error
122+
// Close cancels the active watch and enables callers to use Watch again.
127123
Close()
124+
// Error reports errors that originate from the watch or from the handler.
125+
Error() <-chan error
126+
// PropagateErrors will propagate errors from the watch's Error() channel to
127+
// the given error channel in a goroutine until the given context is
128+
// complete.
129+
PropagateErrors(ctx context.Context, ch chan error)
128130
}
129131

130132
// VersionUpdater are the methods that an operation can implement to support

0 commit comments

Comments
 (0)