Gumdrop

Gumdrop Redis Client

Gumdrop provides a fully asynchronous Redis client that integrates with the framework's event-driven architecture. Like all Gumdrop clients, the Redis client uses non-blocking I/O and callbacks, making it ideal for use within server implementations that need to access Redis for caching, session storage, rate limiting, or pub/sub messaging.

Contents

Architecture

The Redis client follows the same patterns as other Gumdrop clients (LDAP, SMTP, HTTP). It uses the RESP (Redis Serialization Protocol) for wire-level communication, with a streaming decoder that handles partial data gracefully.

Key Components

Asynchronous Model

All Redis operations are asynchronous. When you call a command method, it immediately encodes and sends the command, then returns. When the response arrives (possibly much later), your callback is invoked on the connection's SelectorLoop thread.

This model supports natural pipelining—you can issue many commands without waiting for responses:

session.set("key1", "value1", handler);
session.set("key2", "value2", handler);
session.set("key3", "value3", handler);
session.get("key1", getHandler);
// All commands sent immediately; responses arrive in order

Basic Usage

Connect to Redis and execute commands using callback handlers:

RedisClient client = new RedisClient(selectorLoop, "localhost", 6379);

client.connect(new RedisConnectionReady() {
    @Override
    public void handleReady(RedisSession session) {
        // Connection established, start using Redis
        session.ping(new StringResultHandler() {
            @Override
            public void handleResult(String result, RedisSession s) {
                // result is "PONG"
                System.out.println("Redis says: " + result);
            }
            
            @Override
            public void handleError(String error, RedisSession s) {
                System.err.println("Error: " + error);
            }
        });
    }
    
    @Override
    public void onConnected(Endpoint endpoint) {
        // TCP connected
    }
    
    @Override
    public void onDisconnected() {
        // Connection closed
    }
    
    @Override
    public void onSecurityEstablished(SecurityInfo info) {
        // TLS handshake complete (for secure connections)
    }
    
    @Override
    public void onError(Exception e) {
        e.printStackTrace();
    }
});

Storing and Retrieving Values

session.set("user:1:name", "Alice", new StringResultHandler() {
    @Override
    public void handleResult(String result, RedisSession s) {
        // result is "OK"
        // Now retrieve the value
        s.get("user:1:name", new BulkResultHandler() {
            @Override
            public void handleResult(byte[] value, RedisSession session) {
                String name = new String(value, StandardCharsets.UTF_8);
                System.out.println("Name: " + name);  // "Alice"
            }
            
            @Override
            public void handleNull(RedisSession session) {
                System.out.println("Key not found");
            }
            
            @Override
            public void handleError(String error, RedisSession session) {
                System.err.println("Error: " + error);
            }
        });
    }
    
    @Override
    public void handleError(String error, RedisSession s) {
        System.err.println("SET failed: " + error);
    }
});

SelectorLoop Affinity

When using the Redis client from within a Gumdrop server (such as an HTTP servlet, SMTP handler, or any other protocol implementation), the client connection should be assigned to the same SelectorLoop as the originating server connection. This provides several important benefits:

Obtaining the SelectorLoop

Within a Gumdrop endpoint handler, obtain the current SelectorLoop from the endpoint:

// Inside an HTTP servlet or protocol handler
SelectorLoop loop = endpoint.getSelectorLoop();

// Create Redis client bound to this loop
RedisClient redis = new RedisClient(loop, "localhost", 6379);

Callback Threading

When using SelectorLoop affinity, Redis callbacks execute on the same thread as your server connection. This means you can safely access connection state without synchronisation:

// Inside SMTP handler's mailFrom() callback
public void mailFrom(String sender, MailFromCallback callback) {
    SelectorLoop loop = connection.getSelectorLoop();
    
    // Check sender reputation in Redis
    redisSession.get("reputation:" + sender, new BulkResultHandler() {
        @Override
        public void handleResult(byte[] value, RedisSession session) {
            int score = Integer.parseInt(new String(value, UTF_8));
            if (score < 0) {
                callback.mailFromReply(SenderPolicyResult.REJECT_SPAM_REPUTATION);
            } else {
                callback.mailFromReply(SenderPolicyResult.ACCEPT);
            }
        }
        
        @Override
        public void handleNull(RedisSession session) {
            // Unknown sender, accept for now
            callback.mailFromReply(SenderPolicyResult.ACCEPT);
        }
        
        @Override
        public void handleError(String error, RedisSession session) {
            // Redis unavailable, fail open
            callback.mailFromReply(SenderPolicyResult.ACCEPT);
        }
    });
}

