Description
Applications intermittently experience RabbitMQ 505 UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead errors, which force a connection reset and 15-second reconnection delay.
The root cause is that send_message() in application.py calls channel.basic_publish() directly from whichever thread invokes it. Since pika's SelectConnection channels are not thread-safe, concurrent calls from multiple threads (e.g., the simulator thread firing TimeStatusPublisher and ScenarioTimeIntervalCallback simultaneously) interleave AMQP frames on the wire. Each basic_publish sends a 3-frame sequence (method → content header → content body), and when two threads publish at the same time, the broker receives frames out of order and closes the connection.
This issue is most likely to affect applications with frequent observer callbacks, such as the Simulator, but can occur in any application where send_message() is called from multiple threads.
Proposed Changes
Route all channel.basic_publish() calls through connection.ioloop.add_callback_threadsafe() to serialize publishes onto pika's single IO thread, eliminating frame interleaving.
Specific changes in application.py:
- Initialize
_message_queue and _queue_lock in __init__: Replace lazy hasattr-based initialization with eager initialization and add a threading.Lock to protect queue access from multiple threads
- Add
_do_publish() method: Performs the actual basic_publish call, designed to run exclusively on the IO thread. Re-queues the message if the connection dropped between scheduling and execution
- Refactor
send_message(): Replace direct basic_publish calls with connection.ioloop.add_callback_threadsafe(functools.partial(self._do_publish, ...)) to schedule publishes on the IO thread
- Add lock protection to
_process_message_queue(): Wrap all _message_queue reads and writes with _queue_lock for thread safety
The signature of send_message(app_name, app_topics, payload) is unchanged. No caller modifications are required. All existing applications (Manager, ManagedApplication, and unmanaged Application) inherit the fix automatically.
Description
Applications intermittently experience RabbitMQ
505 UNEXPECTED_FRAME - expected content header for class 60, got non content header frame insteaderrors, which force a connection reset and 15-second reconnection delay.The root cause is that
send_message()inapplication.pycallschannel.basic_publish()directly from whichever thread invokes it. Since pika'sSelectConnectionchannels are not thread-safe, concurrent calls from multiple threads (e.g., the simulator thread firingTimeStatusPublisherandScenarioTimeIntervalCallbacksimultaneously) interleave AMQP frames on the wire. Eachbasic_publishsends a 3-frame sequence (method → content header → content body), and when two threads publish at the same time, the broker receives frames out of order and closes the connection.This issue is most likely to affect applications with frequent observer callbacks, such as the Simulator, but can occur in any application where
send_message()is called from multiple threads.Proposed Changes
Route all
channel.basic_publish()calls throughconnection.ioloop.add_callback_threadsafe()to serialize publishes onto pika's single IO thread, eliminating frame interleaving.Specific changes in
application.py:_message_queueand_queue_lockin__init__: Replace lazyhasattr-based initialization with eager initialization and add athreading.Lockto protect queue access from multiple threads_do_publish()method: Performs the actualbasic_publishcall, designed to run exclusively on the IO thread. Re-queues the message if the connection dropped between scheduling and executionsend_message(): Replace directbasic_publishcalls withconnection.ioloop.add_callback_threadsafe(functools.partial(self._do_publish, ...))to schedule publishes on the IO thread_process_message_queue(): Wrap all_message_queuereads and writes with_queue_lockfor thread safetyThe signature of
send_message(app_name, app_topics, payload)is unchanged. No caller modifications are required. All existing applications (Manager, ManagedApplication, and unmanaged Application) inherit the fix automatically.