Broadway Source Reading (Part 4 - Batching)
Updated:
GenStage pipelines
When options for batcher is configured, the processor becomes :producer_consumer
type. The GenStage pipeline becomes much more complex than the simple one shown in the post about processor.
graph TD A(Producer) --> B(Processor_1) A --> P(Processor_2) B --> C(Batcher:S3) B -.-> H(DemandDispatcher/PartitionDispatcher) P -.-> H P --> D(Batcher:SQS) C --> F(BatchProcessor_S3_1) C --> I(BatchProcessor_S3_2) C -.-> H D -.-> H D --> G(BatchProcessor_SQS_1) F -.-> K(Acknowledger) I -.-> K G -.-> K
Notes: The communications between the stages are through process message passing, while other interactions between the DemandDispatcher
, the Acknowledger
and them are through direct method calls. The Dispatcher and Acknowledger are not part of the GenStage pipelines and so here uses dotted line to indicate their interactions to have clearer separation.
In the scenario of multiple batchers, PartitionDispatcher
is used instead of DemandDispatcher
for the processors in build_processors_specs/2
and the :partitions
are set by the keys of the batchers’ config.
The interactions with dispatcher is to register the processes of the BatcherStage / BatchProcessorStage
into the dispatcher_state
of the processes of ProcessorStage / BatcherStage
respectively for event dispatching. Related source code is the handle_info/2
in the processes of ProcessorStage
and BatcherStage
matching the {:"$gen_producer", {consumer_pid, ref} = from, {:subscribe, cancel, opts}}
message. It invokes the producer_subscribe/3
function, subsequently invokes the function subscribe(opts, from, dispatcher_state)
of the DemandDispatcher / PartitionDispatcher
.
Message Consuming as a Producer Consumer (w Batcher)
The sequence flow which starts with ProducerStage
takes another path as the ProcessorStage
is changed to be :producer_consumer
with batchers. And the take_pc_events/3
function actually calls consumer_dispatch/6
that we covered in the post about processor.
sequenceDiagram participant P as ProducerStage participant D as DemandDispatcher participant PS as ProcessorStage P->>P: take_from_buffer(counter, %{stage | dispatcher_state: dispatcher_state}) P->>P: noreply_callback(:handle_demand, [counter, state], stage) P->>P: handle_demand/2 P->>P: handle_noreply_callback/2 P->>P: dispatch_events/3 P->>D: dispatch/3 alt as Producer P--)PS: handle_info({:"$gen_consumer", {producer_pid, ref}, events}, %{type: :consumer} = stage) PS->>PS: consumer_dispatch/6 else as ProducerConsumer P--)PS: handle_info({:"$gen_consumer", {producer_pid, ref}, events}, %{type: :producer_consumer} = stage) loop take_pc_events/3 until event in queue is empty PS->>PS: send_pc_events/3 PS->>PS: consumer_dispatch/6 end end
Because the messages will be forwarded to the batchers without acknowledgement after processing. As a result, the successful_messages_to_ack
is []
, and the successful_messages_to_forward
contain all messages to batchers. Below is the sub-sequence flow of dispatch_events/3
:
sequenceDiagram participant PS as ProcessorStage participant DD as DemandDispatcher participant PD as PartitionDispatcher participant BS as BatcherStage alt w only one Batcher PS->>DD: dispatch(successful_messages_to_forward) loop dispatch_demand/3 until every batcher's demand is met DD->>DD: split_events/3 PS->>BS: send(batcher_pid, {:"$gen_consumer", {self(), ref}, deliver_now}, [:noconnect]) end else w multiple Batcher PS->>PD: dispatch(successful_messages_to_forward) loop until every partition is dispatched PD->>PD: split_events/4 PD->>PD: dispatch_per_partition/1 PD->>PD: maybe_send/3 PS->>BS: send(batcher_pid, {:"$gen_consumer", {self(), ref}, :lists.reverse(events)}, [:noconnect]) end end PS--)BS: handle_info({:"$gen_consumer", {processor_pid, ref}, events}, %{type: :producer_consumer} = stage) BS->>BS: take_pc_events/3
Again, the take_pc_events/3
function of the BatcherStage
process is triggered. As you can see, the GenStage pipelines work in such a way repeatedly on each stage. Hence, we know that the Dispatcher will dispatch the events to BatchProcessorStage
, and that process receiving the message will invoke consumer_dispatch/6
and handle_events/3
.
Below is the sub-sequence flow of handle_events/3
in BatchProcessorStage
:
sequenceDiagram participant BP as BatchProcessorStage participant A as Acknowledger participant B as MyBW BP->>BP: handle_batch/4 BP->>B: handle_batch/4 BP->>A: maybe_handle_failed_messages/3 A->>B: handle_failed/2 BP->>A: ack_messages(successful_messages_to_ack, failed_messages)
Now, we basically know how Broadway works with different pipeline options underneath. However, it’s still far from really understanding it from bottom-up. When I was working on this post, I came across one interesting article about Broadway’s concurrency and how misconfiguration might have great impact on it. The author did a lot of debugging to figure it out. I will see if I can see it from the source code level with the observation from this article.