Skip to content

Commit 7a8a522

Browse files
committed
feat: write-through cache
Adds a write-through cache implementation to the `storage` package. The cache and its operations serve reads from an in-memory map, while writes are served from Etcd and, if successful, persisted to the in-memory map. This cache is useful for areas where we need to perform frequent range queries over small to moderate numbers of small values types. This implementation is designed to be easy and transparent to add to existing stores.
1 parent fa49e10 commit 7a8a522

11 files changed

Lines changed: 1397 additions & 98 deletions

File tree

server/internal/storage/cache.go

Lines changed: 536 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
package storage
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
"github.com/stretchr/testify/require"
8+
)
9+
10+
// These tests directly invoke the unexported cache methods to simulate
11+
// different event and operation orders that are difficult to reproduce reliably
12+
// with the public interface.
13+
func TestCacheOrdering(t *testing.T) {
14+
t.Run("stale put after watch delete", func(t *testing.T) {
15+
// Simulates when a write-through put's UpdateCache fires after the
16+
// watch has already processed the delete for the same key at a higher
17+
// revision. The put should not resurrect the key.
18+
c := newOrderingCache()
19+
item := &cacheTestItem{K: "foo", V: "bar"}
20+
21+
seedCache(c, "foo", "bar", 5)
22+
simulateWatchDelete(c, "foo", 10) // watch advances to R=10, key gone
23+
24+
// Delayed write-through put at R=5 — must not resurrect the key because
25+
// it's from an older revision.
26+
c.put(item, 5)
27+
28+
_, err := c.get("foo")
29+
assert.ErrorIs(t, err, ErrNotFound, "stale write-through put must not resurrect a watched delete")
30+
})
31+
32+
t.Run("stale delete after watch put", func(t *testing.T) {
33+
// Simulates when a write-through delete's UpdateCache fires after the
34+
// watch has already delivered a put at a higher revision. The delete
35+
// should not erase the newer watched value.
36+
c := newOrderingCache()
37+
item := &cacheTestItem{K: "foo", V: "new"}
38+
39+
simulateWatchPut(c, item, 10) // watch advances to R=10, key present
40+
41+
// Delayed write-through delete at R=5 — must not erase the key.
42+
c.delete("foo", 5)
43+
44+
val, err := c.get("foo")
45+
require.NoError(t, err)
46+
assert.Equal(t, "new", val.V, "stale write-through delete must not erase a newer watched put")
47+
})
48+
49+
t.Run("tombstone blocks stale put", func(t *testing.T) {
50+
// Simulates when write-through delete fires before its watch event,
51+
// writing a tombstone, and a stale watch put (for a revision before the
52+
// delete) must not overwrite it.
53+
c := newOrderingCache()
54+
staleItem := &cacheTestItem{K: "foo", V: "old"}
55+
56+
seedCache(c, "foo", "old", 5)
57+
c.delete("foo", 10) // write-through delete fires first, tombstone at R=10
58+
59+
// A stale watch put at R=5 must not overwrite the tombstone.
60+
c.mu.Lock()
61+
c.unlockedWrite(staleItem, 5)
62+
c.mu.Unlock()
63+
64+
_, err := c.get("foo")
65+
assert.ErrorIs(t, err, ErrNotFound, "tombstone must block a stale watch put")
66+
})
67+
68+
t.Run("tombstone allows newer put", func(t *testing.T) {
69+
// Simulates a re-create after delete: write-through delete fires first
70+
// (creating a tombstone), then a watch put at a higher revision
71+
// arrives. The put should win because it represents a genuinely newer
72+
// write in etcd's ordering.
73+
c := newOrderingCache()
74+
newItem := &cacheTestItem{K: "foo", V: "recreated"}
75+
76+
seedCache(c, "foo", "old", 5)
77+
c.delete("foo", 8) // write-through fires first
78+
79+
// Watch delivers a put at R=12 (a re-create after the delete).
80+
simulateWatchPut(c, newItem, 12)
81+
82+
val, err := c.get("foo")
83+
require.NoError(t, err)
84+
assert.Equal(t, "recreated", val.V, "watch put at higher revision must overwrite tombstone")
85+
})
86+
87+
t.Run("write-through delete then watch event", func(t *testing.T) {
88+
// Simulates the normal ordering (write-through fires before watch) for
89+
// a delete: the tombstone is written by the write-through, then the
90+
// matching watch event cleans it up. Neither the tombstone nor the
91+
// prior value should be readable after the watch event.
92+
c := newOrderingCache()
93+
94+
seedCache(c, "foo", "bar", 5)
95+
c.delete("foo", 10) // write-through fires first, tombstone at R=10
96+
97+
// The tombstone should be invisible to readers immediately.
98+
_, err := c.get("foo")
99+
assert.ErrorIs(t, err, ErrNotFound, "tombstone must be invisible to readers")
100+
assert.Len(t, c.tombstones, 1, "tombstone should be pending cleanup")
101+
102+
// The matching watch event arrives at the same revision and cleans up.
103+
simulateWatchDelete(c, "foo", 10)
104+
105+
_, err = c.get("foo")
106+
assert.ErrorIs(t, err, ErrNotFound)
107+
assert.Empty(t, c.tombstones, "tombstone must be purged once watch catches up")
108+
})
109+
}
110+
111+
// cacheTestItem is a minimal Value implementation for white-box cache tests.
112+
type cacheTestItem struct {
113+
StoredValue
114+
K string `json:"k"`
115+
V string `json:"v"`
116+
}
117+
118+
func newOrderingCache() *cache[*cacheTestItem] {
119+
return &cache[*cacheTestItem]{
120+
items: map[string]*cachedValue{},
121+
key: func(v *cacheTestItem) string { return v.K },
122+
}
123+
}
124+
125+
// seed writes a key directly at a given revision, bypassing all guards.
126+
// This simulates the state of the cache after an initial load or prior event.
127+
func seedCache(c *cache[*cacheTestItem], key, value string, revision int64) {
128+
c.unlockedWrite(&cacheTestItem{K: key, V: value}, revision)
129+
}
130+
131+
// simulateWatchPut simulates a watch event delivering a put at the given revision.
132+
// In production this runs under c.mu held by the Start() watch handler.
133+
func simulateWatchPut(c *cache[*cacheTestItem], item *cacheTestItem, revision int64) {
134+
c.mu.Lock()
135+
defer c.mu.Unlock()
136+
c.lastWatchRevision = revision
137+
c.unlockedWrite(item, revision)
138+
c.purgeTombstones(revision)
139+
}
140+
141+
// simulateWatchDelete simulates a watch event delivering a delete at the given revision.
142+
func simulateWatchDelete(c *cache[*cacheTestItem], key string, revision int64) {
143+
c.mu.Lock()
144+
defer c.mu.Unlock()
145+
c.lastWatchRevision = revision
146+
c.unlockedDelete(key, revision)
147+
c.purgeTombstones(revision)
148+
}

0 commit comments

Comments
 (0)