国产TS紫迹丝袜高跟鞋在线,一区二区三区国产自产视频免费,67pao国产人成视频,午国产午夜激无码毛片不卡

愛(ài)碼網(wǎng)專(zhuān)注于資源免費(fèi)下載

消息中間件ActiveMQ

一.簡(jiǎn)介

1.1 JMS簡(jiǎn)介

JMS即Java消息服務(wù)(Java Message Service)應(yīng)用程序接口,是一個(gè)Java平臺(tái)中關(guān)于面向消息中間件(MOM)的API,用于在兩個(gè)應(yīng)用程序之間,或分布式系統(tǒng)中發(fā)送消息,進(jìn)行異步通信。Java消息服務(wù)是一個(gè)與具體平臺(tái)無(wú)關(guān)的API,絕大多數(shù)MOM提供商都對(duì)JMS提供支持。

JMS是一種與廠商無(wú)關(guān)的 API,用來(lái)訪問(wèn)收發(fā)系統(tǒng)消息,它類(lèi)似于JDBC(Java Database Connectivity)。這里,JDBC 是可以用來(lái)訪問(wèn)許多不同關(guān)系數(shù)據(jù)庫(kù)的 API,而 JMS 則提供同樣與廠商無(wú)關(guān)的訪問(wèn)方法,以訪問(wèn)消息收發(fā)服務(wù)。許多廠商都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ。 JMS 使您能夠通過(guò)消息收發(fā)服務(wù)(有時(shí)稱(chēng)為消息中介程序或路由器)從一個(gè) JMS 客戶(hù)機(jī)向另一個(gè) JMS客戶(hù)機(jī)發(fā)送消息。消息是 JMS 中的一種類(lèi)型對(duì)象,由兩部分組成:報(bào)頭和消息主體。報(bào)頭由路由信息以及有關(guān)該消息的元數(shù)據(jù)組成。消息主體則攜帶著應(yīng)用程序的數(shù)據(jù)或有效負(fù)載。根據(jù)有效負(fù)載的類(lèi)型來(lái)劃分,可以將消息分為幾種類(lèi)型,它們分別攜帶:簡(jiǎn)單文本(TextMessage)、可序列化的對(duì)象 (ObjectMessage)、屬性集合 (MapMessage)、字節(jié)流 (BytesMessage)、原始值流 (StreamMessage),還有無(wú)有效負(fù)載的消息 (Message)。

1.2 ActiveMQ簡(jiǎn)介

消息中間件ActiveMQ-第1張圖片

Apache ActiveMQ是Apache軟件基金會(huì)所研發(fā)的開(kāi)放源代碼消息中間件;由于ActiveMQ是一個(gè)純Java程序,因此只需要操作系統(tǒng)支持Java虛擬機(jī),ActiveMQ便可執(zhí)行。

1.3 ActiveMQ啟動(dòng)和運(yùn)行

主頁(yè):http://activemq.apache.org/
目前最新版本:5.15.12 (March 9, 2020)
開(kāi)發(fā)包及源碼下載地址:http://activemq.apache.orgactivemq-5111-release.htmActiveMQ
服務(wù)啟動(dòng)地址:http://127.0.0.1:8161/admin/
用戶(hù)名/密碼:admin/admin

二.ActiveMQ點(diǎn)對(duì)點(diǎn)消息實(shí)現(xiàn)

2.1 直接 Receive 方式

Session.AUTO_ACKNOWLEDGE。當(dāng)客戶(hù)成功的從receive方法返回的時(shí)候,或者從MessageListener.onMessage方法成功返回的時(shí)候,會(huì)話自動(dòng)確認(rèn)客戶(hù)收到的消息。
Session.CLIENT_ACKNOWLEDGE。 客戶(hù)通過(guò)消息的 acknowledge 方法確認(rèn)消息。需要注意的是,在這種模式中,確認(rèn)是在會(huì)話層上進(jìn)行:確認(rèn)一個(gè)被消費(fèi)的消息將自動(dòng)確認(rèn)所有已被會(huì)話消 費(fèi)的消息。例如,如果一個(gè)消息消費(fèi)者消費(fèi)了 10 個(gè)消息,然后確認(rèn)第 5 個(gè)消息,那么所有 10 個(gè)消息都被確認(rèn)。
Session.DUPS_ACKNOWLEDGE。 該選擇只是會(huì)話遲鈍第確認(rèn)消息的提交。如果 JMS provider 失敗,那么可能會(huì)導(dǎo)致一些重復(fù)的消息。如果是重復(fù)的消息,那么 JMS provider 必須把消息頭的 JMSRedelivered 字段設(shè)置為 true。

