Annotation Interface Consumer


Declares a Consumer within a Flux application.

A consumer represents an isolated group of handlers that independently track and process messages from one or more message logs. It can be applied at the class or package level to group handlers together. Handlers that do not explicitly declare a Consumer are assigned according to the application's configuration, as defined via FluxCapacitorBuilder.addConsumerConfiguration(ConsumerConfiguration, MessageType...) )}. If no specific configuration is provided, the handler will be assigned to the application's default consumer.

A consumer consists of one or more trackers—individual threads or processes that fetch and process message segments. Each tracker is responsible for a disjoint segment of the message log, allowing for parallel consumption. By default, messages are sharded into 128 segments; a consumer with threads = 2 will assign 64 segments to each tracker.

This annotation offers fine-grained control over message processing characteristics including concurrency, batching, backpressure, result publication, and handler exclusivity.

Terminology

  • Consumer: Named group of handlers with isolated message tracking state.
  • Tracker: A processing thread assigned to a specific segment of the message log.
  • Handler: Method annotated with @HandleEvent, @HandleCommand, etc., which processes messages.

Example:


 @Consumer(name = "audit", threads = 3, passive = true)
 class AuditHandler {
     @HandleCommand
     void on(AuthenticateUser command) {
         // log for auditing; result will not be published due to passive = true
     }
 }
 
  • Required Element Summary

    Required Elements
    Modifier and Type
    Required Element
    Description
    The unique name of the consumer.
  • Optional Element Summary

    Optional Elements
    Modifier and Type
    Optional Element
    Description
    Class<? extends BatchInterceptor>[]
    Interceptors applied at the batch level across all messages in a poll cycle.
    boolean
    If true, the consumer will not rely on Flux's internal tracking index.
    Class<? extends ErrorHandler>
    Error handler invoked when a message processing error occurs.
    boolean
    Determines whether handlers assigned to this consumer are excluded from other consumers.
    boolean
    If true, only messages explicitly targeted at this application instance will be processed.
    Class<? extends FlowRegulator>
    Regulates message flow and backpressure behavior.
    Interceptors applied to individual handler method invocations.
    boolean
    If true, this consumer will bypass the default segment-based sharding applied by the Flux platform and attempt to process all message segments.
    int
    Maximum number of messages to fetch in a batch.
    long
    Optional exclusive upper bound for message processing.
    long
    Maximum time to wait before fetching a new batch, when none are available.
    long
    Optional minimum message index from which this consumer should begin processing.
    boolean
    Indicates that this consumer should process messages without publishing result messages.
    boolean
    If true, designates a single tracker within this consumer as the "main" tracker, responsible for processing all messages across all segments.
    boolean
    Whether this consumer is taking manual control over storing its position in the log.
    int
    The number of tracker threads to allocate for this consumer.
    Optional regular expression used to filter message payload types on the Flux platform.
  • Element Details

    • name

      String name
      The unique name of the consumer. Required. This isolates its tracking tokens from other consumers.
    • threads

      int threads
      The number of tracker threads to allocate for this consumer. Each thread processes a unique segment of the message log. Default is 1.
      Default:
      1
    • maxFetchSize

      int maxFetchSize
      Maximum number of messages to fetch in a batch. Default is 1024.
      Default:
      1024
    • maxWaitDuration

      long maxWaitDuration
      Maximum time to wait before fetching a new batch, when none are available. See durationUnit() for the time unit. Default is 60 (seconds).
      Default:
      60L
    • durationUnit

      ChronoUnit durationUnit
      Default:
      SECONDS
    • handlerInterceptors

      Class<? extends HandlerInterceptor>[] handlerInterceptors
      Interceptors applied to individual handler method invocations.
      Default:
      {}
    • batchInterceptors

      Class<? extends BatchInterceptor>[] batchInterceptors
      Interceptors applied at the batch level across all messages in a poll cycle.
      Default:
      {}
    • errorHandler

      Class<? extends ErrorHandler> errorHandler
      Error handler invoked when a message processing error occurs. Default is LoggingErrorHandler which logs errors and allows message tracking and processing to continue.
      Default:
      io.fluxcapacitor.javaclient.tracking.LoggingErrorHandler.class
    • flowRegulator

      Class<? extends FlowRegulator> flowRegulator
      Regulates message flow and backpressure behavior. Default is NoOpFlowRegulator.
      Default:
      io.fluxcapacitor.javaclient.tracking.NoOpFlowRegulator.class
    • filterMessageTarget

      boolean filterMessageTarget
      If true, only messages explicitly targeted at this application instance will be processed. Typically used for tracking of Result or WebResponse messages. If true, this consumer will only receive results targeted for this application instance.
      Default:
      false
    • ignoreSegment

      boolean ignoreSegment
      If true, this consumer will bypass the default segment-based sharding applied by the Flux platform and attempt to process all message segments.

      By default, Flux shards messages across consumers using a routing key present in the message payload, or the message ID if no routing key is specified. However, some handlers may require a custom sharding strategy— for instance, sharding based on a different property in the payload.

      Setting ignoreSegment = true allows such handlers to override Flux's internal routing and apply their own logic. A common pattern is to use the @RoutingKey annotation on a handler method to specify a custom property:

      
       @HandleEvent
       @RoutingKey("organisationId")
       void handle(CreateUser event) {
           // process based on organisationId instead of the default routing key
       }
       
      Default:
      false
    • singleTracker

      boolean singleTracker
      If true, designates a single tracker within this consumer as the "main" tracker, responsible for processing all messages across all segments.

      Although multiple tracker threads may be configured (via threads()), only one tracker will be assigned all segments. Other trackers will remain idle and receive no segment assignments.

      This setting is useful when:

      • Messages must be processed strictly in global index order by a single process.
      • No suitable routing key exists for meaningful partitioning.
      • Handler logic requires a holistic or stateful view of all messages across the log.

      In contrast to regular segmented consumers, this mode disables concurrent processing across trackers but ensures strict ordering.

      Default:
      false
    • clientControlledIndex

      boolean clientControlledIndex
      If true, the consumer will not rely on Flux's internal tracking index. Instead, the application itself is responsible for determining which messages to process.

      This is typically used in combination with ignoreSegment() set to true to ensure that all application instances receive every message—rather than a sharded subset.

      This mode is useful for scenarios where message delivery must be broadcast to all instances. For example, a WebSocket endpoint that pushes updates to connected clients may need to observe the full message stream, ensuring that each client sees every relevant update.

      When false (the default), Flux tracks message indices and distributes segments to consumer trackers for balanced parallel processing.

      Default:
      false
    • storePositionManually

      boolean storePositionManually
      Whether this consumer is taking manual control over storing its position in the log.

      When true, the consumer is responsible for explicitly storing its position after processing one or more message batches. This allows for greater control — for example, when handling long-running workflows that span multiple batches, or when committing position should be deferred until post-processing is complete.

      When false (the default), the position is automatically updated after each message batch is processed, ensuring progress is recorded and avoiding reprocessing on restart.

      Note: Even with manual position tracking enabled, the consumer will continue to receive "new" messages as long as the tracking process remains active. However, its persisted position will not be updated unless explicitly stored.

      Default:
      false
    • exclusive

      boolean exclusive
      Determines whether handlers assigned to this consumer are excluded from other consumers.

      If true (default), a handler will only be active in this consumer. If false, the same handler may be active in multiple consumers simultaneously. This enables advanced scenarios such as parallel replays alongside live processing.

      Default:
      true
    • passive

      boolean passive
      Indicates that this consumer should process messages without publishing result messages.

      When true, return values from request handlers (e.g., @HandleCommand, @HandleQuery, @HandleWebRequest) are ignored and not appended to the result log. This is useful for secondary consumers that perform side-effects or projections without impacting the result flow.

      Default:
      false
    • minIndex

      long minIndex
      Optional minimum message index from which this consumer should begin processing.

      If set to a non-negative value, only messages at or above this index will be processed. If negative (the default), the consumer will start processing from the current end of the message log – i.e., it will only receive new messages from this point forward.

      Default:
      -1L
    • maxIndexExclusive

      long maxIndexExclusive
      Optional exclusive upper bound for message processing. Messages at or above this index will not be processed. Ignored if negative.
      Default:
      -1L
    • typeFilter

      String typeFilter
      Optional regular expression used to filter message payload types on the Flux platform.

      When specified, this filter is applied server-side to restrict the messages delivered to the consumer based on the fully qualified type name of the payload.

      If left empty (the default), all message types are delivered to the client, and filtering is performed locally by the handlers. This is typically the preferred approach, as it avoids tightly coupling consumer configuration to type naming and allows for greater flexibility.

      Example: typeFilter = ".*\\.CreateUser$|.*\\.UpdateUser$" matches any CreateUser or UpdateUser message types, regardless of package. This is useful for selectively tracking a set of message types without tying the filter to specific namespaces.

      Default:
      ""