Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
journaled (6.2.7)
journaled (6.2.8)
activejob
activerecord
activesupport
Expand Down
2 changes: 1 addition & 1 deletion gemfiles/rails_7_2.gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
journaled (6.2.7)
journaled (6.2.8)
activejob
activerecord
activesupport
Expand Down
2 changes: 1 addition & 1 deletion gemfiles/rails_8_0.gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
journaled (6.2.7)
journaled (6.2.8)
activejob
activerecord
activesupport
Expand Down
19 changes: 11 additions & 8 deletions lib/journaled/kinesis_batch_sender.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ class KinesisBatchSender
'ValidationException',
].freeze

BATCH_TOO_LARGE_PATTERN = /too large/i
# Kinesis rejects any record whose data blob exceeds 1 MB.
KINESIS_MAX_RECORD_BYTES = 1_048_576

# Send a batch of database events to Kinesis
#
Expand Down Expand Up @@ -43,9 +44,7 @@ def send_stream_batch(stream_name, stream_events)
response = kinesis_client.put_records(stream_name:, records:)
process_response(response, stream_events)
rescue Aws::Kinesis::Errors::ValidationException => e
raise unless e.message.match?(BATCH_TOO_LARGE_PATTERN)

handle_batch_too_large(stream_name, stream_events)
handle_validation_error(stream_name, stream_events, e.message)
Comment on lines -46 to +47

@smudge smudge May 29, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, and just flagging this for myself (if I ever try to reconstruct my understanding of the root cause), the raise unless is what essentially creates the poison pill jobs, as nothing upstream of this will mark these jobs as failed.

rescue StandardError => e
# Handle transient errors (throttling, network issues, service unavailable)
handle_transient_batch_error(e, stream_events)
Expand Down Expand Up @@ -94,24 +93,28 @@ def create_failed_event(event, error_code:, error_message:, transient:)
)
end

def handle_batch_too_large(stream_name, stream_events)
# Isolate the offending record(s) when Kinesis rejects the batch with a
# ValidationException (aggregate payload too large, per-record size limit,
# invalid partition key, etc.). Splits the batch in half and retries
# recursively until a single offending event is marked as a permanent failure.
def handle_validation_error(stream_name, stream_events, error_message)
if stream_events.size <= 1
# Single event exceeds payload limit — treat as permanent failure
return {
succeeded: [],
failed: stream_events.map do |event|
create_failed_event(
event,
error_code: 'ValidationException',
error_message: 'Record exceeds Kinesis payload limit',
error_message:,
transient: false,
)
end,
}
end

Rails.logger.warn(
"[journaled] Batch too large for Kinesis (#{stream_events.size} events), splitting in half and retrying",
"[journaled] Kinesis ValidationException for batch of #{stream_events.size} events " \
"(#{error_message}), splitting in half and retrying",
)

mid = stream_events.size / 2
Expand Down
10 changes: 10 additions & 0 deletions lib/journaled/outbox/adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ module Outbox
# 4. Start workers: bundle exec rake journaled_worker:work
class Adapter < Journaled::DeliveryAdapter
class TableNotFoundError < StandardError; end
class RecordTooLargeError < StandardError; end

# Delivers events by inserting them into the database
#
Expand All @@ -29,6 +30,15 @@ def self.deliver(events:, **)
# Exclude the application-level id - the database will generate its own using uuid_generate_v7()
event_data = event.journaled_attributes.except(:id)

# The DB-generated id adds bytes to the JSON payload at send time, so a
# placeholder id keeps this estimate honest.
payload_bytesize = event_data.merge(id: SecureRandom.uuid).to_json.bytesize
if payload_bytesize > KinesisBatchSender::KINESIS_MAX_RECORD_BYTES
raise RecordTooLargeError, "Journaled event '#{event.journaled_attributes[:event_type]}' " \

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, re: @samandmoore's point:

hm. my thinking is that id rather not lose an event nor error for the user.

so if we can accept the event and figure out how to fix the event to get it to flow through the pipeline, that seems better in some sense than breaking for the user.

I guess I'm sort of 🤞 that we would detect most case before it gets to a production request. But even then, I guess we have two choices:

  1. Disallow an operation from proceeding if it is fundamentally not loggable. (This has been where I gravitate, kind of for simplicity until we have a better sense for what we need to solve beyond that.)
  2. Allow it to proceed but fire the events directly into the dead letter queue in the hopes that they can be manually cleaned up later.

I'm less a fan of option 2 because I think we ultimately want to do away with the DLQ -- it's a bit of a crutch that makes it easier for us to ignore / tighten the ratchet on guaranteeing deliverability from the moment we construct the event payload.