Note how the callback.mailFromReply() is invoked directly from the Redis handler—this is safe because both are on the same SelectorLoop thread.

Connection Pooling Pattern

For high-throughput servers, maintain a pool of Redis connections per SelectorLoop. This avoids connection setup overhead and supports pipelining:

public class RedisPool {
    private final Map<SelectorLoop, RedisSession> sessions = new ConcurrentHashMap<>();
    private final String host;
    private final int port;
    
    public RedisPool(String host, int port) {
        this.host = host;
        this.port = port;
    }
    
    public void getSession(SelectorLoop loop, final SessionCallback callback) {
        RedisSession existing = sessions.get(loop);
        if (existing != null) {
            callback.onSession(existing);
            return;
        }
        
        // Create new connection for this SelectorLoop
        RedisClient client = new RedisClient(loop, host, port);
        client.connect(new RedisConnectionReady() {
            @Override
            public void handleReady(RedisSession session) {
                sessions.put(loop, session);
                callback.onSession(session);
            }
            
            @Override
            public void onConnected(Endpoint endpoint) { }
            
            @Override
            public void onDisconnected() {
                sessions.remove(loop);
            }
            
            @Override
            public void onSecurityEstablished(SecurityInfo info) { }
            
            @Override
            public void onError(Exception e) {
                callback.onError(e);
            }
        });
    }
    
    public interface SessionCallback {
        void onSession(RedisSession session);
        void onError(Exception e);
    }
}

Supported Commands

The RedisSession interface provides methods for all common Redis commands, organised by data type.

Connection Commands

MethodRedis CommandDescription
auth(password, handler)AUTHAuthenticate (Redis 5 and earlier)
auth(user, password, handler)AUTHAuthenticate with ACL (Redis 6+)
hello(protover, handler)HELLONegotiate RESP protocol version (Redis 6+)
hello(protover, user, pass, handler)HELLONegotiate protocol with auth
clientSetName(name, handler)CLIENT SETNAMESet connection name
clientGetName(handler)CLIENT GETNAMEGet connection name
clientId(handler)CLIENT IDGet connection ID
ping(handler)PINGTest connection
select(index, handler)SELECTSelect database
echo(message, handler)ECHOEcho message
reset(handler)RESETReset connection state (Redis 6.2+)
quit()QUITClose connection

String Commands

MethodRedis CommandDescription
get(key, handler)GETGet value
set(key, value, handler)SETSet value
setex(key, seconds, value, handler)SETEXSet with expiration
setnx(key, value, handler)SETNXSet if not exists
incr(key, handler)INCRIncrement
incrby(key, amount, handler)INCRBYIncrement by amount
decr(key, handler)DECRDecrement
mget(handler, keys...)MGETGet multiple
mset(handler, keysAndValues...)MSETSet multiple

Key Commands

MethodRedis CommandDescription
del(handler, keys...)DELDelete keys
exists(key, handler)EXISTSCheck existence
expire(key, seconds, handler)EXPIRESet expiration
ttl(key, handler)TTLGet time to live
persist(key, handler)PERSISTRemove expiration
keys(pattern, handler)KEYSFind keys by pattern
type(key, handler)TYPEGet key type
scan(cursor, handler)SCANIterate key space
scan(cursor, match, count, handler)SCANIterate with MATCH/COUNT

Hash Commands

MethodRedis CommandDescription
hget(key, field, handler)HGETGet field
hset(key, field, value, handler)HSETSet field
hgetall(key, handler)HGETALLGet all fields
hdel(key, handler, fields...)HDELDelete fields
hexists(key, field, handler)HEXISTSCheck field exists
hincrby(key, field, amount, handler)HINCRBYIncrement field
hscan(key, cursor, handler)HSCANIterate hash fields

List Commands

