Gumdrop provides a complete MQTT message broker and client implementation supporting both MQTT 3.1.1 (RFC specification OASIS Standard, 29 October 2014) and MQTT 5.0 (OASIS Standard, 7 March 2019). The broker integrates with Gumdrop's event-driven, non-blocking I/O architecture to deliver high-throughput publish/subscribe messaging with minimal resource consumption. MQTT over WebSocket is also supported, enabling browser-based clients to connect through the same service.
The MQTT implementation follows Gumdrop's standard service/listener/handler
architecture. An MQTTService owns one or more
MQTTListeners that accept TCP connections. Each connection is
handled by an MQTTProtocolHandler which parses the MQTT wire
protocol using a SAX-style event-driven parser (MQTTFrameParser)
and delegates application-level decisions to pluggable handler interfaces.
org.bluezoo.gumdrop.mqtt.MQTTService — abstract base
class providing broker infrastructure; subclass to customise connection,
publish, and subscribe behaviourorg.bluezoo.gumdrop.mqtt.MQTTListener — TCP listener
for MQTT connections (port 1883 cleartext, port 8883 TLS)org.bluezoo.gumdrop.mqtt.MQTTProtocolHandler —
per-connection protocol handler implementing the MQTT state machineorg.bluezoo.gumdrop.mqtt.codec.MQTTFrameParser —
incremental, SAX-style parser that delivers protocol events via
MQTTEventHandler callbacks without buffering entire packetsorg.bluezoo.gumdrop.mqtt.codec.MQTTPacketEncoder —
stateless encoder producing ByteBuffers for all packet typesorg.bluezoo.gumdrop.mqtt.MQTTWebSocketHandler —
bridges MQTT over WebSocket connections to the protocol handler
Unlike implementations that buffer complete MQTT packets before processing,
Gumdrop uses an incremental, SAX-style event model. The
MQTTFrameParser invokes methods on the
MQTTEventHandler interface as each protocol element is decoded.
PUBLISH payloads — which can be up to 256 MB — are streamed
in chunks via publishData(ByteBuffer) callbacks between
startPublish() and endPublish() events. SUBSCRIBE
and UNSUBSCRIBE topic filters are similarly delivered incrementally. This
design keeps memory usage bounded regardless of message size.
When a PUBLISH message is received, the protocol handler consults the
SubscriptionManager which uses a TopicTree to
resolve matching subscribers (including wildcard patterns + and
#). For large payloads, the broker performs horizontal
fan-out: each chunk of the payload is read once from storage and then
broadcast to all matching subscribers before the next chunk is read. This
minimises I/O and memory pressure compared to reading the full payload once
per subscriber.
Full support for the MQTT 3.1.1 specification (OASIS Standard):
+ single-level, # multi-level)Full support for MQTT 5.0 (OASIS Standard), which adds:
MQTTPropertiesThe protocol version is negotiated during the CONNECT handshake. MQTT 3.1.1 clients (protocol level 4) and MQTT 5.0 clients (protocol level 5) can connect to the same listener simultaneously.
<service id="mqtt" class="org.bluezoo.gumdrop.mqtt.DefaultMQTTService">
<listener class="org.bluezoo.gumdrop.mqtt.MQTTListener"
port="1883"/>
</service>
<service id="mqtt" class="org.bluezoo.gumdrop.mqtt.DefaultMQTTService">
<!-- Cleartext MQTT -->
<listener class="org.bluezoo.gumdrop.mqtt.MQTTListener"
port="1883"/>
<!-- MQTT over TLS -->
<listener class="org.bluezoo.gumdrop.mqtt.MQTTListener"
port="8883" secure="true">
<property name="keystore-file" path="keystore.p12"/>
<property name="keystore-pass" value="secret"/>
</listener>
</service>
<service id="mqtt" class="org.bluezoo.gumdrop.mqtt.DefaultMQTTService">
<property name="realm" ref="#realm"/>
<listener class="org.bluezoo.gumdrop.mqtt.MQTTListener"
port="1883"/>
</service>
<bean id="realm" class="org.bluezoo.gumdrop.auth.BasicRealm">
<!-- realm configuration -->
</bean>
| Property | Setter | Default | Description |
|---|---|---|---|
realm | setRealm(Realm) |
none | Authentication realm for client connections |
max-packet-size | setMaxPacketSize(int) |
268,435,455 | Maximum MQTT packet size in bytes (256 MB) |
| Property | Setter | Default | Description |
|---|---|---|---|
port | setPort(int) |
1883 | Listening port (8883 for MQTTS) |
max-packet-size | setMaxPacketSize(int) |
268,435,455 | Maximum packet size for this listener |
default-keep-alive | setDefaultKeepAlive(int) |
60 | Default keep-alive interval in seconds |
realm | setRealm(Realm) |
from service | Per-listener authentication realm override |
TLS properties (keystore-file, keystore-pass,
secure, need-client-auth) and lifecycle properties
(idle-timeout, read-timeout,
connection-timeout) are inherited from TCPListener
and work identically to other protocols.
The MQTT service uses Gumdrop's staged handler pattern for application logic. Three handler interfaces let you control connection authorization, publish authorization, and subscribe authorization. Each handler receives a state object that represents the valid responses — you call a method on the state to accept or reject the request. This continuation-passing style is fully asynchronous: you can defer the response (e.g. to perform a database lookup or remote service call) and invoke the state callback later, on any thread.
Called when a client sends a CONNECT packet. Use this to authorize connections based on credentials, client ID, or network origin.
public interface ConnectHandler {
void handleConnect(ConnectState state, ConnectPacket packet, Endpoint endpoint);
void disconnected();
}
The ConnectState provides:
acceptConnection() — send CONNACK with successrejectBadCredentials() — reject with bad username/passwordrejectNotAuthorized() — reject as not authorizedreject(int returnCode) — reject with any CONNACK return codeCalled when a client publishes a message. Use this for topic-level access control, content filtering, or audit logging.
public interface PublishHandler {
void authorizePublish(PublishState state, String clientId,
String topicName, int qos, boolean retain);
}
The PublishState provides:
allowPublish() — route the message to subscribersrejectPublish() — discard the messageCalled for each topic filter in a SUBSCRIBE request. Use this for topic-level access control or to downgrade the granted QoS level.
public interface SubscribeHandler {
void authorizeSubscription(SubscribeState state, String clientId,
String topicFilter, QoS requestedQoS);
}
The SubscribeState provides:
grantSubscription(QoS grantedQoS) — accept with a
(possibly downgraded) QoS levelrejectSubscription() — deny the subscription
Override the factory methods in your MQTTService subclass to
supply custom handlers:
public class MyMQTTService extends MQTTService {
@Override
protected ConnectHandler createConnectHandler(TCPListener listener) {
return new ConnectHandler() {
@Override
public void handleConnect(ConnectState state,
ConnectPacket packet,
Endpoint endpoint) {
String username = packet.getUsername();
byte[] password = packet.getPassword();
if (username != null && authenticate(username, password)) {
state.acceptConnection();
} else {
state.rejectBadCredentials();
}
}
@Override
public void disconnected() {
// Clean up resources
}
};
}
@Override
protected PublishHandler createPublishHandler(TCPListener listener) {
return (state, clientId, topic, qos, retain) -> {
if (topic.startsWith("restricted/") && !isAdmin(clientId)) {
state.rejectPublish();
} else {
state.allowPublish();
}
};
}
@Override
protected SubscribeHandler createSubscribeHandler(TCPListener listener) {
return (state, clientId, topicFilter, requestedQoS) -> {
if (topicFilter.startsWith("$SYS/")) {
state.rejectSubscription();
} else {
state.grantSubscription(requestedQoS);
}
};
}
}
DefaultMQTTService provides a ready-to-use broker that accepts all
connections, allows all publishes, and grants all subscriptions at the requested
QoS level. It requires no handler implementation — just configure listeners
and optionally a realm:
<service id="mqtt" class="org.bluezoo.gumdrop.mqtt.DefaultMQTTService">
<property name="realm" ref="#realm"/>
<listener class="org.bluezoo.gumdrop.mqtt.MQTTListener" port="1883"/>
<listener class="org.bluezoo.gumdrop.mqtt.MQTTListener" port="8883" secure="true">
<property name="keystore-file" path="keystore.p12"/>
<property name="keystore-pass" value="secret"/>
</listener>
</service>
When a realm is configured, the broker validates username/password credentials from the CONNECT packet against the realm. Without a realm, all clients are accepted. This is typically sufficient for development, internal messaging, and IoT deployments where network-level security is adequate.
The broker is composed of several collaborating components managed by the service. These are created automatically and available to custom service implementations.
Tracks all active client subscriptions and maps topic names to subscribers.
Uses a TopicTree for efficient wildcard matching.
subscribe(clientId, topicFilter, qos) — register a
subscriptionunsubscribe(clientId, topicFilter) — remove a
subscriptionresolveSubscribers(topicName) — returns a
Map<String, QoS> of matching client IDs and their
effective QoS levelsgetSessionCount() — number of active sessions
A trie-based data structure that supports MQTT topic wildcard matching.
Topic filters containing + (single-level wildcard) and
# (multi-level wildcard) are stored in the tree and matched
against concrete topic names during publish routing. The
matchWithMaxQoS(topicName) method returns the highest QoS
level per client across all matching subscriptions.
Stores the most recent retained message for each topic. When a client subscribes to a topic filter, any matching retained messages are delivered immediately. Setting a retained message with an empty payload clears the retained entry.
Stores Last Will and Testament messages registered during CONNECT. If a client
disconnects unexpectedly (without sending DISCONNECT), its will message is
published to the specified topic. Will messages are stored as
MQTTMessageContent and participate in the streaming payload
pipeline.
Tracks in-flight messages for QoS 1 and QoS 2 delivery. Manages packet ID
allocation, outbound message tracking, and the QoS 2 four-phase handshake
(PUBLISH → PUBREC → PUBREL → PUBCOMP). In-flight
messages reference MQTTMessageContent for zero-copy redelivery.
MQTT PUBLISH payloads can be up to 256 MB. To avoid buffering entire
payloads in memory, Gumdrop provides a pluggable message store abstraction
based on NIO channels. Both the broker and the client use this abstraction:
the broker streams payloads to subscribers, and the client delivers
MQTTMessageContent to MQTTMessageListener so
applications can stream large received messages via
content.openChannel() instead of materialising them as
byte[].
Three interfaces in org.bluezoo.gumdrop.mqtt.store form the
abstraction:
MQTTMessageStore — factory that creates writers for
incoming payloadsMQTTMessageWriter — extends
WritableByteChannel; accumulates payload data and produces a
readable handle on commit()MQTTMessageContent — readable handle for stored
payload data; provides size(), isBuffered(),
asByteArray(), openChannel(), and
release()publishData() events arrive from
the parser, the protocol handler writes chunks into an
MQTTMessageWriterendPublish(), the writer is committed
to produce an MQTTMessageContentopenChannel() and streamed to the endpointrelease() frees resources
When delivering a large payload to multiple subscribers, the broker reads
each chunk from the MQTTMessageContent channel once and then
sends it to all matching subscribers before reading the next chunk:
// Pseudocode for the horizontal fan-out strategy
for (Subscriber s : subscribers) {
s.sendHeader(encodePublishHeader(s));
}
while (channel.read(buf) > 0) {
for (Subscriber s : subscribers) {
s.sendChunk(buf.duplicate());
buf.rewind();
}
}
This ensures the payload is read only once regardless of subscriber count, minimising I/O and memory pressure.
The default InMemoryMessageStore buffers payloads as byte arrays.
For small messages (the common case in IoT), isBuffered() returns
true and the broker uses a fast path that encodes the entire
PUBLISH packet in a single ByteBuffer, avoiding the streaming
overhead entirely.
Override createMessageStore() in your MQTTService
subclass to provide a persistent, file-backed, or distributed store:
public class PersistentMQTTService extends MQTTService {
@Override
protected MQTTMessageStore createMessageStore() {
return new FileBackedMessageStore(Path.of("/var/mqtt/payloads"));
}
}
A file-backed implementation would write incoming payload chunks to temporary
files and return an MQTTMessageContent whose
openChannel() opens a FileChannel for zero-copy
reads. This allows the broker to handle arbitrarily large payloads without
proportional memory allocation.
The MQTTWebSocketHandler bridges WebSocket connections to the
MQTT protocol handler, enabling browser-based MQTT clients (such as
Eclipse Paho
JavaScript or MQTT.js) to connect to the broker. Binary WebSocket frames
are fed directly into the MQTTFrameParser.
To enable MQTT over WebSocket, configure a WebSocket listener on the HTTP
service that delegates to the MQTT service's WebSocket handler. The WebSocket
subprotocol mqtt should be negotiated during the upgrade
handshake.
The broker implements all three MQTT QoS levels:
| Level | Name | Delivery Guarantee | Handshake |
|---|---|---|---|
| 0 | At most once | Fire and forget | PUBLISH only |
| 1 | At least once | Acknowledged delivery | PUBLISH → PUBACK |
| 2 | Exactly once | Assured delivery | PUBLISH → PUBREC → PUBREL → PUBCOMP |
The effective QoS for a delivered message is the minimum of the
publisher's QoS and the subscriber's granted QoS, as required by the MQTT
specification. The QoSManager tracks all in-flight messages and
manages packet ID allocation for both inbound and outbound message flows.
MQTT 5.0 introduces a property mechanism carried by all packet types. The
MQTTProperties class provides type-safe access to all standard
properties:
| Property | ID | Type | Used In |
|---|---|---|---|
| Payload Format Indicator | 0x01 | Integer | PUBLISH |
| Message Expiry Interval | 0x02 | Integer | PUBLISH |
| Content Type | 0x03 | String | PUBLISH |
| Response Topic | 0x08 | String | PUBLISH |
| Correlation Data | 0x09 | Binary | PUBLISH |
| Session Expiry Interval | 0x11 | Integer | CONNECT, CONNACK, DISCONNECT |
| Authentication Method | 0x15 | String | CONNECT, CONNACK, AUTH |
| Authentication Data | 0x16 | Binary | CONNECT, CONNACK, AUTH |
| User Property | 0x26 | String pair | All packets |
Properties are accessed via getIntegerProperty(id),
getStringProperty(id), getBinaryProperty(id), and
getUserProperties(). For MQTT 3.1.1 connections, a shared
MQTTProperties.EMPTY instance is used throughout.
MQTTClient provides a fully asynchronous MQTT client that
integrates with Gumdrop's event-driven architecture. Like all Gumdrop clients,
it supports SelectorLoop affinity for use within server implementations.
MQTTClient client = new MQTTClient("broker.example.com", 1883);
client.setClientId("sensor-001");
client.setCleanSession(true);
client.setKeepAlive(30);
client.setCredentials("user", "password");
client.connect(new MQTTClientCallback() {
@Override
public void connected() {
System.out.println("Connected to broker");
}
@Override
public void connectionLost() {
System.err.println("Connection lost");
}
@Override
public void subscribeAcknowledged(int packetId) {
System.out.println("Subscription confirmed");
}
@Override
public void publishComplete(int packetId) {
System.out.println("Publish acknowledged");
}
}, new MQTTMessageListener() {
@Override
public void messageReceived(String topic, MQTTMessageContent content,
int qos, boolean retain) {
byte[] data = content.asByteArray();
try {
System.out.println(topic + ": " + new String(data, UTF_8));
} finally {
content.release();
}
}
});
The listener receives MQTTMessageContent instead of
ByteBuffer. Ownership transfers to the listener, which must call
content.release() when done. For small messages, use
content.asByteArray(). For large payloads, stream via
content.openChannel() to avoid loading the entire message into
memory:
public void messageReceived(String topic, MQTTMessageContent content,
int qos, boolean retain) {
try (ReadableByteChannel ch = content.openChannel()) {
Files.copy(Channels.newInputStream(ch), Path.of("received.dat"));
} catch (IOException e) {
// handle
} finally {
content.release();
}
}
// QoS 0 - fire and forget
client.publish("sensors/temperature", "22.5", QoS.AT_MOST_ONCE);
// QoS 1 - at least once delivery
client.publish("sensors/humidity", payload, QoS.AT_LEAST_ONCE, false);
// QoS 2 - exactly once delivery
int packetId = client.publish("commands/reboot", payload, QoS.EXACTLY_ONCE, false);
// Single topic
client.subscribe("sensors/#", QoS.AT_LEAST_ONCE);
// Multiple topics
client.subscribe(
new String[]{"sensors/+/temperature", "commands/#"},
new QoS[]{QoS.AT_LEAST_ONCE, QoS.EXACTLY_ONCE}
);
MQTTClient client = new MQTTClient("broker.example.com", 8883);
client.setSecure(true);
client.setKeystoreFile(Path.of("truststore.p12"));
client.setKeystorePass("password");
client.connect(callback, listener);
client.setWill("devices/sensor-001/status", "offline".getBytes(UTF_8),
QoS.AT_LEAST_ONCE, true);
client.connect(callback, listener);
When using the MQTT client from within a Gumdrop server, pass the endpoint's
SelectorLoop to the constructor. This ensures all MQTT callbacks
execute on the same thread as your server connection, eliminating
synchronisation overhead:
SelectorLoop loop = endpoint.getSelectorLoop(); MQTTClient mqtt = new MQTTClient(loop, "broker.internal", 1883); mqtt.connect(callback, listener);
client.setVersion(MQTTVersion.V5_0); client.connect(callback, listener);
When a TelemetryConfig is set on the service,
MQTTServerMetrics records the following OpenTelemetry metrics:
| Metric | Type | Description |
|---|---|---|
mqtt.server.connections | Counter | Total MQTT connections accepted |
mqtt.server.active_connections | UpDownCounter | Currently active connections |
mqtt.server.session.duration | Histogram | Session duration in milliseconds |
mqtt.server.publishes | Counter | PUBLISH messages received (by QoS level) |
mqtt.server.publish.size | Histogram | Publish payload size in bytes |
mqtt.server.subscribes | Counter | SUBSCRIBE operations received |
mqtt.server.unsubscribes | Counter | UNSUBSCRIBE operations received |
mqtt.server.authentications | Counter | Authentication attempts |
mqtt.server.authentications.success | Counter | Successful authentications |
mqtt.server.authentications.failure | Counter | Failed authentications |
Metrics are exported to any OpenTelemetry Collector via the framework's OTLP exporter. See the Telemetry documentation for configuration details.
A service that accepts sensor data over MQTT and stores it in a time-series database:
public class SensorGatewayService extends MQTTService {
private TimeSeriesDB db;
public void setDatabase(TimeSeriesDB db) {
this.db = db;
}
@Override
protected ConnectHandler createConnectHandler(TCPListener listener) {
return new ConnectHandler() {
@Override
public void handleConnect(ConnectState state,
ConnectPacket packet,
Endpoint endpoint) {
Realm realm = getRealm();
if (realm != null) {
String user = packet.getUsername();
byte[] pass = packet.getPassword();
if (realm.authenticate(user, new String(pass))) {
state.acceptConnection();
} else {
state.rejectBadCredentials();
}
} else {
state.acceptConnection();
}
}
@Override
public void disconnected() { }
};
}
@Override
protected PublishHandler createPublishHandler(TCPListener listener) {
return (state, clientId, topic, qos, retain) -> {
// Only allow publishing to sensor topics
if (topic.startsWith("sensors/")) {
state.allowPublish();
} else {
state.rejectPublish();
}
};
}
@Override
protected SubscribeHandler createSubscribeHandler(TCPListener listener) {
return (state, clientId, topicFilter, requestedQoS) -> {
// Dashboards subscribe, sensors publish
state.grantSubscription(requestedQoS);
};
}
}
<service id="mqtt" class="com.example.SensorGatewayService">
<property name="realm" ref="#deviceRealm"/>
<property name="database" ref="#tsdb"/>
<listener class="org.bluezoo.gumdrop.mqtt.MQTTListener" port="1883"/>
<listener class="org.bluezoo.gumdrop.mqtt.MQTTListener"
port="8883" secure="true">
<property name="keystore-file" path="mqtt.p12"/>
<property name="keystore-pass" value="secret"/>
</listener>
</service>
Publish sensor data received via HTTP REST to an MQTT topic for distribution to subscribers:
public class SensorAPIHandler implements HTTPRequestHandler {
private MQTTClient mqttClient;
@Override
public void handleRequest(Request request, Response response) {
String sensorId = request.getPathInfo().substring(1);
byte[] data = request.getBody();
mqttClient.publish("sensors/" + sensorId + "/data", data,
QoS.AT_LEAST_ONCE, false);
response.setStatus(202);
response.setContentType("text/plain");
response.getWriter().write("Accepted");
}
}
← Back to Main Page | HTTP Server & Client | WebSocket | Telemetry
Gumdrop MQTT Server & Client