久久国产成人av_抖音国产毛片_a片网站免费观看_A片无码播放手机在线观看,色五月在线观看,亚洲精品m在线观看,女人自慰的免费网址,悠悠在线观看精品视频,一级日本片免费的,亚洲精品久,国产精品成人久久久久久久

分享

JMS--ActiveMQ的簡單使用

 余香閣 2021-05-27

推薦閱讀:

1.SSM整合及聚合工程的搭建

2.AdminLTE介紹和zTree的簡單使用

3.MyBatis分頁組件--PageHelper

4.Shiro

5.SpringAop--系統(tǒng)日志簡例

6.Dubbo和Zookeeper

7.MyBatis的逆向工程

8.Java使用 POI 操作Excel

9.WebService

10.Echarts和Quartz簡介


一.消息中間件概述

消息中間件利用高效可靠的消息傳遞機制進行平臺無關的數(shù)據(jù)交流,,并基于數(shù)據(jù)通信來進行分布式系統(tǒng)的集成,。通過提供消息傳遞和消息排隊模型,,它可以在分布式環(huán)境下擴展進程間的通信,。對于消息中間件,,常見的角色大致也就有 Producer(生產(chǎn)者),、Consumer(消費者),。消息隊列中間件是分布式系統(tǒng)中重要的組件,,主要解決應用解耦,,異步消息,流量削鋒等問題,,實現(xiàn)高性能,,高可用,可伸縮和最終一致性架構,。

1.1常見消息中間件

ActiveMQ

        ActiveMQ是 Apache 出品,最流行的,,能力強勁的開源消息總線,。ActiveMQ 是一個完全支持 JMS1.1 和J2EE 1.4 規(guī)范的 JMS Provider 實現(xiàn)。

RabbitMQ

        AMQP 協(xié)議的領導實現(xiàn),,支持多種場景。淘寶的 MySQL 集群內部有使用它進行通訊,,OpenStack 開源云平臺的通信組件,最先在金融行業(yè)得到運用,。

ZeroMQ

史上最快的消息隊列系統(tǒng),。

Kafka

        Apache 下的一個子項目 。特點:高吞吐,,在一臺普通的服務器上既可以達到 10W/s 的吞吐速率;完全的分布式系統(tǒng),。適合處理海量數(shù)據(jù)(消息丟失率較高),。

1.2應用場景

  • 異步處理

  • 應用解耦

  • 流量削峰

  • 消息通訊

二.JMS消息服務

JMS(Java Messaging Service)是 Java 平臺上有關面向消息中間件的技術規(guī)范,,它便于消息系統(tǒng)中的Java 應用程序進行消息交換,并且通過提供標準的產(chǎn)生,、發(fā)送、接收消息的接口簡化企業(yè)應用的開發(fā),。JMS 本身只定義了一系列的接口規(guī)范,,是一種與廠商無關的 API,用來訪問消息收發(fā)系統(tǒng),。它類似JDBC(java Database Connectivity):這里,,JDBC 是可以用來訪問許多不同關系數(shù)據(jù)庫的 API,而 JMS則提供同樣與廠商無關的訪問方法,,以訪問消息收發(fā)服務,。

2.1JMS消息模型

        消息中間件一般有兩種傳遞模式:點對點模式(P2P)和發(fā)布-訂閱模式(Pub/Sub),。

2.1.1點對點模型

        點對點模型(Pointer-to-Pointer):即生產(chǎn)者和消費者之間的消息往來。每個消息都被發(fā)送到特定的消息隊列,,接收者從隊列中獲取消息,。隊列保留著消息,直到他們被消費或超時,。

特點

  • 每個消息只有一個消費者(一旦被消費,就不在消息隊列中了)

  • 發(fā)送者和接收者之間沒有依賴,直接發(fā)送,不管是否有消費者

  • 接收者成功接收消息后需向隊列應答成功