"exceeds Kinesis #{KinesisBatchSender::KINESIS_MAX_RECORD_BYTES}-byte per-record limit " \
"(#{payload_bytesize} bytes); refusing to enqueue."
end
Comment on lines +35 to +40

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need this here instead of as a validation on Journaled::Outbox::Event because we use insert_all, bypassing model level validations.

Do you think it's worth adding as a db constraint (albeit slightly less precise) instead?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UUIDs are fixed-length, so it seems like we could maintain a reasonable length estimate (perhaps with some padding) in either place. (It all kinda comes down to the way the string serialization of the JSON works over in the outbox worker, right?)

Do you think it's worth adding as a db constraint (albeit slightly less precise) instead?

I would say in addition to rather than instead -- the DB constraint can be a backstop that maintains the invariant, and the raise call should be at least as strict in terms of padding estimates but is also where you get the developer-friendly error message and exception type.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And we don't strictly need the DB constraint as part of this PR, but if it's trivial to add then happy to review that too.

But mainly I think it's useful to both validate the data in motion and then express the hard requirements in the schema, and both are valuable.


{
event_type: event.journaled_attributes[:event_type],
event_data:,
Expand Down
2 changes: 1 addition & 1 deletion lib/journaled/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module Journaled
VERSION = "6.2.7"
VERSION = "6.2.8"
end
24 changes: 15 additions & 9 deletions spec/lib/journaled/kinesis_batch_sender_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@
it 'logs a warning about the split' do
subject.send_batch(events)

expect(Rails.logger).to have_received(:warn).with(/Batch too large.*splitting in half/)
expect(Rails.logger).to have_received(:warn).with(/Kinesis ValidationException.*splitting in half/)
end
end

Expand All @@ -180,7 +180,7 @@
.and_raise(Aws::Kinesis::Errors::ValidationException.new(nil, 'Request Payload is too large'))
end

it 'marks the event as a permanent failure' do
it 'marks the event as a permanent failure with the Kinesis error message' do
result = subject.send_batch(events)

expect(result[:succeeded]).to be_empty
Expand All @@ -189,7 +189,7 @@
failure = result[:failed].first
expect(failure.event).to eq(event)
expect(failure.error_code).to eq('ValidationException')
expect(failure.error_message).to eq('Record exceeds Kinesis payload limit')
expect(failure.error_message).to eq('Request Payload is too large')
expect(failure.permanent?).to be true
end
end
Expand All @@ -202,17 +202,23 @@
let(:event) { create_database_event }
let(:events) { [event] }

context 'when entire batch fails with validation exception' do
context 'when entire batch fails with a non-payload ValidationException' do
before do
allow(kinesis_client).to receive(:put_records)
.and_raise(Aws::Kinesis::Errors::ValidationException.new(nil, 'Invalid stream name'))
end

it 'raises the exception (configuration error, not event data error)' do
expect { subject.send_batch(events) }.to raise_error(
Aws::Kinesis::Errors::ValidationException,
'Invalid stream name',
)
it 'isolates the offending event(s) as permanent failures with the original error message' do
result = subject.send_batch(events)

expect(result[:succeeded]).to be_empty
expect(result[:failed].length).to eq(1)

failure = result[:failed].first
expect(failure.event).to eq(event)
expect(failure.error_code).to eq('ValidationException')
expect(failure.error_message).to eq('Invalid stream name')
expect(failure.permanent?).to be true
end
end

Expand Down
43 changes: 43 additions & 0 deletions spec/lib/journaled/outbox/adapter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,49 @@
}.to change { Journaled::Outbox::Event.count }.by(2)
end
end

context 'when an event exceeds the Kinesis per-record size limit' do
let(:huge_payload) { 'x' * (Journaled::KinesisBatchSender::KINESIS_MAX_RECORD_BYTES + 1) }
let(:oversized_event) do
instance_double(
event_class,
id: SecureRandom.uuid,
journaled_attributes: {
id: 'big_id',
event_type: 'big_event',
payload: huge_payload,
created_at: Time.current,
},
journaled_partition_key: 'test_partition_key',
journaled_stream_name: 'test_stream',
)
end

context 'with a single oversized event' do
let(:events) { [oversized_event] }

it 'raises RecordTooLargeError and inserts nothing' do
expect {
described_class.deliver(events:, enqueue_opts:)
}.to raise_error(
Journaled::Outbox::Adapter::RecordTooLargeError,
/big_event.*exceeds Kinesis.*per-record limit/,
)
expect(Journaled::Outbox::Event.count).to eq(0)
end
end

context 'mixed with a normally-sized event' do
let(:events) { [event, oversized_event] }

it 'raises and inserts none of the events in the batch' do
expect {
described_class.deliver(events:, enqueue_opts:)
}.to raise_error(Journaled::Outbox::Adapter::RecordTooLargeError)
expect(Journaled::Outbox::Event.count).to eq(0)
end
end
end
end

context 'when tables do not exist' do
Expand Down
Loading