サイズベースのアグリゲーターの例 - Mule 4

Aggregators Module の ​[Size based aggregator (サイズベースのアグリゲーター)]​ スコープを使用すると、定義済みのサイズ数の要素の集約が完了するまで要素を集約することができます。次の例は、最大 ​3​ 件の HTTP 要求コールを集約するようにこのスコープを設定する方法を示しています。アプリケーションは DataWeave を使用して、ランダム ID 番号で各コールのペイロードを設定します。集約の完了に必要な ​3​ 件のコールに達すると、アプリケーションはすべての集約されたランダム ID 番号要素を記録します。

Anypoint Studio キャンバスでのサイズベースのアグリゲーターのフローの例
Figure 1. サイズベースのアグリゲーターのフローの例

独自の環境でこの例をテストするには、Mule アプリケーションを作成し、curl コマンドを使用してアプリケーションを実行する必要があります。

Mule アプリケーションの作成

Anypoint Studio で Mule フローを作成する手順は、次のとおりです。

  1. [Mule Palette (Mule パレット)]​ で、​[HTTP Listener (HTTP リスナー)]​ を選択してキャンバスにドラッグします。
    このソースは受信 HTTP メッセージ属性をリスンすることでフローを開始します。

  2. [Connector configuration (コネクタ設定)]​ で ​[HTTP_Listener_config]​ を選択します。

  3. [Path (パス)]​ を ​/test​ に設定します。

  4. [Transform message]​ コンポーネントを ​[Listener]​ の右にドラッグします。

  5. [Transform message]​ コンポーネントの DataSense プレビューウィンドウで、次の DataWeave 式を追加します。これにより、各 HTTP 要求のペイロードに ​Source Name​ が ​"size-based"​ として設定され、​Source ID​ が ​random() as String​ として設定されます。

    %dw 2.0
    output application/json
    ---
    {
        "Source Name": "size-based",
        "Source ID": (random() as String)
    }
  6. [Size based aggregator (サイズベースのアグリゲーター)]​ スコープを ​[Transform message]​ の右にドラッグします。

  7. [名前]​ を ​sizeBasedAggregator​ に設定します。

  8. [Max Size (最大サイズ)]​ を ​3​ に設定します。

  9. [Content (コンテンツ)]​ を ​payload​ に設定します。これは、何を集約するか (この場合は変換された HTTP 要求) を定義する式です。

  10. [Logger]​ コンポーネントを ​[Incremental aggregation (増分集約)]​ ルートにドラッグします。

  11. [Logger]​ 設定画面で、​[Message (メッセージ)]​ を ​[Doing incremental aggregation step]​ に設定します。

  12. [Logger]​ コンポーネントを ​[Aggregation complete (集約完了)]​ ルートにドラッグします。

  13. [Message (メッセージ)]​ を ​Aggregation complete​ に設定します。

  14. 別の ​[Logger]​ コンポーネントを ​[Aggregation complete (集約完了)]​ 内の最初の ​[Logger]​ コンポーネントの右にドラッグします。

  15. [Message (メッセージ)]​ を次の式に設定します。この式では、すべての集約された要素が返されます。

    output application/json
    ---
    payload
  16. プロジェクトを保存します。

  17. Package Explorer​ で、​[Run (実行)] > [Run As (別のユーザーとして実行)] > [Mule Application (Mule アプリケーション)]​ をクリックします。

  18. 次の curl コマンドを 3 回送信して、アプリケーションをテストします。
    curl -X POST http://localhost:8086/test
    これにより、​[Size based aggregator (サイズベースのアグリゲーター)]​ で処理する 3 個の HTTP 要求が生成されます。

  19. [Console (コンソール)]​ ビューに移動して、ロガーメッセージを参照します。

