Gumdrop

Gumdrop MQTT Server & Client

Gumdrop provides a complete MQTT message broker and client implementation supporting both MQTT 3.1.1 (RFC specification OASIS Standard, 29 October 2014) and MQTT 5.0 (OASIS Standard, 7 March 2019). The broker integrates with Gumdrop's event-driven, non-blocking I/O architecture to deliver high-throughput publish/subscribe messaging with minimal resource consumption. MQTT over WebSocket is also supported, enabling browser-based clients to connect through the same service.

Contents

Architecture

The MQTT implementation follows Gumdrop's standard service/listener/handler architecture. An MQTTService owns one or more MQTTListeners that accept TCP connections. Each connection is handled by an MQTTProtocolHandler which parses the MQTT wire protocol using a SAX-style event-driven parser (MQTTFrameParser) and delegates application-level decisions to pluggable handler interfaces.

Core Components

Event-Driven Parsing

Unlike implementations that buffer complete MQTT packets before processing, Gumdrop uses an incremental, SAX-style event model. The MQTTFrameParser invokes methods on the MQTTEventHandler interface as each protocol element is decoded. PUBLISH payloads — which can be up to 256 MB — are streamed in chunks via publishData(ByteBuffer) callbacks between startPublish() and endPublish() events. SUBSCRIBE and UNSUBSCRIBE topic filters are similarly delivered incrementally. This design keeps memory usage bounded regardless of message size.

Message Routing