MethodRedis CommandDescription
lpush(key, handler, values...)LPUSHPush to head
rpush(key, handler, values...)RPUSHPush to tail
lpop(key, handler)LPOPPop from head
rpop(key, handler)RPOPPop from tail
lrange(key, start, stop, handler)LRANGEGet range
llen(key, handler)LLENGet length
blpop(timeout, handler, keys...)BLPOPBlocking pop from head
brpop(timeout, handler, keys...)BRPOPBlocking pop from tail
blmove(src, dest, from, to, timeout, handler)BLMOVEBlocking move between lists

Set Commands

MethodRedis CommandDescription
sadd(key, handler, members...)SADDAdd members
srem(key, handler, members...)SREMRemove members
smembers(key, handler)SMEMBERSGet all members
sismember(key, member, handler)SISMEMBERCheck membership
scard(key, handler)SCARDGet cardinality
sscan(key, cursor, handler)SSCANIterate set members

Sorted Set Commands

MethodRedis CommandDescription
zadd(key, score, member, handler)ZADDAdd with score
zrange(key, start, stop, handler)ZRANGEGet by rank
zrevrange(key, start, stop, handler)ZREVRANGEGet by rank (descending)
zscore(key, member, handler)ZSCOREGet score
zrank(key, member, handler)ZRANKGet rank
zincrby(key, increment, member, handler)ZINCRBYIncrement score
zscan(key, cursor, handler)ZSCANIterate sorted set members

Stream Commands

MethodRedis CommandDescription
xadd(key, id, handler, fields...)XADDAppend entry to stream
xlen(key, handler)XLENGet stream length
xrange(key, start, end, handler)XRANGEGet entries in range
xrevrange(key, end, start, handler)XREVRANGEGet entries in reverse
xread(count, block, handler, keysAndIds...)XREADRead from streams (optionally blocking)
xtrim(key, maxLen, handler)XTRIMTrim stream to max length
xack(key, group, handler, ids...)XACKAcknowledge entries for consumer group
xgroupCreate(key, group, id, handler)XGROUP CREATECreate consumer group
xgroupDestroy(key, group, handler)XGROUP DESTROYDelete consumer group
xpending(key, group, handler)XPENDINGGet pending entries for group

Generic Command

For commands not covered by specific methods, use the generic command() method:

session.command(handler, "OBJECT", "ENCODING", "mykey");

Result Handlers

Each command type has a corresponding result handler interface:

BulkResultHandler Example

session.get("key", new BulkResultHandler() {
    @Override
    public void handleResult(byte[] value, RedisSession session) {
        // Value found - convert bytes to string if needed
        String s = new String(value, StandardCharsets.UTF_8);
    }
    
    @Override
    public void handleNull(RedisSession session) {
        // Key doesn't exist
    }
    
    @Override
    public void handleError(String error, RedisSession session) {
        // Redis error (e.g., WRONGTYPE)
    }
});

ArrayResultHandler Example

session.hgetall("user:1", new ArrayResultHandler() {
    @Override
    public void handleResult(List<RESPValue> array, RedisSession session) {
        // HGETALL returns alternating field/value pairs
        for (int i = 0; i < array.size(); i += 2) {
            String field = array.get(i).asString();
            String value = array.get(i + 1).asString();
            System.out.println(field + " = " + value);
        }
    }
    
    @Override
    public void handleNull(RedisSession session) {
        // Key doesn't exist
    }
    
    @Override
    public void handleError(String error, RedisSession session) {
        // Error occurred
    }
});

RESP3 Protocol

