Contents
  1. 1. Producer
    1. 1.1. Initialization
  2. 2. Message Consumption
    1. 2.1. Pull (Proactive)
    2. 2.2. Push (Passive)

After having an overview on the Broadway’s architecture, I want to know how the basic Broadway pipeline works with only Producer and Processor options, such as this sample in the official Documentation:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
defmodule MyBroadway do
use Broadway

def start_link(_opts) do
Broadway.start_link(MyBroadway,
name: MyBroadwayExample,
producer: [
module: {Counter, []},
concurrency: 1
],
processors: [
default: [concurrency: 2]
]
)
end

...callbacks...
end

Producer

As the beginning of the Broadway pipeline, the producer module should either keep generating events/messages as the data source, or forwarding the queue messages that your Messaging Middleware receives from other external systems to downstream consumers.

Broadway has provided officially-supported producers for Messaging Middleware like Amazon SQS, Apache Kafka, Google Cloud Pub/Sub and RabbitMQ. How do these producers work with Broadway? What I need to know if I want to implement one for other Messaging Middlewares?

Initialization

As shown in Part 1 - Entry Point and Architecture, Topology.build_producers_specs/2 generates child specs of ProducerStage in the process supervision hierarchy. The ProducerStage module which use GenStage acts as a wrapper of the producer module you specify in pipeline and delegates function calls to it, such as handle_demand, handle_call, handle_cast, handle_info, prepare_for_draining and terminate. Its startup call sequence is:

sequenceDiagram
  participant S as Supervisor
  participant P as ProducerStage
  participant G as GenStage
  participant M as MyProducer

  S->>P: start_link(args, index, opts)
  P->>G: start_link(ProducerStage, {args, index}, opts)
  G->>G: init({ProducerStage, {args, index}})
  G-->>P: init({args, index})
  P->>M: init(my_producer_args_in_pipeline)
  M-->>G: {:producer, state, options}
  G->>G: init_producer(ProducerStage, options, state)

Message Consumption

There are two approaches to consume messages in the Messaging Middlewares: Pull and Push. Pull is like you ask for tasks from your boss instead of your boss Push them to you. (Although in real life, you might have the opposite experience unless your company is using Kanban. :D)

For above mentioned Messaging Middlewares:

  • RabbitMQ is pushed-based.
  • Amazon SQS, and Apache Kafka are pull-based.
  • Google Cloud Pub/Sub and can do both.

Because GenStage‘s philosophy is Pull, how do two different message consuming models work with Broadway? Let’s use the source code of Amazon SQS and RabbitMQ’s Broadway Producer to illustrate below.

Pull (Proactive)

graph BT
  D(Amazon SQS Producer) -- pull --> C(Amazon SQS Queue)
  E(Processors in Broadway) -- pull --> D

In the source code of broadway_sqs/producer.ex, the messages are to receive in the handle_demand callback because it’s the timing when consumers pulls for messages.

1
2
3
4
@impl true
def handle_demand(incoming_demand, %{demand: demand} = state) do
handle_receive_messages(%{state | demand: demand + incoming_demand})
end

handle_receive_messages tries to get messages from SQS to meet the demand from consumers. If the count of the messages is less than the demand, the producer schedules itself a signal to receive messages again until all demand is met.

The messages received in scheduled task are handled in handle_info callback.

1
2
3
4
@impl true
def handle_info(:receive_messages, state) do
handle_receive_messages(%{state | receive_timer: nil})
end

Replying messages in handle_demand callback is not difficult to understand because that is where GenStage passes messages to consumers. However, why handle_info can be used for the same purpose?

In GenStage:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
## Catch-all messages

def handle_info(msg, %{state: state} = stage) do
noreply_callback(:handle_info, [msg, state], stage)
end


## Shared helpers

defp noreply_callback(:handle_info, [msg, state], %{mod: mod} = stage) do
if function_exported?(mod, :handle_info, 2) do
handle_noreply_callback(mod.handle_info(msg, state), stage)
else
log = '** Undefined handle_info in ~tp~n** Unhandled message: ~tp~n'
:error_logger.warning_msg(log, [mod, msg])
{:noreply, %{stage | state: state}}
end
end

