· 5 min read ·
Building RelayMQ (Part 2): Implementing Consumer Subscriptions
Moving from pull-based consumption to push-based consumer subscriptions
-
I recently started building RelayMQ, a message broker written in Scala, inspired by the internal architecture of message brokers like RabbitMQ.
-
The goal is to deeply understand how messaging systems actually work internally, by building my own version.
-
In the previous post, I implemented a minimal in-memory message broker with queues, producers and pull-based consumers.
This post focuses on the next major step: implementing push-based consumer subscriptions.
Why Subscriptions?
The first version of RelayMQ supported pull-based consumption.
A consumer had to explicitly request messages:
CONSUME queueThis means:
- Consumers continuously poll the broker
- The broker has no idea who is interested in a queue
- Message delivery cannot be optimized
The message flow looked like this:
Producer
↓
Queue
↓
Consumer → CONSUME requestThis model works, but it is inefficient for real systems.
Most production message brokers instead use consumer subscriptions.
Push-Based Consumption
With subscriptions, a consumer registers interest in a queue once, and the broker automatically pushes messages whenever they arrive.
Example:
SUBSCRIBE messageboxAfter subscribing, the consumer does not need to poll.
RelayMQ will now automatically deliver messages:
Producer
↓
Queue
↓
Subscribed ConsumerWhen a message arrives:
MESSAGE <messageId> <payload>This is much closer to how real message brokers operate.
Consumer State in RelayMQ
To support subscriptions, the queue needed to track connected consumers.
Each queue now maintains the following state:
readyMessages
inFlightMessages
consumers
consumerMessagesWhere:
readyMessages → messages waiting to be delivered
inFlightMessages → messages currently being processed by consumers
consumers → active consumers subscribed to the queue
consumerMessages → tracks which messages belong to which consumer
This allows RelayMQ to correctly handle acknowledgements and message redelivery.
Message Lifecycle with Subscriptions
Once subscriptions were introduced, the message lifecycle changed slightly.
Producer
↓
Queue (READY)
↓
Consumer (DELIVERED)
↓
IN_FLIGHT
↓
ACK → removed
NACK → back to READYA message is delivered immediately when:
- a new message arrives
- a consumer subscribes
- a message is acknowledged
Consumer Implementation
Consumers are represented internally using a simple interface:
package relaymq.core.consumer
import relaymq.core.message.RelayMQMessage
import java.util.UUID
trait RelayMQConsumer {
def id: String = UUID.randomUUID().toString
/**
* The id of the connection associated with this consumer
*
* As of now, only one connection can be associated with a consumer
*
* Means, only one SUBSCRIBE can be done per TCP connection
*
* @return
*/
def connectionId: String
def deliver(message: RelayMQMessage): Unit
}The server module provides a concrete implementation that writes messages directly to the TCP connection.
Connection Management
Because consumers are tied to client connections, RelayMQ also tracks active connections.
ConnectionManager
|__ Map[connectionId, ConnectionHandler]Each connection can register a consumer through the broker.
connectionId → consumerThis allows the broker to identify which consumer issued an ACK or NACK.
Handling Consumer Disconnects
Handling connection failures turned out to be an important edge case.
If a consumer disconnects while processing a message, the broker must:
- Remove the consumer from the queue
- Requeue all messages that were in-flight
- Make those messages available for other consumers
RelayMQ handles this during connection shutdown by notifying the broker when a connection closes.
def onConnectionClosed(connectionId: String): Unit = {
connectionConsumer.remove(connectionId).foreach { consumer =>
LOG.info(s"Connection closed for consumer ${consumer.id}")
queues.values.foreach { queue =>
try {
queue.unSubscribe(consumer.id)
} catch {
case _: Throwable => // consumer may not belong to this queue
}
}
}
}This ensures no messages remain stuck in-flight.
Testing Push-Based Delivery
Connect using telnet:
telnet localhost 5672Terminal 1 - Producer
DECLARE_QUEUE messagebox
Ok(Queue messagebox created)
PUBLISH messagebox hello
Ok(Message produced successfully)Terminal 2 - Consumer
SUBSCRIBE praveen
Ok(Subscribed...)
MESSAGE 5963ee6a-dcdf-4180-a5fe-fe969a0588f3 helloThe message gets pushed automatically once the consumer is subscribed
SUBSCRIBE praveen
Ok(Subscribed...)
MESSAGE 933d1fbd-5268-467b-a843-ca53ff1bb3d2 hoo
SUBSCRIBE praveen
Error(A consumer already present for this connection)The proposed constraint, one consumer per TCP connection works as expected.
Challenges Discovered
While implementing subscriptions, a few design challenges surfaced:
- Tracking which messages belong to which consumer
- Handling consumer disconnects safely
- Preventing invalid ACK/NACK operations
- Avoiding race conditions when delivering messages
The current implementation uses synchronized blocks for correctness.
However, this approach will likely become a bottleneck as the system grows.
Two architectural directions I'm currently exploring:
- Single-writer queue model
- Event-loop based queue execution
Both approaches process queue operations sequentially while allowing the system to scale across multiple queues.
What Comes Next
My next plan for RelayMQ are:
- Prefetch support
- Implementing Exchanges & Exchange-based routing
- Message persistence using a WAL (write-ahead log)
Stay Tuned!
Takeaway
Implementing consumer subscriptions revealed how much complexity lies behind something that initially appears simple.
Even a small broker must carefully manage:
- message ownership
- delivery guarantees
- consumer lifecycle
- failure handling
Followup Topics
- Building RelayMQ (Part 3): Implenting Exchanges & Message Routing
- Building RelayMQ (Part 4): Implenting Write-Ahead-Logging (WAL) based persistence