2.1.2發(fā)布/訂閱模型

            發(fā)布/訂閱(Publish-Subscribe):包含三個角色:主體(Topic),,發(fā)布者(Publisher),訂閱者(Subscriber),,多個發(fā)布者將消息發(fā)送到 topic,系統(tǒng)將這些消息投遞到訂閱此 topic 的訂閱者,。發(fā)布者發(fā)送到 topic 的消息,只有訂閱了 topic 的訂閱者才會收到消息,。topic 實現(xiàn)了發(fā)布和訂閱,,當你發(fā)布一個消息,,所有訂閱這個 topic 的服務都能得到這個消息,。

特點

  • 每個消息可有有多個消費者

  • 發(fā)布者和訂閱者之間有時間上的依賴

  • 針對某個主題(Topic)的訂閱者,它必須創(chuàng)建一個訂閱者之后,,才能消費發(fā)布者的消息,,而且為了消費消息,訂閱者必須保持運行的狀態(tài)

2.2JMS編程模型

ConnectionFactory

        創(chuàng)建Connection對象的工廠,針對兩種不同的 jms 消息模型,分別有 QueueConnectionFactory 和TopicConnectionFactory 兩種,。

Destination

        Destination 的意思是消息生產(chǎn)者的消息發(fā)送目標或者說消息消費者的消息來源。對于消息生產(chǎn)者來說,,它的 Destination 是某個隊列(Queue)或某個主題(Topic);對于消息消費者來說,,它的 Destination 也是某個隊列或主題(即消息來源)。所以,,Destination 實際上就是兩種類型的對象:Queue、Topic,。

Connection

        Connection 表示在客戶端和 JMS 系統(tǒng)之間建立的鏈接(對 TCP/IP socket 的包裝),。Connection 可以產(chǎn)生一個或多個 Session,。

Session

        Session 是我們對消息進行操作的接口,可以通過 session 創(chuàng)建生產(chǎn)者,、消費者,、消息等,。Session 提供了事務的功能,如果需要使用 session 發(fā)送/接收多個消息時,,可以將這些發(fā)送/接收動作放到一個事務中。

Producer

        Producer(消息生產(chǎn)者):消息生產(chǎn)者由 Session 創(chuàng)建,,并用于將消息發(fā)送到 Destination。同樣,,消息生產(chǎn)者分兩種類型:QueueSender和TopicPublisher,。可以調用消息生產(chǎn)者的方法(send或publish方法)發(fā)送消息,。

Consumer

        Consumer(消息消費者):消息消費者由 Session 創(chuàng)建,,用于接收被發(fā)送到 Destination 的消息。兩種類型:QueueReceiver 和 TopicSubscriber,??煞謩e通過 session 的 createReceiver(Queue)或createSubscriber(Topic)來創(chuàng)建。當然,,也可以 session 的 creatDurableSubscriber 方法來創(chuàng)建持久化的訂閱者,。

MessageListener

        消息監(jiān)聽器。如果注冊了消息監(jiān)聽器,,一旦消息到達,,將自動調用監(jiān)聽器的 onMessage 方法。EJB 中的 MDB(Message-Driven Bean)就是一種 MessageListener,。

三.消息隊列ActiveMQ

ActiveMQ 是由 Apache 出品的一款開源消息中間件,,旨在為應用程序提供高效、可擴展,、穩(wěn)定,、安全的企業(yè)級消息通信。它的設計目標是提供標準的,、面向消息的,、多語言的應用集成消息通信中間件,。ActiveMQ 實現(xiàn)了JMS 1.1 并提供了很多附加的特性,比如 JMX 管理,、主從管理,、消息組通信、消息優(yōu)先級,、延遲接收消息,、虛擬接收者、消息持久化,、消息隊列監(jiān)控等等,。

官網(wǎng):http://activemq./

解壓安裝后進入管理界面:localhost:8161  用戶名密碼均為:admin

3.1點對點模式

