Broadway Source Reading (Part 2 - Producer)
Updated:
After having an overview on the Broadway’s architecture, I want to know how the basic Broadway pipeline works with only Producer
and Processor
options, such as this sample in the official Documentation:
1 | defmodule MyBroadway do |
Producer
As the beginning of the Broadway pipeline, the producer
module should either keep generating events/messages as the data source, or forwarding the queue messages that your Messaging Middleware receives from other external systems to downstream consumers.
Broadway has provided officially-supported producers for Messaging Middleware like Amazon SQS, Apache Kafka, Google Cloud Pub/Sub and RabbitMQ. How do these producers work with Broadway? What I need to know if I want to implement one for other Messaging Middlewares?
Initialization
As shown in Part 1 - Entry Point and Architecture, Topology.build_producers_specs/2
generates child specs of ProducerStage
in the process supervision hierarchy. The ProducerStage
module which use GenStage
acts as a wrapper of the producer module you specify in pipeline and delegates function calls to it, such as handle_demand
, handle_call
, handle_cast
, handle_info
, prepare_for_draining
and terminate
. Its startup call sequence is:
sequenceDiagram participant S as Supervisor participant P as ProducerStage participant G as GenStage participant M as MyProducer S->>P: start_link(args, index, opts) P->>G: start_link(ProducerStage, {args, index}, opts) G->>G: init({ProducerStage, {args, index}}) G-->>P: init({args, index}) P->>M: init(my_producer_args_in_pipeline) M-->>G: {:producer, state, options} G->>G: init_producer(ProducerStage, options, state)
Message Consumption
There are two approaches to consume messages in the Messaging Middlewares: Pull and Push. Pull is like you ask for tasks from your boss instead of your boss Push them to you. (Although in real life, you might have the opposite experience unless your company is using Kanban. :D)
For above mentioned Messaging Middlewares:
- RabbitMQ is pushed-based.
- Amazon SQS, and Apache Kafka are pull-based.
- Google Cloud Pub/Sub and can do both.
Because GenStage
‘s philosophy is Pull, how do two different message consuming models work with Broadway? Let’s use the source code of Amazon SQS and RabbitMQ’s Broadway Producer to illustrate below.
Pull (Proactive)
graph BT D(Amazon SQS Producer) -- pull --> C(Amazon SQS Queue) E(Processors in Broadway) -- pull --> D
In the source code of broadway_sqs/producer.ex
, the messages are to receive in the handle_demand
callback because it’s the timing when consumers pulls for messages.
1 | @impl true |
handle_receive_messages
tries to get messages from SQS to meet the demand from consumers. If the count of the messages is less than the demand, the producer schedules itself a signal to receive messages again until all demand is met.
The messages received in scheduled task are handled in handle_info
callback.
1 | @impl true |
Replying messages in handle_demand
callback is not difficult to understand because that is where GenStage
passes messages to consumers. However, why handle_info
can be used for the same purpose?
In GenStage
:
1 | ## Catch-all messages |
And in ProducerStage
:
1 | def handle_info(message, state) do |
So the call sequence flow is:
sequenceDiagram participant P as ProducerStage participant G as GenStage participant M as MyProducer G->>G: handle_info(msg, %{state: state} = stage) G->>G: noreply_callback(:handle_info, [msg, state], stage) G->>P: handle_info(message, %{module: module, module_state: module_state} = state) P->>M: handle_info(message, module_state) M-->>P: {:noreply, events, new_module_state} P->>P: handle_no_reply(reply, state) P-->>G: {:noreply, messages, %{state | module_state: new_module_state}} G->>G: handle_noreply_callback(return, stage) G->>G: dispatch_events(events, length(events), %{stage | state: state})
As a result, the events returned from handle_info
in our producer module will be dispatched to the consumers at the end.
Push (Passive)
graph TD A(RabbitMQ Queue) -- push --> B(RabbitMQ Producer) C(Processors in Broadway) -- pull --> B
According to this simple flow diagram, messages are pushed to the queue consumers if the queue has messages. However, it’s possible that no broadway consumers ask for any message at that moment. There are also times that the broadway consumers ask for messages but none have been pushed from queues. How does the design fullfill these different scenarios?
According to the source code of RabbitMQ Broadway Producer, handle_demand
does nothing.
1 | @impl true |
As a result, demand request from consumers is ingored by the producer and the messages are always pushed from the producers. In order to control Back-pressure for the consumers, option :prefetch_count
is used in the producer to control the message volume. The flow diagram should be this instead:
graph TD A(RabbitMQ Queue) -- push --> B(RabbitMQ Producer) B -- push --> C(Processors in Broadway)
The underline logic is:
The Broadway RabbitMQ producer uses
:amqp
which uses:amqp_client
to connect to RabbitMQ, it calls the client’s consume method to register its process as a queue consumer.Every time a message is pushed from the queue, the producer’s process will receive message with structure
{:basic_deliver, payload, meta}
.In the same way we explored above, the
handle_info
method of RabbitMQ producer returns the message forGenStage
to dispatch.
1 | def handle_info({:basic_deliver, payload, meta}, state) do |
Now we basically know how the producer receives and dispatches messages, we can move forward to the processors next.