Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,30 @@ layout: default

All notable user-facing changes to this project are documented in this file.

## Release [4.0.0.RC2] 24-Mar-2026

{: .highlight}
This is a release candidate for 4.0.0. It targets Spring Boot 4.x and Spring Framework 7.x.
Please test thoroughly before using in production.

### Features
* **Pluggable message ID generation** — added `RqueueMessageIdGenerator` with a
default UUIDv4 implementation so applications can override message ID generation
with a custom bean, including time-ordered strategies such as UUIDv7.
* **Worker registry for dashboard visibility** — added an optional
`rqueue.worker.registry.enabled` registry that tracks worker metadata and
queue-level poller activity for dashboard use.
* **Workers dashboard page** — added a dedicated workers view showing worker
identity, queue pollers, last poll activity, and recent capacity exhaustion.
* **Queue and workers pagination** — added server-side pagination for dashboard
queue and worker listings, with configurable page sizes.
* **Dashboard enqueue controls for scheduled messages** — messages in scheduled
queues can now be moved back to the main queue from the dashboard, including
explicit front/rear enqueue options for non-periodic messages.
* **Dashboard refresh and usability improvements** — refreshed queue, worker, and
explorer UI with improved layouts, duration formatting, feedback modals, and
more readable queue metadata.

## Release [4.0.0.RC1] 18-Mar-2026

{: .highlight}
Expand Down
41 changes: 40 additions & 1 deletion docs/configuration/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,49 @@ The serialized form encodes both the envelope class and the type parameter:
`List<Event<Order>>` are not supported.
- Multi-level nesting (e.g. `Wrapper<Event<T>>`) is not supported.

## Message ID Generator

Rqueue now resolves message IDs through the `RqueueMessageIdGenerator` abstraction.
By default, Rqueue registers a UUIDv4-based implementation, but applications can
override it by defining their own bean.

This is useful when you need:

- time-ordered IDs such as UUIDv7
- custom prefixes or tenant-aware IDs
- IDs generated by an external system

```java
import com.github.sonus21.rqueue.core.RqueueMessageIdGenerator;

@Configuration
public class RqueueConfiguration {

@Bean
public RqueueMessageIdGenerator rqueueMessageIdGenerator() {
return () -> java.util.UUID.randomUUID().toString();
}
}
```

{: .note}
The default implementation is still UUIDv4. Custom generators are applied to the
normal enqueue APIs such as `enqueue`, `enqueueIn`, `enqueueAt`, and `enqueuePeriodic`
whenever Rqueue generates the message ID internally.

## Additional Configuration

- **`rqueue.retry.per.poll`**: Determines how many times a polled message is retried
immediately if processing fails, before it is moved back to the queue for a
subsequent poll. The default value is `1`. If increased to `N`, the message will
be retried `N` times consecutively within the same polling cycle.

- **`rqueue.worker.registry.enabled`**: Enables worker and queue-poller tracking for
the dashboard. Default: `true`.
- **`rqueue.worker.registry.worker.ttl`**: TTL in seconds for worker metadata stored
in Redis. Default: `300`.
- **`rqueue.worker.registry.worker.heartbeat.interval`**: Interval in seconds for
refreshing worker metadata. Default: `60`.
- **`rqueue.worker.registry.queue.ttl`**: TTL in seconds for queue poller hashes.
Default: `3600`.
- **`rqueue.worker.registry.queue.heartbeat.interval`**: Interval in seconds for
queue poller heartbeats. Default: `15`.
42 changes: 42 additions & 0 deletions docs/dashboard.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ message processing.

* **Task Operations**: Facilitates moving tasks between different queues.

* **Worker Visibility**: Shows which worker is polling a queue, when it last polled,
and whether the queue recently ran out of execution capacity.

* **Scheduled Message Recovery**: Allows non-periodic scheduled messages to be moved
back to the main queue from the dashboard, either at the front or rear.

