Skip to content

chore: add schema handling in the kafka serializer#66

Merged
Yash Shrivastava (alephys26) merged 4 commits into
mainfrom
alephys26/serializer-improvements
May 25, 2026
Merged

chore: add schema handling in the kafka serializer#66
Yash Shrivastava (alephys26) merged 4 commits into
mainfrom
alephys26/serializer-improvements

Conversation

@alephys26
Copy link
Copy Markdown
Contributor

Description

This pull request introduces significant improvements to Avro serialization in the Kafka pipeline, specifically addressing the handling of Avro unions and type coercion when serializing JSON data. The main changes include adding schema caching, automatic tagging for Avro unions, and correct type coercion for numbers and logical types. These changes ensure that JSON data is properly transformed to match Avro expectations, improving compatibility and reliability.

Changes

Avro Serialization Improvements:

  • Added schema caching to avroCodec by introducing a loadSchema method, which fetches and parses the latest schema for each topic only once, improving performance and consistency. [1] [2]
  • Implemented the tagUnions function to automatically rewrite untagged unions in JSON to the Avro JSON-tagged form required by the hamba/avro library, ensuring correct serialization of union types.
  • Added type coercion logic in coerceNumber to convert float64 values (from JSON) into the appropriate Go types for Avro primitive and logical types (e.g., int32, int64, time.Time, time.Duration), preventing type errors during serialization.
  • Enhanced union schema handling with helper functions pickNonNullBranch and branchTagName to support correct tagging and disambiguation of nullable unions.

Dependency Updates:

  • Imported new dependencies: sync, time, and github.com/hamba/avro/v2 to support schema caching, logical type coercion, and Avro schema parsing.

Types of changes

  • Docs change / refactoring / dependency upgrade
  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)

Tests

Tested locally with test pipelines. All suceeded.

Checklist

  • My code follows the code style of this project.
  • My change requires a change to the documentation and I have updated the documentation accordingly.
  • I have added tests to cover my changes.

Copilot AI review requested due to automatic review settings May 25, 2026 08:46
@alephys26 Yash Shrivastava (alephys26) requested a review from a team as a code owner May 25, 2026 08:46
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR enhances the Kafka task’s Avro serialization path by introducing schema parsing/caching and pre-processing JSON payloads to better match hamba/avro’s expectations (notably union tagging and numeric/logical type coercion).

Changes:

  • Added Schema Registry schema fetch + parse caching for Avro serialization.
  • Added a schema-guided walk to tag nullable unions into Avro’s JSON tagged-union form and coerce JSON numbers into Avro-compatible Go types.
  • Minor formatting/whitespace adjustments in the Kafka task struct and producer flush block.

Reviewed changes

Copilot reviewed 1 out of 2 changed files in this pull request and generated 4 comments.

File Description
internal/pkg/pipeline/task/kafka/serializer.go Adds schema loading/caching and JSON rewriting (union tagging + number/logical-type coercion) before Avro serialization.
internal/pkg/pipeline/task/kafka/kafka.go Formatting/whitespace-only adjustments.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread internal/pkg/pipeline/task/kafka/serializer.go Outdated
Comment thread internal/pkg/pipeline/task/kafka/serializer.go
Comment thread internal/pkg/pipeline/task/kafka/serializer.go Outdated
Comment thread internal/pkg/pipeline/task/kafka/serializer.go Outdated
Comment thread internal/pkg/pipeline/task/kafka/serializer.go Outdated
@alephys26 Yash Shrivastava (alephys26) merged commit fa09431 into main May 25, 2026
7 checks passed
@alephys26 Yash Shrivastava (alephys26) deleted the alephys26/serializer-improvements branch May 25, 2026 12:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants