Broadway Source Reading (Part 3 - Processor)
Updated:
Normally, we only need to define the pipeline options for the Producer of any Messaging Middleware, Processor, and implement the handle_message/3
callback to use Broadway.
The complexity of how the handle_message/3
is called, how the messages got acknowledged and how the failed messages handled is all hidden behind. But that is the part I would like to know by reading the source code. As we have gone through the Producer part, I bet $10 dollars the questions above should be able to answer after reading the Processor part.
Startup Call Sequence
With the experience on the startup call sequence of Producer, we can find that the one for Processor is quite similar as below. One thing to be noted is that, the init
function in Subscriber
make the consumers to subscribe to all upper stages randomly.
sequenceDiagram participant S as Supervisor participant P as ProcessorStage participant SB as Subscriber participant G as GenStage S->>P: start_link(args, stage_options) P->>SB: start_link(ProcessorStage, ..., stage_opts) SB->>G: start_link(Subscriber, {ProcessorStage, ...}, stage_opts) G->>G: init({Subsriber, args}) G->>SB: init({ProcessorStage, ...}) SB->>P: init/1 alt w/o Batcher P-->>SB: {:consumer, state, []} SB-->>G: {:consumer, state, []} G->>G: init_consumer(ProcessorStage, opts, state) else w/ Batcher P-->>SB: {:producer_consumer, state, dispatcher} SB-->>G: {:producer_consumer, state, dispatcher} G->>G: init_producer_consumer(ProcessorStage, opts, state) end G->>G: consumer_init_subscribe(subscribe_to, stage) G->>G: consumer_subscribe/7 G->>G: ask/3
GenStage pipelines
Using the simplest Broadway pipeline options without Batcher, the GenStage pipeline should be constructed with only two stages: Producer and Processors.
graph TD A(Producer) --> B(Processor_1) A -.-> D(DemandDispatcher) A --> C(Processor_2) B -.-> E(Acknowledger) C -.-> E
Notes: The communications between the stages are through process message passing, while other interactions between the DemandDispatcher
, the Acknowledger
and them are through direct method calls. The Dispatcher and Acknowledger are not part of the GenStage pipelines and so here uses dotted line to indicate their interactions to have clearer separation.
Message Consuming as a Consumer (w/o Batcher)
At the end of the startup call sequence, the processor ask/3
for messages immediately after subscribing to the producers. The process of ProcessorStage sends a :"$gen_producer"
message to the producer process it subscribes to. In the producer’s handle_info/2
, it calls dispatcher_callback/3
and redirect the call to dispatcher module’s ask/3
method that passes the result to handle_dispatcher_result/2
.
There are two branches in the case
statement in handle_dispatcher_result/2
. As in this simplest GenStage pipeline, the process receivng the message is :producer
, so the path for producer should be taken.
sequenceDiagram participant P as ProducerStage participant D as DemandDispatcher participant PS as ProcessorStage Note over P: when no events in buffer P->>P: take_from_buffer(counter, %{stage | dispatcher_state: dispatcher_state}) P->>P: noreply_callback(:handle_demand, [counter, state], stage) P->>P: handle_demand/2 P->>P: handle_noreply_callback/2 P->>P: dispatch_events/3 P->>D: dispatch/3 alt as Producer P--)PS: handle_info({:"$gen_consumer", {producer_pid, ref}, events}, %{type: :consumer} = stage) PS->>PS: consumer_dispatch/6 else as ProducerConsumer P--)PS: handle_info({:"$gen_consumer", {producer_pid, ref}, events}, %{type: :producer_consumer} = stage) P->>P: take_pc_events/3 end
Below is the call sequence flow of consumer_dispatch/6
after :"$gen_consumer"
message received in the ProcessorStage
as a consumer. The events are acknowledged immediately because no batchers are specified in this simplest setup. The successful_messages_to_forward
is actually []
because no forwarding is required. Finally, the consumer ask
from the producer again for more events and this kind of asking/pulling events happens recursively after subscription.
sequenceDiagram participant PS as ProcessorStage participant A as Acknowledger participant B as MyBW loop consumer_dispatch/6 until event batches empty PS->>PS: handle_events/3 PS->>PS: maybe_prepare_messages/2 PS->>B: prepare_messages/2 PS->>PS: handle_messages/4 PS->>B: handle_message/3 PS->>A: maybe_handle_failed_messages/3 A->>B: handle_failed/2 PS->>A: ack_messages(successful_messages_to_ack, failed_messages) PS->>PS: dispatch_events(successful_messages_to_forward, _length, stage) PS->>PS: ask/3 end
Failed Messages Handling and Acknowledging
If we look into the ProcessorStage.handle_events/3
, it’s clear that each message will be handled by our implemented callback prepare_messages/2
and handle_message/3
of our MyBW
module. Each message will be separated into the successful_messages
and failed_messages
categories.
The failed_messages
will first be passed to handle_failed/2
of our MyBW
module so that we can do whatever necessary, such as saving them in DB or forwarding to another exception queue before the Acknowledger
acknowledge them.
The messages are grouped by each message’s acknowledger to actually ack
them. The message acknowledger is set in the producer that fulfills the scenario mentioned in documentation:
where messages are coming from different producers. Broadway will use this information to correctly identify the acknowledger and pass it among with the messages so you can properly communicate with the source of the data for acknowledgement.
What I do not quite understand is that the messages are not passed through the Map (replacing the true
value) but use the Process dict. I checked the git history and found that it’s the change made by Jose from a branch named jv-speed-up-ack. Is it faster? Why?
1 | defp group_by_acknowledger(ackers, messages, key) do |
Update: I posted the question in ElixirForum and got Jose’s reply. Take a look there. :D