The Redis client supports both RESP2 and RESP3 protocol versions. RESP3 is negotiated using the HELLO command (Redis 6+). RESP3 adds several new data types: Map (%), Set (~), Double (,), Boolean (#), Null (_), Push (>), Verbatim String (=), Big Number ((), and Blob Error (!).

// Negotiate RESP3 protocol
session.hello(3, new ArrayResultHandler() {
    @Override
    public void handleResult(List<RESPValue> array, RedisSession session) {
        // Server properties (flattened map): server, version, proto, id, mode, role
        for (int i = 0; i < array.size(); i += 2) {
            System.out.println(array.get(i).asString() + ": " + array.get(i + 1).asString());
        }
    }
    // ...
});

// HELLO with authentication (single round-trip)
session.hello(3, "default", "password", handler);

RESP3 Maps received by ArrayResultHandler are automatically flattened to alternating key/value lists for compatibility. The RESP3 Push type (>) is used for out-of-band Pub/Sub message delivery, replacing the RESP2 array-based detection.

The CLIENT commands are useful for connection management and debugging:

session.clientSetName("my-connection", handler);
session.clientGetName(handler);      // returns "my-connection"
session.clientId(handler);           // returns unique connection ID

The RESET command (Redis 6.2+) restores the connection to its initial state, clearing any Pub/Sub subscriptions, MULTI transactions, WATCH keys, and authentication:

session.reset(handler);  // receives "RESET"

Cursor-based Iteration (SCAN)

SCAN commands provide incremental iteration over key spaces and collections without blocking the server. Unlike KEYS, which scans the entire key space in one call, SCAN returns a small batch per invocation and uses a cursor to track position.

// Iterate all keys matching "user:*"
session.scan("0", "user:*", 100, new ScanResultHandler() {
    @Override
    public void handleResult(String cursor, List<RESPValue> elements,
                             RedisSession session) {
        for (RESPValue key : elements) {
            System.out.println("Key: " + key.asString());
        }
        if (!"0".equals(cursor)) {
            // Continue iteration
            session.scan(cursor, "user:*", 100, this);
        }
    }
    // ...
});

The same pattern applies to hscan() (hash fields), sscan() (set members), and zscan() (sorted set members with scores). Each accepts optional MATCH and COUNT parameters.

Blocking Commands

Blocking list commands wait until an element is available or a timeout expires. These are commonly used to implement job queues and message consumers.

// Block until an element is available on "queue" (5 second timeout)
session.blpop(5.0, new ArrayResultHandler() {
    @Override
    public void handleResult(List<RESPValue> array, RedisSession session) {
        // array[0] = key, array[1] = value
        String key = array.get(0).asString();
        String value = array.get(1).asString();
        System.out.println("Got " + value + " from " + key);
    }

    @Override
    public void handleNull(RedisSession session) {
        // Timeout expired, no element available
    }
    // ...
}, "queue");

BLMOVE (Redis 6.2+) atomically moves an element between lists with blocking:

session.blmove("source", "destination", "LEFT", "RIGHT", 5.0, handler);

Streams

Redis Streams provide an append-only log data structure with consumer groups, suitable for event sourcing, message brokering, and activity feeds (Redis 5.0+).

// Append an entry
session.xadd("mystream", "*", handler, "sensor", "temperature", "value", "22.5");

// Read a range of entries
session.xrange("mystream", "-", "+", 10, handler);

// Read new entries with blocking
session.xread(10, 5000, handler,
    "mystream", "$");  // STREAMS mystream $

// Create a consumer group and acknowledge entries
session.xgroupCreate("mystream", "mygroup", "0", handler);
session.xack("mystream", "mygroup", handler, "1526569495631-0");

Pub/Sub

Redis Pub/Sub enables real-time messaging between services. When you subscribe to channels, the connection enters Pub/Sub mode and messages are delivered through the MessageHandler.

session.subscribe(new MessageHandler() {
    @Override
    public void handleMessage(String channel, byte[] message) {
        String text = new String(message, StandardCharsets.UTF_8);
        System.out.println("Received on " + channel + ": " + text);
    }
    
    @Override
    public void handlePatternMessage(String pattern, String channel, byte[] message) {
        // For psubscribe patterns
    }
    
    @Override
    public void handleSubscribed(String channel, int subscriptionCount) {
        System.out.println("Subscribed to " + channel);
    }
    
    @Override
    public void handlePatternSubscribed(String pattern, int subscriptionCount) {
        System.out.println("Pattern subscribed: " + pattern);
    }
    
    @Override
    public void handleUnsubscribed(String channel, int subscriptionCount) {
        System.out.println("Unsubscribed from " + channel);
    }
    
    @Override
    public void handlePatternUnsubscribed(String pattern, int subscriptionCount) {
        System.out.println("Pattern unsubscribed: " + pattern);
    }
    
    @Override
    public void handleError(String error) {
        System.err.println("Pub/Sub error: " + error);
    }
}, "events", "notifications");

Pattern Subscriptions

// Subscribe to all channels matching a pattern
session.psubscribe(messageHandler, "user:*", "order:*");

Publishing

Publishing is done on a separate connection (not in Pub/Sub mode):

session.publish("events", "user logged in", new IntegerResultHandler() {
    @Override
    public void handleResult(long subscriberCount, RedisSession session) {
        System.out.println("Message sent to " + subscriberCount + " subscribers");
    }
    
    @Override
    public void handleError(String error, RedisSession session) {
        System.err.println("Publish failed: " + error);
    }
});

Transactions

Redis transactions provide atomic execution of multiple commands:

// Start transaction
session.multi(new StringResultHandler() {
    @Override
    public void handleResult(String result, RedisSession session) {
        // Queue commands (responses will be QUEUED)
        session.incr("counter", null);
        session.get("counter", null);
        session.set("last_updated", String.valueOf(System.currentTimeMillis()), null);
        
        // Execute all queued commands
        session.exec(new ArrayResultHandler() {
            @Override
            public void handleResult(List<RESPValue> results, RedisSession session) {
                // results[0] = INCR result (integer)
                // results[1] = GET result (bulk string)
                // results[2] = SET result ("OK")
                long newValue = results.get(0).asLong();
                String current = results.get(1).asString();
            }
            
            @Override
            public void handleNull(RedisSession session) {
                // Transaction aborted (WATCH key was modified)
            }
            
            @Override
            public void handleError(String error, RedisSession session) {
                System.err.println("Transaction failed: " + error);
            }
        });
    }
    
    @Override
    public void handleError(String error, RedisSession session) {
        System.err.println("MULTI failed: " + error);
    }
});

Optimistic Locking with WATCH

// Watch key for changes
session.watch(new StringResultHandler() {
    @Override
    public void handleResult(String result, RedisSession session) {
        // Read current value
        session.get("balance", new BulkResultHandler() {
            @Override
            public void handleResult(byte[] value, RedisSession session) {
                int balance = Integer.parseInt(new String(value, UTF_8));
                int newBalance = balance - 100;
                
                // Start transaction
                session.multi(new StringResultHandler() {
                    @Override
                    public void handleResult(String r, RedisSession s) {
                        s.set("balance", String.valueOf(newBalance), null);
                        s.exec(new ArrayResultHandler() {
                            @Override
                            public void handleResult(List<RESPValue> results, RedisSession s) {
                                // Transaction succeeded
                            }
                            
                            @Override
                            public void handleNull(RedisSession s) {
                                // Someone modified balance - retry
                            }
                            
                            @Override
                            public void handleError(String e, RedisSession s) { }
                        });
                    }
                    
                    @Override
                    public void handleError(String e, RedisSession s) { }
                });
            }
            
            @Override
            public void handleNull(RedisSession session) { }
            
            @Override
            public void handleError(String error, RedisSession session) { }
        });
    }
    
    @Override
    public void handleError(String error, RedisSession session) { }
}, "balance");

TLS Connections

Connect to Redis over TLS by enabling secure mode and configuring the truststore:

RedisClient client = new RedisClient(selectorLoop, "redis.example.com", 6379);
client.setSecure(true);
client.setKeystoreFile("/path/to/truststore.p12");
client.setKeystorePass("password");
client.connect(handler);

For mutual TLS (client certificates), configure the keystore and a custom trust manager. RedisClient does not expose truststore properties directly; use setTrustManager() with a PinnedCertTrustManager or custom X509TrustManager:

client.setSecure(true);
client.setKeystoreFile("/path/to/client-keystore.p12");
client.setKeystorePass("password");
client.setTrustManager(new PinnedCertTrustManager(fingerprints));

Authentication

Password Authentication (Redis 5 and earlier)

client.connect(new RedisConnectionReady() {
    @Override
    public void handleReady(RedisSession session) {
        session.auth("your-password", new StringResultHandler() {
            @Override
            public void handleResult(String result, RedisSession s) {
                // Authenticated - start using Redis
                s.set("key", "value", myHandler);
            }
            
            @Override
            public void handleError(String error, RedisSession s) {
                System.err.println("Authentication failed: " + error);
                s.close();
            }
        });
    }
    // ... other callbacks
});

ACL Authentication (Redis 6+)

session.auth("username", "password", new StringResultHandler() {
    @Override
    public void handleResult(String result, RedisSession s) {
        // Authenticated as specific user
    }
    
    @Override
    public void handleError(String error, RedisSession s) {
        // Authentication failed
    }
});

Server Integration Examples

Rate Limiting in SMTP Handler

public class RateLimitingMailHandler implements SMTPConnectionHandler {
    private final RedisSession redis;
    private final int maxMessagesPerMinute = 60;
    
    @Override
    public void mailFrom(String sender, MailFromCallback callback) {
        String key = "rate:" + sender;
        
        redis.incr(key, new IntegerResultHandler() {
            @Override
            public void handleResult(long count, RedisSession session) {
                if (count == 1) {
                    // First message - set expiry
                    session.expire(key, 60, new BooleanResultHandler() {
                        @Override
                        public void handleResult(boolean set, RedisSession s) {
                            checkLimit(count, callback);
                        }
                        
                        @Override
                        public void handleError(String error, RedisSession s) {
                            callback.mailFromReply(SenderPolicyResult.ACCEPT);
                        }
                    });
                } else {
                    checkLimit(count, callback);
                }
            }
            
            @Override
            public void handleError(String error, RedisSession session) {
                // Redis unavailable - fail open
                callback.mailFromReply(SenderPolicyResult.ACCEPT);
            }
        });
    }
    
    private void checkLimit(long count, MailFromCallback callback) {
        if (count > maxMessagesPerMinute) {
            callback.mailFromReply(SenderPolicyResult.TEMP_REJECT_RATE_LIMIT);
        } else {
            callback.mailFromReply(SenderPolicyResult.ACCEPT);
        }
    }
}

Session Storage in Servlet

public class RedisSessionServlet extends HttpServlet {
    private RedisPool redisPool;
    
    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) 
            throws IOException {
        String sessionId = req.getSession().getId();
        SelectorLoop loop = ((Request) req).getEndpoint().getSelectorLoop();
        
        redisPool.getSession(loop, new RedisPool.SessionCallback() {
            @Override
            public void onSession(RedisSession redis) {
                redis.hgetall("session:" + sessionId, new ArrayResultHandler() {
                    @Override
                    public void handleResult(List<RESPValue> array, RedisSession s) {
                        Map<String, String> attrs = new HashMap<>();
                        for (int i = 0; i < array.size(); i += 2) {
                            attrs.put(array.get(i).asString(), 
                                     array.get(i + 1).asString());
                        }
                        req.setAttribute("redisSession", attrs);
                        // Continue processing...
                    }
                    
                    @Override
                    public void handleNull(RedisSession s) {
                        req.setAttribute("redisSession", Collections.emptyMap());
                    }
                    
                    @Override
                    public void handleError(String error, RedisSession s) {
                        resp.sendError(500, "Redis error: " + error);
                    }
                });
            }
            
            @Override
            public void onError(Exception e) {
                resp.sendError(500, "Redis unavailable");
            }
        });
    }
}

Caching with Expiration

public class CachingService {
    private final RedisSession redis;
    private final int cacheTTLSeconds = 300; // 5 minutes
    
    public void getCached(String key, CacheCallback callback) {
        redis.get("cache:" + key, new BulkResultHandler() {
            @Override
            public void handleResult(byte[] value, RedisSession session) {
                callback.onHit(new String(value, UTF_8));
            }
            
            @Override
            public void handleNull(RedisSession session) {
                callback.onMiss();
            }
            
            @Override
            public void handleError(String error, RedisSession session) {
                callback.onError(new Exception(error));
            }
        });
    }
    
    public void cache(String key, String value) {
        redis.setex("cache:" + key, cacheTTLSeconds, value, 
            new StringResultHandler() {
                @Override
                public void handleResult(String r, RedisSession s) { }
                
                @Override
                public void handleError(String e, RedisSession s) {
                    // Log error but don't fail
                }
            });
    }
    
    public interface CacheCallback {
        void onHit(String value);
        void onMiss();
        void onError(Exception e);
    }
}

← Back to Main Page | HTTP Server & Client | SMTP Server & Client | Telemetry

Gumdrop Redis Client