Spring JMSでトランザクションを設定する3種類の方法について

Spring Frameworkトランザクション対応のJMSメッセージ送受信を行う方法について簡単にまとめます。

前提知識

JMSのメッセージ送受信については

の3通りがあります。トランザクションなしで送受信する場合は、送信や受信が直ちに行われますが、Session生成時のアクナレッジモードの指定により、ACKを送信するタイミングを制御することができます。ローカルトランザクションの場合は、特定のJMS Sessionに対して複数のメッセージの送信や受信処理をトランザクショナルに処理できます。ただし、JTAを利用する場合と異なり、複数のSessionやデータベースのアクセスをひとつのトランザクションとして処理することはできません。この3種類はもちろん要件によって適切に使い分ける必要がありますが、通常Java EEのサーバー上でのメッセージ送受信ではローカルかJTAかいずれかのトランザクション内でメッセージを送受信するケースが多いと思います。

XAを使うべきかどうか

理論上、データベースとJMSをひとつのトランザクションで処理できると、障害発生時でも容易に整合性を保つことができ便利なように思われます。しかし、XAを使うには2フェーズコミットが必要であり、そのためにはトランザクションログ書き込みのためファイルIOが発生し、メッセージの量によってはどうしても性能が大きく劣化する可能性があります。また、トランザクションマネージャーであるアプリケーションサーバーとJMS実装との相性の問題などにぶつかる可能性も高いです。さらに、Active MQなどアプリケーションサーバーとは独立したJMSプロバイダーをXA対応で利用するためにはJCAのリソースプールとしてアプリケーションサーバーに正しく組み込む必要があります。このように、XAを使いこなすには技術的なハードルは相当高いと思います。したがって、検証に十分時間を割くことができなければ、あえてローカルトランザクションを使う方が実際的であることが多いと考えます。
ActiveMQ
この場合、以下がポイントになります。

  • JMSのトランザクション境界がDBのトランザクション境界の外側になるように設定する
  • 可能ならメッセージ送信側でDB更新を避け、受信側でのみDB更新を行うようにする。この場合、DBのトランザクションがコミット後にJMSのトランザクションロールバックされると、メッセージの再送機能によりメッセージが重複受信される可能性があります。しかし、DBの状態(注文などの状態やメッセージID)を見て重複メッセージかどうか判断し、無視するよう対処すれば問題ありません。(Idempotent Consumerパターン)送信側でどうしてもDB更新を行いたい場合は、定期的に状態を監視して不整合をリカバリーするか、きちんと例外発生時に処理するなどの対処が必要です。

Spring JMSにおけるトランザクション設定

Spring JMSではトランザクション内でメッセージを送受信する方法として以下の3通りの設定方法があります。

A)JtaTransactionManagerを利用してXAのグローバルトランザクションに参加

XA対応の接続ファクトリーを設定しておけば、JTAトランザクション境界の中でメッセージを送受信すると自動的にグローバルトランザクションに参加してくれます。AOPを組み合わせることで通常はサービスのメソッド呼び出しが自動的にトランザクション境界となるように設定されますから、アプリケーションロジックで特に意識しなくても自動的に参加できます。また、MDPを使う場合もDefaultMessageListenerContainerに対してJtaTransactionManagerをインジェクションしておけば、トランザクション内で非同期にメッセージを受信する動作となります。*1JBoss Messagingを使う場合の設定例を以下に示します。

	<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager" />

	<jee:jndi-lookup id="connectionFactory" jndi-name="java:/JmsXA" />
	
	<tx:advice id="txAdvice" transaction-manager="transactionManager">
		<tx:attributes>
			<tx:method name="*" propagation="REQUIRED"/>
		</tx:attributes>
	</tx:advice>

	
	<bean id="desitinationResolver" class="org.springframework.jms.support.destination.JndiDestinationResolver" />
	
	<bean id="sampleMessageSender" class="jbosstest.service.impl.mao.JmsSampleMessageSender">
		<property name="jmsTemplate" >
			<bean class="org.springframework.jms.core.JmsTemplate">
				<property name="connectionFactory" ref="connectionFactory" />
				<property name="defaultDestinationName" value="/queue/TestQueue" />
				<property name="destinationResolver" ref="desitinationResolver" />
			</bean>
		</property>
	</bean>
	
	<bean id="sampleMessageReceiver" class="jbosstest.service.impl.mao.JmsSampleMessageReceiver" />
	
	<bean id="taskExecutor" class="org.springframework.jca.work.jboss.JBossWorkManagerTaskExecutor" />
	
	<jms:listener-container 
		cache="consumer" 
		connection-factory="connectionFactory"
		destination-type="queue"
		destination-resolver="desitinationResolver"
		transaction-manager="transactionManager"	
		task-executor="taskExecutor">
		<jms:listener destination="/queue/TestQueue"  ref="sampleMessageReceiver" method="handleMessage"/>
	</jms:listener-container>
	
	<aop:config>
		<aop:advisor pointcut="execution(* *..service..*.*(..)) and @target(org.springframework.stereotype.Service)" advice-ref="txAdvice" />
	</aop:config>