defp handle_noreply_callback(return, stage) do
case return do
{:noreply, events, state} when is_list(events) ->
stage = dispatch_events(events, length(events), %{stage | state: state})
{:noreply, stage}

{:noreply, events, state, :hibernate} when is_list(events) ->
stage = dispatch_events(events, length(events), %{stage | state: state})
{:noreply, stage, :hibernate}

{:stop, reason, state} ->
{:stop, reason, %{stage | state: state}}

other ->
{:stop, {:bad_return_value, other}, stage}
end
end

And in ProducerStage:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def handle_info(message, state) do
%{module: module, module_state: module_state} = state

message
|> module.handle_info(module_state)
|> handle_no_reply(state)
end

defp handle_no_reply(reply, %{transformer: transformer} = state) do
case reply do
{:noreply, events, new_module_state} when is_list(events) ->
messages = transform_events(events, transformer)
{state, messages} = maybe_rate_limit_and_buffer_messages(state, messages)
{:noreply, messages, %{state | module_state: new_module_state}}

{:noreply, events, new_module_state, :hibernate} ->
messages = transform_events(events, transformer)
{state, messages} = maybe_rate_limit_and_buffer_messages(state, messages)
{:noreply, messages, %{state | module_state: new_module_state}, :hibernate}

{:stop, reason, new_module_state} ->
{:stop, reason, %{state | module_state: new_module_state}}
end
end

So the call sequence flow is:

sequenceDiagram
  participant P as ProducerStage
  participant G as GenStage
  participant M as MyProducer

  G->>G: handle_info(msg, %{state: state} = stage)
  G->>G: noreply_callback(:handle_info, [msg, state], stage)
  G->>P: handle_info(message, %{module: module, module_state: module_state} = state)
  P->>M: handle_info(message, module_state)
  M-->>P: {:noreply, events, new_module_state}
  P->>P: handle_no_reply(reply, state)
  P-->>G: {:noreply, messages, %{state | module_state: new_module_state}}
  G->>G: handle_noreply_callback(return, stage)
  G->>G: dispatch_events(events, length(events), %{stage | state: state})

As a result, the events returned from handle_info in our producer module will be dispatched to the consumers at the end.

Push (Passive)

graph TD
  A(RabbitMQ Queue) -- push --> B(RabbitMQ Producer)
  C(Processors in Broadway) -- pull --> B

According to this simple flow diagram, messages are pushed to the queue consumers if the queue has messages. However, it’s possible that no broadway consumers ask for any message at that moment. There are also times that the broadway consumers ask for messages but none have been pushed from queues. How does the design fullfill these different scenarios?

According to the source code of RabbitMQ Broadway Producer, handle_demand does nothing.

1
2
3
4
@impl true
def handle_demand(_incoming_demand, state) do
{:noreply, [], state}
end

As a result, demand request from consumers is ingored by the producer and the messages are always pushed from the producers. In order to control Back-pressure for the consumers, option :prefetch_count is used in the producer to control the message volume. The flow diagram should be this instead:

graph TD
  A(RabbitMQ Queue) -- push --> B(RabbitMQ Producer)
  B -- push --> C(Processors in Broadway)

The underline logic is:

  • The Broadway RabbitMQ producer uses :amqp which uses :amqp_client to connect to RabbitMQ, it calls the client’s consume method to register its process as a queue consumer.

  • Every time a message is pushed from the queue, the producer’s process will receive message with structure {:basic_deliver, payload, meta}.

  • In the same way we explored above, the handle_info method of RabbitMQ producer returns the message for GenStage to dispatch.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def handle_info({:basic_deliver, payload, meta}, state) do
%{channel: channel, client: client, config: config} = state
%{delivery_tag: tag, redelivered: redelivered} = meta

# Omitted some code for acknowledger here.

metadata =
meta
|> Map.take(config[:metadata])
|> Map.put(:amqp_channel, channel)

message = %Message{
data: payload,
metadata: metadata,
acknowledger: acknowledger
}

{:noreply, [message], state}
end

Now we basically know how the producer receives and dispatches messages, we can move forward to the processors next.

Contents
  1. 1. Producer
    1. 1.1. Initialization
  2. 2. Message Consumption
    1. 2.1. Pull (Proactive)
    2. 2.2. Push (Passive)