Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions pkg/lumera/chainerrors/chainerrors.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,28 @@ func IsHealVerificationAlreadySubmitted(err error) bool {
return strings.Contains(err.Error(), "verification already submitted by creator")
}

// IsEpochReportDuplicate reports whether err corresponds to the chain
// rejecting MsgSubmitEpochReport because a report for the same
// (epoch, reporter) tuple has already been accepted. Chain wraps
// audittypes.ErrDuplicateReport (registered code 4) with the discriminating
// phrase "report already submitted for this epoch" at
// msg_submit_epoch_report.go:142.
//
// Callers in the host-reporter submit path use this to distinguish
// "chain has our report already (drained proof rows are stale; do NOT
// requeue)" from "transient submit failure (must requeue drained rows
// so the next tick can retry)".
func IsEpochReportDuplicate(err error) bool {
if err == nil {
return false
}
if errors.Is(err, audittypes.ErrDuplicateReport) {
return true
}
// Substring fallback — chain phrase from msg_submit_epoch_report.go:142.
return strings.Contains(strings.ToLower(err.Error()), "report already submitted for this epoch")
}

// IsRecheckEvidenceAlreadySubmitted reports whether err corresponds to the
// chain rejecting a duplicate recheck-evidence submission. Chain wraps
// audittypes.ErrInvalidRecheckEvidence (a generic envelope for ALL recheck
Expand Down
151 changes: 99 additions & 52 deletions pkg/storage/queries/recheck.go

Large diffs are not rendered by default.

94 changes: 70 additions & 24 deletions pkg/storage/queries/recheck_schema_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,18 @@ func TestAddColumnIfMissing_Idempotent(t *testing.T) {
require.NoError(t, addColumnIfMissing(ctx, db, "t", "extra", `ALTER TABLE t ADD COLUMN extra TEXT NOT NULL DEFAULT 'x';`))
}

