一.簡(jiǎn)介
1.1 JMS簡(jiǎn)介
JMS即Java消息服務(wù)(Java Message Service)應(yīng)用程序接口,是一個(gè)Java平臺(tái)中關(guān)于面向消息中間件(MOM)的API,用于在兩個(gè)應(yīng)用程序之間,或分布式系統(tǒng)中發(fā)送消息,進(jìn)行異步通信。Java消息服務(wù)是一個(gè)與具體平臺(tái)無(wú)關(guān)的API,絕大多數(shù)MOM提供商都對(duì)JMS提供支持。
JMS是一種與廠商無(wú)關(guān)的 API,用來(lái)訪問(wèn)收發(fā)系統(tǒng)消息,它類(lèi)似于JDBC(Java Database Connectivity)。這里,JDBC 是可以用來(lái)訪問(wèn)許多不同關(guān)系數(shù)據(jù)庫(kù)的 API,而 JMS 則提供同樣與廠商無(wú)關(guān)的訪問(wèn)方法,以訪問(wèn)消息收發(fā)服務(wù)。許多廠商都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ。 JMS 使您能夠通過(guò)消息收發(fā)服務(wù)(有時(shí)稱(chēng)為消息中介程序或路由器)從一個(gè) JMS 客戶(hù)機(jī)向另一個(gè) JMS客戶(hù)機(jī)發(fā)送消息。消息是 JMS 中的一種類(lèi)型對(duì)象,由兩部分組成:報(bào)頭和消息主體。報(bào)頭由路由信息以及有關(guān)該消息的元數(shù)據(jù)組成。消息主體則攜帶著應(yīng)用程序的數(shù)據(jù)或有效負(fù)載。根據(jù)有效負(fù)載的類(lèi)型來(lái)劃分,可以將消息分為幾種類(lèi)型,它們分別攜帶:簡(jiǎn)單文本(TextMessage)、可序列化的對(duì)象 (ObjectMessage)、屬性集合 (MapMessage)、字節(jié)流 (BytesMessage)、原始值流 (StreamMessage),還有無(wú)有效負(fù)載的消息 (Message)。
1.2 ActiveMQ簡(jiǎn)介
Apache ActiveMQ是Apache軟件基金會(huì)所研發(fā)的開(kāi)放源代碼消息中間件;由于ActiveMQ是一個(gè)純Java程序,因此只需要操作系統(tǒng)支持Java虛擬機(jī),ActiveMQ便可執(zhí)行。
1.3 ActiveMQ啟動(dòng)和運(yùn)行
主頁(yè):http://activemq.apache.org/
目前最新版本:5.15.12 (March 9, 2020)
開(kāi)發(fā)包及源碼下載地址:http://activemq.apache.orgactivemq-5111-release.htmActiveMQ
服務(wù)啟動(dòng)地址:http://127.0.0.1:8161/admin/
用戶(hù)名/密碼:admin/admin
二.ActiveMQ點(diǎn)對(duì)點(diǎn)消息實(shí)現(xiàn)
2.1 直接 Receive 方式
Session.AUTO_ACKNOWLEDGE。當(dāng)客戶(hù)成功的從receive方法返回的時(shí)候,或者從MessageListener.onMessage方法成功返回的時(shí)候,會(huì)話自動(dòng)確認(rèn)客戶(hù)收到的消息。
Session.CLIENT_ACKNOWLEDGE。 客戶(hù)通過(guò)消息的 acknowledge 方法確認(rèn)消息。需要注意的是,在這種模式中,確認(rèn)是在會(huì)話層上進(jìn)行:確認(rèn)一個(gè)被消費(fèi)的消息將自動(dòng)確認(rèn)所有已被會(huì)話消 費(fèi)的消息。例如,如果一個(gè)消息消費(fèi)者消費(fèi)了 10 個(gè)消息,然后確認(rèn)第 5 個(gè)消息,那么所有 10 個(gè)消息都被確認(rèn)。
Session.DUPS_ACKNOWLEDGE。 該選擇只是會(huì)話遲鈍第確認(rèn)消息的提交。如果 JMS provider 失敗,那么可能會(huì)導(dǎo)致一些重復(fù)的消息。如果是重復(fù)的消息,那么 JMS provider 必須把消息頭的 JMSRedelivered 字段設(shè)置為 true。
生產(chǎn)者:
package main.activeMq; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; /** * 消息生產(chǎn)者 * * @author wangjian * @date 2020/5/8 22:26 */ public class Producer { // 默認(rèn)的用戶(hù)名,密碼,連接地址 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 發(fā)送消息數(shù)量 private static final int SENDNUM = 10; public static void main(String[] args) { ConnectionFactory connectionFactory; // 連接工廠 Connection connection = null; // 連接 Session session; // 會(huì)話:接收或者發(fā)送消息的線程 Destination destination; // 消息的目的地 MessageProducer messageProducer; // 消息生產(chǎn)者 // 實(shí)例化連接工廠(URL:failover://tcp://localhost:61616) connectionFactory = new ActiveMQConnectionFactory(Producer.USERNAME, Producer.PASSWORD, Producer.URL); try { // 通過(guò)連接工廠獲取連接 connection = connectionFactory.createConnection(); // 啟動(dòng) connection.start(); // 創(chuàng)建session,true:開(kāi)啟事務(wù) session = connection.createSession(true, SENDNUM); // 創(chuàng)建消息隊(duì)列 destination = session.createQueue("firstQueue"); // 點(diǎn)對(duì)點(diǎn) // destination = session.createTopic("firstTopic"); // 發(fā)布訂閱 // 創(chuàng)建消息生產(chǎn)者 messageProducer = session.createProducer(destination); sendMessage(session, messageProducer); session.commit(); } catch (JMSException e) { e.printStackTrace(); } finally { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } /** * 發(fā)送消息 * * @param session * @param producer */ public static void sendMessage(Session session, MessageProducer producer) { for (int i = 0; i < SENDNUM; i++) { try { TextMessage textMessage = session.createTextMessage("ActiveMQ 發(fā)送的消息:" + i); System.out.println("發(fā)送消息:" + "ActiveMQ 發(fā)送的消息" + i); producer.send(textMessage); } catch (JMSException e) { e.printStackTrace(); } } } }
消費(fèi)者:
package main.activeMq; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; /** * 消費(fèi)者 * * @author wangjian * @date 2020/5/8 22:53 */ public class Consumer { // 默認(rèn)的用戶(hù)名,密碼,連接地址 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args) { ConnectionFactory connectionFactory; // 連接工廠 Connection connection = null; // 連接 Session session; // 會(huì)話:接收或者發(fā)送消息的線程 Destination destination; // 消息的目的地 MessageConsumer messageConsumer; // 消息消費(fèi)者 // 實(shí)例化連接工廠 connectionFactory = new ActiveMQConnectionFactory(Consumer.USERNAME, Consumer.PASSWORD, Consumer.URL); try { // 創(chuàng)建連接 connection = connectionFactory.createConnection(); // 啟動(dòng)連接 connection.start(); // 創(chuàng)建session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("firstQueue"); MessageConsumer sessionConsumer = session.createConsumer(destination); while (true) { TextMessage textMessage = (TextMessage) sessionConsumer.receive(); if (textMessage != null) { System.out.println("收到的消息:" + textMessage.getText()); } else { break; } } } catch (JMSException e) { e.printStackTrace(); } } }
2.2 使用 Listener 監(jiān)聽(tīng)方式
Listener監(jiān)聽(tīng):
package main.activeMq; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * 消息監(jiān)聽(tīng) * * @author wangjian * @date 2020/5/8 23:01 */ public class Listener implements MessageListener { @Override public void onMessage(Message message) { try { System.out.println("收到的消息:" + ((TextMessage) message).getText()); } catch (JMSException e) { e.printStackTrace(); } } }
消費(fèi)者:
package main.activeMq; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; /** * 消費(fèi)者:監(jiān)聽(tīng)方式 * @author wangjian * @date 2020/5/8 23:03 */ public class Consumer2 { // 默認(rèn)的用戶(hù)名,密碼,連接地址 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args) { ConnectionFactory connectionFactory; // 連接工廠 Connection connection = null; // 連接 Session session; // 會(huì)話:接收或者發(fā)送消息的線程 Destination destination; // 消息的目的地 MessageConsumer messageConsumer; // 消息消費(fèi)者 // 實(shí)例化連接工廠 connectionFactory = new ActiveMQConnectionFactory(Consumer2.USERNAME, Consumer2.PASSWORD, Consumer2.URL); try { // 創(chuàng)建連接 connection = connectionFactory.createConnection(); // 啟動(dòng)連接 connection.start(); // 創(chuàng)建session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 創(chuàng)建消息的連接隊(duì)列 destination = session.createQueue("firstQueue"); // 創(chuàng)建消息消費(fèi)者 MessageConsumer sessionConsumer = session.createConsumer(destination); // 注冊(cè)消息監(jiān)聽(tīng) sessionConsumer.setMessageListener(new Listener()); } catch (JMSException e) { e.printStackTrace(); } } }
三.ActiveMQ 發(fā)布-訂閱消息模式實(shí)現(xiàn)
必須先訂閱,后發(fā)布
destination = session.createTopic("firstTopic"); // 發(fā)布訂閱