Contact Us 1-800-596-4880

Aggregators Module Reference 1.1 - Mule 4

An aggregator scope is a component that receives some data, processes the data to extract some value, and then adds that value to a list of aggregated elements. After that process is complete, and depending on the aggregator scope configuration, a set of components in the flow processes the list of elements.

Aggregators are pass-through routers. The same data that an aggregator scope receives is processed by the components that follow the aggregator scope. The only modification is variable propagation if any variables are set during the execution of any of the aggregation routes.

When an aggregator scope releases the stored values, a route component (Incremental aggregation or Aggregation complete) processes the list of aggregated elements within the aggregator scope itself or in another Mule flow, through an Aggregator listener source. The process depends on the configuration of the aggregator scope used.

Configurations

There is no global configuration for the Aggregator Module because the module uses scopes. Each type of aggregator is a different scope that is configurable through its parameters. The following parameters are common in each scope:

  • Content

    The expression that defines what to aggregate. The result of the evaluation is the value stored in the aggregation. Because all data related to an aggregator scope is stored in an object store, the content value must be serializable. Otherwise, there the module might not work properly.

  • Object Store

    All information related to an aggregator is stored in an object store. The type of object store relates to the expected behavior of the component.

    • Persistent Object Store (Default)

      Slower but more reliable than an in-memory object store. Enables data recovery after the application restarts.

    • In-Memory Object Store

      Faster than a persistent memory store, but loses all the data if the application restarts.

Aggregators Scopes

There are size-based, time-based, and group-based aggregators.

Size-Based Aggregator

The Size based aggregator scope enables you to aggregate elements until a predefined number of elements completes the aggregation.

<aggregators:size-based-aggregator  name="sizeBasedAggregator"
                                    maxSize="10"
                                    timeout="60"
                                    timeoutUnit="MINUTES"
                                    objectStore="exampleObjectStore">
    <aggregators:content>
        #[payload]
    </aggregators:content>
    <aggregators:incremental-aggregation>
        ...
    </aggregators:incremental-aggregation>
    <aggregators:aggregation-complete>
        ...
    </aggregators:aggregation-complete>
</aggregators:size-based-aggregator>

If the elements reach the Max size specified, two things occur:

  • The elements in the storage are removed and the next element belongs to the new aggregation.

  • The Aggregation complete route executes with the aggregated elements.

Additionally, if an Aggregator listener is registered to the aggregator scope, the listener’s callback executes using the same set of elements.
If the Incremental aggregation route is not null, and the Max size is not reached, then the route component chain executes with all the aggregated elements, including the last element aggregated.

You can also define a Timeout for the aggregator scope. In that case, a scheduled task with that timeout as a delay is registered for execution. The time is computed from the time at which the first element arrives, and no extra task is scheduled if there is another task waiting to be executed. In the case of a timeout, the referenced listener executes only if it supports being called by timeout.

Parameters

Name Type Description Default Value Required

Name

String

The name of the aggregator. An aggregator listener can later reference the scope by name.

x

Content

Expression

The expression that defines what to aggregate. The result of the evaluation is the value stored in the aggregation.

#[payload]

Max Size

Number

The total number of elements to aggregate before considering the aggregation complete.

x

Timeout

Number

A maximum time to wait for the aggregation to complete. If the timeout is reached before the total number of elements is equal to Max Size, the aggregation is not considered complete. A value of 0 is not supported, to avoid constant group timeouts.

-1(UNLIMITED)

Timeout unit

Time Unit

The time unit in which to measure the timeout.

SECONDS

Object Store

Object Store

Either a name to reference a global object store or a definition of a private object store where the aggregated elements will be stored.

Default Object Store partition.

Aggregation Complete Route

Route

Components chain to execute once the aggregation is complete. The Aggregation Complete Route does not execute if the timeout is reached.

x

Incremental Aggregation Route

Route

Components chain to execute for every new element that is aggregated. The payload is the list of all the elements that have been aggregated in the aggregation from the first element to the one that is currently aggregated.

Raises

  • AGGREGATORS:AGGREGATOR_CONFIG

The Max size or Timeout fields have invalid values, for example, maxSize < 0.

  • AGGREGATORS:OBJECT_STORE_ACCESS

An error occurred during an attempt to access the object store used to store the aggregated values.

Time-Based Aggregator

The Time based aggregator scope enables you to aggregate elements until a specified time limit is reached.

<aggregators:time-based-aggregator  name="timeBasedAggregator"
                                    period="60"
                                    periodUnit="MINUTES"
                                    maxSize="10"
                                    objectStore="exampleObjectStore">
    <aggregators:content>
        #[payload]
    </aggregators:content>
    <aggregators:incremental-aggregation>
        ...
    </aggregators:incremental-aggregation>