INFO  2021-06-08 11:18:44,897 [[MuleRuntime].uber.02: [aggregator-size-demo].aggregator-size-demoFlow.CPU_INTENSIVE @20a8a5b4] [processor: aggregator-size-demoFlow/processors/1/route/0/processors/0; event: 6ee63ec0-c864-11eb-b5a7-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Doing incremental aggregation step.
INFO  2021-06-08 11:18:48,570 [[MuleRuntime].uber.02: [aggregator-size-demo].aggregator-size-demoFlow.CPU_INTENSIVE @20a8a5b4] [processor: aggregator-size-demoFlow/processors/1/route/0/processors/0; event: 71283f80-c864-11eb-b5a7-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Doing incremental aggregation step.
INFO  2021-06-08 11:18:50,189 [[MuleRuntime].uber.02: [aggregator-size-demo].aggregator-size-demoFlow.CPU_INTENSIVE @20a8a5b4] [processor: aggregator-size-demoFlow/processors/1/route/1/processors/0; event: 721efb90-c864-11eb-b5a7-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Aggregation complete
INFO  2021-06-08 11:18:50,254 [[MuleRuntime].uber.02: [aggregator-size-demo].aggregator-size-demoFlow.CPU_INTENSIVE @20a8a5b4] [processor: aggregator-size-demoFlow/processors/1/route/1/processors/1; event: 721efb90-c864-11eb-b5a7-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [
  {
    "Source Name": "size-based",
    "Source ID": "0.49127866969631506"
  },
  {
    "Source Name": "size-based",
    "Source ID": "0.4250959269275998"
  },
  {
    "Source Name": "size-based",
    "Source ID": "0.4160926509813687"
  }
]

このロガーメッセージの例は、最初の 2 つの HTTP 要求で、​[Incremental aggregator (増分アグリゲーター)]​ ルートの ​[Logger]​ コンポーネントによってメッセージ ​Doing incremental aggregation step.​ が記録されることを示しています。集約が 3 番目の HTTP 要求コールに達すると、​[Aggregation complete (集約完了)]​ ルートの最初の ​[Logger]​ コンポーネントによってメッセージ ​Aggregation complete​ が記録され、2 番目の ​[Logger]​ コンポーネントによって、集約されたペイロード要素 (ランダム ID 番号) が返されます。

{
  "Source Name": "size-based",
  "Source ID": "0.49127866969631506"
},
{
  "Source Name": "size-based",
  "Source ID": "0.4250959269275998"
},
{
  "Source Name": "size-based",
  "Source ID": "0.4160926509813687"
}

サイズベースのアグリゲーターの例の XML

この例のフローをすばやく Mule アプリケーションに読み込むには、次のコードを Studio XML エディターに貼り付けます。

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:db="http://www.mulesoft.org/schema/mule/db" xmlns:aggregators="http://www.mulesoft.org/schema/mule/aggregators"
	xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
	xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/aggregators http://www.mulesoft.org/schema/mule/aggregators/current/mule-aggregators.xsd
http://www.mulesoft.org/schema/mule/db http://www.mulesoft.org/schema/mule/db/current/mule-db.xsd">
	<http:listener-config name="HTTP_Listener_config">
		<http:listener-connection host="0.0.0.0" port="8086" />
	</http:listener-config>
	<flow name="aggregator-size-demoFlow" >
		<http:listener config-ref="HTTP_Listener_config" path="/test" allowedMethods="POST"/>
		<ee:transform >
			<ee:message >
				<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
{
    "Source Name": "size-based",
    "Source ID": (random() as String)
}]]></ee:set-payload>
			</ee:message>
		</ee:transform>
		<aggregators:size-based-aggregator name="sizeBasedAggregator" maxSize="3">
			<aggregators:incremental-aggregation>
				<logger level="INFO" message="Doing incremental aggregation step." />
			</aggregators:incremental-aggregation>
			<aggregators:aggregation-complete>
				<logger level="INFO" message="Aggregation complete" />
				<logger level="INFO" message="#[output application/json
---
payload]" />
			</aggregators:aggregation-complete>
		</aggregators:size-based-aggregator>
	</flow>
</mule>