Annotation Interface Consumer
Consumer
within a Flux application.
A consumer represents an isolated group of handlers that independently track and process messages from one or more
message logs. It can be applied at the class or package level to group handlers together. Handlers that do not
explicitly declare a Consumer
are assigned according to the application's configuration, as defined via
FluxCapacitorBuilder.addConsumerConfiguration(ConsumerConfiguration, MessageType...)
)}. If no specific
configuration is provided, the handler will be assigned to the application's default consumer.
A consumer consists of one or more trackers—individual threads or processes that fetch and process message
segments. Each tracker is responsible for a disjoint segment of the message log, allowing for parallel consumption.
By default, messages are sharded into 128 segments; a consumer with threads = 2
will assign 64 segments to
each tracker.
This annotation offers fine-grained control over message processing characteristics including concurrency, batching, backpressure, result publication, and handler exclusivity.
Terminology
- Consumer: Named group of handlers with isolated message tracking state.
- Tracker: A processing thread assigned to a specific segment of the message log.
- Handler: Method annotated with
@HandleEvent
,@HandleCommand
, etc., which processes messages.
Example:
@Consumer(name = "audit", threads = 3, passive = true)
class AuditHandler {
@HandleCommand
void on(AuthenticateUser command) {
// log for auditing; result will not be published due to passive = true
}
}
-
Required Element Summary
Required Elements -
Optional Element Summary
Optional ElementsModifier and TypeOptional ElementDescriptionClass<? extends BatchInterceptor>[]
Interceptors applied at the batch level across all messages in a poll cycle.boolean
Iftrue
, the consumer will not rely on Flux's internal tracking index.Unit formaxWaitDuration()
.Class
<? extends ErrorHandler> Error handler invoked when a message processing error occurs.boolean
Determines whether handlers assigned to this consumer are excluded from other consumers.boolean
Iftrue
, only messages explicitly targeted at this application instance will be processed.Class
<? extends FlowRegulator> Regulates message flow and backpressure behavior.Class<? extends HandlerInterceptor>[]
Interceptors applied to individual handler method invocations.boolean
Iftrue
, this consumer will bypass the default segment-based sharding applied by the Flux platform and attempt to process all message segments.int
Maximum number of messages to fetch in a batch.long
Optional exclusive upper bound for message processing.long
Maximum time to wait before fetching a new batch, when none are available.long
Optional minimum message index from which this consumer should begin processing.boolean
Indicates that this consumer should process messages without publishing result messages.boolean
Iftrue
, designates a single tracker within this consumer as the "main" tracker, responsible for processing all messages across all segments.boolean
Whether this consumer is taking manual control over storing its position in the log.int
The number of tracker threads to allocate for this consumer.Optional regular expression used to filter message payload types on the Flux platform.
-
Element Details
-
name
String nameThe unique name of the consumer. Required. This isolates its tracking tokens from other consumers. -
threads
int threadsThe number of tracker threads to allocate for this consumer. Each thread processes a unique segment of the message log. Default is1
.- Default:
1
-
maxFetchSize
int maxFetchSizeMaximum number of messages to fetch in a batch. Default is1024
.- Default:
1024
-
maxWaitDuration
long maxWaitDurationMaximum time to wait before fetching a new batch, when none are available. SeedurationUnit()
for the time unit. Default is60
(seconds).- Default:
60L
-
durationUnit
ChronoUnit durationUnitUnit formaxWaitDuration()
. Default isChronoUnit.SECONDS
.- Default:
SECONDS
-
handlerInterceptors
Class<? extends HandlerInterceptor>[] handlerInterceptorsInterceptors applied to individual handler method invocations.- Default:
{}
-
batchInterceptors
Class<? extends BatchInterceptor>[] batchInterceptorsInterceptors applied at the batch level across all messages in a poll cycle.- Default:
{}
-
errorHandler
Class<? extends ErrorHandler> errorHandlerError handler invoked when a message processing error occurs. Default isLoggingErrorHandler
which logs errors and allows message tracking and processing to continue.- Default:
io.fluxcapacitor.javaclient.tracking.LoggingErrorHandler.class
-
flowRegulator
Class<? extends FlowRegulator> flowRegulatorRegulates message flow and backpressure behavior. Default isNoOpFlowRegulator
.- Default:
io.fluxcapacitor.javaclient.tracking.NoOpFlowRegulator.class
-
filterMessageTarget
boolean filterMessageTargetIftrue
, only messages explicitly targeted at this application instance will be processed. Typically used for tracking ofResult
orWebResponse
messages. Iftrue
, this consumer will only receive results targeted for this application instance.- Default:
false
-
ignoreSegment
boolean ignoreSegmentIftrue
, this consumer will bypass the default segment-based sharding applied by the Flux platform and attempt to process all message segments.By default, Flux shards messages across consumers using a routing key present in the message payload, or the message ID if no routing key is specified. However, some handlers may require a custom sharding strategy— for instance, sharding based on a different property in the payload.
Setting
ignoreSegment = true
allows such handlers to override Flux's internal routing and apply their own logic. A common pattern is to use the@RoutingKey
annotation on a handler method to specify a custom property:@HandleEvent @RoutingKey("organisationId") void handle(CreateUser event) { // process based on organisationId instead of the default routing key }
- Default:
false
-
singleTracker
boolean singleTrackerIftrue
, designates a single tracker within this consumer as the "main" tracker, responsible for processing all messages across all segments.Although multiple tracker threads may be configured (via
threads()
), only one tracker will be assigned all segments. Other trackers will remain idle and receive no segment assignments.This setting is useful when:
- Messages must be processed strictly in global index order by a single process.
- No suitable routing key exists for meaningful partitioning.
- Handler logic requires a holistic or stateful view of all messages across the log.
In contrast to regular segmented consumers, this mode disables concurrent processing across trackers but ensures strict ordering.
- Default:
false
-
clientControlledIndex
boolean clientControlledIndexIftrue
, the consumer will not rely on Flux's internal tracking index. Instead, the application itself is responsible for determining which messages to process.This is typically used in combination with
ignoreSegment()
set totrue
to ensure that all application instances receive every message—rather than a sharded subset.This mode is useful for scenarios where message delivery must be broadcast to all instances. For example, a WebSocket endpoint that pushes updates to connected clients may need to observe the full message stream, ensuring that each client sees every relevant update.
When
false
(the default), Flux tracks message indices and distributes segments to consumer trackers for balanced parallel processing.- Default:
false
-
storePositionManually
boolean storePositionManuallyWhether this consumer is taking manual control over storing its position in the log.When
true
, the consumer is responsible for explicitly storing its position after processing one or more message batches. This allows for greater control — for example, when handling long-running workflows that span multiple batches, or when committing position should be deferred until post-processing is complete.When
false
(the default), the position is automatically updated after each message batch is processed, ensuring progress is recorded and avoiding reprocessing on restart.Note: Even with manual position tracking enabled, the consumer will continue to receive "new" messages as long as the tracking process remains active. However, its persisted position will not be updated unless explicitly stored.
- Default:
false
-
exclusive
boolean exclusiveDetermines whether handlers assigned to this consumer are excluded from other consumers.If
true
(default), a handler will only be active in this consumer. Iffalse
, the same handler may be active in multiple consumers simultaneously. This enables advanced scenarios such as parallel replays alongside live processing.- Default:
true
-
passive
boolean passiveIndicates that this consumer should process messages without publishing result messages.When
true
, return values from request handlers (e.g.,@HandleCommand
,@HandleQuery
,@HandleWebRequest
) are ignored and not appended to the result log. This is useful for secondary consumers that perform side-effects or projections without impacting the result flow.- Default:
false
-
minIndex
long minIndexOptional minimum message index from which this consumer should begin processing.If set to a non-negative value, only messages at or above this index will be processed. If negative (the default), the consumer will start processing from the current end of the message log – i.e., it will only receive new messages from this point forward.
- Default:
-1L
-
maxIndexExclusive
long maxIndexExclusiveOptional exclusive upper bound for message processing. Messages at or above this index will not be processed. Ignored if negative.- Default:
-1L
-
typeFilter
String typeFilterOptional regular expression used to filter message payload types on the Flux platform.When specified, this filter is applied server-side to restrict the messages delivered to the consumer based on the fully qualified type name of the payload.
If left empty (the default), all message types are delivered to the client, and filtering is performed locally by the handlers. This is typically the preferred approach, as it avoids tightly coupling consumer configuration to type naming and allows for greater flexibility.
Example:
typeFilter = ".*\\.CreateUser$|.*\\.UpdateUser$"
matches anyCreateUser
orUpdateUser
message types, regardless of package. This is useful for selectively tracking a set of message types without tying the filter to specific namespaces.- Default:
""
-