</aggregators:time-based-aggregator>

The period taken into account is computed from the time the first element arrives. After the aggregation is released, the timer does not start until the next element arrives.

The aggregator also enables an Incremental aggregation route to be executed every time a new element arrives, unless a Max size is set. If that is the case, the Incremental aggregation route executes every time except when the size of the aggregated elements is equal to the Max size. If an Aggregator listener is present at that moment, the listener callback is also executed.

Parameters

Name Type Description Default Value Required

Name

String

The name of the aggregator. An aggregator listener can later reference the scope by name.

x

Content

Expression

The expression that defines what to aggregate. The result of the evaluation is the value stored in the aggregation.

#[payload]

Period

Number

A time period to wait before considering the aggregation complete.

x

Period unit

Time Unit

The time unit in which to measure the time period.

SECONDS

Max Size

Number

The total number of elements to aggregate before considering the aggregation complete.

-1(UNLIMITED)

Object Store

Object Store

Either a name to reference a global object store or a definition of a private object store where the aggregated elements will be stored.

Default Object Store partition.

Incremental Aggregation Route

Route

Components chain to execute for every new element that is aggregated. The payload is the list of all the elements that have been aggregated in the aggregation from the first element to the one that is currently aggregated.

Raises

  • AGGREGATORS:AGGREGATOR_CONFIG

The Period or Max size fields have invalid values, for example, Period = 0.

  • AGGREGATORS:OBJECT_STORE_ACCESS

An error occurred during an attempt to access the object store used to store the aggregated values.

Group-Based Aggregator

The Group based aggregator scope enables you to aggregate elements into groups by group ID.

<aggregators:group-based-aggregator name="groupBasedAggregator"
                                    groupId="#[correlationId]"
                                    groupSize="#[itemSequenceInfo.sequenceSize]"
                                    evictionTime="180"
                                    evictionTimeUnit="SECONDS"
                                    timeout="60"
                                    timeoutUnit="MINUTES"
                                    objectStore="exampleObjectStore">
    <aggregators:content>
        #[payload]
    </aggregators:content>
    <aggregators:incremental-aggregation>
        ...
    </aggregators:incremental-aggregation>
    <aggregators:aggregation-complete>
        ...
    </aggregators:aggregation-complete>
</aggregators:group-based-aggregator>

If the elements reach the Max size specified for the group, two things occur:

  • The elements in that group are removed from storage. The group is marked as complete and every new element that arrives at that group raises an exception.

  • The Aggregation complete route executes with the aggregated elements of that particular group.

Every time a new element reaches the aggregator, an ID is resolved. If a group with that ID already exists in the aggregator, the value is added to that group. Otherwise, a new group with that ID is created and the received element is the first element in that group’s aggregation.

Additionally, if an Aggregator listener is registered to the aggregator scope, the listener’s callback executes with the same set of elements.
If the Incremental aggregation route is not null, and the Max size is not reached, then the route component chain executes with all the aggregated elements, including the last element aggregated.

The Group based aggregator scope introduces some important concepts:

  • Group timeout
    Specifies when a group must be released because all the necessary elements for the group did not arrive within the expected time. If a group has timed out but is not yet evicted, it rejects attempts to add any new values

  • Group eviction
    Specifies when a group is removed from the aggregator, regardless of whether it was completed or timed out. If a new element with that group’s ID is received by the aggregator, the group is created again.

Lastly, when elements that reach group-based aggregators come from a sequence that is split (by a ForEach component for example), each element is assigned a different sequenceNumber. In that case, the elements are sorted in increasing order prior to the aggregation release.

Parameters

Name Type Description Default Value Required

Name

String

The name of the aggregator. An aggregator listener can later reference the scope by name.

x

Content

Expression

The expression that defines what to aggregate. The result of the evaluation is the value stored in the aggregation.

#[payload]

Group Id

Expression

The expression to evaluate for every new message received to obtain the ID for the group in which it should be aggregated..

#[correlationId]

Group Size

Number

The maximum size to assign to the group with the group ID resolved. All messages with the same group ID must have the same group size. If not, only the first resolved group size is considered correct. A warning is logged for every group that does not match.

#[itemSequenceInfo.sequenceSize]

Eviction Time

Number

The time to remember a group ID after it is completed or timed out (0 means: don’t remember, -1: remember forever)

180

Eviction Time Unit

Time Unit

The time unit for the Eviction Time.

SECONDS

Timeout

Number

The maximum time to wait for the aggregation of a group to complete. If the timeout is reached before the total number of elements in that group is equal to the group’s size, the aggregation is considered complete. To avoid constant group timeouts, a value of 0 is not supported.