B1)JmsTemplate、DefaultMessageListenerContainerなどのsessionTransactedプロパティをtrueに設定してローカルトランザクションに参加

sessionTransactedプロパティをtrueに設定すると、内部的にJMSのSessionを生成する際にSessionローカルのトランザクションが有効になります。JTAやデータベースなどの別のトランザクションの境界内では、自動的にそのトランザクションの外側でJMSのトランザクションをコミットしてくれるようになります。(JMSがJTAに参加する場合はsessionTransactedフラグは使われない想定のため、デフォルトのfalseのままにしておくべきです。ただし、Atomikosをトランザクションマネージャーとして使う場合に限りsessionTransactedを同時にtrueにしないといけないようです。)JBoss Messagingを使う場合の設定例を以下に示します。この場合、受信側の設定でタグを利用できいため、明示的にDefaultMessageListenerContainerを宣言しています。

<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager" />

	<jee:jndi-lookup id="jbossConnectionFactory" jndi-name="java:/ConnectionFactory" />
	<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
		<property name="targetConnectionFactory" ref="jbossConnectionFactory" />
	</bean>
	
	
	<tx:advice id="txAdvice" transaction-manager="transactionManager">
		<tx:attributes>
			<tx:method name="*" propagation="REQUIRED"/>
		</tx:attributes>
	</tx:advice>

	<bean id="desitinationResolver" class="org.springframework.jms.support.destination.JndiDestinationResolver" />
	
	<bean id="sampleMessageSender" class="jbosstest.service.impl.mao.JmsSampleMessageSender">
		<property name="jmsTemplate" >
			<bean class="org.springframework.jms.core.JmsTemplate">
				<property name="connectionFactory" ref="connectionFactory" />
				<property name="defaultDestinationName" value="/queue/TestQueue" />
				<property name="destinationResolver" ref="desitinationResolver" />
				<property name="sessionTransacted" value="true" />
			</bean>
		</property>
	</bean>
	
	<bean id="sampleMessageReceiver" class="jbosstest.service.impl.mao.JmsSampleMessageReceiver" />
	
	<bean id="taskExecutor" class="org.springframework.jca.work.jboss.JBossWorkManagerTaskExecutor" />
	
<!--
	<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="cacheLevelName" value="CACHE_CONSUMER"/>
		<property name="sessionTransacted" value="true" />
		<property name="taskExecutor" ref="taskExecutor" />
		<property name="destinationResolver" ref="desitinationResolver" />
		<property name="connectionFactory" ref="connectionFactory" />
		<property name="destinationName" value="/queue/TestQueue" />
		<property name="pubSubDomain" value="false" />
		<property name="messageListener" ref="sampleMessageReceiver" />		
	</bean> -->
	
	<jms:listener-container 
		cache="consumer" 
		connection-factory="connectionFactory"
		destination-type="queue"
		destination-resolver="desitinationResolver"
		acknowledge=="transacted"	
		task-executor="taskExecutor">
		<jms:listener destination="/queue/TestQueue"  ref="sampleMessageReceiver" method="handleMessage"/>
	</jms:listener-container>

	<aop:config>
		<aop:advisor pointcut="execution(* *..service..*.*(..)) and @target(org.springframework.stereotype.Service)" advice-ref="txAdvice" />
	</aop:config>

JmsAccessorのJavaDOC

B2)JmsTransactionManagerを使用してローカルトランザクションに参加

