?
本文檔使用 PHP中文網(wǎng)手冊 發(fā)布
雖然JMS一般都和異步處理相關,但它也可以同步的方式使用消息??芍剌d的 receive(..)
方法提供了這種功能。在同步接收中,接收線程被阻塞直至獲得一個消息,有可能出現(xiàn)線程被無限阻塞的危險情況。屬性 receiveTimeout 指定了接收器可等待消息的延時時間。
類似于EJB世界里流行的消息驅動Bean(MDB),消息驅動POJO(MDP)作為JMS消息的接收器。MDP的一個約束(但也請看下面的有關 javax.jms.MessageListener
類的討論)是它必須實現(xiàn) javax.jms.MessageListener
接口。另外當你的POJO將以多線程的方式接收消息時必須確保你的代碼是線程-安全的。
以下是MDP的一個簡單實現(xiàn):
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class ExampleListener implements MessageListener { public void onMessage(Message message) { if (message instanceof TextMessage) { try { System.out.println(((TextMessage) message).getText()); } catch (JMSException ex) { throw new RuntimeException(ex); } } else { throw new IllegalArgumentException("Message must be of type TextMessage"); } } }
一旦你實現(xiàn)了 MessageListener
后就可以創(chuàng)建一個消息偵聽容器。
請看下面例子是如何定義和配置一個隨Sping發(fā)行的消息偵聽容器的(這個例子用 DefaultMessageListenerContainer
)
<!-- this is the Message Driven POJO (MDP) --> <bean id="messageListener" class="jmsexample.ExampleListener" /> <!-- and this is the message listener container --> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="destination"/> <property name="messageListener" ref="messageListener" /> </bean>
關于各個消息偵聽容器實現(xiàn)的特色請參閱相關的Spring Javadoc文檔。
SessionAwareMessageListener
接口是一個Spring專門用來提供類似于JMS MessageListener
的接口,也提供了從接收 Message
來訪問JMS Session
的消息處理方法。
package org.springframework.jms.listener;
public interface SessionAwareMessageListener {
void onMessage(Message message, Session session) throws JMSException;
}
如果你希望你的MDP可以響應所有接收到的消息(使用 onMessage(Message, Session)
方法提供的 Session
)那么你可以選擇讓你的MDP實現(xiàn)這個接口(優(yōu)先于標準的JMS MessageListener
接口)。所有隨Spring發(fā)行的支持MDP的消息偵聽容器都支持 MessageListener
或 SessionAwareMessageListener
接口的實現(xiàn)。要注意的是實現(xiàn)了 SessionAwareMessageListener
接口的類通過接口和Spring有了耦合。是否選擇使用它完全取決于開發(fā)者或架構師。
請注意 SessionAwareMessageListener
接口的 'onMessage(..)'
方法會拋出 JMSException
異常。和標準JMS MessageListener
接口相反,當使用 SessionAwareMessageListener
接口時,客戶端代碼負責處理任何拋出的異常。
MessageListenerAdapter
類是Spring的異步支持消息類中的不變類(final class):簡而言之,它允許你幾乎將 任意 一個類做為MDP顯露出來(當然有某些限制)。
如果你使用JMS 1.0.2 API,你將使用和 MessageListenerAdapter
一樣功能的類 MessageListenerAdapter102
。
考慮如下接口定義。注意雖然這個接口既不是從 MessageListener
也不是從 SessionAwareMessageListener
繼承來得,但通過 MessageListenerAdapter
類依然可以當作一個MDP來使用。同時也請注意各種消息處理方法是如何根據(jù)他們可以接收并處理消息的內容來進行強類型匹配的。
public interface MessageDelegate { void handleMessage(String message); void handleMessage(Map message); void handleMessage(byte[] message); void handleMessage(Serializable message); }
public class DefaultMessageDelegate implements MessageDelegate {
// implementation elided for clarity...
}
特別請注意,上面的 MessageDelegate
接口(上文中 DefaultMessageDelegate
類)的實現(xiàn)完全 不 依賴于JMS。它是一個真正的POJO,我們可以通過如下配置把它設置成MDP。
<!-- this is the Message Driven POJO (MDP) --> <bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <constructor-arg> <bean class="jmsexample.DefaultMessageDelegate"/> </constructor-arg> </bean> <!-- and this is the message listener container... --> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="destination"/> <property name="messageListener" ref="messageListener" /> </bean>
下面是另外一個只能處理接收JMS TextMessage
消息的MDP示例。注意消息處理方法是如何實際調用 'receive'
(在 MessageListenerAdapter
中默認的消息處理方法的名字是 'handleMessage'
)的,但是它是可配置的(你下面就將看到)。注意 'receive(..)'
方法是如何使用強制類型來只接收和處理JMS TextMessage
消息的。
public interface TextMessageDelegate { void receive(TextMessage message); }
public class DefaultTextMessageDelegate implements TextMessageDelegate {
// implementation elided for clarity...
}
輔助的 MessageListenerAdapter
類配置文件類似如下:
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultTextMessageDelegate"/>
</constructor-arg>
<property name="defaultListenerMethod" value="receive"/>
<!-- we don't want automatic message context extraction -->
<property name="messageConverter">
<null/>
</property>
</bean>
請注意,如果上面的 'messageListener'
收到一個不是 TextMessage
類型的JMS Message
,將會產(chǎn)生一個 IllegalStateException
異常(隨之產(chǎn)生的其他異常只被捕獲而不處理)。
MessageListenerAdapter
還有一個功能就是如果處理方法返回一個非空值,它將自動返回一個響應 消息
。
請看下面的接口及其實現(xiàn):
public interface ResponsiveTextMessageDelegate {
// notice the return type...
String receive(TextMessage message);
}
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
// implementation elided for clarity...
}
如果上面的 DefaultResponsiveTextMessageDelegate
和 MessageListenerAdapter
聯(lián)合使用,那么任意從執(zhí)行 'receive(..)'
方法返回的非空值都將(缺省情況下)轉換成一個 TextMessage
。這個返回的 TextMessage
將被發(fā)送到原來的 Message
中JMS Reply-To屬性定義的 目的地
(如果存在),或者是 MessageListenerAdapter
設置(如果配置了)的缺省 目的地
;如果沒有定義 目的地
,那么將產(chǎn)生一個 InvalidDestinationException
異常(此異常將不會只被捕獲而不處理,它 將沿著調用堆棧上傳)。
在消息監(jiān)聽器的調用中使用事務只需要重新配置監(jiān)聽器容器
通過監(jiān)聽器容器定義中的 sessionTransacted
標記可以輕松的激活本地資源事務。每次消息監(jiān)聽器的調用都在激活的JMS事務中執(zhí)行,執(zhí)行失敗時,消息接收將發(fā)生回滾。這個本地事務還將包含響應信息的發(fā)送(通過 SessionAwareMessageListener
),但其它資源的操作(例如訪問數(shù)據(jù)庫)是獨立的。經(jīng)常會發(fā)生類似于數(shù)據(jù)庫處理已提交但消息處理提交失敗的情況,因此需要在監(jiān)聽器的實現(xiàn)中進行重復消息的檢測。
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="sessionTransacted" value="true"/>
</bean>
當參與外部管理的事務時,你需要使用支持外來事務的監(jiān)聽器容器:通常是 DefaultMessageListenerContainer
來配置事務管理器。
參與XA事務時,消息監(jiān)聽器容器需要配置 JtaTransactionManager
(默認會委托給J2EE服務器事務子系統(tǒng))。注意以下JMS ConnectionFactory需要具有XA能力并注冊JTA事務協(xié)調器?。▍⒖寄闼褂玫腏2EE服務器中JNDI資源的配置。)這樣,消息接收就像數(shù)據(jù)庫訪問一樣作為同一個事務的一部分(具有統(tǒng)一提交的語義,僅僅增加了XA事務日志的額外開銷)。
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>
然后你只需要把它添加到早先配置好的容器中。這個容器將處理剩下的事情。
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="transactionManager" ref="transactionManager"/>
</bean>