生產(chǎn)者

	package main.activeMq;
	
	import org.apache.activemq.ActiveMQConnection;
	import org.apache.activemq.ActiveMQConnectionFactory;
	
	import javax.jms.Connection;
	import javax.jms.ConnectionFactory;
	import javax.jms.Destination;
	import javax.jms.JMSException;
	import javax.jms.MessageProducer;
	import javax.jms.Session;
	import javax.jms.TextMessage;


	/**
	 * 消息生產(chǎn)者
	 *
	 * @author wangjian
	 * @date 2020/5/8 22:26
	 */
	public class Producer {

    // 默認(rèn)的用戶(hù)名,密碼,連接地址
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL;
    // 發(fā)送消息數(shù)量
    private static final int SENDNUM = 10;

    public static void main(String[] args) {
        ConnectionFactory connectionFactory;  // 連接工廠
        Connection connection = null;  // 連接
        Session session; // 會(huì)話:接收或者發(fā)送消息的線程
        Destination destination; // 消息的目的地
        MessageProducer messageProducer; // 消息生產(chǎn)者
        // 實(shí)例化連接工廠(URL:failover://tcp://localhost:61616)
        connectionFactory = new ActiveMQConnectionFactory(Producer.USERNAME, Producer.PASSWORD, Producer.URL);
        try {
            // 通過(guò)連接工廠獲取連接
            connection = connectionFactory.createConnection();
            // 啟動(dòng)
            connection.start();
            // 創(chuàng)建session,true:開(kāi)啟事務(wù)
            session = connection.createSession(true, SENDNUM);
            // 創(chuàng)建消息隊(duì)列
            destination = session.createQueue("firstQueue"); // 點(diǎn)對(duì)點(diǎn)
            // destination = session.createTopic("firstTopic");  // 發(fā)布訂閱
            // 創(chuàng)建消息生產(chǎn)者
            messageProducer = session.createProducer(destination);
            sendMessage(session, messageProducer);
            session.commit();
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            try {
                connection.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }

    }

    /**
     * 發(fā)送消息
     *
     * @param session
     * @param producer
     */
    public static void sendMessage(Session session, MessageProducer producer) {
        for (int i = 0; i < SENDNUM; i++) {
            try {
                TextMessage textMessage = session.createTextMessage("ActiveMQ 發(fā)送的消息:" + i);
                System.out.println("發(fā)送消息:" + "ActiveMQ 發(fā)送的消息" + i);
                producer.send(textMessage);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    }	
	

消費(fèi)者

package main.activeMq;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

/**
 * 消費(fèi)者
 *
 * @author wangjian
 * @date 2020/5/8 22:53
 */
public class Consumer {
// 默認(rèn)的用戶(hù)名,密碼,連接地址
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL;


public static void main(String[] args) {
    ConnectionFactory connectionFactory;  // 連接工廠
    Connection connection = null;  // 連接
    Session session; // 會(huì)話:接收或者發(fā)送消息的線程
    Destination destination; // 消息的目的地
    MessageConsumer messageConsumer; // 消息消費(fèi)者
    // 實(shí)例化連接工廠
    connectionFactory = new ActiveMQConnectionFactory(Consumer.USERNAME, Consumer.PASSWORD, Consumer.URL);
    try {
        // 創(chuàng)建連接
        connection = connectionFactory.createConnection();
        // 啟動(dòng)連接
        connection.start();
        // 創(chuàng)建session
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        destination = session.createQueue("firstQueue");
        MessageConsumer sessionConsumer = session.createConsumer(destination);
        while (true) {
            TextMessage textMessage = (TextMessage) sessionConsumer.receive();
            if (textMessage != null) {
                System.out.println("收到的消息:" + textMessage.getText());
            } else {
                break;
            }
        }
    } catch (JMSException e) {
        e.printStackTrace();
    }
}
}

2.2 使用 Listener 監(jiān)聽(tīng)方式

Listener監(jiān)聽(tīng)

package main.activeMq;
	
	import javax.jms.JMSException;
	import javax.jms.Message;
	import javax.jms.MessageListener;
	import javax.jms.TextMessage;
	
	/**
	 * 消息監(jiān)聽(tīng)
	 *
	 * @author wangjian
	 * @date 2020/5/8 23:01
	 */
	public class Listener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("收到的消息:" + ((TextMessage) message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
    }

消費(fèi)者

package main.activeMq;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

/**
 * 消費(fèi)者:監(jiān)聽(tīng)方式
 * @author wangjian
 * @date 2020/5/8 23:03
 */
public class Consumer2 {
   // 默認(rèn)的用戶(hù)名,密碼,連接地址
   private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
   private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
   private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL;


   public static void main(String[] args) {
       ConnectionFactory connectionFactory;  // 連接工廠
       Connection connection = null;  // 連接
       Session session; // 會(huì)話:接收或者發(fā)送消息的線程
       Destination destination; // 消息的目的地
       MessageConsumer messageConsumer; // 消息消費(fèi)者
       // 實(shí)例化連接工廠
       connectionFactory = new ActiveMQConnectionFactory(Consumer2.USERNAME, Consumer2.PASSWORD, Consumer2.URL);
       try {
           // 創(chuàng)建連接
           connection = connectionFactory.createConnection();
           // 啟動(dòng)連接
           connection.start();
           // 創(chuàng)建session
           session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
           // 創(chuàng)建消息的連接隊(duì)列
           destination = session.createQueue("firstQueue");
           // 創(chuàng)建消息消費(fèi)者
           MessageConsumer sessionConsumer = session.createConsumer(destination);
           // 注冊(cè)消息監(jiān)聽(tīng)
           sessionConsumer.setMessageListener(new Listener());
       } catch (JMSException e) {
           e.printStackTrace();
       }
   }
   }

三.ActiveMQ 發(fā)布-訂閱消息模式實(shí)現(xiàn)

必須先訂閱,后發(fā)布

destination = session.createTopic("firstTopic");  // 發(fā)布訂閱

本文鏈接:http://fangxuan.com.cn/article/115.html

網(wǎng)友評(píng)論

熱門(mén)文章
隨機(jī)文章
熱門(mén)標(biāo)簽
側(cè)欄廣告位