-1(UNLIMITED)

Timeout unit

Time Unit

The time unit in which to measure the timeout.

SECONDS

Object Store

Object Store

Either a name to reference a global object store or a definition of a private object store where the aggregated elements are stored.

Default Object Store partition

Aggregation Complete Route

Route

Components chain to execute once the aggregation is complete.

x

Incremental Aggregation Route

Route

Components chain to execute for every new element that is aggregated. The payload is the list of all the elements that have been aggregated in the aggregation from the first element to the one that is currently aggregated.

Raises

  • AGGREGATORS:GROUP_COMPLETED

An error occurred during an attempt to add a new element to an already completed group that was not yet evicted.

  • AGGREGATORS:GROUP_TIMED_OUT

An error occurred during an attempt to add a new element to a group that timed out but had not yet been evicted.

  • AGGREGATORS:NO_GROUP_ID

The expression that resolves to the group ID returns null.

  • AGGREGATORS:NO_GROUP_SIZE

The expression that resolves to the group size returns null.

  • AGGREGATORS:AGGREGATOR_CONFIG

The Group size or Timeout fields have invalid values, for example, groupSize < 0.

  • AGGREGATORS:OBJECT_STORE_ACCESS

An error occurred during an attempt to access the object store used to store the aggregated values.

Sources

Aggregator Listener

An Aggregator listener is a source for listening to elements triggered by an aggregator scope.

<aggregators:aggregator-listener aggregatorName="exampleAggregator" includeTimedOutGroups="false">

An Aggregator listener references only aggregator scopes that are inside a flow. Aggregator scopes declared in a subflow are not visible to Aggregator listeners.

After the aggregator scope referenced by the listener completes an aggregation, the listener is triggered with a list of all the elements.
Because the Aggregator listener is a source, it is located in a different flow than the aggregator. The listener cannot access the context from the aggregator’s flow, and therefore cannot access the flow’s variables.

Although you can use aggregation listeners for any kind of aggregator, it is important for time-driven asynchronous aggregations, which do not execute an aggregator route and can reach components only in flows with an aggregator listener as the source.

Parameters

Name Type Description Default Value Required

Aggregator Name

String

The name of the aggregator to listen to. After that aggregator releases its elements, the listener is executed. Each listener can reference only one aggregator, and each aggregator can be referenced by at most one listener.

x

Include Timed Out Groups

Boolean

Indicates whether the listener should be triggered when a group is released due to a timeout.

false

Aggregation Attributes

Each time a message goes through an aggregation, some attributes with information about the aggregation are added to the message.

Name Type Description

Aggregation ID

String

The ID from the group in which the element is aggregated. If the aggregation strategy does not aggregate by group, this field is an autogenerated value that is kept until the aggregation is released (as with group-based and time-based aggregators).

First Item Arrival Time

Date

The time when the first value is aggregated.

Last Item Arrival Time

Date

The time when the last value is aggregated.

Is Aggregation Complete

Boolean

True if the aggregation is complete, False otherwise.

Redelivery Policy

Configures the redelivery policy for executing requests that generate errors. You can add a redelivery policy to any source in a flow.

Field Type Description Default Value Required

Max Redelivery Count

Number

Maximum number of times that a redelivered request can be processed unsuccessfully before returning a REDELIVERY_EXHAUSTED error.

Use Secure Hash

Boolean

If true, Mule uses a secure hash algorithm to identify a redelivered message.

Message Digest Algorithm

String

Secure hashing algorithm to use if the <b>Use Secure Hash<b/> field is true. If the payload of the message is a Java object, Mule ignores this value and returns the value that the payload’s hashCode() returned.

Id Expression

String

One or more expressions that determine when a message was redelivered. This property can be set only if the <b>Use Secure Hash</b> field is false.

Object Store

Object Store

Configures the object store that stores the redelivery counter for each message.

System Properties

Property Description Default Value Example

mule.aggregator.executor.failOnStartIfNoQuorum

Currently a Mule app deployment fails, if the quorum constraint is not met. This system property sends the quorum poll to the background to deploy the Mule app.

True

-M-Dmule.aggregator.executor.failOnStartIfNoQuorum=false

mule.aggregator.executor.delayForQuorum

Configures the time (in milliseconds) between quorum polls, and the property takes effect if the mule.aggregator.executor.failOnStartIfNoQuorum property has a value of false.

100

-M-Dmule.aggregator.executor.delayForQuorum=200

mule.aggregatorsSchedulingPeriod

Configures the scheduling period (in milliseconds) to determine if Size-Based or Time-Based aggregators reached the configured size. The module triggers a GET call every 1 second.

-M-Dmule.aggregatorsSchedulingPeriod=1

View on GitHub