Access the dashboard at: [http://localhost:8080/rqueue](http://localhost:8080/rqueue)

## Configuration
Expand Down Expand Up @@ -71,6 +77,8 @@ Example URL with a configured prefix:
* `rqueue.web.enable`: Enable or disable the web dashboard (default: `true`).
* `rqueue.web.max.message.move.count`: Maximum number of messages to move in a single request
from the utility tab (default: `1000`).
* `rqueue.web.queue.page.size`: Number of queue cards shown per page (default: `12`).
* `rqueue.web.worker.page.size`: Number of worker cards shown per page (default: `10`).
* `rqueue.web.collect.listener.stats`: Enable collection of task execution statistics
(default: `false`).
* `rqueue.web.collect.listener.stats.thread.count`: Number of threads used for metrics aggregation.
Expand All @@ -82,6 +90,40 @@ Example URL with a configured prefix:
* `rqueue.web.collect.statistic.aggregate.shutdown.wait.time`: Wait time in milliseconds for
forced aggregation of pending events during application shutdown.

## Worker Registry

The dashboard can optionally maintain lightweight worker metadata in Redis to show:

- worker host and process ID
- queue-level polling activity
- recent queue capacity exhaustion
- worker and queue drill-down views

This feature is controlled by the following properties:

- `rqueue.worker.registry.enabled`
- `rqueue.worker.registry.worker.ttl`
- `rqueue.worker.registry.worker.heartbeat.interval`
- `rqueue.worker.registry.queue.ttl`
- `rqueue.worker.registry.queue.heartbeat.interval`

{: .note}
The worker registry is intended for dashboard visibility. Instance-level liveness
should still be monitored through your infrastructure or platform health checks.

## Queue Explorer Actions

The queue explorer supports queue-specific administrative actions:

- delete pending, running, dead-letter, or scheduled messages
- move messages between Redis collections from the Utility tab
- enqueue scheduled messages back to the main queue

For scheduled messages:

- periodic messages can be deleted, but are not offered queue-to-front or queue-to-rear actions
- non-periodic scheduled messages can be queued to the front or rear of the main queue

### Dashboard Screenshots

#### Latency Graph
Expand Down
3 changes: 2 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ model through annotation-driven APIs and minimal setup.
* Use callbacks for dead-letter, discard, and related flows
* Subscribe to bootstrap and task execution events
* Monitor in-flight, queued, and scheduled messages with metrics
* Use the built-in web dashboard for queue visibility and monitoring
* Use the built-in web dashboard for queue visibility, worker activity, and message operations
* Override message ID generation with a custom `RqueueMessageIdGenerator` bean

* **Redis and platform support**
* Support Redis standalone, Sentinel, and Cluster setups
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ public Long rpush(String listName, V val) {
return redisTemplate.opsForList().rightPush(listName, val);
}

public Long lpush(String listName, V val) {
return redisTemplate.opsForList().leftPush(listName, val);
}

public Long addToSet(String setName, V... values) {
return redisTemplate.opsForSet().add(setName, values);
}
Expand Down Expand Up @@ -98,6 +102,23 @@ public void set(String key, V val, Duration duration) {
redisTemplate.opsForValue().set(key, val, duration.toMillis(), TimeUnit.MILLISECONDS);
}

public void putHashValue(String key, String hashKey, V val) {
redisTemplate.opsForHash().put(key, hashKey, val);
}

@SuppressWarnings("unchecked")
public Map<String, V> getHashEntries(String key) {
return (Map<String, V>) (Map<?, ?>) redisTemplate.opsForHash().entries(key);
}

public Long deleteHashValues(String key, String... hashKeys) {
return redisTemplate.opsForHash().delete(key, (Object[]) hashKeys);
}

public Boolean expire(String key, Duration duration) {
return redisTemplate.expire(key, duration.toMillis(), TimeUnit.MILLISECONDS);
}

public Boolean setIfAbsent(String lockKey, V val, Duration duration) {
boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, val);
if (result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@
@Configuration
public class RqueueConfig {

@Getter
private static final String brokerId = UUID.randomUUID().toString();

private static final AtomicLong counter = new AtomicLong(1);
private final RedisConnectionFactory connectionFactory;
private final ReactiveRedisConnectionFactory reactiveRedisConnectionFactory;
Expand Down Expand Up @@ -139,9 +141,26 @@ public class RqueueConfig {
@Value("${rqueue.completed.job.cleanup.interval:30000}")
private long completedJobCleanupIntervalInMs;

public static String getBrokerId() {
return brokerId;
}
@Value("${rqueue.worker.registry.enabled:true}")
private boolean workerRegistryEnabled;

@Value("${rqueue.worker.registry.worker.ttl:300}")
private long workerRegistryWorkerTtlInSeconds;

@Value("${rqueue.worker.registry.worker.heartbeat.interval:60}")
private long workerRegistryWorkerHeartbeatIntervalInSeconds;

@Value("${rqueue.worker.registry.queue.ttl:3600}")
private long workerRegistryQueueTtlInSeconds;

@Value("${rqueue.worker.registry.queue.heartbeat.interval:15}")
private long workerRegistryQueueHeartbeatIntervalInSeconds;

@Value("${rqueue.worker.registry.key.prefix:worker::}")
private String workerRegistryKeyPrefix;

@Value("${rqueue.worker.registry.queue.key.prefix:q-pollers::}")
private String workerRegistryQueueKeyPrefix;

public boolean messageInTerminalStateShouldBeStored() {
return getMessageDurabilityInTerminalStateInSecond() > 0;
Expand Down Expand Up @@ -294,6 +313,14 @@ public String getJobsKey(String messageId) {
return prefix + jobsCollectionNamePrefix + messageId;
}

public String getWorkerRegistryKey(String workerId) {
return prefix + workerRegistryKeyPrefix + workerId;
}

public String getWorkerRegistryQueueKey(String queueName) {
return prefix + workerRegistryQueueKeyPrefix + getTaggedName(queueName);
}

public String getDelDataName(String queueName) {
return prefix
+ delPrefix
Expand All @@ -307,6 +334,22 @@ public Duration getJobDurabilityInTerminalState() {
return Duration.ofSeconds(jobDurabilityInTerminalStateInSecond);
}

public Duration getWorkerRegistryWorkerTtl() {
return Duration.ofSeconds(workerRegistryWorkerTtlInSeconds);
}

public Duration getWorkerRegistryWorkerHeartbeatInterval() {
return Duration.ofSeconds(workerRegistryWorkerHeartbeatIntervalInSeconds);
}

public Duration getWorkerRegistryQueueTtl() {
return Duration.ofSeconds(workerRegistryQueueTtlInSeconds);
}

public Duration getWorkerRegistryQueueHeartbeatInterval() {
return Duration.ofSeconds(workerRegistryQueueHeartbeatIntervalInSeconds);
}

public String getLibVersion() {
if (StringUtils.isEmpty(version)) {
ClassPathResource resource =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,23 @@
import com.github.sonus21.rqueue.core.ProcessingQueueMessageScheduler;
import com.github.sonus21.rqueue.core.RqueueBeanProvider;
import com.github.sonus21.rqueue.core.RqueueInternalPubSubChannel;
import com.github.sonus21.rqueue.core.RqueueMessageIdGenerator;
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
import com.github.sonus21.rqueue.core.RqueueRedisListenerContainerFactory;
import com.github.sonus21.rqueue.core.ScheduledQueueMessageScheduler;
import com.github.sonus21.rqueue.core.impl.RqueueMessageTemplateImpl;
import com.github.sonus21.rqueue.core.impl.UuidV4RqueueMessageIdGenerator;
import com.github.sonus21.rqueue.dao.RqueueStringDao;
import com.github.sonus21.rqueue.dao.impl.RqueueStringDaoImpl;
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
import com.github.sonus21.rqueue.metrics.RqueueQueueMetrics;
import com.github.sonus21.rqueue.utils.RedisUtils;
import com.github.sonus21.rqueue.utils.condition.MissingRqueueMessageIdGenerator;
import com.github.sonus21.rqueue.utils.condition.ReactiveEnabled;
import com.github.sonus21.rqueue.utils.pebble.ResourceLoader;
import com.github.sonus21.rqueue.utils.pebble.RqueuePebbleExtension;
import com.github.sonus21.rqueue.worker.RqueueWorkerRegistry;
import com.github.sonus21.rqueue.worker.RqueueWorkerRegistryImpl;
import io.pebbletemplates.pebble.PebbleEngine;
import io.pebbletemplates.spring.extension.SpringExtension;
import io.pebbletemplates.spring.reactive.PebbleReactiveViewResolver;
Expand Down Expand Up @@ -151,6 +156,12 @@ public RqueueWebConfig rqueueWebConfig() {
return new RqueueWebConfig();
}

@Bean
@Conditional(MissingRqueueMessageIdGenerator.class)
public RqueueMessageIdGenerator rqueueMessageIdGenerator() {
return new UuidV4RqueueMessageIdGenerator();
}

@Bean
public RqueueSchedulerConfig rqueueSchedulerConfig() {
return new RqueueSchedulerConfig();
Expand Down Expand Up @@ -215,6 +226,11 @@ public RqueueStringDao rqueueStringDao(RqueueConfig rqueueConfig) {
return new RqueueStringDaoImpl(rqueueConfig);
}

@Bean
public RqueueWorkerRegistry rqueueWorkerRegistry(RqueueConfig rqueueConfig) {
return new RqueueWorkerRegistryImpl(rqueueConfig);
}

@Bean
public RqueueLockManager rqueueLockManager(RqueueStringDao rqueueStringDao) {
return new RqueueLockManagerImpl(rqueueStringDao);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ public class RqueueWebConfig {
@Value("${rqueue.web.max.message.move.count:1000}")
private int maxMessageMoveCount;

@Value("${rqueue.web.queue.page.size:12}")
private int queuePageSize;

@Value("${rqueue.web.worker.page.size:10}")
private int workerPageSize;

/**
* Whether queue stats should be collected or not. When this flag is disabled, metric data won't
* be available in the dashboard.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.github.sonus21.rqueue.listener.RqueueMessageHandler;
import com.github.sonus21.rqueue.metrics.RqueueMetricsCounter;
import com.github.sonus21.rqueue.web.service.RqueueMessageMetadataService;
import com.github.sonus21.rqueue.worker.RqueueWorkerRegistry;
import lombok.Getter;
import lombok.Setter;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -55,6 +56,9 @@ public class RqueueBeanProvider {
@Autowired(required = false)
private RqueueMetricsCounter rqueueMetricsCounter;

@Autowired(required = false)
private RqueueWorkerRegistry rqueueWorkerRegistry;

@Autowired
private RqueueMessageHandler rqueueMessageHandler;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (c) 2026 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*
*/

package com.github.sonus21.rqueue.core;

@FunctionalInterface
public interface RqueueMessageIdGenerator {

String generate();
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ void moveMessageWithDelay(

Long addMessage(String queueName, RqueueMessage rqueueMessage);

Long addMessageAtFront(String queueName, RqueueMessage rqueueMessage);

Boolean addToZset(String zsetName, RqueueMessage rqueueMessage, long score);

List<RqueueMessage> getAllMessages(
Expand Down
Loading