When a PUBLISH message is received, the protocol handler consults the SubscriptionManager which uses a TopicTree to resolve matching subscribers (including wildcard patterns + and #). For large payloads, the broker performs horizontal fan-out: each chunk of the payload is read once from storage and then broadcast to all matching subscribers before the next chunk is read. This minimises I/O and memory pressure compared to reading the full payload once per subscriber.

Protocol Support

MQTT 3.1.1

Full support for the MQTT 3.1.1 specification (OASIS Standard):

MQTT 5.0

Full support for MQTT 5.0 (OASIS Standard), which adds:

The protocol version is negotiated during the CONNECT handshake. MQTT 3.1.1 clients (protocol level 4) and MQTT 5.0 clients (protocol level 5) can connect to the same listener simultaneously.

Configuration

Basic Configuration

<service id="mqtt" class="org.bluezoo.gumdrop.mqtt.DefaultMQTTService">
    <listener class="org.bluezoo.gumdrop.mqtt.MQTTListener"
            port="1883"/>
</service>

TLS Configuration (MQTTS)

<service id="mqtt" class="org.bluezoo.gumdrop.mqtt.DefaultMQTTService">
    <!-- Cleartext MQTT -->
    <listener class="org.bluezoo.gumdrop.mqtt.MQTTListener"
            port="1883"/>
    <!-- MQTT over TLS -->
    <listener class="org.bluezoo.gumdrop.mqtt.MQTTListener"
            port="8883" secure="true">
        <property name="keystore-file" path="keystore.p12"/>
        <property name="keystore-pass" value="secret"/>
    </listener>
</service>

With Authentication

<service id="mqtt" class="org.bluezoo.gumdrop.mqtt.DefaultMQTTService">
    <property name="realm" ref="#realm"/>
    <listener class="org.bluezoo.gumdrop.mqtt.MQTTListener"
            port="1883"/>
</service>

<bean id="realm" class="org.bluezoo.gumdrop.auth.BasicRealm">
    <!-- realm configuration -->
</bean>

MQTTService Properties

PropertySetterDefaultDescription
realmsetRealm(Realm) noneAuthentication realm for client connections
max-packet-sizesetMaxPacketSize(int) 268,435,455Maximum MQTT packet size in bytes (256 MB)

MQTTListener Properties

PropertySetterDefaultDescription
portsetPort(int) 1883Listening port (8883 for MQTTS)
max-packet-sizesetMaxPacketSize(int) 268,435,455Maximum packet size for this listener
default-keep-alivesetDefaultKeepAlive(int) 60Default keep-alive interval in seconds
realmsetRealm(Realm) from servicePer-listener authentication realm override

TLS properties (keystore-file, keystore-pass, secure, need-client-auth) and lifecycle properties (idle-timeout, read-timeout, connection-timeout) are inherited from TCPListener and work identically to other protocols.

Handler Interfaces

The MQTT service uses Gumdrop's staged handler pattern for application logic. Three handler interfaces let you control connection authorization, publish authorization, and subscribe authorization. Each handler receives a state object that represents the valid responses — you call a method on the state to accept or reject the request. This continuation-passing style is fully asynchronous: you can defer the response (e.g. to perform a database lookup or remote service call) and invoke the state callback later, on any thread.

ConnectHandler

Called when a client sends a CONNECT packet. Use this to authorize connections based on credentials, client ID, or network origin.

public interface ConnectHandler {
    void handleConnect(ConnectState state, ConnectPacket packet, Endpoint endpoint);
    void disconnected();
}

The ConnectState provides:

PublishHandler

Called when a client publishes a message. Use this for topic-level access control, content filtering, or audit logging.

public interface PublishHandler {
    void authorizePublish(PublishState state, String clientId,
                          String topicName, int qos, boolean retain);
}

The PublishState provides:

SubscribeHandler

Called for each topic filter in a SUBSCRIBE request. Use this for topic-level access control or to downgrade the granted QoS level.

public interface SubscribeHandler {
    void authorizeSubscription(SubscribeState state, String clientId,
                               String topicFilter, QoS requestedQoS);
}

The SubscribeState provides:

Implementing Custom Handlers

Override the factory methods in your MQTTService subclass to supply custom handlers:

public class MyMQTTService extends MQTTService {

    @Override
    protected ConnectHandler createConnectHandler(TCPListener listener) {
        return new ConnectHandler() {
            @Override
            public void handleConnect(ConnectState state,
                                      ConnectPacket packet,
                                      Endpoint endpoint) {
                String username = packet.getUsername();
                byte[] password = packet.getPassword();

                if (username != null && authenticate(username, password)) {
                    state.acceptConnection();
                } else {
                    state.rejectBadCredentials();
                }
            }

            @Override
            public void disconnected() {
                // Clean up resources
            }
        };
    }

    @Override
    protected PublishHandler createPublishHandler(TCPListener listener) {
        return (state, clientId, topic, qos, retain) -> {
            if (topic.startsWith("restricted/") && !isAdmin(clientId)) {
                state.rejectPublish();
            } else {
                state.allowPublish();
            }
        };
    }

    @Override
    protected SubscribeHandler createSubscribeHandler(TCPListener listener) {
        return (state, clientId, topicFilter, requestedQoS) -> {
            if (topicFilter.startsWith("$SYS/")) {
                state.rejectSubscription();
            } else {
                state.grantSubscription(requestedQoS);
            }
        };
    }
}

Default Service

DefaultMQTTService provides a ready-to-use broker that accepts all connections, allows all publishes, and grants all subscriptions at the requested QoS level. It requires no handler implementation — just configure listeners and optionally a realm:

<service id="mqtt" class="org.bluezoo.gumdrop.mqtt.DefaultMQTTService">
    <property name="realm" ref="#realm"/>
    <listener class="org.bluezoo.gumdrop.mqtt.MQTTListener" port="1883"/>
    <listener class="org.bluezoo.gumdrop.mqtt.MQTTListener" port="8883" secure="true">
        <property name="keystore-file" path="keystore.p12"/>
        <property name="keystore-pass" value="secret"/>
    </listener>
</service>

When a realm is configured, the broker validates username/password credentials from the CONNECT packet against the realm. Without a realm, all clients are accepted. This is typically sufficient for development, internal messaging, and IoT deployments where network-level security is adequate.

Broker Components

The broker is composed of several collaborating components managed by the service. These are created automatically and available to custom service implementations.

SubscriptionManager

Tracks all active client subscriptions and maps topic names to subscribers. Uses a TopicTree for efficient wildcard matching.

TopicTree

A trie-based data structure that supports MQTT topic wildcard matching. Topic filters containing + (single-level wildcard) and # (multi-level wildcard) are stored in the tree and matched against concrete topic names during publish routing. The matchWithMaxQoS(topicName) method returns the highest QoS level per client across all matching subscriptions.

RetainedMessageStore

Stores the most recent retained message for each topic. When a client subscribes to a topic filter, any matching retained messages are delivered immediately. Setting a retained message with an empty payload clears the retained entry.

WillManager

Stores Last Will and Testament messages registered during CONNECT. If a client disconnects unexpectedly (without sending DISCONNECT), its will message is published to the specified topic. Will messages are stored as MQTTMessageContent and participate in the streaming payload pipeline.

QoSManager

Tracks in-flight messages for QoS 1 and QoS 2 delivery. Manages packet ID allocation, outbound message tracking, and the QoS 2 four-phase handshake (PUBLISH → PUBREC → PUBREL → PUBCOMP). In-flight messages reference MQTTMessageContent for zero-copy redelivery.

Streaming Payloads & Message Store

MQTT PUBLISH payloads can be up to 256 MB. To avoid buffering entire payloads in memory, Gumdrop provides a pluggable message store abstraction based on NIO channels. Both the broker and the client use this abstraction: the broker streams payloads to subscribers, and the client delivers MQTTMessageContent to MQTTMessageListener so applications can stream large received messages via content.openChannel() instead of materialising them as byte[].

Store Interfaces

Three interfaces in org.bluezoo.gumdrop.mqtt.store form the abstraction:

Payload Lifecycle

  1. Accumulate — as publishData() events arrive from the parser, the protocol handler writes chunks into an MQTTMessageWriter
  2. Commit — on endPublish(), the writer is committed to produce an MQTTMessageContent
  3. Route — the content is passed through authorization and then to the routing pipeline
  4. Deliver — for each subscriber, the content is read via openChannel() and streamed to the endpoint
  5. Release — when no longer referenced (not retained, all QoS handshakes complete), release() frees resources

Horizontal Fan-Out

When delivering a large payload to multiple subscribers, the broker reads each chunk from the MQTTMessageContent channel once and then sends it to all matching subscribers before reading the next chunk:

// Pseudocode for the horizontal fan-out strategy
for (Subscriber s : subscribers) {
    s.sendHeader(encodePublishHeader(s));
}
while (channel.read(buf) > 0) {
    for (Subscriber s : subscribers) {
        s.sendChunk(buf.duplicate());
        buf.rewind();
    }
}

This ensures the payload is read only once regardless of subscriber count, minimising I/O and memory pressure.

In-Memory Store (Default)

The default InMemoryMessageStore buffers payloads as byte arrays. For small messages (the common case in IoT), isBuffered() returns true and the broker uses a fast path that encodes the entire PUBLISH packet in a single ByteBuffer, avoiding the streaming overhead entirely.

Custom Store Implementations

Override createMessageStore() in your MQTTService subclass to provide a persistent, file-backed, or distributed store:

public class PersistentMQTTService extends MQTTService {

    @Override
    protected MQTTMessageStore createMessageStore() {
        return new FileBackedMessageStore(Path.of("/var/mqtt/payloads"));
    }
}

A file-backed implementation would write incoming payload chunks to temporary files and return an MQTTMessageContent whose openChannel() opens a FileChannel for zero-copy reads. This allows the broker to handle arbitrarily large payloads without proportional memory allocation.

MQTT over WebSocket

The MQTTWebSocketHandler bridges WebSocket connections to the MQTT protocol handler, enabling browser-based MQTT clients (such as Eclipse Paho JavaScript or MQTT.js) to connect to the broker. Binary WebSocket frames are fed directly into the MQTTFrameParser.

To enable MQTT over WebSocket, configure a WebSocket listener on the HTTP service that delegates to the MQTT service's WebSocket handler. The WebSocket subprotocol mqtt should be negotiated during the upgrade handshake.

Quality of Service

The broker implements all three MQTT QoS levels:

LevelNameDelivery GuaranteeHandshake
0At most onceFire and forget PUBLISH only
1At least onceAcknowledged delivery PUBLISH → PUBACK
2Exactly onceAssured delivery PUBLISH → PUBREC → PUBREL → PUBCOMP

The effective QoS for a delivered message is the minimum of the publisher's QoS and the subscriber's granted QoS, as required by the MQTT specification. The QoSManager tracks all in-flight messages and manages packet ID allocation for both inbound and outbound message flows.

MQTT 5.0 Features

MQTT 5.0 introduces a property mechanism carried by all packet types. The MQTTProperties class provides type-safe access to all standard properties:

PropertyIDTypeUsed In
Payload Format Indicator0x01Integer PUBLISH
Message Expiry Interval0x02Integer PUBLISH
Content Type0x03String PUBLISH
Response Topic0x08String PUBLISH
Correlation Data0x09Binary PUBLISH
Session Expiry Interval0x11Integer CONNECT, CONNACK, DISCONNECT
Authentication Method0x15String CONNECT, CONNACK, AUTH
Authentication Data0x16Binary CONNECT, CONNACK, AUTH
User Property0x26String pair All packets

Properties are accessed via getIntegerProperty(id), getStringProperty(id), getBinaryProperty(id), and getUserProperties(). For MQTT 3.1.1 connections, a shared MQTTProperties.EMPTY instance is used throughout.

Client API

MQTTClient provides a fully asynchronous MQTT client that integrates with Gumdrop's event-driven architecture. Like all Gumdrop clients, it supports SelectorLoop affinity for use within server implementations.

Connecting

MQTTClient client = new MQTTClient("broker.example.com", 1883);
client.setClientId("sensor-001");
client.setCleanSession(true);
client.setKeepAlive(30);
client.setCredentials("user", "password");

client.connect(new MQTTClientCallback() {
    @Override
    public void connected() {
        System.out.println("Connected to broker");
    }

    @Override
    public void connectionLost() {
        System.err.println("Connection lost");
    }

    @Override
    public void subscribeAcknowledged(int packetId) {
        System.out.println("Subscription confirmed");
    }

    @Override
    public void publishComplete(int packetId) {
        System.out.println("Publish acknowledged");
    }
}, new MQTTMessageListener() {
    @Override
    public void messageReceived(String topic, MQTTMessageContent content,
                                int qos, boolean retain) {
        byte[] data = content.asByteArray();
        try {
            System.out.println(topic + ": " + new String(data, UTF_8));
        } finally {
            content.release();
        }
    }
});

The listener receives MQTTMessageContent instead of ByteBuffer. Ownership transfers to the listener, which must call content.release() when done. For small messages, use content.asByteArray(). For large payloads, stream via content.openChannel() to avoid loading the entire message into memory:

public void messageReceived(String topic, MQTTMessageContent content,
                            int qos, boolean retain) {
    try (ReadableByteChannel ch = content.openChannel()) {
        Files.copy(Channels.newInputStream(ch), Path.of("received.dat"));
    } catch (IOException e) {
        // handle
    } finally {
        content.release();
    }
}

Publishing

// QoS 0 - fire and forget
client.publish("sensors/temperature", "22.5", QoS.AT_MOST_ONCE);

// QoS 1 - at least once delivery
client.publish("sensors/humidity", payload, QoS.AT_LEAST_ONCE, false);

// QoS 2 - exactly once delivery
int packetId = client.publish("commands/reboot", payload, QoS.EXACTLY_ONCE, false);

Subscribing

// Single topic
client.subscribe("sensors/#", QoS.AT_LEAST_ONCE);

// Multiple topics
client.subscribe(
    new String[]{"sensors/+/temperature", "commands/#"},
    new QoS[]{QoS.AT_LEAST_ONCE, QoS.EXACTLY_ONCE}
);

TLS Connections

MQTTClient client = new MQTTClient("broker.example.com", 8883);
client.setSecure(true);
client.setKeystoreFile(Path.of("truststore.p12"));
client.setKeystorePass("password");
client.connect(callback, listener);

Last Will and Testament

client.setWill("devices/sensor-001/status", "offline".getBytes(UTF_8),
               QoS.AT_LEAST_ONCE, true);
client.connect(callback, listener);

SelectorLoop Affinity

When using the MQTT client from within a Gumdrop server, pass the endpoint's SelectorLoop to the constructor. This ensures all MQTT callbacks execute on the same thread as your server connection, eliminating synchronisation overhead:

SelectorLoop loop = endpoint.getSelectorLoop();
MQTTClient mqtt = new MQTTClient(loop, "broker.internal", 1883);
mqtt.connect(callback, listener);

MQTT 5.0 Client

client.setVersion(MQTTVersion.V5_0);
client.connect(callback, listener);

Telemetry

When a TelemetryConfig is set on the service, MQTTServerMetrics records the following OpenTelemetry metrics:

MetricTypeDescription
mqtt.server.connectionsCounter Total MQTT connections accepted
mqtt.server.active_connectionsUpDownCounter Currently active connections
mqtt.server.session.durationHistogram Session duration in milliseconds
mqtt.server.publishesCounter PUBLISH messages received (by QoS level)
mqtt.server.publish.sizeHistogram Publish payload size in bytes
mqtt.server.subscribesCounter SUBSCRIBE operations received
mqtt.server.unsubscribesCounter UNSUBSCRIBE operations received
mqtt.server.authenticationsCounter Authentication attempts
mqtt.server.authentications.successCounter Successful authentications
mqtt.server.authentications.failureCounter Failed authentications

Metrics are exported to any OpenTelemetry Collector via the framework's OTLP exporter. See the Telemetry documentation for configuration details.

Examples

IoT Sensor Gateway

A service that accepts sensor data over MQTT and stores it in a time-series database:

public class SensorGatewayService extends MQTTService {

    private TimeSeriesDB db;

    public void setDatabase(TimeSeriesDB db) {
        this.db = db;
    }

    @Override
    protected ConnectHandler createConnectHandler(TCPListener listener) {
        return new ConnectHandler() {
            @Override
            public void handleConnect(ConnectState state,
                                      ConnectPacket packet,
                                      Endpoint endpoint) {
                Realm realm = getRealm();
                if (realm != null) {
                    String user = packet.getUsername();
                    byte[] pass = packet.getPassword();
                    if (realm.authenticate(user, new String(pass))) {
                        state.acceptConnection();
                    } else {
                        state.rejectBadCredentials();
                    }
                } else {
                    state.acceptConnection();
                }
            }

            @Override
            public void disconnected() { }
        };
    }

    @Override
    protected PublishHandler createPublishHandler(TCPListener listener) {
        return (state, clientId, topic, qos, retain) -> {
            // Only allow publishing to sensor topics
            if (topic.startsWith("sensors/")) {
                state.allowPublish();
            } else {
                state.rejectPublish();
            }
        };
    }

    @Override
    protected SubscribeHandler createSubscribeHandler(TCPListener listener) {
        return (state, clientId, topicFilter, requestedQoS) -> {
            // Dashboards subscribe, sensors publish
            state.grantSubscription(requestedQoS);
        };
    }
}

Minimal Configuration

<service id="mqtt" class="com.example.SensorGatewayService">
    <property name="realm" ref="#deviceRealm"/>
    <property name="database" ref="#tsdb"/>
    <listener class="org.bluezoo.gumdrop.mqtt.MQTTListener" port="1883"/>
    <listener class="org.bluezoo.gumdrop.mqtt.MQTTListener"
            port="8883" secure="true">
        <property name="keystore-file" path="mqtt.p12"/>
        <property name="keystore-pass" value="secret"/>
    </listener>
</service>

Using MQTT Client within an HTTP Microservice

Publish sensor data received via HTTP REST to an MQTT topic for distribution to subscribers:

public class SensorAPIHandler implements HTTPRequestHandler {

    private MQTTClient mqttClient;

    @Override
    public void handleRequest(Request request, Response response) {
        String sensorId = request.getPathInfo().substring(1);
        byte[] data = request.getBody();

        mqttClient.publish("sensors/" + sensorId + "/data", data,
                           QoS.AT_LEAST_ONCE, false);

        response.setStatus(202);
        response.setContentType("text/plain");
        response.getWriter().write("Accepted");
    }
}

← Back to Main Page | HTTP Server & Client | WebSocket | Telemetry

Gumdrop MQTT Server & Client