Gumdrop provides a fully asynchronous Redis client that integrates with the framework's event-driven architecture. Like all Gumdrop clients, the Redis client uses non-blocking I/O and callbacks, making it ideal for use within server implementations that need to access Redis for caching, session storage, rate limiting, or pub/sub messaging.
The Redis client follows the same patterns as other Gumdrop clients (LDAP, SMTP, HTTP). It uses the RESP (Redis Serialization Protocol) for wire-level communication, with a streaming decoder that handles partial data gracefully.
org.bluezoo.gumdrop.redis.client.RedisClient - connection
factory that creates connections to a Redis serverorg.bluezoo.gumdrop.redis.client.RedisClientProtocolHandler -
endpoint handler implementing the RESP protocolorg.bluezoo.gumdrop.redis.client.RedisSession - interface
exposing all Redis operationsorg.bluezoo.gumdrop.redis.client.RedisConnectionReady -
entry point handler receiving the ready notificationorg.bluezoo.gumdrop.redis.codec - RESP encoder/decoderAll Redis operations are asynchronous. When you call a command method, it immediately encodes and sends the command, then returns. When the response arrives (possibly much later), your callback is invoked on the connection's SelectorLoop thread.
This model supports natural pipelining—you can issue many commands without waiting for responses:
session.set("key1", "value1", handler);
session.set("key2", "value2", handler);
session.set("key3", "value3", handler);
session.get("key1", getHandler);
// All commands sent immediately; responses arrive in order
Connect to Redis and execute commands using callback handlers:
RedisClient client = new RedisClient(selectorLoop, "localhost", 6379);
client.connect(new RedisConnectionReady() {
@Override
public void handleReady(RedisSession session) {
// Connection established, start using Redis
session.ping(new StringResultHandler() {
@Override
public void handleResult(String result, RedisSession s) {
// result is "PONG"
System.out.println("Redis says: " + result);
}
@Override
public void handleError(String error, RedisSession s) {
System.err.println("Error: " + error);
}
});
}
@Override
public void onConnected(Endpoint endpoint) {
// TCP connected
}
@Override
public void onDisconnected() {
// Connection closed
}
@Override
public void onSecurityEstablished(SecurityInfo info) {
// TLS handshake complete (for secure connections)
}
@Override
public void onError(Exception e) {
e.printStackTrace();
}
});
session.set("user:1:name", "Alice", new StringResultHandler() {
@Override
public void handleResult(String result, RedisSession s) {
// result is "OK"
// Now retrieve the value
s.get("user:1:name", new BulkResultHandler() {
@Override
public void handleResult(byte[] value, RedisSession session) {
String name = new String(value, StandardCharsets.UTF_8);
System.out.println("Name: " + name); // "Alice"
}
@Override
public void handleNull(RedisSession session) {
System.out.println("Key not found");
}
@Override
public void handleError(String error, RedisSession session) {
System.err.println("Error: " + error);
}
});
}
@Override
public void handleError(String error, RedisSession s) {
System.err.println("SET failed: " + error);
}
});
When using the Redis client from within a Gumdrop server (such as an HTTP servlet, SMTP handler, or any other protocol implementation), the client connection should be assigned to the same SelectorLoop as the originating server connection. This provides several important benefits:
Within a Gumdrop endpoint handler, obtain the current SelectorLoop from the endpoint:
// Inside an HTTP servlet or protocol handler SelectorLoop loop = endpoint.getSelectorLoop(); // Create Redis client bound to this loop RedisClient redis = new RedisClient(loop, "localhost", 6379);
When using SelectorLoop affinity, Redis callbacks execute on the same thread as your server connection. This means you can safely access connection state without synchronisation:
// Inside SMTP handler's mailFrom() callback
public void mailFrom(String sender, MailFromCallback callback) {
SelectorLoop loop = connection.getSelectorLoop();
// Check sender reputation in Redis
redisSession.get("reputation:" + sender, new BulkResultHandler() {
@Override
public void handleResult(byte[] value, RedisSession session) {
int score = Integer.parseInt(new String(value, UTF_8));
if (score < 0) {
callback.mailFromReply(SenderPolicyResult.REJECT_SPAM_REPUTATION);
} else {
callback.mailFromReply(SenderPolicyResult.ACCEPT);
}
}
@Override
public void handleNull(RedisSession session) {
// Unknown sender, accept for now
callback.mailFromReply(SenderPolicyResult.ACCEPT);
}
@Override
public void handleError(String error, RedisSession session) {
// Redis unavailable, fail open
callback.mailFromReply(SenderPolicyResult.ACCEPT);
}
});
}
Note how the callback.mailFromReply() is invoked directly from
the Redis handler—this is safe because both are on the same SelectorLoop
thread.
For high-throughput servers, maintain a pool of Redis connections per SelectorLoop. This avoids connection setup overhead and supports pipelining:
public class RedisPool {
private final Map<SelectorLoop, RedisSession> sessions = new ConcurrentHashMap<>();
private final String host;
private final int port;
public RedisPool(String host, int port) {
this.host = host;
this.port = port;
}
public void getSession(SelectorLoop loop, final SessionCallback callback) {
RedisSession existing = sessions.get(loop);
if (existing != null) {
callback.onSession(existing);
return;
}
// Create new connection for this SelectorLoop
RedisClient client = new RedisClient(loop, host, port);
client.connect(new RedisConnectionReady() {
@Override
public void handleReady(RedisSession session) {
sessions.put(loop, session);
callback.onSession(session);
}
@Override
public void onConnected(Endpoint endpoint) { }
@Override
public void onDisconnected() {
sessions.remove(loop);
}
@Override
public void onSecurityEstablished(SecurityInfo info) { }
@Override
public void onError(Exception e) {
callback.onError(e);
}
});
}
public interface SessionCallback {
void onSession(RedisSession session);
void onError(Exception e);
}
}
The RedisSession interface provides methods for all common Redis
commands, organised by data type.
| Method | Redis Command | Description |
|---|---|---|
auth(password, handler) | AUTH | Authenticate (Redis 5 and earlier) |
auth(user, password, handler) | AUTH | Authenticate with ACL (Redis 6+) |
hello(protover, handler) | HELLO | Negotiate RESP protocol version (Redis 6+) |
hello(protover, user, pass, handler) | HELLO | Negotiate protocol with auth |
clientSetName(name, handler) | CLIENT SETNAME | Set connection name |
clientGetName(handler) | CLIENT GETNAME | Get connection name |
clientId(handler) | CLIENT ID | Get connection ID |
ping(handler) | PING | Test connection |
select(index, handler) | SELECT | Select database |
echo(message, handler) | ECHO | Echo message |
reset(handler) | RESET | Reset connection state (Redis 6.2+) |
quit() | QUIT | Close connection |
| Method | Redis Command | Description |
|---|---|---|
get(key, handler) | GET | Get value |
set(key, value, handler) | SET | Set value |
setex(key, seconds, value, handler) | SETEX | Set with expiration |
setnx(key, value, handler) | SETNX | Set if not exists |
incr(key, handler) | INCR | Increment |
incrby(key, amount, handler) | INCRBY | Increment by amount |
decr(key, handler) | DECR | Decrement |
mget(handler, keys...) | MGET | Get multiple |
mset(handler, keysAndValues...) | MSET | Set multiple |
| Method | Redis Command | Description |
|---|---|---|
del(handler, keys...) | DEL | Delete keys |
exists(key, handler) | EXISTS | Check existence |
expire(key, seconds, handler) | EXPIRE | Set expiration |
ttl(key, handler) | TTL | Get time to live |
persist(key, handler) | PERSIST | Remove expiration |
keys(pattern, handler) | KEYS | Find keys by pattern |
type(key, handler) | TYPE | Get key type |
scan(cursor, handler) | SCAN | Iterate key space |
scan(cursor, match, count, handler) | SCAN | Iterate with MATCH/COUNT |
| Method | Redis Command | Description |
|---|---|---|
hget(key, field, handler) | HGET | Get field |
hset(key, field, value, handler) | HSET | Set field |
hgetall(key, handler) | HGETALL | Get all fields |
hdel(key, handler, fields...) | HDEL | Delete fields |
hexists(key, field, handler) | HEXISTS | Check field exists |
hincrby(key, field, amount, handler) | HINCRBY | Increment field |
hscan(key, cursor, handler) | HSCAN | Iterate hash fields |
| Method | Redis Command | Description |
|---|---|---|
lpush(key, handler, values...) | LPUSH | Push to head |
rpush(key, handler, values...) | RPUSH | Push to tail |
lpop(key, handler) | LPOP | Pop from head |
rpop(key, handler) | RPOP | Pop from tail |
lrange(key, start, stop, handler) | LRANGE | Get range |
llen(key, handler) | LLEN | Get length |
blpop(timeout, handler, keys...) | BLPOP | Blocking pop from head |
brpop(timeout, handler, keys...) | BRPOP | Blocking pop from tail |
blmove(src, dest, from, to, timeout, handler) | BLMOVE | Blocking move between lists |
| Method | Redis Command | Description |
|---|---|---|
sadd(key, handler, members...) | SADD | Add members |
srem(key, handler, members...) | SREM | Remove members |
smembers(key, handler) | SMEMBERS | Get all members |
sismember(key, member, handler) | SISMEMBER | Check membership |
scard(key, handler) | SCARD | Get cardinality |
sscan(key, cursor, handler) | SSCAN | Iterate set members |
| Method | Redis Command | Description |
|---|---|---|
zadd(key, score, member, handler) | ZADD | Add with score |
zrange(key, start, stop, handler) | ZRANGE | Get by rank |
zrevrange(key, start, stop, handler) | ZREVRANGE | Get by rank (descending) |
zscore(key, member, handler) | ZSCORE | Get score |
zrank(key, member, handler) | ZRANK | Get rank |
zincrby(key, increment, member, handler) | ZINCRBY | Increment score |
zscan(key, cursor, handler) | ZSCAN | Iterate sorted set members |
| Method | Redis Command | Description |
|---|---|---|
xadd(key, id, handler, fields...) | XADD | Append entry to stream |
xlen(key, handler) | XLEN | Get stream length |
xrange(key, start, end, handler) | XRANGE | Get entries in range |
xrevrange(key, end, start, handler) | XREVRANGE | Get entries in reverse |
xread(count, block, handler, keysAndIds...) | XREAD | Read from streams (optionally blocking) |
xtrim(key, maxLen, handler) | XTRIM | Trim stream to max length |
xack(key, group, handler, ids...) | XACK | Acknowledge entries for consumer group |
xgroupCreate(key, group, id, handler) | XGROUP CREATE | Create consumer group |
xgroupDestroy(key, group, handler) | XGROUP DESTROY | Delete consumer group |
xpending(key, group, handler) | XPENDING | Get pending entries for group |
For commands not covered by specific methods, use the generic
command() method:
session.command(handler, "OBJECT", "ENCODING", "mykey");
Each command type has a corresponding result handler interface:
StringResultHandler - for commands returning simple strings
("OK", "PONG")BulkResultHandler - for commands returning bulk strings or
null (GET, HGET)IntegerResultHandler - for commands returning integers
(INCR, LPUSH, DEL)BooleanResultHandler - convenience for integer-as-boolean
(EXISTS, SISMEMBER)ArrayResultHandler - for commands returning arrays
(KEYS, LRANGE, HGETALL)MessageHandler - for Pub/Sub messagesScanResultHandler - for SCAN family commands (cursor + elements)
session.get("key", new BulkResultHandler() {
@Override
public void handleResult(byte[] value, RedisSession session) {
// Value found - convert bytes to string if needed
String s = new String(value, StandardCharsets.UTF_8);
}
@Override
public void handleNull(RedisSession session) {
// Key doesn't exist
}
@Override
public void handleError(String error, RedisSession session) {
// Redis error (e.g., WRONGTYPE)
}
});
session.hgetall("user:1", new ArrayResultHandler() {
@Override
public void handleResult(List<RESPValue> array, RedisSession session) {
// HGETALL returns alternating field/value pairs
for (int i = 0; i < array.size(); i += 2) {
String field = array.get(i).asString();
String value = array.get(i + 1).asString();
System.out.println(field + " = " + value);
}
}
@Override
public void handleNull(RedisSession session) {
// Key doesn't exist
}
@Override
public void handleError(String error, RedisSession session) {
// Error occurred
}
});
The Redis client supports both RESP2 and RESP3 protocol versions. RESP3 is
negotiated using the HELLO command (Redis 6+). RESP3 adds several
new data types: Map (%), Set (~), Double (,), Boolean (#), Null (_), Push (>),
Verbatim String (=), Big Number ((), and Blob Error (!).
// Negotiate RESP3 protocol
session.hello(3, new ArrayResultHandler() {
@Override
public void handleResult(List<RESPValue> array, RedisSession session) {
// Server properties (flattened map): server, version, proto, id, mode, role
for (int i = 0; i < array.size(); i += 2) {
System.out.println(array.get(i).asString() + ": " + array.get(i + 1).asString());
}
}
// ...
});
// HELLO with authentication (single round-trip)
session.hello(3, "default", "password", handler);
RESP3 Maps received by ArrayResultHandler are automatically
flattened to alternating key/value lists for compatibility. The RESP3 Push
type (>) is used for out-of-band Pub/Sub message delivery, replacing the
RESP2 array-based detection.
The CLIENT commands are useful for connection management and
debugging:
session.clientSetName("my-connection", handler);
session.clientGetName(handler); // returns "my-connection"
session.clientId(handler); // returns unique connection ID
The RESET command (Redis 6.2+) restores the connection to its
initial state, clearing any Pub/Sub subscriptions, MULTI transactions, WATCH
keys, and authentication:
session.reset(handler); // receives "RESET"
SCAN commands provide incremental iteration over key spaces and collections
without blocking the server. Unlike KEYS, which scans the entire
key space in one call, SCAN returns a small batch per invocation and uses a
cursor to track position.
// Iterate all keys matching "user:*"
session.scan("0", "user:*", 100, new ScanResultHandler() {
@Override
public void handleResult(String cursor, List<RESPValue> elements,
RedisSession session) {
for (RESPValue key : elements) {
System.out.println("Key: " + key.asString());
}
if (!"0".equals(cursor)) {
// Continue iteration
session.scan(cursor, "user:*", 100, this);
}
}
// ...
});
The same pattern applies to hscan() (hash fields),
sscan() (set members), and zscan() (sorted set
members with scores). Each accepts optional MATCH and COUNT parameters.
Blocking list commands wait until an element is available or a timeout expires. These are commonly used to implement job queues and message consumers.
// Block until an element is available on "queue" (5 second timeout)
session.blpop(5.0, new ArrayResultHandler() {
@Override
public void handleResult(List<RESPValue> array, RedisSession session) {
// array[0] = key, array[1] = value
String key = array.get(0).asString();
String value = array.get(1).asString();
System.out.println("Got " + value + " from " + key);
}
@Override
public void handleNull(RedisSession session) {
// Timeout expired, no element available
}
// ...
}, "queue");
BLMOVE (Redis 6.2+) atomically moves an element between lists
with blocking:
session.blmove("source", "destination", "LEFT", "RIGHT", 5.0, handler);
Redis Streams provide an append-only log data structure with consumer groups, suitable for event sourcing, message brokering, and activity feeds (Redis 5.0+).
// Append an entry
session.xadd("mystream", "*", handler, "sensor", "temperature", "value", "22.5");
// Read a range of entries
session.xrange("mystream", "-", "+", 10, handler);
// Read new entries with blocking
session.xread(10, 5000, handler,
"mystream", "$"); // STREAMS mystream $
// Create a consumer group and acknowledge entries
session.xgroupCreate("mystream", "mygroup", "0", handler);
session.xack("mystream", "mygroup", handler, "1526569495631-0");
Redis Pub/Sub enables real-time messaging between services. When you subscribe
to channels, the connection enters Pub/Sub mode and messages are delivered
through the MessageHandler.
session.subscribe(new MessageHandler() {
@Override
public void handleMessage(String channel, byte[] message) {
String text = new String(message, StandardCharsets.UTF_8);
System.out.println("Received on " + channel + ": " + text);
}
@Override
public void handlePatternMessage(String pattern, String channel, byte[] message) {
// For psubscribe patterns
}
@Override
public void handleSubscribed(String channel, int subscriptionCount) {
System.out.println("Subscribed to " + channel);
}
@Override
public void handlePatternSubscribed(String pattern, int subscriptionCount) {
System.out.println("Pattern subscribed: " + pattern);
}
@Override
public void handleUnsubscribed(String channel, int subscriptionCount) {
System.out.println("Unsubscribed from " + channel);
}
@Override
public void handlePatternUnsubscribed(String pattern, int subscriptionCount) {
System.out.println("Pattern unsubscribed: " + pattern);
}
@Override
public void handleError(String error) {
System.err.println("Pub/Sub error: " + error);
}
}, "events", "notifications");
// Subscribe to all channels matching a pattern session.psubscribe(messageHandler, "user:*", "order:*");
Publishing is done on a separate connection (not in Pub/Sub mode):
session.publish("events", "user logged in", new IntegerResultHandler() {
@Override
public void handleResult(long subscriberCount, RedisSession session) {
System.out.println("Message sent to " + subscriberCount + " subscribers");
}
@Override
public void handleError(String error, RedisSession session) {
System.err.println("Publish failed: " + error);
}
});
Redis transactions provide atomic execution of multiple commands:
// Start transaction
session.multi(new StringResultHandler() {
@Override
public void handleResult(String result, RedisSession session) {
// Queue commands (responses will be QUEUED)
session.incr("counter", null);
session.get("counter", null);
session.set("last_updated", String.valueOf(System.currentTimeMillis()), null);
// Execute all queued commands
session.exec(new ArrayResultHandler() {
@Override
public void handleResult(List<RESPValue> results, RedisSession session) {
// results[0] = INCR result (integer)
// results[1] = GET result (bulk string)
// results[2] = SET result ("OK")
long newValue = results.get(0).asLong();
String current = results.get(1).asString();
}
@Override
public void handleNull(RedisSession session) {
// Transaction aborted (WATCH key was modified)
}
@Override
public void handleError(String error, RedisSession session) {
System.err.println("Transaction failed: " + error);
}
});
}
@Override
public void handleError(String error, RedisSession session) {
System.err.println("MULTI failed: " + error);
}
});
// Watch key for changes
session.watch(new StringResultHandler() {
@Override
public void handleResult(String result, RedisSession session) {
// Read current value
session.get("balance", new BulkResultHandler() {
@Override
public void handleResult(byte[] value, RedisSession session) {
int balance = Integer.parseInt(new String(value, UTF_8));
int newBalance = balance - 100;
// Start transaction
session.multi(new StringResultHandler() {
@Override
public void handleResult(String r, RedisSession s) {
s.set("balance", String.valueOf(newBalance), null);
s.exec(new ArrayResultHandler() {
@Override
public void handleResult(List<RESPValue> results, RedisSession s) {
// Transaction succeeded
}
@Override
public void handleNull(RedisSession s) {
// Someone modified balance - retry
}
@Override
public void handleError(String e, RedisSession s) { }
});
}
@Override
public void handleError(String e, RedisSession s) { }
});
}
@Override
public void handleNull(RedisSession session) { }
@Override
public void handleError(String error, RedisSession session) { }
});
}
@Override
public void handleError(String error, RedisSession session) { }
}, "balance");
Connect to Redis over TLS by enabling secure mode and configuring the truststore:
RedisClient client = new RedisClient(selectorLoop, "redis.example.com", 6379);
client.setSecure(true);
client.setKeystoreFile("/path/to/truststore.p12");
client.setKeystorePass("password");
client.connect(handler);
For mutual TLS (client certificates), configure the keystore and a custom trust
manager. RedisClient does not expose truststore properties directly;
use setTrustManager() with a PinnedCertTrustManager or
custom X509TrustManager:
client.setSecure(true);
client.setKeystoreFile("/path/to/client-keystore.p12");
client.setKeystorePass("password");
client.setTrustManager(new PinnedCertTrustManager(fingerprints));
client.connect(new RedisConnectionReady() {
@Override
public void handleReady(RedisSession session) {
session.auth("your-password", new StringResultHandler() {
@Override
public void handleResult(String result, RedisSession s) {
// Authenticated - start using Redis
s.set("key", "value", myHandler);
}
@Override
public void handleError(String error, RedisSession s) {
System.err.println("Authentication failed: " + error);
s.close();
}
});
}
// ... other callbacks
});
session.auth("username", "password", new StringResultHandler() {
@Override
public void handleResult(String result, RedisSession s) {
// Authenticated as specific user
}
@Override
public void handleError(String error, RedisSession s) {
// Authentication failed
}
});
public class RateLimitingMailHandler implements SMTPConnectionHandler {
private final RedisSession redis;
private final int maxMessagesPerMinute = 60;
@Override
public void mailFrom(String sender, MailFromCallback callback) {
String key = "rate:" + sender;
redis.incr(key, new IntegerResultHandler() {
@Override
public void handleResult(long count, RedisSession session) {
if (count == 1) {
// First message - set expiry
session.expire(key, 60, new BooleanResultHandler() {
@Override
public void handleResult(boolean set, RedisSession s) {
checkLimit(count, callback);
}
@Override
public void handleError(String error, RedisSession s) {
callback.mailFromReply(SenderPolicyResult.ACCEPT);
}
});
} else {
checkLimit(count, callback);
}
}
@Override
public void handleError(String error, RedisSession session) {
// Redis unavailable - fail open
callback.mailFromReply(SenderPolicyResult.ACCEPT);
}
});
}
private void checkLimit(long count, MailFromCallback callback) {
if (count > maxMessagesPerMinute) {
callback.mailFromReply(SenderPolicyResult.TEMP_REJECT_RATE_LIMIT);
} else {
callback.mailFromReply(SenderPolicyResult.ACCEPT);
}
}
}
public class RedisSessionServlet extends HttpServlet {
private RedisPool redisPool;
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
throws IOException {
String sessionId = req.getSession().getId();
SelectorLoop loop = ((Request) req).getEndpoint().getSelectorLoop();
redisPool.getSession(loop, new RedisPool.SessionCallback() {
@Override
public void onSession(RedisSession redis) {
redis.hgetall("session:" + sessionId, new ArrayResultHandler() {
@Override
public void handleResult(List<RESPValue> array, RedisSession s) {
Map<String, String> attrs = new HashMap<>();
for (int i = 0; i < array.size(); i += 2) {
attrs.put(array.get(i).asString(),
array.get(i + 1).asString());
}
req.setAttribute("redisSession", attrs);
// Continue processing...
}
@Override
public void handleNull(RedisSession s) {
req.setAttribute("redisSession", Collections.emptyMap());
}
@Override
public void handleError(String error, RedisSession s) {
resp.sendError(500, "Redis error: " + error);
}
});
}
@Override
public void onError(Exception e) {
resp.sendError(500, "Redis unavailable");
}
});
}
}
public class CachingService {
private final RedisSession redis;
private final int cacheTTLSeconds = 300; // 5 minutes
public void getCached(String key, CacheCallback callback) {
redis.get("cache:" + key, new BulkResultHandler() {
@Override
public void handleResult(byte[] value, RedisSession session) {
callback.onHit(new String(value, UTF_8));
}
@Override
public void handleNull(RedisSession session) {
callback.onMiss();
}
@Override
public void handleError(String error, RedisSession session) {
callback.onError(new Exception(error));
}
});
}
public void cache(String key, String value) {
redis.setex("cache:" + key, cacheTTLSeconds, value,
new StringResultHandler() {
@Override
public void handleResult(String r, RedisSession s) { }
@Override
public void handleError(String e, RedisSession s) {
// Log error but don't fail
}
});
}
public interface CacheCallback {
void onHit(String value);
void onMiss();
void onError(Exception e);
}
}
← Back to Main Page | HTTP Server & Client | SMTP Server & Client | Telemetry
Gumdrop Redis Client