From abba893f9d7c35dbc9d538704fef64701845e7ee Mon Sep 17 00:00:00 2001 From: j-rafique Date: Mon, 25 May 2026 17:14:30 +0000 Subject: [PATCH] fix(lep6): align host_reporter and recheck dedup with chain semantics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses Matee's prod-gate review on PR #286. F1 (Critical) — host_reporter SHADOW/SOFT idempotency: Previously skipped epoch report when mode != UNSPECIFIED with empty proof rows. This converted a transient local proof-generation gap into missing audit reports → postponement. Fix: gate compound-proof coverage check to FULL mode only; SHADOW/SOFT/UNSPECIFIED submit the epoch report even with empty LEP-6 proof rows (INFO log). F2 (Critical) — transactional proof-row drain: CollectResults(epochID) drained rows before SubmitEpochReport; on tx/RPC failure rows were lost. Fix: requeue proof rows on non- duplicate submit failure via new requeueProofResults helper, with new IsEpochReportDuplicate(err) predicate to avoid requeue on benign duplicate-report errors. F3 (High) — recheck dedup key matches chain: Local PK was (epoch_id, ticket_id, target_account); chain replay protection in x/audit msg_storage_truth uses (epoch, ticket, creator). Fix: collapse local PK to (epoch_id, ticket_id); target_account becomes metadata. Finder seen-map keyed the same; attestor treats ErrLEP6RecheckAlreadyRecorded as success. Non-destructive migration with deterministic row collapse. F4 (Medium) — go.sum drift for lumera v1.12.0: Deferred — chain-side release-engineering issue. Public Go proxy cached original v1.12.0 bytes (h1:prh3k8y...) immutably; the v1.12.0 tag was later force-moved to a different commit (h1:ZtGvnw... in current go.sum). Cannot be resolved from supernode without either restoring the tag pointer on the chain side or republishing as v1.12.1. Validation: - 53/53 unit-test packages PASS - 7/7 LEP-6 system e2e tests PASS (make test-lep6, 1479.84s) including StorageTruthEnforcementLifecycle (F1+F2 path) and NegativePerCaseCorruptionMatrix + CrashRestartMidHeal (F3 path) --- pkg/lumera/chainerrors/chainerrors.go | 22 +++ pkg/storage/queries/recheck.go | 151 ++++++++++++------ .../queries/recheck_schema_migration_test.go | 94 ++++++++--- pkg/storage/queries/recheck_test.go | 53 +++--- supernode/host_reporter/service.go | 74 +++++++-- supernode/host_reporter/tick_behavior_test.go | 147 +++++++++++++++-- supernode/recheck/finder.go | 14 +- supernode/recheck/finder_service_test.go | 2 +- supernode/recheck/recheck_regression_test.go | 40 +++-- supernode/recheck/test_helpers_test.go | 43 +++-- 10 files changed, 477 insertions(+), 163 deletions(-) diff --git a/pkg/lumera/chainerrors/chainerrors.go b/pkg/lumera/chainerrors/chainerrors.go index 3457130e..e09b3802 100644 --- a/pkg/lumera/chainerrors/chainerrors.go +++ b/pkg/lumera/chainerrors/chainerrors.go @@ -120,6 +120,28 @@ func IsHealVerificationAlreadySubmitted(err error) bool { return strings.Contains(err.Error(), "verification already submitted by creator") } +// IsEpochReportDuplicate reports whether err corresponds to the chain +// rejecting MsgSubmitEpochReport because a report for the same +// (epoch, reporter) tuple has already been accepted. Chain wraps +// audittypes.ErrDuplicateReport (registered code 4) with the discriminating +// phrase "report already submitted for this epoch" at +// msg_submit_epoch_report.go:142. +// +// Callers in the host-reporter submit path use this to distinguish +// "chain has our report already (drained proof rows are stale; do NOT +// requeue)" from "transient submit failure (must requeue drained rows +// so the next tick can retry)". +func IsEpochReportDuplicate(err error) bool { + if err == nil { + return false + } + if errors.Is(err, audittypes.ErrDuplicateReport) { + return true + } + // Substring fallback — chain phrase from msg_submit_epoch_report.go:142. + return strings.Contains(strings.ToLower(err.Error()), "report already submitted for this epoch") +} + // IsRecheckEvidenceAlreadySubmitted reports whether err corresponds to the // chain rejecting a duplicate recheck-evidence submission. Chain wraps // audittypes.ErrInvalidRecheckEvidence (a generic envelope for ALL recheck diff --git a/pkg/storage/queries/recheck.go b/pkg/storage/queries/recheck.go index bfe4fafc..c6ac0db4 100644 --- a/pkg/storage/queries/recheck.go +++ b/pkg/storage/queries/recheck.go @@ -22,16 +22,16 @@ type RecheckSubmissionRecord struct { } // ErrLEP6RecheckAlreadyRecorded is returned by RecordPendingRecheckSubmission -// when a row already exists for (epoch_id, ticket_id, target_account). The -// caller (recheck attestor) treats this as "another tick already pre-staged -// this candidate" — same idempotency semantics as -// ErrLEP6ClaimAlreadyRecorded / ErrLEP6VerificationAlreadyRecorded. +// when a row already exists for (epoch_id, ticket_id). The caller (recheck +// attestor) treats this as "another tick already pre-staged this candidate" +// — same idempotency semantics as ErrLEP6ClaimAlreadyRecorded / +// ErrLEP6VerificationAlreadyRecorded. // -// Wave 1 fix for L3: previous code used `INSERT OR IGNORE` which silently -// hid duplicates AND any real INSERT error (constraint violation, locked -// DB), then the caller submitted to chain anyway — the chain rejected and -// we deleted the row. Now duplicates are surfaced as a typed error and -// real INSERT failures propagate. +// PR286 F3 fix: dedup is keyed by (epoch_id, ticket_id) to match the chain +// replay key in lumera x/audit/v1/keeper/msg_storage_truth.go:88-90 +// (HasRecheckEvidence(epoch, ticket, creator)). The local supernode is the +// implicit creator, so (epoch, ticket) suffices. target_account is recorded +// as metadata of the selected candidate but does NOT participate in dedup. var ErrLEP6RecheckAlreadyRecorded = errors.New("lep6: recheck submission already recorded") const createStorageRecheckSubmissions = ` @@ -44,7 +44,7 @@ CREATE TABLE IF NOT EXISTS storage_recheck_submissions ( result_class INTEGER NOT NULL, status TEXT NOT NULL DEFAULT 'submitted', submitted_at INTEGER NOT NULL, - PRIMARY KEY (epoch_id, ticket_id, target_account) + PRIMARY KEY (epoch_id, ticket_id) );` const createStorageRecheckSubmissionStatusIndex = `CREATE INDEX IF NOT EXISTS idx_storage_recheck_submissions_status ON storage_recheck_submissions(status);` @@ -58,11 +58,21 @@ CREATE TABLE IF NOT EXISTS recheck_attempt_failures ( attempts INTEGER NOT NULL DEFAULT 1, last_error TEXT, expires_at INTEGER NOT NULL, - PRIMARY KEY (epoch_id, ticket_id, target_account) + PRIMARY KEY (epoch_id, ticket_id) );` const createRecheckAttemptFailuresExpiresIndex = `CREATE INDEX IF NOT EXISTS idx_recheck_attempt_failures_expires ON recheck_attempt_failures(expires_at);` +// migrateRecheckAttemptFailuresPK collapses the per-target PK +// (epoch_id, ticket_id, target_account) back to (epoch_id, ticket_id) — +// PR286 F3 fix. Failure budget aligns with the chain replay key, so retries +// for any target within the same (epoch, ticket) share one budget. +// +// Collapse rule: SUM attempts across target rows and MAX expires_at so the +// resulting row reflects the most aggressive prior retry pressure. Keep +// lex-smallest target_account as metadata for observability. +// +// Idempotent: returns nil if already on the (epoch, ticket) PK. func migrateRecheckAttemptFailuresPK(ctx context.Context, db sqliteExecQuerier) error { pkCols, err := primaryKeyColumns(ctx, db, "recheck_attempt_failures") if err != nil { @@ -75,7 +85,7 @@ func migrateRecheckAttemptFailuresPK(ctx context.Context, db sqliteExecQuerier) break } } - if hasTarget { + if !hasTarget { return nil } if len(pkCols) == 0 { @@ -108,7 +118,7 @@ CREATE TABLE recheck_attempt_failures_new ( attempts INTEGER NOT NULL DEFAULT 1, last_error TEXT, expires_at INTEGER NOT NULL, - PRIMARY KEY (epoch_id, ticket_id, target_account) + PRIMARY KEY (epoch_id, ticket_id) );` if _, err := tx.ExecContext(ctx, createNew); err != nil { return fmt.Errorf("create new recheck failure table: %w", err) @@ -116,8 +126,15 @@ CREATE TABLE recheck_attempt_failures_new ( const copyData = ` INSERT INTO recheck_attempt_failures_new (epoch_id, ticket_id, target_account, attempts, last_error, expires_at) -SELECT epoch_id, ticket_id, target_account, attempts, last_error, expires_at -FROM recheck_attempt_failures;` +SELECT + epoch_id, + ticket_id, + MIN(target_account) AS target_account, + SUM(attempts) AS attempts, + COALESCE(MAX(last_error), '') AS last_error, + MAX(expires_at) AS expires_at +FROM recheck_attempt_failures +GROUP BY epoch_id, ticket_id;` if _, err := tx.ExecContext(ctx, copyData); err != nil { return fmt.Errorf("copy recheck failure rows: %w", err) } @@ -135,17 +152,25 @@ FROM recheck_attempt_failures;` } // migrateStorageRecheckSubmissionsPK migrates an old DB whose -// storage_recheck_submissions table has PK (epoch_id, ticket_id) up to the -// Wave 1 schema with PK (epoch_id, ticket_id, target_account). +// storage_recheck_submissions table has PK +// (epoch_id, ticket_id, target_account) down to (epoch_id, ticket_id) — +// PR286 F3 fix. The per-target PK never matched chain replay semantics: +// chain accepts one recheck per (epoch, ticket, creator), so a second +// target row was always going to be chain-rejected. +// +// Collapse rule: per (epoch, ticket) prefer 'submitted' over 'pending' +// (so we don't roll back already-confirmed local state), then tie-break by +// lex-smallest target_account for determinism. Dropped rows correspond to +// target candidates that the chain would never have accepted anyway; the +// next finder tick will rediscover any still-eligible candidates from +// chain epoch reports. // // SQLite cannot ALTER PRIMARY KEY in place; we rebuild via the canonical -// "create _new, copy, drop, rename" pattern inside a single transaction so -// a crash mid-migration leaves the DB consistent. +// "create _new, copy, drop, rename" pattern inside a single transaction +// so a crash mid-migration leaves the DB consistent. // // Idempotent: if the table is already on the new PK shape, this returns // nil after the PRAGMA introspection check (no DDL run). -// -// Wave 1 fix for C2. func migrateStorageRecheckSubmissionsPK(ctx context.Context, db sqliteExecQuerier) error { pkCols, err := primaryKeyColumns(ctx, db, "storage_recheck_submissions") if err != nil { @@ -158,18 +183,13 @@ func migrateStorageRecheckSubmissionsPK(ctx context.Context, db sqliteExecQuerie break } } - if hasTarget { - return nil // already migrated + if !hasTarget { + return nil // already migrated to (epoch, ticket) } if len(pkCols) == 0 { - // Defensive: PRAGMA returned no PK columns. The CREATE TABLE - // above always sets a PK so this would only happen on a bizarre - // custom build; bail rather than silently rebuild. return fmt.Errorf("storage_recheck_submissions has no detectable primary key") } - // Run inside a transaction so we don't end up with the new table but - // the old data partially copied. exec, ok := db.(interface { BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) }) @@ -187,6 +207,9 @@ func migrateStorageRecheckSubmissionsPK(ctx context.Context, db sqliteExecQuerie } }() + if _, err := tx.ExecContext(ctx, `DROP TABLE IF EXISTS storage_recheck_submissions_new;`); err != nil { + return fmt.Errorf("drop stale recheck migration table: %w", err) + } const createNew = ` CREATE TABLE storage_recheck_submissions_new ( epoch_id INTEGER NOT NULL, @@ -197,18 +220,26 @@ CREATE TABLE storage_recheck_submissions_new ( result_class INTEGER NOT NULL, status TEXT NOT NULL DEFAULT 'submitted', submitted_at INTEGER NOT NULL, - PRIMARY KEY (epoch_id, ticket_id, target_account) + PRIMARY KEY (epoch_id, ticket_id) );` if _, err := tx.ExecContext(ctx, createNew); err != nil { return fmt.Errorf("create new recheck table: %w", err) } + // Deterministic collapse: ORDER BY prefers 'submitted' over 'pending' + // (via CASE so case sensitivity / future status values are explicit), + // tie-break by target_account ascending. INSERT OR IGNORE keeps only + // the first row per (epoch, ticket) — which is the preferred one + // given the ORDER BY above. const copyData = ` -INSERT INTO storage_recheck_submissions_new +INSERT OR IGNORE INTO storage_recheck_submissions_new (epoch_id, ticket_id, target_account, challenged_transcript_hash, recheck_transcript_hash, result_class, status, submitted_at) SELECT epoch_id, ticket_id, target_account, challenged_transcript_hash, recheck_transcript_hash, result_class, - COALESCE(status, 'submitted'), submitted_at -FROM storage_recheck_submissions;` + COALESCE(status, 'submitted') AS status, submitted_at +FROM storage_recheck_submissions +ORDER BY epoch_id, ticket_id, + CASE COALESCE(status, 'submitted') WHEN 'submitted' THEN 0 ELSE 1 END, + target_account;` if _, err := tx.ExecContext(ctx, copyData); err != nil { return fmt.Errorf("copy recheck rows: %w", err) } @@ -226,13 +257,18 @@ FROM storage_recheck_submissions;` } // HasRecheckSubmission reports whether a row exists for the -// (epoch_id, ticket_id, target_account) tuple — Wave 1 fix for C2 (chain -// dedup is per-target, so multiple targets in one (epoch, ticket) must -// each be tracked separately). +// (epoch_id, ticket_id) tuple — PR286 F3 fix. targetAccount is retained in +// the signature for API stability (and is the selected candidate's target, +// recorded as metadata at INSERT time) but does NOT participate in dedup +// because chain replay protection in lumera +// x/audit/v1/keeper/msg_storage_truth.go:88-90 is keyed by +// (epoch, ticket, creator) — the creator is implicit (this supernode), +// leaving (epoch, ticket) as the local dedup key. func (s *SQLiteStore) HasRecheckSubmission(ctx context.Context, epochID uint64, ticketID, targetAccount string) (bool, error) { - const stmt = `SELECT 1 FROM storage_recheck_submissions WHERE epoch_id = ? AND ticket_id = ? AND target_account = ? LIMIT 1` + _ = targetAccount // intentionally unused: chain replay key is (epoch, ticket, creator) + const stmt = `SELECT 1 FROM storage_recheck_submissions WHERE epoch_id = ? AND ticket_id = ? LIMIT 1` var one int - err := s.db.QueryRowContext(ctx, stmt, epochID, ticketID, targetAccount).Scan(&one) + err := s.db.QueryRowContext(ctx, stmt, epochID, ticketID).Scan(&one) if err != nil { if err == sql.ErrNoRows { return false, nil @@ -244,8 +280,10 @@ func (s *SQLiteStore) HasRecheckSubmission(ctx context.Context, epochID uint64, // RecordPendingRecheckSubmission pre-stages a recheck submission row before // chain submit. Returns ErrLEP6RecheckAlreadyRecorded when a row already -// exists for the (epoch, ticket, target) tuple — Wave 1 fix for L3 (no -// more silent INSERT-OR-IGNORE). +// exists for (epoch, ticket) — PR286 F3 fix. targetAccount is stored as +// metadata of the selected candidate; another target for the same +// (epoch, ticket) would be rejected as already-recorded because chain +// accepts only one recheck per (epoch, ticket, creator). func (s *SQLiteStore) RecordPendingRecheckSubmission(ctx context.Context, epochID uint64, ticketID, targetAccount, challengedTranscriptHash, recheckTranscriptHash string, resultClass audittypes.StorageProofResultClass) error { return s.recordRecheckSubmissionWithStatus(ctx, epochID, ticketID, targetAccount, challengedTranscriptHash, recheckTranscriptHash, resultClass, "pending", true) } @@ -264,7 +302,7 @@ func (s *SQLiteStore) recordRecheckSubmissionWithStatus(ctx context.Context, epo const stmt = `INSERT INTO storage_recheck_submissions (epoch_id, ticket_id, target_account, challenged_transcript_hash, recheck_transcript_hash, result_class, status, submitted_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?) -ON CONFLICT(epoch_id, ticket_id, target_account) DO NOTHING` +ON CONFLICT(epoch_id, ticket_id) DO NOTHING` res, err := s.db.ExecContext(ctx, stmt, epochID, ticketID, targetAccount, challengedTranscriptHash, recheckTranscriptHash, int32(resultClass), status, time.Now().Unix()) if err != nil { return fmt.Errorf("insert recheck submission: %w", err) @@ -278,22 +316,27 @@ ON CONFLICT(epoch_id, ticket_id, target_account) DO NOTHING` return nil } -// MarkRecheckSubmissionSubmitted flips a (epoch, ticket, target) row from -// 'pending' to 'submitted'. Threading target_account is the C2 fix: -// without it, two pending rows for the same (epoch, ticket) would both -// be marked when only one was actually submitted. +// MarkRecheckSubmissionSubmitted flips a (epoch, ticket) row from +// 'pending' to 'submitted' — PR286 F3 fix. targetAccount is accepted for +// API stability but ignored in the WHERE clause: chain dedup is per +// (epoch, ticket, creator), so the single local row is the only one. func (s *SQLiteStore) MarkRecheckSubmissionSubmitted(ctx context.Context, epochID uint64, ticketID, targetAccount string) error { - _, err := s.db.ExecContext(ctx, `UPDATE storage_recheck_submissions SET status = 'submitted', submitted_at = ? WHERE epoch_id = ? AND ticket_id = ? AND target_account = ?`, time.Now().Unix(), epochID, ticketID, targetAccount) + _ = targetAccount + _, err := s.db.ExecContext(ctx, `UPDATE storage_recheck_submissions SET status = 'submitted', submitted_at = ? WHERE epoch_id = ? AND ticket_id = ?`, time.Now().Unix(), epochID, ticketID) return err } -// DeletePendingRecheckSubmission deletes a single (epoch, ticket, target) -// pending row after a hard tx failure — Wave 1 C2 fix. +// DeletePendingRecheckSubmission deletes a (epoch, ticket) pending row +// after a hard tx failure — PR286 F3 fix. func (s *SQLiteStore) DeletePendingRecheckSubmission(ctx context.Context, epochID uint64, ticketID, targetAccount string) error { - _, err := s.db.ExecContext(ctx, `DELETE FROM storage_recheck_submissions WHERE epoch_id = ? AND ticket_id = ? AND target_account = ? AND status = 'pending'`, epochID, ticketID, targetAccount) + _ = targetAccount + _, err := s.db.ExecContext(ctx, `DELETE FROM storage_recheck_submissions WHERE epoch_id = ? AND ticket_id = ? AND status = 'pending'`, epochID, ticketID) return err } +// RecordRecheckAttemptFailure records / increments the per-(epoch, ticket) +// failure counter — PR286 F3 fix. targetAccount is preserved as metadata +// of the most recent attempt but does NOT participate in the PK. func (s *SQLiteStore) RecordRecheckAttemptFailure(ctx context.Context, epochID uint64, ticketID, targetAccount string, err error, ttl time.Duration) error { if epochID == 0 || ticketID == "" { return fmt.Errorf("epoch_id and ticket_id are required") @@ -305,19 +348,23 @@ func (s *SQLiteStore) RecordRecheckAttemptFailure(ctx context.Context, epochID u expiresAt := time.Now().Add(ttl).Unix() const stmt = `INSERT INTO recheck_attempt_failures (epoch_id, ticket_id, target_account, attempts, last_error, expires_at) VALUES (?, ?, ?, 1, ?, ?) -ON CONFLICT(epoch_id, ticket_id, target_account) DO UPDATE SET attempts = attempts + 1, last_error = excluded.last_error, expires_at = excluded.expires_at` +ON CONFLICT(epoch_id, ticket_id) DO UPDATE SET attempts = attempts + 1, last_error = excluded.last_error, expires_at = excluded.expires_at, target_account = excluded.target_account` _, execErr := s.db.ExecContext(ctx, stmt, epochID, ticketID, targetAccount, msg, expiresAt) return execErr } +// HasRecheckAttemptFailureBudgetExceeded reads the per-(epoch, ticket) +// failure counter — PR286 F3 fix. targetAccount accepted for API stability +// but does not participate in the lookup. func (s *SQLiteStore) HasRecheckAttemptFailureBudgetExceeded(ctx context.Context, epochID uint64, ticketID, targetAccount string, maxAttempts int) (bool, error) { + _ = targetAccount if maxAttempts <= 0 { return false, nil } - const stmt = `SELECT attempts, expires_at FROM recheck_attempt_failures WHERE epoch_id = ? AND ticket_id = ? AND target_account = ? LIMIT 1` + const stmt = `SELECT attempts, expires_at FROM recheck_attempt_failures WHERE epoch_id = ? AND ticket_id = ? LIMIT 1` var attempts int var expiresAt int64 - err := s.db.QueryRowContext(ctx, stmt, epochID, ticketID, targetAccount).Scan(&attempts, &expiresAt) + err := s.db.QueryRowContext(ctx, stmt, epochID, ticketID).Scan(&attempts, &expiresAt) if err == sql.ErrNoRows { return false, nil } @@ -325,7 +372,7 @@ func (s *SQLiteStore) HasRecheckAttemptFailureBudgetExceeded(ctx context.Context return false, err } if expiresAt <= time.Now().Unix() { - _, _ = s.db.ExecContext(ctx, `DELETE FROM recheck_attempt_failures WHERE epoch_id = ? AND ticket_id = ? AND target_account = ?`, epochID, ticketID, targetAccount) + _, _ = s.db.ExecContext(ctx, `DELETE FROM recheck_attempt_failures WHERE epoch_id = ? AND ticket_id = ?`, epochID, ticketID) return false, nil } return attempts >= maxAttempts, nil diff --git a/pkg/storage/queries/recheck_schema_migration_test.go b/pkg/storage/queries/recheck_schema_migration_test.go index 605e4bdd..4690a381 100644 --- a/pkg/storage/queries/recheck_schema_migration_test.go +++ b/pkg/storage/queries/recheck_schema_migration_test.go @@ -53,15 +53,18 @@ func TestAddColumnIfMissing_Idempotent(t *testing.T) { require.NoError(t, addColumnIfMissing(ctx, db, "t", "extra", `ALTER TABLE t ADD COLUMN extra TEXT NOT NULL DEFAULT 'x';`)) } -// TestMigrateStorageRecheckSubmissionsPK covers the C2 migration: an old -// DB with PK (epoch_id, ticket_id) is migrated to PK (epoch_id, ticket_id, -// target_account) preserving all data. Idempotent on already-migrated DBs. -func TestMigrateStorageRecheckSubmissionsPK(t *testing.T) { +// TestMigrateStorageRecheckSubmissionsPK_CollapseToTicketKey covers the +// PR286 F3 migration: an old DB with PK (epoch_id, ticket_id, target_account) +// is migrated down to PK (epoch_id, ticket_id) so local dedup matches chain +// replay (one recheck per (epoch, ticket, creator)). Multiple target rows +// for the same (epoch, ticket) collapse to one — preferring 'submitted' +// status over 'pending', then tie-breaking by lex-smallest target_account. +func TestMigrateStorageRecheckSubmissionsPK_CollapseToTicketKey(t *testing.T) { db := sqlx.MustConnect("sqlite3", ":memory:") defer db.Close() ctx := context.Background() - // Seed the OLD schema (before this fix PK). + // Seed the OLD per-target PK schema. const oldSchema = ` CREATE TABLE storage_recheck_submissions ( epoch_id INTEGER NOT NULL, @@ -72,42 +75,65 @@ CREATE TABLE storage_recheck_submissions ( result_class INTEGER NOT NULL, status TEXT NOT NULL DEFAULT 'submitted', submitted_at INTEGER NOT NULL, - PRIMARY KEY (epoch_id, ticket_id) + PRIMARY KEY (epoch_id, ticket_id, target_account) );` _, err := db.Exec(oldSchema) require.NoError(t, err) - _, err = db.Exec(`INSERT INTO storage_recheck_submissions VALUES (7, 'ticket-1', 'target-a', 'ch', 'rh', 1, 'submitted', 1234);`) + // Three rows on the same (epoch, ticket): + // target-a pending → loses to submitted + // target-b submitted → wins (lex-smallest among submitted) + // target-c submitted → loses to target-b on tie-break + // Also one independent (epoch, ticket) with a single submitted row. + _, err = db.Exec(`INSERT INTO storage_recheck_submissions VALUES (7, 'ticket-1', 'target-a', 'ch-a', 'rh-a', 1, 'pending', 1000);`) + require.NoError(t, err) + _, err = db.Exec(`INSERT INTO storage_recheck_submissions VALUES (7, 'ticket-1', 'target-b', 'ch-b', 'rh-b', 1, 'submitted', 1100);`) + require.NoError(t, err) + _, err = db.Exec(`INSERT INTO storage_recheck_submissions VALUES (7, 'ticket-1', 'target-c', 'ch-c', 'rh-c', 1, 'submitted', 1200);`) + require.NoError(t, err) + _, err = db.Exec(`INSERT INTO storage_recheck_submissions VALUES (8, 'ticket-2', 'only-target', 'ch-x', 'rh-x', 1, 'submitted', 1300);`) require.NoError(t, err) // Confirm pre-migration PK shape. pk, err := primaryKeyColumns(ctx, db, "storage_recheck_submissions") require.NoError(t, err) - require.Equal(t, []string{"epoch_id", "ticket_id"}, pk) + require.Equal(t, []string{"epoch_id", "ticket_id", "target_account"}, pk) // Run migration. require.NoError(t, migrateStorageRecheckSubmissionsPK(ctx, db)) - // Verify new PK shape and preserved data. + // New PK is (epoch, ticket). pk, err = primaryKeyColumns(ctx, db, "storage_recheck_submissions") require.NoError(t, err) - require.Equal(t, []string{"epoch_id", "ticket_id", "target_account"}, pk) + require.Equal(t, []string{"epoch_id", "ticket_id"}, pk) + + // Three (epoch=7, ticket-1) rows collapsed to ONE — and it MUST be the + // 'submitted' target-b (status preference + lex tie-break). var n int - require.NoError(t, db.QueryRow(`SELECT COUNT(*) FROM storage_recheck_submissions WHERE epoch_id=7 AND ticket_id='ticket-1' AND target_account='target-a'`).Scan(&n)) + require.NoError(t, db.QueryRow(`SELECT COUNT(*) FROM storage_recheck_submissions WHERE epoch_id=7 AND ticket_id='ticket-1'`).Scan(&n)) + require.Equal(t, 1, n) + var keptTarget, keptStatus string + require.NoError(t, db.QueryRow(`SELECT target_account, status FROM storage_recheck_submissions WHERE epoch_id=7 AND ticket_id='ticket-1'`).Scan(&keptTarget, &keptStatus)) + require.Equal(t, "target-b", keptTarget, "submitted+lex-smallest target wins the collapse") + require.Equal(t, "submitted", keptStatus) + + // The other (epoch, ticket) row is preserved verbatim. + require.NoError(t, db.QueryRow(`SELECT COUNT(*) FROM storage_recheck_submissions WHERE epoch_id=8 AND ticket_id='ticket-2'`).Scan(&n)) require.Equal(t, 1, n) // Idempotency: second run is a no-op. require.NoError(t, migrateStorageRecheckSubmissionsPK(ctx, db)) - // Multi-target now allowed under the new PK. + // Post-migration: another target on the SAME (epoch, ticket) does NOT + // produce a second row — matches chain replay semantics. store := &SQLiteStore{db: db} - require.NoError(t, store.RecordRecheckSubmission(ctx, 7, "ticket-1", "target-b", "ch2", "rh2", audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_PASS)) + require.NoError(t, store.RecordRecheckSubmission(ctx, 7, "ticket-1", "target-zzz", "ch2", "rh2", audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_PASS)) require.NoError(t, db.QueryRow(`SELECT COUNT(*) FROM storage_recheck_submissions WHERE epoch_id=7 AND ticket_id='ticket-1'`).Scan(&n)) - require.Equal(t, 2, n) + require.Equal(t, 1, n, "post-migration insert for a new target on existing (epoch, ticket) must be a no-op") } // TestMigrateStorageRecheckSubmissionsPK_AlreadyMigratedNoOp covers the // idempotent fast-path where a fresh DB created via createStorageRecheckSubmissions -// already has the multi-column PK. +// already has the (epoch_id, ticket_id) PK. func TestMigrateStorageRecheckSubmissionsPK_AlreadyMigratedNoOp(t *testing.T) { db := sqlx.MustConnect("sqlite3", ":memory:") defer db.Close() @@ -117,9 +143,14 @@ func TestMigrateStorageRecheckSubmissionsPK_AlreadyMigratedNoOp(t *testing.T) { require.NoError(t, migrateStorageRecheckSubmissionsPK(ctx, db)) pk, err := primaryKeyColumns(ctx, db, "storage_recheck_submissions") require.NoError(t, err) - require.Equal(t, []string{"epoch_id", "ticket_id", "target_account"}, pk) + require.Equal(t, []string{"epoch_id", "ticket_id"}, pk) } +// TestMigrateRecheckAttemptFailuresPK covers the PR286 F3 migration of the +// failure-budget table: PK (epoch, ticket, target) → PK (epoch, ticket). +// Multiple rows for the same (epoch, ticket) collapse with SUM(attempts) +// + MAX(expires_at) so the budget reflects the most aggressive prior +// retry pressure across targets. func TestMigrateRecheckAttemptFailuresPK(t *testing.T) { ctx := context.Background() db, err := sqlx.Open("sqlite3", ":memory:") @@ -134,23 +165,38 @@ CREATE TABLE recheck_attempt_failures ( attempts INTEGER NOT NULL DEFAULT 1, last_error TEXT, expires_at INTEGER NOT NULL, - PRIMARY KEY (epoch_id, ticket_id) + PRIMARY KEY (epoch_id, ticket_id, target_account) );`) require.NoError(t, err) - _, err = db.Exec(`INSERT INTO recheck_attempt_failures VALUES (7, 'ticket-1', 'target-a', 1, 'boom', 999999);`) + // Two failure rows on same (epoch, ticket): attempts 1 + 2 must sum to 3. + // Use a far-future expires_at so HasRecheckAttemptFailureBudgetExceeded + // doesn't TTL-evict on read. + farFuture := time.Now().Add(24 * time.Hour).Unix() + _, err = db.Exec(`INSERT INTO recheck_attempt_failures VALUES (7, 'ticket-1', 'target-a', 1, 'a', ?);`, farFuture) + require.NoError(t, err) + _, err = db.Exec(`INSERT INTO recheck_attempt_failures VALUES (7, 'ticket-1', 'target-b', 2, 'b', ?);`, farFuture) require.NoError(t, err) require.NoError(t, migrateRecheckAttemptFailuresPK(ctx, db)) pk, err := primaryKeyColumns(ctx, db, "recheck_attempt_failures") require.NoError(t, err) - require.Equal(t, []string{"epoch_id", "ticket_id", "target_account"}, pk) + require.Equal(t, []string{"epoch_id", "ticket_id"}, pk) store := &SQLiteStore{db: db} - require.NoError(t, store.RecordRecheckAttemptFailure(ctx, 7, "ticket-1", "target-b", fmt.Errorf("nope"), time.Hour)) - blockedA, err := store.HasRecheckAttemptFailureBudgetExceeded(ctx, 7, "ticket-1", "target-a", 2) + // After migration, the budget is shared per (epoch, ticket). With + // summed attempts=3 already on the row, a maxAttempts=3 query + // (any target) must report blocked. + blocked, err := store.HasRecheckAttemptFailureBudgetExceeded(ctx, 7, "ticket-1", "target-a", 3) require.NoError(t, err) - require.False(t, blockedA) - blockedB, err := store.HasRecheckAttemptFailureBudgetExceeded(ctx, 7, "ticket-1", "target-b", 2) + require.True(t, blocked, "post-migration budget is per (epoch, ticket); summed attempts cross threshold") + blocked, err = store.HasRecheckAttemptFailureBudgetExceeded(ctx, 7, "ticket-1", "target-b", 3) require.NoError(t, err) - require.False(t, blockedB) + require.True(t, blocked, "budget query for a different target on the same (epoch, ticket) reads the same row") + + // Increment via a third target — the per-(epoch, ticket) row is + // updated in-place (ON CONFLICT clause keys on (epoch, ticket)). + require.NoError(t, store.RecordRecheckAttemptFailure(ctx, 7, "ticket-1", "target-c", fmt.Errorf("nope"), time.Hour)) + var n int + require.NoError(t, db.QueryRow(`SELECT COUNT(*) FROM recheck_attempt_failures WHERE epoch_id=7 AND ticket_id='ticket-1'`).Scan(&n)) + require.Equal(t, 1, n, "RecordRecheckAttemptFailure must update the single (epoch, ticket) row, not create a per-target row") } diff --git a/pkg/storage/queries/recheck_test.go b/pkg/storage/queries/recheck_test.go index 87d76dde..eedf0cdd 100644 --- a/pkg/storage/queries/recheck_test.go +++ b/pkg/storage/queries/recheck_test.go @@ -13,12 +13,17 @@ import ( "github.com/stretchr/testify/require" ) -// TestRecheckSubmissionDedupPerTarget asserts the LEP-6 C2 fix: chain -// dedup is per-(epoch, ticket, target_account), so two distinct targets -// within the same (epoch, ticket) must produce two persisted rows. Before -// LEP-6 review fix, the PK was (epoch, ticket) and the second target's row was -// silently dropped — masking that supernode from chain N/R/D math. -func TestRecheckSubmissionDedupPerTarget(t *testing.T) { +// TestRecheckSubmissionDedupMatchesChainPerTicket is the PR286 F3 +// regression: chain replay protection in +// lumera x/audit/v1/keeper/msg_storage_truth.go:88-90 is keyed by +// (epoch, ticket, creator), NOT target. The local supernode is the +// implicit creator, so (epoch, ticket) is the right local dedup key. +// +// Two different targets within the same (epoch, ticket) must collapse to +// ONE persisted row; the second insert is silently ignored. The previous +// (epoch, ticket, target) PK was rejected by chain on every secondary +// target submit and confused local state. +func TestRecheckSubmissionDedupMatchesChainPerTicket(t *testing.T) { db := sqlx.MustConnect("sqlite3", ":memory:") defer db.Close() _, err := db.Exec(createStorageRecheckSubmissions) @@ -26,40 +31,46 @@ func TestRecheckSubmissionDedupPerTarget(t *testing.T) { store := &SQLiteStore{db: db} ctx := context.Background() - // Initially nothing is recorded for either target. + // Initially nothing is recorded. exists, err := store.HasRecheckSubmission(ctx, 7, "ticket-1", "target-a") require.NoError(t, err) require.False(t, exists) - // First target gets recorded. + // First target gets recorded. HasRecheckSubmission now returns true + // regardless of the target argument because chain dedup is per + // (epoch, ticket, creator). require.NoError(t, store.RecordRecheckSubmission(ctx, 7, "ticket-1", "target-a", "orig", "rh1", audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_PASS)) exists, err = store.HasRecheckSubmission(ctx, 7, "ticket-1", "target-a") require.NoError(t, err) require.True(t, exists) - - // Second target in the SAME (epoch, ticket) must also be recorded - // (this is the C2 fix — old behaviour silently dropped this row). - require.NoError(t, store.RecordRecheckSubmission(ctx, 7, "ticket-1", "target-b", "orig2", "rh2", audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_RECHECK_CONFIRMED_FAIL)) exists, err = store.HasRecheckSubmission(ctx, 7, "ticket-1", "target-b") - require.NoError(t, err) + require.NoError(t, err, "any target on the same (epoch, ticket) must read as already recorded — chain replay key is per (epoch, ticket, creator)") require.True(t, exists) - // Confirm both rows landed. + // Second target on the SAME (epoch, ticket) is silently ignored by + // the ON CONFLICT(epoch_id, ticket_id) DO NOTHING. RecordRecheckSubmission + // is back-compat idempotent so this returns nil. + require.NoError(t, store.RecordRecheckSubmission(ctx, 7, "ticket-1", "target-b", "orig2", "rh2", audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_RECHECK_CONFIRMED_FAIL)) + + // Confirm only the first-recorded row landed. var n int require.NoError(t, db.QueryRowContext(ctx, `SELECT COUNT(*) FROM storage_recheck_submissions WHERE epoch_id=? AND ticket_id=?`, 7, "ticket-1").Scan(&n)) - require.Equal(t, 2, n) + require.Equal(t, 1, n, "exactly one row per (epoch, ticket) to match chain replay semantics") + var retainedTarget, retainedRh string + require.NoError(t, db.QueryRowContext(ctx, `SELECT target_account, recheck_transcript_hash FROM storage_recheck_submissions WHERE epoch_id=? AND ticket_id=?`, 7, "ticket-1").Scan(&retainedTarget, &retainedRh)) + require.Equal(t, "target-a", retainedTarget, "first-recorded target wins") + require.Equal(t, "rh1", retainedRh, "first-recorded transcript wins") // Same ticket in a different epoch is intentionally a different replay key. exists, err = store.HasRecheckSubmission(ctx, 8, "ticket-1", "target-a") require.NoError(t, err) require.False(t, exists) - // Idempotent second-call on the same (epoch, ticket, target) is a no-op - // (ON CONFLICT DO NOTHING) — preserves first row. - require.NoError(t, store.RecordRecheckSubmission(ctx, 7, "ticket-1", "target-a", "orig", "rh1-DIFFERENT", audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_PASS)) - var rh string - require.NoError(t, db.QueryRowContext(ctx, `SELECT recheck_transcript_hash FROM storage_recheck_submissions WHERE epoch_id=? AND ticket_id=? AND target_account=?`, 7, "ticket-1", "target-a").Scan(&rh)) - require.Equal(t, "rh1", rh) + // RecordPendingRecheckSubmission on a (epoch, ticket) that already + // has a row surfaces the typed dedup error. + err = store.RecordPendingRecheckSubmission(ctx, 7, "ticket-1", "target-c", "orig3", "rh3", audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_HASH_MISMATCH) + require.Error(t, err) + require.True(t, errors.Is(err, ErrLEP6RecheckAlreadyRecorded), "second target on same (epoch, ticket) must surface ErrLEP6RecheckAlreadyRecorded") } // TestRecordPendingRecheckSubmission_DuplicateReturnsTypedError covers the diff --git a/supernode/host_reporter/service.go b/supernode/host_reporter/service.go index d8390046..c2902adc 100644 --- a/supernode/host_reporter/service.go +++ b/supernode/host_reporter/service.go @@ -16,6 +16,7 @@ import ( audittypes "github.com/LumeraProtocol/lumera/x/audit/v1/types" "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" "github.com/LumeraProtocol/supernode/v2/pkg/lumera" + "github.com/LumeraProtocol/supernode/v2/pkg/lumera/chainerrors" "github.com/LumeraProtocol/supernode/v2/pkg/reachability" statussvc "github.com/LumeraProtocol/supernode/v2/supernode/status" "github.com/cosmos/cosmos-sdk/crypto/keyring" @@ -184,23 +185,21 @@ func (s *Service) tick(ctx context.Context) { storageChallengeObservations := s.buildStorageChallengeObservations(tickCtx, epochID, assignResp.RequiredOpenPorts, assignResp.TargetSupernodeAccounts) var storageProofResults []*audittypes.StorageProofResult - if proofResultProvider := s.getProofResultProvider(); proofResultProvider != nil { + proofResultProvider := s.getProofResultProvider() + if proofResultProvider != nil { storageProofResults = proofResultProvider.CollectResults(epochID) mode, modeOK := s.storageTruthEnforcementMode(tickCtx) - if modeOK && mode != audittypes.StorageTruthEnforcementMode_STORAGE_TRUTH_ENFORCEMENT_MODE_UNSPECIFIED && len(assignResp.TargetSupernodeAccounts) > 0 && len(storageProofResults) == 0 { - logtrace.Warn(tickCtx, "epoch report skipped: waiting for LEP-6 storage proof results", logtrace.Fields{ - "epoch_id": epochID, - "assigned_targets": len(assignResp.TargetSupernodeAccounts), - "mode": mode.String(), - }) - return - } if modeOK && mode == audittypes.StorageTruthEnforcementMode_STORAGE_TRUTH_ENFORCEMENT_MODE_FULL { + // FULL mode is the only mode where the chain enforces compound + // storage-proof coverage (one RECENT + one OLD per assigned target). + // See lumera x/audit/v1/keeper/msg_submit_epoch_report.go:143 + // (enforceCompoundStorageProofs := mode == FULL). If our local + // drain doesn't satisfy that, we MUST skip this epoch and + // requeue the partial rows so the next tick can try again with + // a complete set. complete, reason := storageProofCoverageComplete(storageProofResults, assignResp.TargetSupernodeAccounts) if !complete { - if requeuer, ok := proofResultProvider.(ProofResultRequeuer); ok { - requeuer.RequeueResults(epochID, storageProofResults) - } + requeueProofResults(proofResultProvider, epochID, storageProofResults) logtrace.Warn(tickCtx, "epoch report skipped: incomplete FULL-mode storage proof coverage", logtrace.Fields{ "epoch_id": epochID, "assigned_targets": len(assignResp.TargetSupernodeAccounts), @@ -209,6 +208,21 @@ func (s *Service) tick(ctx context.Context) { }) return } + } else if modeOK && len(assignResp.TargetSupernodeAccounts) > 0 && len(storageProofResults) == 0 { + // SHADOW / SOFT / UNSPECIFIED: chain accepts empty StorageProofResults + // (only FULL enforces compound coverage). Submitting the host / + // peer-observation report is mandatory regardless — withholding it + // would feed audit_missing_reports and risk self-postponement + // (ConsecutiveEpochsToPostpone defaults to 1). The trade-off is + // that a same-epoch idempotency window can cause late-arriving + // proof rows to be rejected as duplicate; that is acceptable in + // observational modes because SHADOW/SOFT proofs do not affect + // scoring (LEP-6 PR286 review F1). + logtrace.Info(tickCtx, "epoch report: submitting in non-FULL mode with empty LEP-6 proof rows", logtrace.Fields{ + "epoch_id": epochID, + "assigned_targets": len(assignResp.TargetSupernodeAccounts), + "mode": mode.String(), + }) } } @@ -226,9 +240,25 @@ func (s *Service) tick(ctx context.Context) { // for existing diagnostics/tests. if _, err := s.lumera.AuditMsg().SubmitEpochReport(tickCtx, epochID, hostReport, storageChallengeObservations, storageProofResults); err != nil { - logtrace.Warn(tickCtx, "epoch report submit failed", logtrace.Fields{ - "epoch_id": epochID, - "error": err.Error(), + // LEP-6 PR286 review F2: CollectResults destructively drained the + // proof buffer. On submit failure we MUST decide whether those rows + // can ever be re-submitted: + // - chain duplicate (report for this epoch already accepted) → + // drained rows are stale; do not requeue, just log; + // - any other error (transient RPC / sequence / validation) → + // requeue so next tick can retry with the same proofs. + if chainerrors.IsEpochReportDuplicate(err) { + logtrace.Info(tickCtx, "epoch report submit returned chain duplicate; drained proof rows discarded", logtrace.Fields{ + "epoch_id": epochID, + "proof_results": len(storageProofResults), + }) + return + } + requeueProofResults(proofResultProvider, epochID, storageProofResults) + logtrace.Warn(tickCtx, "epoch report submit failed; drained proof rows requeued for next tick", logtrace.Fields{ + "epoch_id": epochID, + "proof_results": len(storageProofResults), + "error": err.Error(), }) return } @@ -248,6 +278,20 @@ func (s *Service) storageTruthEnforcementMode(ctx context.Context) (audittypes.S return paramsResp.Params.StorageTruthEnforcementMode, true } +// requeueProofResults returns the drained proof rows to the provider's +// buffer when the host reporter has decided not to ship them this tick +// (e.g. FULL-mode incomplete coverage or submit failure). Providers that +// don't implement ProofResultRequeuer silently drop the rows — same +// semantics as before requeueing was added. +func requeueProofResults(provider ProofResultProvider, epochID uint64, results []*audittypes.StorageProofResult) { + if provider == nil || len(results) == 0 { + return + } + if requeuer, ok := provider.(ProofResultRequeuer); ok { + requeuer.RequeueResults(epochID, results) + } +} + func storageProofCoverageComplete(results []*audittypes.StorageProofResult, targets []string) (bool, string) { if len(targets) == 0 { return true, "" diff --git a/supernode/host_reporter/tick_behavior_test.go b/supernode/host_reporter/tick_behavior_test.go index 309238de..4db639df 100644 --- a/supernode/host_reporter/tick_behavior_test.go +++ b/supernode/host_reporter/tick_behavior_test.go @@ -321,7 +321,24 @@ func TestTick_AttachedProofResultProviderIsDrainedAndForwarded(t *testing.T) { } } -func TestTick_SOFTModeWithAssignedTargetsWaitsForLEP6ProofResults(t *testing.T) { +// TestTick_SHADOWModeSubmitsEmptyProofs is the LEP-6 PR286 F1 regression: +// in SHADOW the chain only enforces compound proof coverage in FULL mode +// (see lumera x/audit/v1/keeper/msg_submit_epoch_report.go:143). The host +// reporter MUST submit the epoch report even when local LEP-6 proof rows +// are empty, otherwise it stops sending host/peer observations entirely +// and feeds the audit_missing_reports postponement path. +func TestTick_SHADOWModeSubmitsEmptyProofs(t *testing.T) { + testTickSubmitsEmptyProofsForMode(t, audittypes.StorageTruthEnforcementMode_STORAGE_TRUTH_ENFORCEMENT_MODE_SHADOW) +} + +// TestTick_SOFTModeSubmitsEmptyProofs covers the same F1 fix as SHADOW — +// SOFT is also an observational mode and chain accepts empty proof rows. +func TestTick_SOFTModeSubmitsEmptyProofs(t *testing.T) { + testTickSubmitsEmptyProofsForMode(t, audittypes.StorageTruthEnforcementMode_STORAGE_TRUTH_ENFORCEMENT_MODE_SOFT) +} + +func testTickSubmitsEmptyProofsForMode(t *testing.T, mode audittypes.StorageTruthEnforcementMode) { + t.Helper() ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -333,7 +350,7 @@ func TestTick_SOFTModeWithAssignedTargetsWaitsForLEP6ProofResults(t *testing.T) assigned: &audittypes.QueryAssignedTargetsResponse{ TargetSupernodeAccounts: []string{"snA"}, }, - params: audittypes.Params{StorageTruthEnforcementMode: audittypes.StorageTruthEnforcementMode_STORAGE_TRUTH_ENFORCEMENT_MODE_SOFT}, + params: audittypes.Params{StorageTruthEnforcementMode: mode}, } auditMsg := auditmsgmod.NewMockModule(ctrl) node := nodemod.NewMockModule(ctrl) @@ -348,8 +365,8 @@ func TestTick_SOFTModeWithAssignedTargetsWaitsForLEP6ProofResults(t *testing.T) provider := &stubProofResultProvider{} auditMsg.EXPECT().SubmitEpochReport(gomock.Any(), uint64(13), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( func(_ context.Context, _ uint64, _ audittypes.HostReport, _ []*audittypes.StorageChallengeObservation, proofs []*audittypes.StorageProofResult) (*sdktx.BroadcastTxResponse, error) { - if len(proofs) != 1 || proofs[0].TicketId != "ticket-13" { - t.Fatalf("expected delayed proof result to be submitted, got %+v", proofs) + if len(proofs) != 0 { + t.Fatalf("expected empty proof results in mode %s, got %d", mode, len(proofs)) } return &sdktx.BroadcastTxResponse{}, nil }, @@ -360,24 +377,122 @@ func TestTick_SOFTModeWithAssignedTargetsWaitsForLEP6ProofResults(t *testing.T) t.Fatalf("new service: %v", err) } svc.SetProofResultProvider(provider) - - // First tick models the production race: host_reporter fires before the - // LEP-6 dispatcher appends same-epoch proof results. It must not submit an - // empty storage_proof_results report because that report is idempotent and - // would permanently block the later proof rows for this epoch. + svc.dialTimeout = 10 * time.Millisecond svc.tick(context.Background()) - provider.results = []*audittypes.StorageProofResult{{ + if len(provider.requeuedEpochs) != 0 { + t.Fatalf("expected no requeue when proofs were submitted (empty is fine in %s mode), got %v", mode, provider.requeuedEpochs) + } +} + +// TestTick_SubmitFailureRequeuesProofResults covers the LEP-6 PR286 F2 +// regression: CollectResults destructively drains the proof buffer; if +// SubmitEpochReport then fails with anything other than a chain duplicate, +// the drained rows MUST be requeued so the next tick can retry them. +func TestTick_SubmitFailureRequeuesProofResults(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + kr, keyName, identity := testKeyringAndIdentity(t) + auditMod := &stubAuditModule{ + currentEpoch: &audittypes.QueryCurrentEpochResponse{EpochId: 14}, + anchor: &audittypes.QueryEpochAnchorResponse{Anchor: audittypes.EpochAnchor{EpochId: 14}}, + epochReportErr: status.Error(codes.NotFound, "not found"), + assigned: &audittypes.QueryAssignedTargetsResponse{ + TargetSupernodeAccounts: []string{"snA"}, + }, + // SHADOW so an empty-coverage drain still reaches Submit (FULL would + // short-circuit on the coverage gate). + params: audittypes.Params{StorageTruthEnforcementMode: audittypes.StorageTruthEnforcementMode_STORAGE_TRUTH_ENFORCEMENT_MODE_SHADOW}, + } + auditMsg := auditmsgmod.NewMockModule(ctrl) + node := nodemod.NewMockModule(ctrl) + sn := supernodemod.NewMockModule(ctrl) + client := lumeraMock.NewMockClient(ctrl) + client.EXPECT().Audit().AnyTimes().Return(auditMod) + client.EXPECT().AuditMsg().AnyTimes().Return(auditMsg) + client.EXPECT().SuperNode().AnyTimes().Return(sn) + client.EXPECT().Node().AnyTimes().Return(node) + sn.EXPECT().GetSupernodeWithLatestAddress(gomock.Any(), "snA").AnyTimes().Return(&supernodemod.SuperNodeInfo{LatestAddress: "127.0.0.1:4444"}, nil) + + drained := []*audittypes.StorageProofResult{{ TargetSupernodeAccount: "snA", - BucketType: audittypes.StorageProofBucketType_STORAGE_PROOF_BUCKET_TYPE_RECENT, - TicketId: "ticket-13", - TranscriptHash: "hash-13", - ResultClass: audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_PASS, + TicketId: "ticket-14", + TranscriptHash: "hash-14", }} + provider := &stubProofResultProvider{results: drained} + + auditMsg.EXPECT().SubmitEpochReport(gomock.Any(), uint64(14), gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil, errors.New("rpc unavailable: connect: connection refused")). + Times(1) + + svc, err := NewService(identity, client, kr, keyName, "", "") + if err != nil { + t.Fatalf("new service: %v", err) + } + svc.SetProofResultProvider(provider) + svc.dialTimeout = 10 * time.Millisecond + svc.tick(context.Background()) + + if len(provider.requeuedEpochs) != 1 || provider.requeuedEpochs[0] != 14 { + t.Fatalf("expected drained proofs requeued on submit failure for epoch 14, got %v", provider.requeuedEpochs) + } + if len(provider.results) != 1 || provider.results[0].TicketId != "ticket-14" { + t.Fatalf("expected requeued proof rows preserved verbatim, got %+v", provider.results) + } +} + +// TestTick_DuplicateReportErrorDoesNotRequeue ensures that when chain +// returns ErrDuplicateReport (report already submitted for this epoch), +// the drained proof rows are NOT requeued — they are stale and another +// submit would just be rejected again. This is the "do not requeue stale +// rows" branch of the F2 fix. +func TestTick_DuplicateReportErrorDoesNotRequeue(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + kr, keyName, identity := testKeyringAndIdentity(t) + auditMod := &stubAuditModule{ + currentEpoch: &audittypes.QueryCurrentEpochResponse{EpochId: 15}, + anchor: &audittypes.QueryEpochAnchorResponse{Anchor: audittypes.EpochAnchor{EpochId: 15}}, + epochReportErr: status.Error(codes.NotFound, "not found"), + assigned: &audittypes.QueryAssignedTargetsResponse{ + TargetSupernodeAccounts: []string{"snA"}, + }, + params: audittypes.Params{StorageTruthEnforcementMode: audittypes.StorageTruthEnforcementMode_STORAGE_TRUTH_ENFORCEMENT_MODE_SHADOW}, + } + auditMsg := auditmsgmod.NewMockModule(ctrl) + node := nodemod.NewMockModule(ctrl) + sn := supernodemod.NewMockModule(ctrl) + client := lumeraMock.NewMockClient(ctrl) + client.EXPECT().Audit().AnyTimes().Return(auditMod) + client.EXPECT().AuditMsg().AnyTimes().Return(auditMsg) + client.EXPECT().SuperNode().AnyTimes().Return(sn) + client.EXPECT().Node().AnyTimes().Return(node) + sn.EXPECT().GetSupernodeWithLatestAddress(gomock.Any(), "snA").AnyTimes().Return(&supernodemod.SuperNodeInfo{LatestAddress: "127.0.0.1:4444"}, nil) + + provider := &stubProofResultProvider{results: []*audittypes.StorageProofResult{{ + TargetSupernodeAccount: "snA", + TicketId: "ticket-15", + TranscriptHash: "hash-15", + }}} + + // Match the chain phrase from lumera x/audit/v1/keeper/msg_submit_epoch_report.go:142. + dupErr := errors.New("report already submitted for this epoch") + auditMsg.EXPECT().SubmitEpochReport(gomock.Any(), uint64(15), gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil, dupErr). + Times(1) + + svc, err := NewService(identity, client, kr, keyName, "", "") + if err != nil { + t.Fatalf("new service: %v", err) + } + svc.SetProofResultProvider(provider) + svc.dialTimeout = 10 * time.Millisecond svc.tick(context.Background()) - if len(provider.queriedEpochs) != 2 || provider.queriedEpochs[0] != 13 || provider.queriedEpochs[1] != 13 { - t.Fatalf("expected provider queried twice for epoch 13, got %v", provider.queriedEpochs) + if len(provider.requeuedEpochs) != 0 { + t.Fatalf("expected NO requeue on chain-duplicate response, got %v", provider.requeuedEpochs) } } diff --git a/supernode/recheck/finder.go b/supernode/recheck/finder.go index d9093f15..3be3fd5e 100644 --- a/supernode/recheck/finder.go +++ b/supernode/recheck/finder.go @@ -105,11 +105,15 @@ func (f *Finder) Find(ctx context.Context) ([]Candidate, error) { if !c.Valid() || c.TargetAccount == f.self || c.OriginalReporter == f.self { continue } - // C2 fix: chain dedup is per-(epoch, ticket, target) — multi- - // target candidates within the same (epoch, ticket) must each - // produce a separate recheck. Key the seen map and the - // HasRecheckSubmission lookup on the full triple. - key := fmt.Sprintf("%d/%s/%s", c.EpochID, c.TicketID, c.TargetAccount) + // PR286 F3 fix: chain dedup is per (epoch, ticket, creator) — + // see lumera x/audit/v1/keeper/msg_storage_truth.go:88-90. + // Multiple target candidates within the same (epoch, ticket) + // can only produce ONE chain-accepted recheck per local + // creator (this supernode). Collapse to a deterministic + // choice using the existing (TicketID, TargetAccount) sort + // above — the first target wins lex-smallest, the rest are + // silently dropped by the seen-key dedup below. + key := fmt.Sprintf("%d/%s", c.EpochID, c.TicketID) if _, ok := seen[key]; ok { continue } diff --git a/supernode/recheck/finder_service_test.go b/supernode/recheck/finder_service_test.go index 35c78140..ecb82dab 100644 --- a/supernode/recheck/finder_service_test.go +++ b/supernode/recheck/finder_service_test.go @@ -112,7 +112,7 @@ func TestService_TickModeGateAndSubmit(t *testing.T) { func TestService_TickSkipsRecheckWhenFailureBudgetExhausted(t *testing.T) { ctx := context.Background() store := newMemoryStore() - store.failures[failureKey(10, "t", "target")] = 2 + store.failures[failureKey(10, "t")] = 2 msg := &recordingAuditMsg{} a := &stubAudit{current: 10, mode: audittypes.StorageTruthEnforcementMode_STORAGE_TRUTH_ENFORCEMENT_MODE_FULL, reports: map[uint64]audittypes.EpochReport{10: {StorageProofResults: []*audittypes.StorageProofResult{resFrom("peer", "t", "target", "h", audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_HASH_MISMATCH)}}}} r := &stubRechecker{result: RecheckResult{TranscriptHash: "rh", ResultClass: audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_PASS}} diff --git a/supernode/recheck/recheck_regression_test.go b/supernode/recheck/recheck_regression_test.go index cd90a04a..2bcba5a2 100644 --- a/supernode/recheck/recheck_regression_test.go +++ b/supernode/recheck/recheck_regression_test.go @@ -9,12 +9,16 @@ import ( "github.com/stretchr/testify/require" ) -// TestAttestor_MultiTargetSameTicketBothPersist is the LEP-6 C2 -// regression test Matee called out: two distinct targets within the same -// (epoch, ticket) must each produce a persisted dedup row and a chain -// submit. The previous PK collapsed both into one row and dropped the -// second submit. -func TestAttestor_MultiTargetSameTicketBothPersist(t *testing.T) { +// TestAttestor_MultiTargetSameTicketCollapsesToOne is the PR286 F3 +// regression: chain replay protection is per (epoch, ticket, creator), +// NOT target. Two distinct target candidates within the same +// (epoch, ticket) submitted by the same local creator must collapse — +// the first lands, the second is rejected by the local dedup (mirroring +// what chain would do) and produces no second chain submit. +// +// This replaces the pre-F3 TestAttestor_MultiTargetSameTicketBothPersist, +// which asserted the buggy per-target behavior. +func TestAttestor_MultiTargetSameTicketCollapsesToOne(t *testing.T) { callSeq = 0 ctx := context.Background() store := newMemoryStore() @@ -33,20 +37,26 @@ func TestAttestor_MultiTargetSameTicketBothPersist(t *testing.T) { } result := RecheckResult{TranscriptHash: "recheck-hash", ResultClass: audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_PASS, Details: "ok"} + // First target lands; second target on same (epoch, ticket) collides + // with the existing local dedup row. The attestor treats the + // ErrLEP6RecheckAlreadyRecorded surface as "another tick already + // handled this candidate", emits a stage_dedup metric, and returns + // nil — no second chain submit. require.NoError(t, a.Submit(ctx, mk("target-a"), result)) - require.NoError(t, a.Submit(ctx, mk("target-b"), result)) + require.NoError(t, a.Submit(ctx, mk("target-b"), result), "second target must be a no-op via stage_dedup") - // Both targets must be persisted in the dedup store. - exA, err := store.HasRecheckSubmission(ctx, 7, "ticket-1", "target-a") + // Local dedup row exists; checking via either target returns true + // because the key is (epoch, ticket). + ex, err := store.HasRecheckSubmission(ctx, 7, "ticket-1", "target-a") require.NoError(t, err) - require.True(t, exA, "target-a must be persisted") - exB, err := store.HasRecheckSubmission(ctx, 7, "ticket-1", "target-b") + require.True(t, ex, "(epoch, ticket) row must be persisted") + ex, err = store.HasRecheckSubmission(ctx, 7, "ticket-1", "target-b") require.NoError(t, err) - require.True(t, exB, "target-b must be persisted (C2 regression)") + require.True(t, ex, "HasRecheckSubmission must read positive for any target on same (epoch, ticket) — chain replay key is per (epoch, ticket, creator)") - // Both must have produced a chain submit. - require.Len(t, msg.calls, 2) - require.NotEqual(t, msg.calls[0].target, msg.calls[1].target) + // Exactly one chain submit fired — the first one. + require.Len(t, msg.calls, 1, "PR286 F3: chain accepts one recheck per (epoch, ticket, creator); only one local submit must fire") + require.Equal(t, "target-a", msg.calls[0].target) } // fakeReporterErrAudit is a stub that fails for "reporter-bad" and returns diff --git a/supernode/recheck/test_helpers_test.go b/supernode/recheck/test_helpers_test.go index 970656f4..a70f8b41 100644 --- a/supernode/recheck/test_helpers_test.go +++ b/supernode/recheck/test_helpers_test.go @@ -7,6 +7,7 @@ import ( "time" audittypes "github.com/LumeraProtocol/lumera/x/audit/v1/types" + "github.com/LumeraProtocol/supernode/v2/pkg/storage/queries" sdktx "github.com/cosmos/cosmos-sdk/types/tx" ) @@ -27,46 +28,60 @@ func newMemoryStore() *memoryStore { return &memoryStore{seen: map[string]bool{}, failures: map[string]int{}} } func (m *memoryStore) HasRecheckSubmission(_ context.Context, epochID uint64, ticketID, targetAccount string) (bool, error) { - return m.seen[key(epochID, ticketID, targetAccount)], nil + _ = targetAccount + return m.seen[key(epochID, ticketID)], nil } func (m *memoryStore) RecordPendingRecheckSubmission(_ context.Context, epochID uint64, ticketID, targetAccount, challengedTranscriptHash, recheckTranscriptHash string, resultClass audittypes.StorageProofResultClass) error { + _ = targetAccount callSeq++ m.recordCallIndex = callSeq - k := key(epochID, ticketID, targetAccount) + k := key(epochID, ticketID) if m.seen[k] { - // Match production SQLite ON CONFLICT DO NOTHING semantics. - return nil + // Match production SQLite ON CONFLICT(epoch_id, ticket_id) DO NOTHING. + // PR286 F3: a second target on the same (epoch, ticket) collides + // with the existing row; the caller treats this as + // ErrLEP6RecheckAlreadyRecorded. + return queries.ErrLEP6RecheckAlreadyRecorded } m.seen[k] = true return nil } func (m *memoryStore) MarkRecheckSubmissionSubmitted(_ context.Context, epochID uint64, ticketID, targetAccount string) error { - m.seen[key(epochID, ticketID, targetAccount)] = true + _ = targetAccount + m.seen[key(epochID, ticketID)] = true return nil } func (m *memoryStore) DeletePendingRecheckSubmission(_ context.Context, epochID uint64, ticketID, targetAccount string) error { - delete(m.seen, key(epochID, ticketID, targetAccount)) + _ = targetAccount + delete(m.seen, key(epochID, ticketID)) return nil } func (m *memoryStore) RecordRecheckSubmission(_ context.Context, epochID uint64, ticketID, targetAccount, challengedTranscriptHash, recheckTranscriptHash string, resultClass audittypes.StorageProofResultClass) error { + _ = targetAccount callSeq++ m.recordCallIndex = callSeq - m.seen[key(epochID, ticketID, targetAccount)] = true + m.seen[key(epochID, ticketID)] = true return nil } func (m *memoryStore) RecordRecheckAttemptFailure(_ context.Context, epochID uint64, ticketID, targetAccount string, err error, ttl time.Duration) error { - m.failures[failureKey(epochID, ticketID, targetAccount)]++ + _ = targetAccount + m.failures[failureKey(epochID, ticketID)]++ return nil } func (m *memoryStore) HasRecheckAttemptFailureBudgetExceeded(_ context.Context, epochID uint64, ticketID, targetAccount string, maxAttempts int) (bool, error) { - return maxAttempts > 0 && m.failures[failureKey(epochID, ticketID, targetAccount)] >= maxAttempts, nil + _ = targetAccount + return maxAttempts > 0 && m.failures[failureKey(epochID, ticketID)] >= maxAttempts, nil } func (m *memoryStore) PurgeExpiredRecheckAttemptFailures(_ context.Context) error { return nil } -func key(epochID uint64, ticketID, targetAccount string) string { - return fmt.Sprintf("%d/%s/%s", epochID, ticketID, targetAccount) -} -func failureKey(epochID uint64, ticketID, targetAccount string) string { - return fmt.Sprintf("%d/%s/%s", epochID, ticketID, targetAccount) + +// PR286 F3: dedup keys are (epoch, ticket) to match chain replay +// protection (epoch, ticket, creator), with creator implicit (this +// supernode). +func key(epochID uint64, ticketID string) string { + return fmt.Sprintf("%d/%s", epochID, ticketID) +} +func failureKey(epochID uint64, ticketID string) string { + return fmt.Sprintf("%d/%s", epochID, ticketID) } type recordingAuditMsg struct {