DBのトランザクションマネージャ(JpaTransactionManagerやJtaTransactionManagerなど)とは別に、JmsTransactionManagerをBeanとして登録しておき、AOPトランザクション境界を設定することができます。この場合、先に説明したようにJMSのトランザクション境界が外側になるようにしなくてはならないため、各TransactionManagerのorder属性を正しく指定することで両者のAOPインターセプターかかる順番を正しく設定する必要があります。ここが非常に間違えやすいところなのですが、orderの値が小さいほど外側になりますから、JmsTransactionManagerのorder属性をより小さな値に設定することが重要です。MDPを使う場合も、JtaTransactionManagerの場合と同様にDefaultMessageListenerContainerに対してJmsTransactionManagerをインジェクションすれば、ローカルトランザクション内でメッセージを受信できます。JBoss Messagingを使う場合の設定例を以下に示します。

	<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager" />
	<jee:jndi-lookup id="jbossConnectionFactory" jndi-name="java:/ConnectionFactory" />
	<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
		<property name="targetConnectionFactory" ref="jbossConnectionFactory" />
	</bean>
	
	<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager" >
		<property name="connectionFactory" ref="connectionFactory" />
	</bean>
	
	<tx:advice id="txAdvice" transaction-manager="transactionManager">
		<tx:attributes>
			<tx:method name="*" propagation="REQUIRED"/>
		</tx:attributes>
	</tx:advice>

	<tx:advice id="txAdviceForJms" transaction-manager="jmsTransactionManager">
		<tx:attributes>
			<tx:method name="*" propagation="REQUIRED"/>
		</tx:attributes>
	</tx:advice>
	
	<aop:config>
		<aop:advisor pointcut="execution(* *..service..*.*(..)) and @target(org.springframework.stereotype.Service)" advice-ref="txAdvice" order="2"/>
		<aop:advisor pointcut="execution(* *..service..*.*(..)) and @target(org.springframework.stereotype.Service)" advice-ref="txAdviceForJms" order="1"/>
	</aop:config>

	<bean id="desitinationResolver" class="org.springframework.jms.support.destination.JndiDestinationResolver" />
	
	<bean id="sampleMessageSender" class="jbosstest.service.impl.mao.JmsSampleMessageSender">
		<property name="jmsTemplate" >
			<bean class="org.springframework.jms.core.JmsTemplate">
				<property name="connectionFactory" ref="connectionFactory" />
				<property name="defaultDestinationName" value="/queue/TestQueue" />
				<property name="destinationResolver" ref="desitinationResolver" />
			</bean>
		</property>
	</bean>
	
	<bean id="sampleMessageReceiver" class="jbosstest.service.impl.mao.JmsSampleMessageReceiver" />
	
	<bean id="taskExecutor" class="org.springframework.jca.work.jboss.JBossWorkManagerTaskExecutor" />
	
	<jms:listener-container 
		cache="consumer" 
		connection-factory="connectionFactory"
		destination-type="queue"
		destination-resolver="desitinationResolver"
		transaction-manager="jmsTransactionManager"	
		task-executor="taskExecutor">
		<jms:listener destination="/queue/TestQueue"  ref="sampleMessageReceiver" method="handleMessage"/>
	</jms:listener-container>


		
それぞれの方法の使い分け

AかBかはグローバルトランザクションを使うかどうかの違いです。問題はローカルトランザクションを使う場合のB1とB2の使い分けです。B1の方法のメリットは、設定が単純なことですが、トランザクション内で正しくメッセージを送受信するためには、特定のSessionオブジェクトを使いまわす必要があります。特にメッセージ受信時にそのトランザクション内で送信を行う場合などの複数の送受信操作をひとつのトランザクション内で行いたい場合に、注意が必要になります。(メッセージ受信コンポーネントでSessionAwareMessageListenerを実装すれば、メッセージ受信時のSessionオブジェクトをパラメーターとして受け取ることができますので、そのSessionを使って送信を行うことが可能です。)一方、B2の方法はローカルトランザクションではありながら、SpringのデータベーストランザクションJTAトランザクションと同様に、スレッドごとにSessionをリソースとして自動的にバインドしてくれるため、トランザクションの境界さえ正しく設定しておけば、JmsTemplateを使う限り正しくSessionを共有してくれることです。また、AOPトランザクション境界を自由に設定できるところも便利な時があるかもしれません。単純な受信だけ、送信だけのトランザクションであれば大げさかもしれませんが、ローカルトランザクションでかつ複数の送り先に対して送受信の連携が必要になるケースでは便利だと思います。

*1:実際には非同期ではなく、タイムアウトつきの同期受信処理を反復することで非同期受信を動作なっている。