トランザクションメッセージソース

操作と同様に、メッセージソースではトランザクションがサポートされています。メッセージソースの例としては、JMS や VM Connector リスナーがあります。どちらのリスナーも、キューからメッセージを取得してフローにプッシュします。メッセージ処理が成功すると、トランザクションがコミットされます。失敗すると、トランザクションがロールバックされ、メッセージがキューに戻ります。

操作と同様に、メッセージソースはトランザクションになるように ​TransactionalConnection​ インターフェースを実装する接続種別で動作する必要があります。接続で ​XATransactionalConnection​ が実装されている場合、XA トランザクションも自動的にサポートされます。

例: トランザクションメッセージソース
public class TransactionalSource extends Source<Serializable, Void> {

  @Connection
  private ConnectionProvider<MyTransactionalConnection> connectionProvider;

  ....
}

メッセージソースのトランザクションアクション

メッセージソースがトランザクションの場合、​transactionalAction​ という名前の合成パラメーターが自動的に追加されます。そのパラメーターは、次の値を取ることができる ​enum​ 型です。

  • ALWAYS_BEGIN​: 呼び出しごとに新しいトランザクションが作成されます。

  • NONE: ソースがトランザクションを開始せずに、フローで開いているトランザクションに参加しないことを示します。

メッセージソースのトランザクション種別

ソースで XA トランザクションがサポートされている場合、​transactionType​ パラメーターが追加されます。このパラメーターは、次の値を取ることができます。

  • LOCAL: 通常のトランザクションを開始します。

  • XA: 代わりに XA トランザクションを開始します。

トランザクションの処理

SDK では、トランザクションは接続を介してモデル化されます。そのため、トランザクションごとに異なる接続インスタンスが必要になります。これは、メッセージソースでは並列性がサポートされている必要がありますが、同じ接続で異なる 2 つのトランザクションを同時に提供できないためです。

そのため、SDK ドキュメントの他の場所で使用されている HTTP リスナーサンプルとは異なるモデルが必要になります。そこで、VM Connector の ​<vm:listener />​ を使用する次の簡略化された例を考えてみましょう。

public VMListener extends Source<Serializable, VMMessageAttributes> {

	@Connection
	private ConnectionProvider<QueueSession> sessionProvider;

	@Override
  public void onStart(SourceCallback<Serializable, VMMessageAttributes> sourceCallback) throws MuleException {
		while(notStopped()) {
			QueueSession session =  sessionProvider.connect(); (1)
      CallbackContext ctx = callback.createContext(); (2)
			TransactionHandle status = ctx.bindConnection(session); (3)

			try {
				callback.handle(session.poll(), ctx); (4)
			} catch (Exception e) {
				status.rollback();
			}
		}
	}

	@OnSuccess
	public void onSuccess(SourceCallbackContext context) {
		handleSuccess(context.getConnection()); (5)
	}

	@OnError
	public void onError(SourceCallbackContext context, Error error) {
		handleError(context, error);
	}
}
1 複数の接続が取得されます。
2 接続ごとに新しい​CallbackContext​ が作成されます。
3 作成された ​SourceCallbackContext​ の ​bindConnection()​ メソッドを介して各接続がコンテキストに登録されます。
4 コンテキストが ​handle​ メソッドに渡されます。
5 接続は後で ​SourceCallbackContext.getConnection()​ メソッドを介してすべての ​onSuccess​、​onError​、​onTerminate methods​ で使用できるようになります。

接続はコンテキストにバインドされているため、Runtime は必要に応じて自動的に接続を終了します。接続がトランザクションで、ソースもトランザクションになるように設定されている場合、​bindConnection()​ メソッドをコールすると、その接続で自動的にトランザクションが開始されます。また、接続がトランザクションの場合、Runtime は ​onSuccess()​ メソッドの後に自動的にトランザクションをコミットしたり、​onError()​ メソッドの後にロールバックしたりします。

このケースでは、トランザクションはキューからメッセージをポーリングする前に開始する必要があります。接続の取得直後に ​bindConnection()​ をコールすることをお勧めします。

カスタムトランザクション処理

コネクタによっては、カスタムトランザクション処理を提供することが必要になる場合があります。たとえば、エラーの場合にエラー応答をパブリッシュしたいとします。この場合、ロールバックされると応答が宛先に到達しなくなるため、トランザクションをコミットする必要があります。

例: カスタムトランザクション処理
  @OnError
  public void onError(SourceCallbackContext context, Error error) {
    ctx.getConnection().publish(buildErrorResponse(error)); (1)
    ctx.getTransactionHandle().commit(); (2)
  }
1 onSuccess()​ および ​onError()​ メソッドを実行するとき、接続はまだオープン状態です。
2 コンテキストには、手動でトランザクションを操作できる ​TransactionHandle​ が設定されています。トランザクションを手動で解決する場合、Runtime でそのことが考慮され、後で自動的に解決されなくなります。最も一般的なユースケースでは、この作業を行う必要はありません。