第一步:新建兩個Maven工程并都導入activemq坐標

<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.13.4</version></dependency>

第二步:編寫消息生產(chǎn)者

/** * 生產(chǎn)消費者模式:生產(chǎn)者 * @author Mr.song * @date 2019/05/24 20:50 */public class QueueProducerTest {
public static void main(String[] args) throws Exception { //1.獲取連接工廠 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); //2.從工廠獲取連接 Connection connection = connectionFactory.createConnection(); //3.啟動連接 connection.start(); //4.通過連接獲取會話: 參數(shù)1-是否支持事務, 參數(shù)2-消息的確認模式 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.使用會話創(chuàng)建隊列的目的地 Queue queue = session.createQueue("queue-demo"); //6.創(chuàng)建消息的生產(chǎn)這對象 MessageProducer producer = session.createProducer(queue); //7.創(chuàng)建消息內容(使用會話對象創(chuàng)建) TextMessage textMessage = session.createTextMessage("activeMQ的生產(chǎn)消費模型第一個消息來了"); //8.發(fā)送消息 producer.send(queue,textMessage); //9.釋放資源 producer.close(); session.close(); connection.close(); }}

第三步:編寫消息消費者

/** * ActiveMQ的生產(chǎn)消費模式-消費者 * @author Mr.song * @date 2019/05/25 15:15 */public class QueueConsumerTest { public static void main(String[] args) throws JMSException { //1.創(chuàng)建連接工廠 ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); //2.創(chuàng)建連接 Connection connection = factory.createConnection(); //3.啟動連接 connection.start(); //4.創(chuàng)建會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.創(chuàng)建消息目的地 Queue queue = session.createQueue("queue-demo"); //6.創(chuàng)建消息的消費者 MessageConsumer consumer = session.createConsumer(queue); //7.使用消費者接受消息:采用監(jiān)聽器輪詢接受消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { //將message進行轉換 try { TextMessage textMessage = (TextMessage) message; System.out.println("1號消費者:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); //8.如需,可釋放資源// consumer.close();// session.close();// connection.close(); }}

Tips:  創(chuàng)建session的兩個參數(shù):

  • 第一個 : 是否使用事務

  • 第二個 : 消息的確認模式

    • AUTO_ACKNOWLEDGE = 1 自動確認

    • CLIENT_ACKNOWLEDGE = 2 客戶端手動確

    • DUPS_OK_ACKNOWLEDGE = 3 自動批量確認

    • SESSION_TRANSACTED = 0 事務提交并確認

第四步:運行測試

3.2發(fā)布訂閱模式

第一步:新建兩個Maven工程并都導入activemq坐標

<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.13.4</version></dependency>

第二步:編寫消息生產(chǎn)者

/** * 發(fā)布訂閱模式的發(fā)布者 * @author Mr.song * @date 2019/05/25 15:36 */public class TopicProduceTest {
public static void main(String[] args) throws JMSException { //1.獲取連接工廠 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); //2.獲取連接 Connection connection = connectionFactory.createConnection(); //3.開啟連接 connection.start(); //4.獲取會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.創(chuàng)建消息隊列的目的地,創(chuàng)建的是發(fā)布訂閱模型的隊列 Topic topic = session.createTopic("topic-demo"); //6.創(chuàng)建消息的生產(chǎn)者對象 MessageProducer producer = session.createProducer(topic); //7.創(chuàng)建消息內容 TextMessage textMessage = session.createTextMessage("ActiveMQ的發(fā)布訂閱模型消息來了"); //8.發(fā)送消息,指定發(fā)布到哪個隊列 producer.send(topic,textMessage); //9.關閉資源 producer.close(); session.close(); connection.close(); }}

第三步:編寫消息消費者

/** * ActiveMQ發(fā)布訂閱模式的消費者 * @author Mr.song * @date 2019/05/25 15:49 */public class TopicConsumerTest { public static void main(String[] args) throws JMSException { //1.創(chuàng)建連接工廠 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); //2.創(chuàng)建連接 Connection connection = connectionFactory.createConnection(); //3.開啟連接 connection.start(); //4.創(chuàng)建會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.創(chuàng)建消費者目的地 Topic topic = session.createTopic("topic-demo"); //6.創(chuàng)建消費者 MessageConsumer consumer = session.createConsumer(topic); //7.使用消費者接受消息:使用監(jiān)聽器進行輪詢 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { //進行消息轉換 try { TextMessage textMessage = (TextMessage) message; System.out.println("訂閱到了消息:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); //8.如果需要,可以關閉資源// consumer.close();// session.close();// connection.close(); }}

第四步:運行測試

四.Spring整合JMS

ActiveMQ可以通過Spring的配置文件方式很容易嵌入到Spring應用中。

4.1點對點模式/發(fā)布訂閱模式

第一步:創(chuàng)建Maven工程并導入相關坐標

<!-- activemq start --><dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.11.2</version></dependency><dependency> <groupId>javax.jms</groupId> <artifactId>javax.jms-api</artifactId> <version>2.0.1</version></dependency><!-- activemq end --><!-- spring 與 mq整合 start --><dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.0.4.RELEASE</version></dependency><dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>5.0.4.RELEASE</version></dependency><dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>4.13</version></dependency><!-- spring 與 mq整合 end --><dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version></dependency><dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>5.0.4.RELEASE</version></dependency>

第二步:編寫消息監(jiān)聽器

/** * 生成消費模式,消息監(jiān)聽器 * @author Mr.song * @date 2019/05/25 16:20 */@Componentpublic class QueueListener implements MessageListener { /** * 獲取到消息進行相關的處理 * @param message */ @Override public void onMessage(Message message) { try {//1.消息轉型 MapMessage mapMessage = (MapMessage) message; String phone = mapMessage.getString("phone"); String code = mapMessage.getString("code"); System.out.println("消費者端得到的手機號及驗證碼是:"+phone+"=="+code); } catch (JMSException e) { e.printStackTrace(); } }}
//==================================================/** * 發(fā)布訂閱模式,消息監(jiān)聽器 * @author Mr.song * @date 2019/05/25 16:25 */@Componentpublic class TopicListener implements MessageListener { /** * 獲取到消息進行相關的處理 * @param message */ @Override public void onMessage(Message message) { try {//1.消息轉型和獲取 MapMessage mapMessage = (MapMessage) message; String phone = mapMessage.getString("phone"); String code = mapMessage.getString("code"); System.out.println("訂閱者獲得的手機號和驗證碼:"+phone+"=="+code); } catch (JMSException e) { e.printStackTrace(); } }}

第三步:編寫消息發(fā)布者配置文件(applicaitionContext-mq.xml)

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www./schema/beans" xmlns:xsi="http://www./2001/XMLSchema-instance" xmlns:amq="http://activemq./schema/core" xsi:schemaLocation="http://www./schema/beans http://www./schema/beans/spring-beans.xsd http://activemq./schema/core http://activemq./schema/core/activemq-core.xsd">
<!-- 1.配置連接工廠,ActiveMQ的連接工廠 --> <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin"/> <!-- 2.配置Spring支持會話緩存的連接工廠 --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 注入供應商的連接工廠 --> <property name="targetConnectionFactory" ref="amqConnectionFactory"/> <!-- 設置session緩存的大小: 100個會話 --> <property name="sessionCacheSize" value="100"/> </bean>
<!--=================== 通過配置,選擇點對點/發(fā)布訂閱模式 ======================= --> <!-- 3.配置Spring提供的jms模板 : 點對點模式 --> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 注入連接工廠,Spring的那個 --> <property name="connectionFactory" ref="connectionFactory"/> <!-- 指定是否是發(fā)布訂閱模型:false即是點對點模式 --> <property name="pubSubDomain" value="false"/> </bean>
<!-- 3.配置Spring提供的jms模板 : 發(fā)布訂閱模式 --> <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 注入工廠連接 --> <property name="connectionFactory" ref="connectionFactory"/> <!-- 指定是否是發(fā)布訂閱模型:true即是發(fā)布訂閱模式 --> <property name="pubSubDomain" value="true"/> </bean></beans>

第四步:編寫消息消費者配置文件(applicationContext-listener.xml)

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www./schema/beans" xmlns:xsi="http://www./2001/XMLSchema-instance" xmlns:context="http://www./schema/context" xmlns:amq="http://activemq./schema/core" xmlns:jms="http://www./schema/jms" xsi:schemaLocation="http://www./schema/beans http://www./schema/beans/spring-beans.xsd http://www./schema/context http://www./schema/context/spring-context.xsd http://www./schema/jms http://www./schema/jms/spring-jms.xsd http://activemq./schema/core http://activemq./schema/core/activemq-core.xsd">
<!-- 1.配置Spring容器啟動時要掃描的包 --> <context:component-scan base-package="cn.dintalk.listener"/>
<!-- 2.配置連接工廠:ActiveMQ的連接工廠 --> <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin"/> <!-- 3.配置Spring支持會話緩存的連接工廠 --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 注入連接工廠:供應商提供的 --> <property name="targetConnectionFactory" ref="amqConnectionFactory"/> <!-- 設置會話緩存大小: 100個會話 --> <property name="sessionCacheSize" value="100"/> </bean>
<!-- ================== 配置不同模式下的消息監(jiān)聽器 =================== --> <!-- 4.配置生產(chǎn)消費模式的監(jiān)聽器 --> <jms:listener-container destination-type="queue"> <!-- 配置監(jiān)聽器類,和消息目的地 --> <jms:listener destination="spring-queue" ref="queueListener"/> </jms:listener-container> <!-- 5.配置發(fā)布訂閱模式的監(jiān)聽器 --> <jms:listener-container destination-type="topic"> <jms:listener destination="spring-topic" ref="topicListener"/> </jms:listener-container></beans>

第五步:編寫消息生產(chǎn)者(兩種模式都有)

/** * Spring整合ActiveMQ的 生產(chǎn)/發(fā)布測試類 * @author Mr.song * @date 2019/05/25 16:34 */@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration("classpath:applicationContext-mq.xml")public class SpringActiveMQProducerTest {
@Autowired //點對點模式的jms模板 private JmsTemplate jmsQueueTemplate; @Autowired //發(fā)布訂閱模式的jms模板 private JmsTemplate jmsTopicTemplate; // 1.點對點模式 @Test public void testQueueProducer(){ jmsQueueTemplate.send("spring-queue", new MessageCreator() { /** * 消息生成器 * @param session * @return * @throws JMSException */ @Override public Message createMessage(Session session) throws JMSException { MapMessage mapMessage = session.createMapMessage(); mapMessage.setString("phone","12345678901"); mapMessage.setString("code","6542"); return mapMessage; } }); } //2.發(fā)布訂閱模式 @Test public void testTopicProducer(){ jmsTopicTemplate.send("spring-topic", new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { MapMessage mapMessage = session.createMapMessage(); mapMessage.setString("phone","12345678901"); mapMessage.setString("code","5648"); return mapMessage; } }); }}

第六步:編寫消息消費者(注冊監(jiān)聽器)(兩種模式都有)

/** * @author Mr.song * @date 2019/05/25 17:42 */public class SpringActiveMQConsumerTest { //1.加載配置文件,注冊消息監(jiān)聽器 public static void main(String[] args) { ClassPathXmlApplicationContext ac = new ClassPathXmlApplicationContext("classpath:applicationContext-listener.xml"); ac.start(); }}

第七步:運行測試

    轉藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多