diff --git a/apps/elf-mcp/src/server.rs b/apps/elf-mcp/src/server.rs index 8e98e4a..9cce54f 100644 --- a/apps/elf-mcp/src/server.rs +++ b/apps/elf-mcp/src/server.rs @@ -1143,7 +1143,44 @@ fn docs_put_schema() -> Arc { "repo": { "type": "string" }, "commit_sha": { "type": "string" }, "pr_number": { "type": "integer" }, - "issue_number": { "type": "integer" } + "issue_number": { "type": "integer" }, + "source_kind": { + "type": "string", + "enum": ["article", "social_thread", "pdf", "text_export", "repo_file", "chat_excerpt", "web_page"] + }, + "canonical_uri": { "type": "string" }, + "captured_at": { "type": "string", "format": "date-time" }, + "source_created_at": { "type": "string", "format": "date-time" }, + "trust_label": { + "type": "string", + "enum": ["trusted", "user_captured", "public_web", "third_party", "unverified"] + }, + "author": { "type": "string" }, + "handle": { "type": "string" }, + "source_content_hash": { "type": "string" }, + "excerpt_locator": { + "type": "object", + "additionalProperties": true, + "properties": { + "quote": { + "type": "object", + "required": ["exact"], + "properties": { + "exact": { "type": "string" }, + "prefix": { "type": "string" }, + "suffix": { "type": "string" } + } + }, + "position": { + "type": "object", + "required": ["start", "end"], + "properties": { + "start": { "type": "integer" }, + "end": { "type": "integer" } + } + } + } + } }, "allOf": [ { @@ -2287,6 +2324,11 @@ mod tests { } let write_policy = properties.get("write_policy").and_then(serde_json::Value::as_object); + let source_ref_properties = properties + .get("source_ref") + .and_then(|value| value.get("properties")) + .and_then(serde_json::Value::as_object) + .expect("docs_put source_ref schema is missing properties."); assert!( write_policy.is_some_and(|field| { @@ -2297,6 +2339,15 @@ mod tests { }), "Missing write_policy object/null type in docs_put schema." ); + + for field in + ["source_kind", "canonical_uri", "captured_at", "trust_label", "excerpt_locator"] + { + assert!( + source_ref_properties.contains_key(field), + "Missing source_ref field: {field}." + ); + } } #[test] diff --git a/docs/spec/system_doc_source_ref_v1.md b/docs/spec/system_doc_source_ref_v1.md index 0249b7a..0173807 100644 --- a/docs/spec/system_doc_source_ref_v1.md +++ b/docs/spec/system_doc_source_ref_v1.md @@ -6,12 +6,15 @@ resource: docs/spec/system_doc_source_ref_v1.md status: active authority: normative owner: spec -last_verified: 2026-06-20 +last_verified: 2026-06-22 tags: - docs - spec source_refs: [] -code_refs: [] +code_refs: + - apps/elf-mcp/src/server.rs + - packages/elf-service/src/docs.rs + - packages/elf-storage/src/docs.rs related: [] drift_watch: - docs/spec/system_doc_source_ref_v1.md @@ -189,6 +192,71 @@ Boundary: create or mutate durable Memory Notes unless the caller separately invokes an explicit memory-write or reviewed promotion path. +Normalized capture output: + +- `docs_put` MUST return `source_capture.schema = "doc_source_capture/v1"`. +- `source_capture.source_record_id` MUST equal the stored `doc_documents.doc_id`. +- `source_capture.origin` MUST be the canonical source origin used for operator + inspection and deduplication. Source Library `canonical_uri` takes precedence + over legacy URL, URI, thread, search, or repo-derived origins. +- `source_capture.captured_at` MUST be the Source Library `captured_at` value + when present. If the Source Library profile is not active, the service may use + the service capture timestamp. +- `source_capture.content_hash` MUST be the BLAKE3 hex hash of the persisted + document content after write-policy transforms. +- `source_capture.visibility_scope` MUST be the document scope. +- `source_capture.title` SHOULD be copied from the request title when present. +- `source_capture.source_type` MUST be `source_kind` when present, otherwise the + normalized `doc_type`. +- `source_capture.source_spans` MUST list stable span references for persisted + chunks. +- `source_capture.policy_spans` MUST list excluded or redacted spans when + write-policy hooks remove or transform source content. + +Stable source records and spans: + +- `doc_documents.doc_id` is the Source Library source record id for captured + docs. It MUST be deterministic for the same tenant, effective project, agent, + scope, doc type, source identity, and persisted content hash. +- Persisted chunk ids MUST be deterministic for the same source record id and + chunk index. +- Captured source span ids MUST be deterministic for the same persisted content + hash, byte offsets, and span status. +- Captured span offsets are byte offsets into the persisted document content. +- Policy span offsets are byte offsets into the original request content before + write-policy transforms. + +`doc_source_span/v1` fields: + +- `schema` (string): exact value `doc_source_span/v1`. +- `span_id` (string UUID): stable span identifier. +- `chunk_id` (string UUID, optional): present for persisted captured chunks. +- `status` (string): `captured`, `excluded`, or `redacted`. +- `reason_code` (string, optional): required for non-captured spans. +- `start_offset` and `end_offset` (integers): byte offsets, with + `start_offset <= end_offset`. +- `content_hash` (string): BLAKE3 hex hash for the content the offsets address. +- `chunk_hash` (string, optional): BLAKE3 hex hash for captured chunk text. + +Typed policy span reasons: + +- Excluded spans MUST use `reason_code = "WRITE_POLICY_EXCLUSION"`. +- Redacted spans MUST use `reason_code = "WRITE_POLICY_REDACTION"`. +- Unsupported or policy-removed content MUST be represented through a typed span + reason or a typed validation error. It MUST NOT disappear silently from Source + Library audit surfaces. + +Persisted normalized `source_ref`: + +- The stored `doc_documents.source_ref` MUST retain the caller-provided + `doc_source_ref/v1` fields and add normalized capture fields: + `source_record_id`, `origin`, `captured_at`, `content_hash`, + `visibility_scope`, `source_type`, and `source_spans`. +- When policy spans exist, stored `doc_documents.source_ref` MUST include + `policy_spans`. +- Normalized capture fields are evidence metadata only. They MUST NOT promote a + source record into approved Memory Authority. + ================================================== 6) Examples ================================================== diff --git a/docs/spec/system_source_ref_doc_pointer_v1.md b/docs/spec/system_source_ref_doc_pointer_v1.md index fcf85e4..7bc36df 100644 --- a/docs/spec/system_source_ref_doc_pointer_v1.md +++ b/docs/spec/system_source_ref_doc_pointer_v1.md @@ -6,12 +6,13 @@ resource: docs/spec/system_source_ref_doc_pointer_v1.md status: active authority: normative owner: spec -last_verified: 2026-06-20 +last_verified: 2026-06-22 tags: - docs - spec source_refs: [] -code_refs: [] +code_refs: + - packages/elf-service/src/docs.rs related: [] drift_watch: - docs/spec/system_source_ref_doc_pointer_v1.md @@ -86,10 +87,16 @@ All keys and string values SHOULD be ASCII-safe and stable over time. `ref` MAY include: - `chunk_id` (string): UUID of a specific chunk. Use when the pointer came from `docs_search_l0`. +- `source_record_id` (string): stable Source Library source record id. For Doc + Extension v1 this MUST match `doc_id`. +- `source_span_id` (string): stable Source Library span id for the returned + chunk or selector span. Notes: - `doc_id` is the canonical lookup key for hydration. - `chunk_id` is an optional anchor that can help choose a small search neighborhood. +- `source_record_id` and `source_span_id` are audit identifiers. They MUST NOT + be treated as a memory-promotion signal. ### 3.3 `state` (optional but recommended) @@ -128,11 +135,16 @@ Rules: Optional fields: - `level` (string): `"L0"`, `"L1"` or `"L2"` as a suggested excerpt size tier for hydration. If omitted, agents should choose based on context budget. +- `span_id` (string): stable Source Library span id for the selector span. `docs_search_l0` returns a `locator.position` selector for the hit chunk. Agents may pass this selector, the returned `ref.chunk_id`, or their own quote selector to `docs_excerpts_get` for verified hydration. +`docs_search_l0` MUST return `ref.source_span_id` equal to `locator.span_id` for +the selected chunk span. `docs_excerpts_get` MUST return `locator.span_id` for +the matched quote, position, or chunk selector span. + ### 3.5 `hashes` (optional) `hashes` MAY include: @@ -208,7 +220,9 @@ The agent SHOULD: "resolver": "elf_doc_ext/v1", "ref": { "doc_id": "6b5b2f08-9a89-4c6c-9b6b-9c0c2f0b1f2d", - "chunk_id": "b2e8a8d2-4c10-4a1b-98f8-7a8702fd0cc1" + "chunk_id": "b2e8a8d2-4c10-4a1b-98f8-7a8702fd0cc1", + "source_record_id": "6b5b2f08-9a89-4c6c-9b6b-9c0c2f0b1f2d", + "source_span_id": "3190ca88-6f24-5d55-bf8f-9cecfba95b72" }, "state": { "content_hash": "baf7cfd2d5b71f5b0f5d5a08a3c38d7b43cf7a2e5a4f75d5c1b4a9072f6dd3b8", @@ -219,6 +233,7 @@ The agent SHOULD: "chunk_hash": "bd85b0e07464bde3a7f3a2b2f3c2d5d4c1c9f0d0c1a2b3c4d5e6f7a8b9c0d1e2" }, "locator": { + "span_id": "3190ca88-6f24-5d55-bf8f-9cecfba95b72", "position": { "start": 128, "end": 384 diff --git a/packages/elf-service/src/docs.rs b/packages/elf-service/src/docs.rs index 11d968c..c79c945 100644 --- a/packages/elf-service/src/docs.rs +++ b/packages/elf-service/src/docs.rs @@ -45,6 +45,8 @@ const DEFAULT_L2_MAX_BYTES: usize = 32 * 1_024; const DOC_RETRIEVAL_TRAJECTORY_SCHEMA_V1: &str = "doc_retrieval_trajectory/v1"; const DOC_SOURCE_REF_SCHEMA_V1: &str = "source_ref/v1"; const DOC_SOURCE_REF_RESOLVER_V1: &str = "elf_doc_ext/v1"; +const DOC_SOURCE_CAPTURE_SCHEMA_V1: &str = "doc_source_capture/v1"; +const DOC_SOURCE_SPAN_SCHEMA_V1: &str = "doc_source_span/v1"; const DOC_STATUSES: [&str; 2] = ["active", "deleted"]; const SOURCE_LIBRARY_FIELD_KEYS: [&str; 9] = [ "source_kind", @@ -129,6 +131,8 @@ pub struct DocsPutRequest { pub struct DocsPutResponse { /// Identifier of the stored document. pub doc_id: Uuid, + /// Normalized Source Library capture metadata for the stored document. + pub source_capture: DocsSourceCaptureSummary, /// Number of persisted chunks generated from the content. pub chunk_count: u32, /// Byte length of the stored content. @@ -140,6 +144,59 @@ pub struct DocsPutResponse { pub write_policy_audit: Option, } +/// Normalized Source Library capture metadata returned by `docs_put`. +#[derive(Clone, Debug, Serialize)] +pub struct DocsSourceCaptureSummary { + /// Schema identifier for this capture summary. + pub schema: String, + /// Stable source record identifier. This is also the stored `doc_id`. + pub source_record_id: Uuid, + /// Canonical source origin used for operator inspection and deduplication. + pub origin: String, + /// RFC3339 timestamp when ELF captured the source. + pub captured_at: String, + /// Whole-document BLAKE3 hash for the persisted content. + pub content_hash: String, + /// Visibility scope assigned to the source record. + pub visibility_scope: String, + #[serde(skip_serializing_if = "Option::is_none")] + /// Optional display title associated with the source record. + pub title: Option, + /// Normalized source type, derived from `source_kind` when present. + pub source_type: String, + /// Stable span references for persisted source chunks. + pub source_spans: Vec, + #[serde(skip_serializing_if = "Vec::is_empty")] + /// Typed audit records for redacted or excluded source spans. + pub policy_spans: Vec, +} + +/// Stable reference to one captured or policy-affected source span. +#[derive(Clone, Debug, Serialize)] +pub struct DocsSourceSpanRef { + /// Schema identifier for this span reference. + pub schema: String, + /// Stable span identifier derived from content hash and byte offsets. + pub span_id: Uuid, + #[serde(skip_serializing_if = "Option::is_none")] + /// Chunk identifier when this span is backed by a persisted chunk. + pub chunk_id: Option, + /// Span lifecycle status such as `captured`, `excluded`, or `redacted`. + pub status: String, + #[serde(skip_serializing_if = "Option::is_none")] + /// Typed reason code for non-captured spans. + pub reason_code: Option, + /// Inclusive start byte offset in the relevant content hash. + pub start_offset: usize, + /// Exclusive end byte offset in the relevant content hash. + pub end_offset: usize, + /// Whole-content hash that makes the offsets replayable. + pub content_hash: String, + #[serde(skip_serializing_if = "Option::is_none")] + /// Chunk hash when this span is backed by a persisted chunk. + pub chunk_hash: Option, +} + /// Request payload for document metadata lookup. #[derive(Clone, Debug, Deserialize)] pub struct DocsGetRequest { @@ -297,6 +354,10 @@ pub struct DocsSearchL0ItemReference { pub doc_id: Uuid, /// Chunk identifier. pub chunk_id: Uuid, + /// Stable source record identifier. + pub source_record_id: Uuid, + /// Stable source span identifier for this chunk. + pub source_span_id: Uuid, } /// Freshness guard for a document-search hit. @@ -323,6 +384,8 @@ pub struct DocsSearchL0ItemHashes { /// Locator hints carried with a document-search pointer. #[derive(Clone, Debug, Serialize)] pub struct DocsSearchL0ItemLocator { + /// Stable source span identifier for the locator. + pub span_id: Uuid, /// Chunk byte position in the authoritative document content. pub position: TextPositionSelector, } @@ -430,6 +493,8 @@ pub struct DocsExcerptResponse { /// Selector resolution metadata for an excerpt. #[derive(Clone, Debug, Serialize)] pub struct DocsExcerptLocator { + /// Stable source span identifier for the matched selector span. + pub span_id: Uuid, /// Selector kind that produced the match. pub selector_kind: String, /// Inclusive start offset of the matched selector span. @@ -447,6 +512,19 @@ pub struct DocsExcerptLocator { pub position: Option, } +struct SourceCaptureSummaryInput<'a> { + doc_id: Uuid, + source_ref: &'a Map, + doc_type: DocType, + scope: &'a str, + title: Option<&'a str>, + content_hash: &'a str, + raw_content_hash: &'a str, + now: OffsetDateTime, + chunks: &'a [DocChunk], + write_policy_audit: Option<&'a WritePolicyAudit>, +} + #[derive(Clone, Copy)] struct DocExcerptMatch { selector_kind: ExcerptsSelectorKind, @@ -592,21 +670,57 @@ impl ElfService { let ValidatedDocsPut { doc_type, content, write_policy_audit } = validate_docs_put(&req)?; let now = OffsetDateTime::now_utc(); let embed_version = crate::embedding_version(&self.cfg); - let DocsPutRequest { tenant_id, project_id, agent_id, scope, title, source_ref, .. } = req; let chunking_profile = resolve_doc_chunking_profile(doc_type); let tokenizer = load_tokenizer(&self.cfg)?; + let tenant_id = req.tenant_id.clone(); + let project_id = req.project_id.clone(); + let agent_id = req.agent_id.clone(); + let scope = req.scope.clone(); + let title = req.title.clone(); + let source_ref = req.source_ref.clone(); + let source_ref_map = source_ref.as_object().ok_or_else(|| Error::InvalidRequest { + message: "source_ref must be a JSON object.".to_string(), + })?; let effective_project_id = if scope.trim() == "org_shared" { ORG_PROJECT_ID } else { project_id.as_str() }; let content_bytes = content.len(); - let content_hash = blake3::hash(content.as_bytes()); - let doc_id = Uuid::new_v4(); - let chunks = split_tokens_by_offsets( + let content_hash = blake3::hash(content.as_bytes()).to_hex().to_string(); + let raw_content_hash = blake3::hash(req.content.as_bytes()).to_hex().to_string(); + let doc_id = source_record_id_for( + tenant_id.as_str(), + effective_project_id, + agent_id.as_str(), + scope.as_str(), + doc_type, + source_ref_map, + content_hash.as_str(), + ); + let mut chunks = split_tokens_by_offsets( content.as_str(), chunking_profile.max_tokens, chunking_profile.overlap_tokens, chunking_profile.max_chunks, &tokenizer, )?; + + for (chunk_index, chunk) in chunks.iter_mut().enumerate() { + chunk.chunk_id = doc_chunk_id_for(doc_id, chunk_index as i32); + } + + let chunk_rows = build_doc_chunk_rows(doc_id, &chunks, now); + let source_capture = build_source_capture_summary(SourceCaptureSummaryInput { + doc_id, + source_ref: source_ref_map, + doc_type, + scope: scope.as_str(), + title: title.as_deref(), + content_hash: content_hash.as_str(), + raw_content_hash: raw_content_hash.as_str(), + now, + chunks: &chunk_rows, + write_policy_audit: write_policy_audit.as_ref(), + })?; + let normalized_source_ref = normalize_source_ref_for_capture(source_ref, &source_capture)?; let doc_row = DocDocument { doc_id, tenant_id: tenant_id.clone(), @@ -616,10 +730,10 @@ impl ElfService { doc_type: doc_type.as_str().to_string(), status: "active".to_string(), title, - source_ref: docs::normalize_source_ref(Some(source_ref)), + source_ref: docs::normalize_source_ref(Some(normalized_source_ref)), content, content_bytes: content_bytes as i32, - content_hash: content_hash.to_hex().to_string(), + content_hash: content_hash.clone(), created_at: now, updated_at: now, }; @@ -627,20 +741,8 @@ impl ElfService { docs::insert_doc_document(&mut *tx, &doc_row).await?; - for (chunk_index, chunk) in chunks.iter().enumerate() { - let chunk_hash = blake3::hash(chunk.text.as_bytes()); - let chunk_row = DocChunk { - chunk_id: chunk.chunk_id, - doc_id, - chunk_index: chunk_index as i32, - start_offset: chunk.start_offset as i32, - end_offset: chunk.end_offset as i32, - chunk_text: chunk.text.clone(), - chunk_hash: chunk_hash.to_hex().to_string(), - created_at: now, - }; - - docs::insert_doc_chunk(&mut *tx, &chunk_row).await?; + for chunk_row in &chunk_rows { + docs::insert_doc_chunk(&mut *tx, chunk_row).await?; doc_outbox::enqueue_doc_outbox( &mut *tx, doc_id, @@ -666,9 +768,10 @@ impl ElfService { Ok(DocsPutResponse { doc_id, - chunk_count: chunks.len() as u32, + source_capture, + chunk_count: chunk_rows.len() as u32, content_bytes: content_bytes as u32, - content_hash: content_hash.to_hex().to_string(), + content_hash, write_policy_audit, }) } @@ -1090,6 +1193,7 @@ LIMIT 1", &selector_kind, match_start_offset, match_end_offset, + doc.content_hash.as_str(), ), verification: DocsExcerptVerification { verified, @@ -1132,6 +1236,14 @@ impl ExcerptsSelectorKind { Self::Position => "position", } } + + fn span_kind(&self) -> &'static str { + match self { + Self::ChunkId => "captured", + Self::Quote => "quote", + Self::Position => "position", + } + } } fn docs_search_l0_deduplicated_chunks( @@ -1236,8 +1348,15 @@ fn docs_excerpt_locator( selector_kind: &ExcerptsSelectorKind, match_start_offset: usize, match_end_offset: usize, + content_hash: &str, ) -> DocsExcerptLocator { DocsExcerptLocator { + span_id: source_span_id( + content_hash, + match_start_offset, + match_end_offset, + selector_kind.span_kind(), + ), selector_kind: selector_kind.as_str().to_string(), match_start_offset, match_end_offset, @@ -1256,7 +1375,17 @@ fn build_docs_l0_pointer(row: &DocSearchRow, chunk_id: Uuid) -> DocsSearchL0Item DocsSearchL0ItemPointer { schema: DOC_SOURCE_REF_SCHEMA_V1.to_string(), resolver: DOC_SOURCE_REF_RESOLVER_V1.to_string(), - reference: DocsSearchL0ItemReference { doc_id: row.doc_id, chunk_id }, + reference: DocsSearchL0ItemReference { + doc_id: row.doc_id, + chunk_id, + source_record_id: row.doc_id, + source_span_id: source_span_id( + row.content_hash.as_str(), + row.start_offset.max(0) as usize, + row.end_offset.max(0) as usize, + "captured", + ), + }, state: DocsSearchL0ItemState { content_hash: hashes.content_hash.clone(), chunk_hash: hashes.chunk_hash.clone(), @@ -1264,6 +1393,12 @@ fn build_docs_l0_pointer(row: &DocSearchRow, chunk_id: Uuid) -> DocsSearchL0Item }, hashes, locator: DocsSearchL0ItemLocator { + span_id: source_span_id( + row.content_hash.as_str(), + row.start_offset.max(0) as usize, + row.end_offset.max(0) as usize, + "captured", + ), position: TextPositionSelector { start: row.start_offset.max(0) as usize, end: row.end_offset.max(0) as usize, @@ -1272,6 +1407,321 @@ fn build_docs_l0_pointer(row: &DocSearchRow, chunk_id: Uuid) -> DocsSearchL0Item } } +fn build_doc_chunk_rows(doc_id: Uuid, chunks: &[ByteChunk], now: OffsetDateTime) -> Vec { + chunks + .iter() + .enumerate() + .map(|(chunk_index, chunk)| DocChunk { + chunk_id: doc_chunk_id_for(doc_id, chunk_index as i32), + doc_id, + chunk_index: chunk_index as i32, + start_offset: chunk.start_offset as i32, + end_offset: chunk.end_offset as i32, + chunk_text: chunk.text.clone(), + chunk_hash: blake3::hash(chunk.text.as_bytes()).to_hex().to_string(), + created_at: now, + }) + .collect() +} + +fn doc_chunk_id_for(doc_id: Uuid, chunk_index: i32) -> Uuid { + let name = format!("elf-doc-chunk/v1:{doc_id}:{chunk_index}"); + + Uuid::new_v5(&Uuid::NAMESPACE_OID, name.as_bytes()) +} + +fn source_record_id_for( + tenant_id: &str, + project_id: &str, + agent_id: &str, + scope: &str, + doc_type: DocType, + source_ref: &Map, + content_hash: &str, +) -> Uuid { + let name = serde_json::json!([ + "elf-doc-source-record/v1", + tenant_id, + project_id, + agent_id, + scope, + doc_type.as_str(), + source_identity_value(source_ref, doc_type), + content_hash, + ]) + .to_string(); + + Uuid::new_v5(&Uuid::NAMESPACE_URL, name.as_bytes()) +} + +fn source_span_id(content_hash: &str, start: usize, end: usize, span_kind: &str) -> Uuid { + let name = serde_json::json!(["elf-doc-source-span/v1", content_hash, start, end, span_kind,]) + .to_string(); + + Uuid::new_v5(&Uuid::NAMESPACE_OID, name.as_bytes()) +} + +fn build_source_capture_summary( + input: SourceCaptureSummaryInput<'_>, +) -> Result { + let SourceCaptureSummaryInput { + doc_id, + source_ref, + doc_type, + scope, + title, + content_hash, + raw_content_hash, + now, + chunks, + write_policy_audit, + } = input; + let captured_at = source_ref + .get("captured_at") + .and_then(Value::as_str) + .map(ToString::to_string) + .unwrap_or(format_timestamp(now)?); + let source_spans = chunks + .iter() + .map(|chunk| DocsSourceSpanRef { + schema: DOC_SOURCE_SPAN_SCHEMA_V1.to_string(), + span_id: source_span_id( + content_hash, + chunk.start_offset.max(0) as usize, + chunk.end_offset.max(0) as usize, + "captured", + ), + chunk_id: Some(chunk.chunk_id), + status: "captured".to_string(), + reason_code: None, + start_offset: chunk.start_offset.max(0) as usize, + end_offset: chunk.end_offset.max(0) as usize, + content_hash: content_hash.to_string(), + chunk_hash: Some(chunk.chunk_hash.clone()), + }) + .collect(); + let policy_spans = source_policy_spans(raw_content_hash, write_policy_audit); + + Ok(DocsSourceCaptureSummary { + schema: DOC_SOURCE_CAPTURE_SCHEMA_V1.to_string(), + source_record_id: doc_id, + origin: source_origin(source_ref, doc_type), + captured_at, + content_hash: content_hash.to_string(), + visibility_scope: scope.to_string(), + title: title.map(ToString::to_string), + source_type: source_type(source_ref, doc_type), + source_spans, + policy_spans, + }) +} + +fn source_policy_spans( + raw_content_hash: &str, + audit: Option<&WritePolicyAudit>, +) -> Vec { + let Some(audit) = audit else { + return Vec::new(); + }; + let mut spans = Vec::with_capacity(audit.exclusions.len() + audit.redactions.len()); + + for span in &audit.exclusions { + spans.push(policy_span_ref( + raw_content_hash, + span.start, + span.end, + "excluded", + "WRITE_POLICY_EXCLUSION", + )); + } + for redaction in &audit.redactions { + spans.push(policy_span_ref( + raw_content_hash, + redaction.span.start, + redaction.span.end, + "redacted", + "WRITE_POLICY_REDACTION", + )); + } + + spans +} + +fn policy_span_ref( + content_hash: &str, + start: usize, + end: usize, + status: &str, + reason_code: &str, +) -> DocsSourceSpanRef { + DocsSourceSpanRef { + schema: DOC_SOURCE_SPAN_SCHEMA_V1.to_string(), + span_id: source_span_id(content_hash, start, end, reason_code), + chunk_id: None, + status: status.to_string(), + reason_code: Some(reason_code.to_string()), + start_offset: start, + end_offset: end, + content_hash: content_hash.to_string(), + chunk_hash: None, + } +} + +fn normalize_source_ref_for_capture( + source_ref: Value, + source_capture: &DocsSourceCaptureSummary, +) -> Result { + let mut source_ref = source_ref.as_object().cloned().ok_or_else(|| Error::InvalidRequest { + message: "source_ref must be a JSON object.".to_string(), + })?; + + source_ref.insert( + "source_record_id".to_string(), + Value::String(source_capture.source_record_id.to_string()), + ); + source_ref.insert("origin".to_string(), Value::String(source_capture.origin.clone())); + source_ref.insert("captured_at".to_string(), Value::String(source_capture.captured_at.clone())); + source_ref + .insert("content_hash".to_string(), Value::String(source_capture.content_hash.clone())); + source_ref.insert( + "visibility_scope".to_string(), + Value::String(source_capture.visibility_scope.clone()), + ); + + if let Some(title) = source_capture.title.as_ref() { + source_ref.entry("title".to_string()).or_insert_with(|| Value::String(title.clone())); + } + + source_ref.insert("source_type".to_string(), Value::String(source_capture.source_type.clone())); + source_ref + .insert("source_spans".to_string(), source_spans_to_value(&source_capture.source_spans)?); + + if !source_capture.policy_spans.is_empty() { + source_ref.insert( + "policy_spans".to_string(), + source_spans_to_value(&source_capture.policy_spans)?, + ); + } + + Ok(Value::Object(source_ref)) +} + +fn source_spans_to_value(spans: &[DocsSourceSpanRef]) -> Result { + serde_json::to_value(spans).map_err(|err| Error::InvalidRequest { + message: format!("failed to encode source span metadata: {err}"), + }) +} + +fn source_type(source_ref: &Map, doc_type: DocType) -> String { + source_ref + .get("source_kind") + .and_then(Value::as_str) + .filter(|value| !value.trim().is_empty()) + .unwrap_or_else(|| doc_type.as_str()) + .to_string() +} + +fn source_origin(source_ref: &Map, doc_type: DocType) -> String { + if let Some(origin) = source_ref_string(source_ref, "canonical_uri") + .or_else(|| source_ref_string(source_ref, "url")) + .or_else(|| source_ref_string(source_ref, "uri")) + { + return origin.to_string(); + } + + match doc_type { + DocType::Chat => source_ref_string(source_ref, "message_id") + .map(|message_id| { + format!( + "thread:{}#{}", + source_ref_string(source_ref, "thread_id").unwrap_or("unknown"), + message_id + ) + }) + .unwrap_or_else(|| { + format!( + "thread:{}", + source_ref_string(source_ref, "thread_id").unwrap_or("unknown") + ) + }), + DocType::Search => source_ref_string(source_ref, "domain") + .map(|domain| format!("search:{domain}")) + .unwrap_or_else(|| "search:unknown".to_string()), + DocType::Dev => dev_origin(source_ref), + DocType::Knowledge => source_ref_string(source_ref, "ts") + .map(|ts| format!("knowledge:{ts}")) + .unwrap_or_else(|| "knowledge:unknown".to_string()), + } +} + +fn dev_origin(source_ref: &Map) -> String { + let repo = source_ref_string(source_ref, "repo").unwrap_or("unknown"); + let path = source_ref_string(source_ref, "path").unwrap_or(""); + let revision = source_ref_string(source_ref, "commit_sha") + .map(|commit| format!("@{commit}")) + .or_else(|| source_ref_i64(source_ref, "pr_number").map(|pr| format!("#pr-{pr}"))) + .or_else(|| { + source_ref_i64(source_ref, "issue_number").map(|issue| format!("#issue-{issue}")) + }) + .unwrap_or_default(); + + if path.is_empty() { + format!("repo:{repo}{revision}") + } else { + format!("repo:{repo}/{path}{revision}") + } +} + +fn source_identity_value(source_ref: &Map, doc_type: DocType) -> Value { + if let Some(canonical_uri) = source_ref_string(source_ref, "canonical_uri") { + return serde_json::json!(["canonical_uri", canonical_uri]); + } + + match doc_type { + DocType::Chat => serde_json::json!([ + "chat", + source_ref_string(source_ref, "thread_id"), + source_ref_string(source_ref, "message_id"), + source_ref_string(source_ref, "role"), + source_ref_string(source_ref, "ts"), + ]), + DocType::Search => serde_json::json!([ + "search", + source_ref_string(source_ref, "url"), + source_ref_string(source_ref, "domain"), + source_ref_string(source_ref, "query"), + source_ref_string(source_ref, "ts"), + ]), + DocType::Dev => serde_json::json!([ + "dev", + source_ref_string(source_ref, "repo"), + source_ref_string(source_ref, "path"), + source_ref_string(source_ref, "commit_sha"), + source_ref_i64(source_ref, "pr_number"), + source_ref_i64(source_ref, "issue_number"), + ]), + DocType::Knowledge => serde_json::json!([ + "knowledge", + source_ref_string(source_ref, "uri"), + source_ref_string(source_ref, "ts"), + ]), + } +} + +fn source_ref_string<'a>(source_ref: &'a Map, key: &str) -> Option<&'a str> { + source_ref.get(key).and_then(Value::as_str).filter(|value| !value.trim().is_empty()) +} + +fn source_ref_i64(source_ref: &Map, key: &str) -> Option { + source_ref.get(key).and_then(Value::as_i64) +} + +fn format_timestamp(ts: OffsetDateTime) -> Result { + ts.format(&Rfc3339).map_err(|err| Error::InvalidRequest { + message: format!("failed to format RFC3339 timestamp: {err}"), + }) +} + fn resolve_doc_chunking_profile(doc_type: DocType) -> DocChunkingProfile { match doc_type { DocType::Chat | DocType::Search => DocChunkingProfile { @@ -2568,7 +3018,8 @@ mod tests { self, DocType, DocsPutRequest, DocsSearchL0Filters, DocsSearchL0Request, DocsSparseMode, Error, }; - use elf_domain::writegate::{WritePolicy, WriteSpan}; + use elf_domain::writegate::{WritePolicy, WritePolicyAudit, WriteRedactionResult, WriteSpan}; + use elf_storage::models::DocChunk; const TENANT_ID: &str = "tenant"; const PROJECT_ID: &str = "project"; @@ -3330,6 +3781,144 @@ mod tests { assert_eq!(validated.doc_type, DocType::Knowledge); } + #[test] + fn source_capture_metadata_uses_stable_record_and_span_ids() { + let now = OffsetDateTime::parse("2026-02-25T12:15:00Z", &Rfc3339) + .expect("Expected test timestamp to parse."); + let source_ref = serde_json::json!({ + "schema": "doc_source_ref/v1", + "doc_type": "knowledge", + "ts": "2026-02-25T12:00:00Z", + "source_kind": "article", + "canonical_uri": "https://example.com/research/source-library", + "captured_at": "2026-02-25T12:10:00Z", + "trust_label": "public_web", + }); + let source_ref = source_ref.as_object().expect("Expected source_ref object."); + let content_hash = "doc-content-hash"; + let doc_id = super::source_record_id_for( + TENANT_ID, + PROJECT_ID, + "owner", + "project_shared", + DocType::Knowledge, + source_ref, + content_hash, + ); + let repeated_doc_id = super::source_record_id_for( + TENANT_ID, + PROJECT_ID, + "owner", + "project_shared", + DocType::Knowledge, + source_ref, + content_hash, + ); + let chunk_id = super::doc_chunk_id_for(doc_id, 0); + let chunk = DocChunk { + chunk_id, + doc_id, + chunk_index: 0, + start_offset: 0, + end_offset: 42, + chunk_text: "Source libraries preserve long-form evidence.".to_string(), + chunk_hash: "chunk-content-hash".to_string(), + created_at: now, + }; + let capture = super::build_source_capture_summary(super::SourceCaptureSummaryInput { + doc_id, + source_ref, + doc_type: DocType::Knowledge, + scope: "project_shared", + title: Some("Saved article"), + content_hash, + raw_content_hash: "raw-content-hash", + now, + chunks: &[chunk], + write_policy_audit: None, + }) + .expect("Expected source capture summary."); + + assert_eq!(doc_id, repeated_doc_id); + assert_eq!(capture.schema, "doc_source_capture/v1"); + assert_eq!(capture.source_record_id, doc_id); + assert_eq!(capture.origin, "https://example.com/research/source-library"); + assert_eq!(capture.captured_at, "2026-02-25T12:10:00Z"); + assert_eq!(capture.content_hash, content_hash); + assert_eq!(capture.visibility_scope, "project_shared"); + assert_eq!(capture.title.as_deref(), Some("Saved article")); + assert_eq!(capture.source_type, "article"); + assert_eq!(capture.source_spans.len(), 1); + assert_eq!(capture.source_spans[0].schema, "doc_source_span/v1"); + assert_eq!(capture.source_spans[0].chunk_id, Some(chunk_id)); + assert_eq!(capture.source_spans[0].status, "captured"); + assert_eq!(capture.source_spans[0].reason_code, None); + assert_eq!(capture.source_spans[0].start_offset, 0); + assert_eq!(capture.source_spans[0].end_offset, 42); + assert_eq!( + capture.source_spans[0].span_id, + super::source_span_id(content_hash, 0, 42, "captured") + ); + } + + #[test] + fn normalized_source_ref_records_policy_span_reasons() { + let now = OffsetDateTime::parse("2026-02-25T12:15:00Z", &Rfc3339) + .expect("Expected test timestamp to parse."); + let source_ref = serde_json::json!({ + "schema": "doc_source_ref/v1", + "doc_type": "knowledge", + "ts": "2026-02-25T12:00:00Z", + "uri": "file:///tmp/source.txt", + }); + let source_ref_map = source_ref.as_object().expect("Expected source_ref object."); + let audit = WritePolicyAudit { + exclusions: vec![WriteSpan { start: 6, end: 12 }], + redactions: vec![WriteRedactionResult { + span: WriteSpan { start: 20, end: 30 }, + replacement: "[redacted]".to_string(), + }], + }; + let doc_id = super::source_record_id_for( + TENANT_ID, + PROJECT_ID, + "owner", + "project_shared", + DocType::Knowledge, + source_ref_map, + "stored-hash", + ); + let capture = super::build_source_capture_summary(super::SourceCaptureSummaryInput { + doc_id, + source_ref: source_ref_map, + doc_type: DocType::Knowledge, + scope: "project_shared", + title: None, + content_hash: "stored-hash", + raw_content_hash: "raw-hash", + now, + chunks: &[], + write_policy_audit: Some(&audit), + }) + .expect("Expected source capture summary."); + let normalized = super::normalize_source_ref_for_capture(source_ref, &capture) + .expect("Expected normalized source_ref"); + + assert_eq!(capture.policy_spans.len(), 2); + assert_eq!(capture.policy_spans[0].status, "excluded"); + assert_eq!(capture.policy_spans[0].reason_code.as_deref(), Some("WRITE_POLICY_EXCLUSION")); + assert_eq!(capture.policy_spans[1].status, "redacted"); + assert_eq!(capture.policy_spans[1].reason_code.as_deref(), Some("WRITE_POLICY_REDACTION")); + assert_eq!(normalized["source_record_id"], doc_id.to_string()); + assert_eq!(normalized["origin"], "file:///tmp/source.txt"); + assert_eq!(normalized["captured_at"], "2026-02-25T12:15:00Z"); + assert_eq!(normalized["content_hash"], "stored-hash"); + assert_eq!(normalized["visibility_scope"], "project_shared"); + assert_eq!(normalized["source_type"], "knowledge"); + assert_eq!(normalized["policy_spans"][0]["reason_code"], "WRITE_POLICY_EXCLUSION"); + assert_eq!(normalized["policy_spans"][1]["reason_code"], "WRITE_POLICY_REDACTION"); + } + #[test] fn validate_docs_put_rejects_incomplete_source_library_metadata() { let err = docs::validate_docs_put(&DocsPutRequest { @@ -3411,8 +4000,11 @@ mod tests { assert_eq!(pointer.resolver, "elf_doc_ext/v1"); assert_eq!(pointer.hashes.content_hash, "doc-hash"); assert_eq!(pointer.hashes.chunk_hash, "chunk-hash"); + assert_eq!(pointer.reference.source_record_id, row.doc_id); + assert_eq!(pointer.reference.source_span_id, pointer.locator.span_id); assert_eq!(pointer.locator.position.start, 12); assert_eq!(pointer.locator.position.end, 64); + assert_eq!(pointer.locator.span_id, super::source_span_id("doc-hash", 12, 64, "captured")); assert_eq!(pointer.state.content_hash, pointer.hashes.content_hash); assert_eq!(pointer.state.chunk_hash, pointer.hashes.chunk_hash); } diff --git a/packages/elf-service/tests/acceptance/docs_extension_v1.rs b/packages/elf-service/tests/acceptance/docs_extension_v1.rs index fb0e5c9..e575c3f 100644 --- a/packages/elf-service/tests/acceptance/docs_extension_v1.rs +++ b/packages/elf-service/tests/acceptance/docs_extension_v1.rs @@ -318,9 +318,29 @@ async fn docs_put_source_library_records_do_not_create_memory_notes() { .fetch_one(&service.db.pool) .await .expect("Failed to verify doc row."); + let stored_source_ref: serde_json::Value = + sqlx::query_scalar("SELECT source_ref FROM doc_documents WHERE doc_id = $1") + .bind(put.doc_id) + .fetch_one(&service.db.pool) + .await + .expect("Failed to fetch normalized source_ref."); assert!(doc_exists); assert_eq!(after, before, "docs_put must not create durable Memory Notes."); + assert_eq!(put.source_capture.schema, "doc_source_capture/v1"); + assert_eq!(put.source_capture.source_record_id, put.doc_id); + assert_eq!(put.source_capture.origin, "https://example.com/thread/source-library-1"); + assert_eq!(put.source_capture.source_type, "social_thread"); + assert_eq!(put.source_capture.visibility_scope, "project_shared"); + assert!(!put.source_capture.source_spans.is_empty()); + assert_eq!(put.source_capture.source_spans[0].schema, "doc_source_span/v1"); + assert_eq!(put.source_capture.source_spans[0].status, "captured"); + assert_eq!(put.source_capture.source_spans[0].reason_code, None); + assert_eq!(stored_source_ref["source_record_id"], put.doc_id.to_string()); + assert_eq!(stored_source_ref["origin"], "https://example.com/thread/source-library-1"); + assert_eq!(stored_source_ref["source_type"], "social_thread"); + assert_eq!(stored_source_ref["content_hash"], put.content_hash); + assert!(stored_source_ref["source_spans"].as_array().is_some_and(|spans| !spans.is_empty())); drop(service); @@ -553,8 +573,24 @@ async fn docs_put_applies_write_policy_and_excerpt_by_chunk_id_is_verified() { assert!(excerpt.verification.verified); assert!(!excerpt.excerpt.is_empty()); assert!(!excerpt.excerpt.contains(secret)); + assert!(!excerpt.locator.span_id.is_nil()); + + let captured_chunk_span = put + .source_capture + .source_spans + .iter() + .find(|span| span.chunk_id == Some(chunk_id)) + .expect("Expected captured source span for hydrated chunk."); + + assert_eq!(excerpt.locator.span_id, captured_chunk_span.span_id); assert_eq!(excerpt.verification.content_hash, put.content_hash); assert!(put.write_policy_audit.is_some()); + assert_eq!(put.source_capture.policy_spans.len(), 1); + assert_eq!(put.source_capture.policy_spans[0].status, "excluded"); + assert_eq!( + put.source_capture.policy_spans[0].reason_code.as_deref(), + Some("WRITE_POLICY_EXCLUSION") + ); let _ = shutdown.send(()); @@ -1521,7 +1557,16 @@ async fn docs_search_l0_returns_pointer_and_explain_trajectory() { assert!(results.items[0].pointer.schema == "source_ref/v1"); assert!(!results.items[0].pointer.reference.doc_id.is_nil()); assert!(!results.items[0].pointer.reference.chunk_id.is_nil()); + assert_eq!( + results.items[0].pointer.reference.source_record_id, + results.items[0].pointer.reference.doc_id + ); + assert_eq!( + results.items[0].pointer.reference.source_span_id, + results.items[0].pointer.locator.span_id + ); assert_eq!(results.items[0].pointer.resolver, "elf_doc_ext/v1"); + assert!(!results.items[0].pointer.locator.span_id.is_nil()); assert!(!results.trace_id.is_nil()); let _ = shutdown.send(()); @@ -1713,6 +1758,7 @@ async fn docs_excerpts_get_supports_l0_and_returns_locator_and_optional_trajecto assert_eq!(excerpt.locator.selector_kind, "quote"); assert!(excerpt.locator.match_end_offset > excerpt.locator.match_start_offset); + assert!(!excerpt.locator.span_id.is_nil()); assert!(excerpt.excerpt.len() <= 256); assert!(excerpt.trajectory.is_some()); assert_eq!( diff --git a/packages/elf-storage/src/docs.rs b/packages/elf-storage/src/docs.rs index 5672966..2bed675 100644 --- a/packages/elf-storage/src/docs.rs +++ b/packages/elf-storage/src/docs.rs @@ -22,23 +22,37 @@ where { sqlx::query( "\ - INSERT INTO doc_documents ( - doc_id, - tenant_id, - project_id, - agent_id, - scope, - doc_type, - status, - title, - source_ref, - content, - content_bytes, - content_hash, - created_at, - updated_at - ) - VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14)", +INSERT INTO doc_documents ( + doc_id, + tenant_id, + project_id, + agent_id, + scope, + doc_type, + status, + title, + source_ref, + content, + content_bytes, + content_hash, + created_at, + updated_at +) +VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14) +ON CONFLICT (doc_id) DO UPDATE +SET + tenant_id = EXCLUDED.tenant_id, + project_id = EXCLUDED.project_id, + agent_id = EXCLUDED.agent_id, + scope = EXCLUDED.scope, + doc_type = EXCLUDED.doc_type, + status = EXCLUDED.status, + title = EXCLUDED.title, + source_ref = EXCLUDED.source_ref, + content = EXCLUDED.content, + content_bytes = EXCLUDED.content_bytes, + content_hash = EXCLUDED.content_hash, + updated_at = EXCLUDED.updated_at", ) .bind(doc.doc_id) .bind(doc.tenant_id.as_str()) @@ -115,7 +129,15 @@ INSERT INTO doc_chunks ( chunk_hash, created_at ) -VALUES ($1,$2,$3,$4,$5,$6,$7,$8)", +VALUES ($1,$2,$3,$4,$5,$6,$7,$8) +ON CONFLICT (chunk_id) DO UPDATE +SET + doc_id = EXCLUDED.doc_id, + chunk_index = EXCLUDED.chunk_index, + start_offset = EXCLUDED.start_offset, + end_offset = EXCLUDED.end_offset, + chunk_text = EXCLUDED.chunk_text, + chunk_hash = EXCLUDED.chunk_hash", ) .bind(chunk.chunk_id) .bind(chunk.doc_id)