// TestMigrateStorageRecheckSubmissionsPK covers the C2 migration: an old
// DB with PK (epoch_id, ticket_id) is migrated to PK (epoch_id, ticket_id,
// target_account) preserving all data. Idempotent on already-migrated DBs.
func TestMigrateStorageRecheckSubmissionsPK(t *testing.T) {
// TestMigrateStorageRecheckSubmissionsPK_CollapseToTicketKey covers the
// PR286 F3 migration: an old DB with PK (epoch_id, ticket_id, target_account)
// is migrated down to PK (epoch_id, ticket_id) so local dedup matches chain
// replay (one recheck per (epoch, ticket, creator)). Multiple target rows
// for the same (epoch, ticket) collapse to one — preferring 'submitted'
// status over 'pending', then tie-breaking by lex-smallest target_account.
func TestMigrateStorageRecheckSubmissionsPK_CollapseToTicketKey(t *testing.T) {
db := sqlx.MustConnect("sqlite3", ":memory:")
defer db.Close()
ctx := context.Background()

// Seed the OLD schema (before this fix PK).
// Seed the OLD per-target PK schema.
const oldSchema = `
CREATE TABLE storage_recheck_submissions (
epoch_id INTEGER NOT NULL,
Expand All @@ -72,42 +75,65 @@ CREATE TABLE storage_recheck_submissions (
result_class INTEGER NOT NULL,
status TEXT NOT NULL DEFAULT 'submitted',
submitted_at INTEGER NOT NULL,
PRIMARY KEY (epoch_id, ticket_id)
PRIMARY KEY (epoch_id, ticket_id, target_account)
);`
_, err := db.Exec(oldSchema)
require.NoError(t, err)
_, err = db.Exec(`INSERT INTO storage_recheck_submissions VALUES (7, 'ticket-1', 'target-a', 'ch', 'rh', 1, 'submitted', 1234);`)
// Three rows on the same (epoch, ticket):
// target-a pending → loses to submitted
// target-b submitted → wins (lex-smallest among submitted)
// target-c submitted → loses to target-b on tie-break
// Also one independent (epoch, ticket) with a single submitted row.
_, err = db.Exec(`INSERT INTO storage_recheck_submissions VALUES (7, 'ticket-1', 'target-a', 'ch-a', 'rh-a', 1, 'pending', 1000);`)
require.NoError(t, err)
_, err = db.Exec(`INSERT INTO storage_recheck_submissions VALUES (7, 'ticket-1', 'target-b', 'ch-b', 'rh-b', 1, 'submitted', 1100);`)
require.NoError(t, err)
_, err = db.Exec(`INSERT INTO storage_recheck_submissions VALUES (7, 'ticket-1', 'target-c', 'ch-c', 'rh-c', 1, 'submitted', 1200);`)
require.NoError(t, err)
_, err = db.Exec(`INSERT INTO storage_recheck_submissions VALUES (8, 'ticket-2', 'only-target', 'ch-x', 'rh-x', 1, 'submitted', 1300);`)
require.NoError(t, err)

// Confirm pre-migration PK shape.
pk, err := primaryKeyColumns(ctx, db, "storage_recheck_submissions")
require.NoError(t, err)
require.Equal(t, []string{"epoch_id", "ticket_id"}, pk)
require.Equal(t, []string{"epoch_id", "ticket_id", "target_account"}, pk)

// Run migration.
require.NoError(t, migrateStorageRecheckSubmissionsPK(ctx, db))

// Verify new PK shape and preserved data.
// New PK is (epoch, ticket).
pk, err = primaryKeyColumns(ctx, db, "storage_recheck_submissions")
require.NoError(t, err)
require.Equal(t, []string{"epoch_id", "ticket_id", "target_account"}, pk)
require.Equal(t, []string{"epoch_id", "ticket_id"}, pk)

// Three (epoch=7, ticket-1) rows collapsed to ONE — and it MUST be the
// 'submitted' target-b (status preference + lex tie-break).
var n int
require.NoError(t, db.QueryRow(`SELECT COUNT(*) FROM storage_recheck_submissions WHERE epoch_id=7 AND ticket_id='ticket-1' AND target_account='target-a'`).Scan(&n))
require.NoError(t, db.QueryRow(`SELECT COUNT(*) FROM storage_recheck_submissions WHERE epoch_id=7 AND ticket_id='ticket-1'`).Scan(&n))
require.Equal(t, 1, n)
var keptTarget, keptStatus string
require.NoError(t, db.QueryRow(`SELECT target_account, status FROM storage_recheck_submissions WHERE epoch_id=7 AND ticket_id='ticket-1'`).Scan(&keptTarget, &keptStatus))
require.Equal(t, "target-b", keptTarget, "submitted+lex-smallest target wins the collapse")
require.Equal(t, "submitted", keptStatus)

// The other (epoch, ticket) row is preserved verbatim.
require.NoError(t, db.QueryRow(`SELECT COUNT(*) FROM storage_recheck_submissions WHERE epoch_id=8 AND ticket_id='ticket-2'`).Scan(&n))
require.Equal(t, 1, n)

// Idempotency: second run is a no-op.
require.NoError(t, migrateStorageRecheckSubmissionsPK(ctx, db))

// Multi-target now allowed under the new PK.
// Post-migration: another target on the SAME (epoch, ticket) does NOT
// produce a second row — matches chain replay semantics.
store := &SQLiteStore{db: db}
require.NoError(t, store.RecordRecheckSubmission(ctx, 7, "ticket-1", "target-b", "ch2", "rh2", audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_PASS))
require.NoError(t, store.RecordRecheckSubmission(ctx, 7, "ticket-1", "target-zzz", "ch2", "rh2", audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_PASS))
require.NoError(t, db.QueryRow(`SELECT COUNT(*) FROM storage_recheck_submissions WHERE epoch_id=7 AND ticket_id='ticket-1'`).Scan(&n))
require.Equal(t, 2, n)
require.Equal(t, 1, n, "post-migration insert for a new target on existing (epoch, ticket) must be a no-op")
}

// TestMigrateStorageRecheckSubmissionsPK_AlreadyMigratedNoOp covers the
// idempotent fast-path where a fresh DB created via createStorageRecheckSubmissions
// already has the multi-column PK.
// already has the (epoch_id, ticket_id) PK.
func TestMigrateStorageRecheckSubmissionsPK_AlreadyMigratedNoOp(t *testing.T) {
db := sqlx.MustConnect("sqlite3", ":memory:")
defer db.Close()
Expand All @@ -117,9 +143,14 @@ func TestMigrateStorageRecheckSubmissionsPK_AlreadyMigratedNoOp(t *testing.T) {
require.NoError(t, migrateStorageRecheckSubmissionsPK(ctx, db))
pk, err := primaryKeyColumns(ctx, db, "storage_recheck_submissions")
require.NoError(t, err)
require.Equal(t, []string{"epoch_id", "ticket_id", "target_account"}, pk)
require.Equal(t, []string{"epoch_id", "ticket_id"}, pk)
}

// TestMigrateRecheckAttemptFailuresPK covers the PR286 F3 migration of the
// failure-budget table: PK (epoch, ticket, target) → PK (epoch, ticket).
// Multiple rows for the same (epoch, ticket) collapse with SUM(attempts)
// + MAX(expires_at) so the budget reflects the most aggressive prior
// retry pressure across targets.
func TestMigrateRecheckAttemptFailuresPK(t *testing.T) {
ctx := context.Background()
db, err := sqlx.Open("sqlite3", ":memory:")
Expand All @@ -134,23 +165,38 @@ CREATE TABLE recheck_attempt_failures (
attempts INTEGER NOT NULL DEFAULT 1,
last_error TEXT,
expires_at INTEGER NOT NULL,
PRIMARY KEY (epoch_id, ticket_id)
PRIMARY KEY (epoch_id, ticket_id, target_account)
);`)
require.NoError(t, err)
_, err = db.Exec(`INSERT INTO recheck_attempt_failures VALUES (7, 'ticket-1', 'target-a', 1, 'boom', 999999);`)
// Two failure rows on same (epoch, ticket): attempts 1 + 2 must sum to 3.
// Use a far-future expires_at so HasRecheckAttemptFailureBudgetExceeded
// doesn't TTL-evict on read.
farFuture := time.Now().Add(24 * time.Hour).Unix()
_, err = db.Exec(`INSERT INTO recheck_attempt_failures VALUES (7, 'ticket-1', 'target-a', 1, 'a', ?);`, farFuture)
require.NoError(t, err)
_, err = db.Exec(`INSERT INTO recheck_attempt_failures VALUES (7, 'ticket-1', 'target-b', 2, 'b', ?);`, farFuture)
require.NoError(t, err)

require.NoError(t, migrateRecheckAttemptFailuresPK(ctx, db))
pk, err := primaryKeyColumns(ctx, db, "recheck_attempt_failures")
require.NoError(t, err)
require.Equal(t, []string{"epoch_id", "ticket_id", "target_account"}, pk)
require.Equal(t, []string{"epoch_id", "ticket_id"}, pk)

store := &SQLiteStore{db: db}
require.NoError(t, store.RecordRecheckAttemptFailure(ctx, 7, "ticket-1", "target-b", fmt.Errorf("nope"), time.Hour))
blockedA, err := store.HasRecheckAttemptFailureBudgetExceeded(ctx, 7, "ticket-1", "target-a", 2)
// After migration, the budget is shared per (epoch, ticket). With
// summed attempts=3 already on the row, a maxAttempts=3 query
// (any target) must report blocked.
blocked, err := store.HasRecheckAttemptFailureBudgetExceeded(ctx, 7, "ticket-1", "target-a", 3)
require.NoError(t, err)
require.False(t, blockedA)
blockedB, err := store.HasRecheckAttemptFailureBudgetExceeded(ctx, 7, "ticket-1", "target-b", 2)
require.True(t, blocked, "post-migration budget is per (epoch, ticket); summed attempts cross threshold")
blocked, err = store.HasRecheckAttemptFailureBudgetExceeded(ctx, 7, "ticket-1", "target-b", 3)
require.NoError(t, err)
require.False(t, blockedB)
require.True(t, blocked, "budget query for a different target on the same (epoch, ticket) reads the same row")

// Increment via a third target — the per-(epoch, ticket) row is
// updated in-place (ON CONFLICT clause keys on (epoch, ticket)).
require.NoError(t, store.RecordRecheckAttemptFailure(ctx, 7, "ticket-1", "target-c", fmt.Errorf("nope"), time.Hour))
var n int
require.NoError(t, db.QueryRow(`SELECT COUNT(*) FROM recheck_attempt_failures WHERE epoch_id=7 AND ticket_id='ticket-1'`).Scan(&n))
require.Equal(t, 1, n, "RecordRecheckAttemptFailure must update the single (epoch, ticket) row, not create a per-target row")
}
53 changes: 32 additions & 21 deletions pkg/storage/queries/recheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,53 +13,64 @@ import (
"github.com/stretchr/testify/require"
)

// TestRecheckSubmissionDedupPerTarget asserts the LEP-6 C2 fix: chain
// dedup is per-(epoch, ticket, target_account), so two distinct targets
// within the same (epoch, ticket) must produce two persisted rows. Before
// LEP-6 review fix, the PK was (epoch, ticket) and the second target's row was
// silently dropped — masking that supernode from chain N/R/D math.
func TestRecheckSubmissionDedupPerTarget(t *testing.T) {
// TestRecheckSubmissionDedupMatchesChainPerTicket is the PR286 F3
// regression: chain replay protection in
// lumera x/audit/v1/keeper/msg_storage_truth.go:88-90 is keyed by
// (epoch, ticket, creator), NOT target. The local supernode is the
// implicit creator, so (epoch, ticket) is the right local dedup key.
//
// Two different targets within the same (epoch, ticket) must collapse to
// ONE persisted row; the second insert is silently ignored. The previous
// (epoch, ticket, target) PK was rejected by chain on every secondary
// target submit and confused local state.
func TestRecheckSubmissionDedupMatchesChainPerTicket(t *testing.T) {
db := sqlx.MustConnect("sqlite3", ":memory:")
defer db.Close()
_, err := db.Exec(createStorageRecheckSubmissions)
require.NoError(t, err)
store := &SQLiteStore{db: db}
ctx := context.Background()

// Initially nothing is recorded for either target.
// Initially nothing is recorded.
exists, err := store.HasRecheckSubmission(ctx, 7, "ticket-1", "target-a")
require.NoError(t, err)
require.False(t, exists)

// First target gets recorded.
// First target gets recorded. HasRecheckSubmission now returns true
// regardless of the target argument because chain dedup is per
// (epoch, ticket, creator).
require.NoError(t, store.RecordRecheckSubmission(ctx, 7, "ticket-1", "target-a", "orig", "rh1", audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_PASS))
exists, err = store.HasRecheckSubmission(ctx, 7, "ticket-1", "target-a")
require.NoError(t, err)
require.True(t, exists)

// Second target in the SAME (epoch, ticket) must also be recorded
// (this is the C2 fix — old behaviour silently dropped this row).
require.NoError(t, store.RecordRecheckSubmission(ctx, 7, "ticket-1", "target-b", "orig2", "rh2", audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_RECHECK_CONFIRMED_FAIL))
exists, err = store.HasRecheckSubmission(ctx, 7, "ticket-1", "target-b")
require.NoError(t, err)
require.NoError(t, err, "any target on the same (epoch, ticket) must read as already recorded — chain replay key is per (epoch, ticket, creator)")
require.True(t, exists)

// Confirm both rows landed.
// Second target on the SAME (epoch, ticket) is silently ignored by
// the ON CONFLICT(epoch_id, ticket_id) DO NOTHING. RecordRecheckSubmission
// is back-compat idempotent so this returns nil.
require.NoError(t, store.RecordRecheckSubmission(ctx, 7, "ticket-1", "target-b", "orig2", "rh2", audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_RECHECK_CONFIRMED_FAIL))

// Confirm only the first-recorded row landed.
var n int
require.NoError(t, db.QueryRowContext(ctx, `SELECT COUNT(*) FROM storage_recheck_submissions WHERE epoch_id=? AND ticket_id=?`, 7, "ticket-1").Scan(&n))
require.Equal(t, 2, n)
require.Equal(t, 1, n, "exactly one row per (epoch, ticket) to match chain replay semantics")
var retainedTarget, retainedRh string
require.NoError(t, db.QueryRowContext(ctx, `SELECT target_account, recheck_transcript_hash FROM storage_recheck_submissions WHERE epoch_id=? AND ticket_id=?`, 7, "ticket-1").Scan(&retainedTarget, &retainedRh))
require.Equal(t, "target-a", retainedTarget, "first-recorded target wins")
require.Equal(t, "rh1", retainedRh, "first-recorded transcript wins")

// Same ticket in a different epoch is intentionally a different replay key.
exists, err = store.HasRecheckSubmission(ctx, 8, "ticket-1", "target-a")
require.NoError(t, err)
require.False(t, exists)

// Idempotent second-call on the same (epoch, ticket, target) is a no-op
// (ON CONFLICT DO NOTHING) — preserves first row.
require.NoError(t, store.RecordRecheckSubmission(ctx, 7, "ticket-1", "target-a", "orig", "rh1-DIFFERENT", audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_PASS))
var rh string
require.NoError(t, db.QueryRowContext(ctx, `SELECT recheck_transcript_hash FROM storage_recheck_submissions WHERE epoch_id=? AND ticket_id=? AND target_account=?`, 7, "ticket-1", "target-a").Scan(&rh))
require.Equal(t, "rh1", rh)
// RecordPendingRecheckSubmission on a (epoch, ticket) that already
// has a row surfaces the typed dedup error.
err = store.RecordPendingRecheckSubmission(ctx, 7, "ticket-1", "target-c", "orig3", "rh3", audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_HASH_MISMATCH)
require.Error(t, err)
require.True(t, errors.Is(err, ErrLEP6RecheckAlreadyRecorded), "second target on same (epoch, ticket) must surface ErrLEP6RecheckAlreadyRecorded")
}

// TestRecordPendingRecheckSubmission_DuplicateReturnsTypedError covers the
Expand Down
Loading