diff --git a/Gemfile.lock b/Gemfile.lock index 4ef3b81..53c1864 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - journaled (6.2.7) + journaled (6.2.8) activejob activerecord activesupport diff --git a/gemfiles/rails_7_2.gemfile.lock b/gemfiles/rails_7_2.gemfile.lock index 39a258e..271f926 100644 --- a/gemfiles/rails_7_2.gemfile.lock +++ b/gemfiles/rails_7_2.gemfile.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - journaled (6.2.7) + journaled (6.2.8) activejob activerecord activesupport diff --git a/gemfiles/rails_8_0.gemfile.lock b/gemfiles/rails_8_0.gemfile.lock index fe44814..dbc43ac 100644 --- a/gemfiles/rails_8_0.gemfile.lock +++ b/gemfiles/rails_8_0.gemfile.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - journaled (6.2.7) + journaled (6.2.8) activejob activerecord activesupport diff --git a/lib/journaled/kinesis_batch_sender.rb b/lib/journaled/kinesis_batch_sender.rb index fd59ed0..17fde1c 100644 --- a/lib/journaled/kinesis_batch_sender.rb +++ b/lib/journaled/kinesis_batch_sender.rb @@ -15,7 +15,8 @@ class KinesisBatchSender 'ValidationException', ].freeze - BATCH_TOO_LARGE_PATTERN = /too large/i + # Kinesis rejects any record whose data blob exceeds 1 MB. + KINESIS_MAX_RECORD_BYTES = 1_048_576 # Send a batch of database events to Kinesis # @@ -43,9 +44,7 @@ def send_stream_batch(stream_name, stream_events) response = kinesis_client.put_records(stream_name:, records:) process_response(response, stream_events) rescue Aws::Kinesis::Errors::ValidationException => e - raise unless e.message.match?(BATCH_TOO_LARGE_PATTERN) - - handle_batch_too_large(stream_name, stream_events) + handle_validation_error(stream_name, stream_events, e.message) rescue StandardError => e # Handle transient errors (throttling, network issues, service unavailable) handle_transient_batch_error(e, stream_events) @@ -94,16 +93,19 @@ def create_failed_event(event, error_code:, error_message:, transient:) ) end - def handle_batch_too_large(stream_name, stream_events) + # Isolate the offending record(s) when Kinesis rejects the batch with a + # ValidationException (aggregate payload too large, per-record size limit, + # invalid partition key, etc.). Splits the batch in half and retries + # recursively until a single offending event is marked as a permanent failure. + def handle_validation_error(stream_name, stream_events, error_message) if stream_events.size <= 1 - # Single event exceeds payload limit — treat as permanent failure return { succeeded: [], failed: stream_events.map do |event| create_failed_event( event, error_code: 'ValidationException', - error_message: 'Record exceeds Kinesis payload limit', + error_message:, transient: false, ) end, @@ -111,7 +113,8 @@ def handle_batch_too_large(stream_name, stream_events) end Rails.logger.warn( - "[journaled] Batch too large for Kinesis (#{stream_events.size} events), splitting in half and retrying", + "[journaled] Kinesis ValidationException for batch of #{stream_events.size} events " \ + "(#{error_message}), splitting in half and retrying", ) mid = stream_events.size / 2 diff --git a/lib/journaled/outbox/adapter.rb b/lib/journaled/outbox/adapter.rb index 03ff8b7..253e61e 100644 --- a/lib/journaled/outbox/adapter.rb +++ b/lib/journaled/outbox/adapter.rb @@ -14,6 +14,7 @@ module Outbox # 4. Start workers: bundle exec rake journaled_worker:work class Adapter < Journaled::DeliveryAdapter class TableNotFoundError < StandardError; end + class RecordTooLargeError < StandardError; end # Delivers events by inserting them into the database # @@ -29,6 +30,15 @@ def self.deliver(events:, **) # Exclude the application-level id - the database will generate its own using uuid_generate_v7() event_data = event.journaled_attributes.except(:id) + # The DB-generated id adds bytes to the JSON payload at send time, so a + # placeholder id keeps this estimate honest. + payload_bytesize = event_data.merge(id: SecureRandom.uuid).to_json.bytesize + if payload_bytesize > KinesisBatchSender::KINESIS_MAX_RECORD_BYTES + raise RecordTooLargeError, "Journaled event '#{event.journaled_attributes[:event_type]}' " \ + "exceeds Kinesis #{KinesisBatchSender::KINESIS_MAX_RECORD_BYTES}-byte per-record limit " \ + "(#{payload_bytesize} bytes); refusing to enqueue." + end + { event_type: event.journaled_attributes[:event_type], event_data:, diff --git a/lib/journaled/version.rb b/lib/journaled/version.rb index e265308..2338765 100644 --- a/lib/journaled/version.rb +++ b/lib/journaled/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module Journaled - VERSION = "6.2.7" + VERSION = "6.2.8" end diff --git a/spec/lib/journaled/kinesis_batch_sender_spec.rb b/spec/lib/journaled/kinesis_batch_sender_spec.rb index ead6f37..7f1133f 100644 --- a/spec/lib/journaled/kinesis_batch_sender_spec.rb +++ b/spec/lib/journaled/kinesis_batch_sender_spec.rb @@ -163,7 +163,7 @@ it 'logs a warning about the split' do subject.send_batch(events) - expect(Rails.logger).to have_received(:warn).with(/Batch too large.*splitting in half/) + expect(Rails.logger).to have_received(:warn).with(/Kinesis ValidationException.*splitting in half/) end end @@ -180,7 +180,7 @@ .and_raise(Aws::Kinesis::Errors::ValidationException.new(nil, 'Request Payload is too large')) end - it 'marks the event as a permanent failure' do + it 'marks the event as a permanent failure with the Kinesis error message' do result = subject.send_batch(events) expect(result[:succeeded]).to be_empty @@ -189,7 +189,7 @@ failure = result[:failed].first expect(failure.event).to eq(event) expect(failure.error_code).to eq('ValidationException') - expect(failure.error_message).to eq('Record exceeds Kinesis payload limit') + expect(failure.error_message).to eq('Request Payload is too large') expect(failure.permanent?).to be true end end @@ -202,17 +202,23 @@ let(:event) { create_database_event } let(:events) { [event] } - context 'when entire batch fails with validation exception' do + context 'when entire batch fails with a non-payload ValidationException' do before do allow(kinesis_client).to receive(:put_records) .and_raise(Aws::Kinesis::Errors::ValidationException.new(nil, 'Invalid stream name')) end - it 'raises the exception (configuration error, not event data error)' do - expect { subject.send_batch(events) }.to raise_error( - Aws::Kinesis::Errors::ValidationException, - 'Invalid stream name', - ) + it 'isolates the offending event(s) as permanent failures with the original error message' do + result = subject.send_batch(events) + + expect(result[:succeeded]).to be_empty + expect(result[:failed].length).to eq(1) + + failure = result[:failed].first + expect(failure.event).to eq(event) + expect(failure.error_code).to eq('ValidationException') + expect(failure.error_message).to eq('Invalid stream name') + expect(failure.permanent?).to be true end end diff --git a/spec/lib/journaled/outbox/adapter_spec.rb b/spec/lib/journaled/outbox/adapter_spec.rb index 7b043c1..012f47b 100644 --- a/spec/lib/journaled/outbox/adapter_spec.rb +++ b/spec/lib/journaled/outbox/adapter_spec.rb @@ -136,6 +136,49 @@ }.to change { Journaled::Outbox::Event.count }.by(2) end end + + context 'when an event exceeds the Kinesis per-record size limit' do + let(:huge_payload) { 'x' * (Journaled::KinesisBatchSender::KINESIS_MAX_RECORD_BYTES + 1) } + let(:oversized_event) do + instance_double( + event_class, + id: SecureRandom.uuid, + journaled_attributes: { + id: 'big_id', + event_type: 'big_event', + payload: huge_payload, + created_at: Time.current, + }, + journaled_partition_key: 'test_partition_key', + journaled_stream_name: 'test_stream', + ) + end + + context 'with a single oversized event' do + let(:events) { [oversized_event] } + + it 'raises RecordTooLargeError and inserts nothing' do + expect { + described_class.deliver(events:, enqueue_opts:) + }.to raise_error( + Journaled::Outbox::Adapter::RecordTooLargeError, + /big_event.*exceeds Kinesis.*per-record limit/, + ) + expect(Journaled::Outbox::Event.count).to eq(0) + end + end + + context 'mixed with a normally-sized event' do + let(:events) { [event, oversized_event] } + + it 'raises and inserts none of the events in the batch' do + expect { + described_class.deliver(events:, enqueue_opts:) + }.to raise_error(Journaled::Outbox::Adapter::RecordTooLargeError) + expect(Journaled::Outbox::Event.count).to eq(0) + end + end + end end context 'when tables do not exist' do