- 浏览: 100825 次
- 性别:
- 来自: 北海
最新评论
-
July01:
最近了解到一款StratoIO打印控件,功能如下:1、Html ...
IE下利用js实现打印功能 -
sunxiyuan111:
试了第三种方法 ,管用,解决了大问题谢谢
jasperreports导出pdf报表时粗体的显示问题 -
java爱好者92:
主子报表帆软报表官网上的攻略说得很详细,可以参考一下
利用iReport制作子报表 -
fengjianquan9527:
狗屁不通,毛用不管
jasperreports导出pdf报表时粗体的显示问题 -
xiegqooo:
codeloafer 写道现在activemq5.6 以上ne ...
ActiveMQ集群应用
ActiveMQ基础实践
- 博客分类:
- 企业应用
一、背景知识
1.JMS
JMS即Java Message Service,Java消息服务,是一组应用程序接口规范,是Java平台中关于面向消息中间件(MOM)的API,用于在多个应用程序之间,或分布式系统中发送和接收消息,进行异步通信。JMS是一套与平台无关的API,目前绝大部分的MOM厂商都对JMS提供了支持。
(可以下载JMS的规范文档,里面详细介绍了JMS的更多特性)
2.ActiveMQ
ActiveMQ是Apache社区的开源消息产品,是一个完全支持JMS1.1和Java EE1.4规范的JMS Provider的实现。ActiveMQ支持多种语言和协议编写的客户端,如,语言:Java、C/C++、C#、Ruby、Perl、Python、PHP,协议:OpenWire、SSL、Stomp、TCP、UDP、XMPP、REST、NIO、WS等。ActiveMQ易于与中间件服务器、Spring等进行集成,是ServiceMix和Geronimo默认的消息系统。
更多ActiveMQ的特性与应用,访问:http://activemq.apache.org/index.html
二、实践环境配置
本教程使用的中间件服务器为Tomcat,三个Tomcat分别部署三个独立的应用,一个用于生产消息,另外两个用于消费消息;一个独立部署的ActiveMQ服务器。本教程使用的技术主要有:JMS、ActiveMQ、Spring IOC和Spring JMS等。
1.ActiveMQ的安装
到Apache官网下载ActiveMQ,解压下载的Apache ActiveMQ包,到${ACTIVEMQ_HOME}/bin目录运行activemq.bat,启动ActiveMQ服务。
2.Tomcat的配置
如果是在一台机器实践本教程,需要Tomcat的服务端口,否则会发生端口占用的异常,到${TOMCAT_HOME}/conf/server.xml下修改端口。
在Tomcat容器中配置JMS管理对象,JMS管理对象包括两个:ConnectionFactory和Destination,Destination包括Queue和Topic。配置在容器中的JMS管理对象,在应用中可以通过JNDI的方式获取。当然还有其他的方式,后面会有介绍。在${TOMCAT_HOME}/conf的context.xml文件中添加以下内容(三个Tomcat都要做相同的配置):
<Resource name="jms/QueueConnectionFactory" auth="Container" type="org.apache.activemq.ActiveMQConnectionFactory" description="JMS Queue ConnectionFactory" factory="org.apache.activemq.jndi.JNDIReferenceFactory" brokerURL="failover:(tcp://localhost:61616)?initialReconnectDelay=100" brokerName="QueueActiveMQBroker" /> <Resource name="jms/TopicConnectionFactory" auth="Container" type="org.apache.activemq.ActiveMQConnectionFactory" description="JMS Topic ConnectionFactory" factory="org.apache.activemq.jndi.JNDIReferenceFactory" brokerURL="failover:(tcp://localhost:61616)?initialReconnectDelay=100" brokerName="TopicActiveMQBroker" /> <Resource name="jms/TestQueue1" auth="Container" type="org.apache.activemq.command.ActiveMQQueue" description="Test JMS Queue" factory="org.apache.activemq.jndi.JNDIReferenceFactory" physicalName="testQueue1" /> <Resource name="jms/TestTopic1" auth="Container" type="org.apache.activemq.command.ActiveMQTopic" description="Test JMS Topic" factory="org.apache.activemq.jndi.JNDIReferenceFactory" physicalName="testTopic1" />
说明:上面配置了两个ConnectionFactory,一个用于Queue,一个用于Topic,配置了一个队列testQueue1和一个主题testTopic1;61616是ActiveMQ默认的监听连接的端口;failover连接机制可以在ActiveMQ服务down掉并恢复后尝试去连接ActiveMQ服务,initialReconnectDelay表示重新连接延迟时间。
配置完成后,我们可以启动其中的一个Tomcat,在浏览器中输入:http://localhost:61616/admin,就会看到ActiveMQ的web管理界面,分别点击Queues和Topics,可以看到我们创建的Queue和Topic,如下图:
三、编写应用
1.编写消息生产者应用
编写一个简单的消息生产者应用并部署到Tomcat。
(1)添加依赖包
添加ActiveMQ、Spring、JMS等依赖包,如下图所示:
说明:ActiveMQ的jar包在${ACTIVEMQ_HOME}/lib目录下可以找到;servlet的jar包不要添加到工程的WebContent\WEB-INF\lib路径下,因为我们部署到Tomcat的时候不需要这个包,只是在开发的时候需要,可以新建一个文件夹存放,然后build到path中。
(2)编写消息生产者类
编写两个消息生产者类,一个发送到Queue,另一个发送到Topic。例子中使用Spring的JMS模板类进行消息的发送。
import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; public class QueueMessageProducer { private JmsTemplate jmsTemplate; private Destination defaultDestination; public void sendMsg(final String message) { MessageCreator creator = new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }; jmsTemplate.send(defaultDestination, creator); } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } public void setDefaultDestination(Destination defaultDestination) { this.defaultDestination = defaultDestination; } }
import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; public class TopicMessageProducer { private JmsTemplate jmsTemplate; private Destination defaultDestination; public void sendMsg(final String message) { MessageCreator creator = new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }; jmsTemplate.send(defaultDestination, creator); } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } public void setDefaultDestination(Destination defaultDestination) { this.defaultDestination = defaultDestination; } }
(3)编写Spring配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <!-- 通过JNDI获取ConnectionFactory --> <bean id="queueConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean"> <property name="jndiName" value="java:comp/env/jms/QueueConnectionFactory" /> </bean> <bean id="topicConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean"> <property name="jndiName" value="java:comp/env/jms/TopicConnectionFactory" /> </bean> <!-- 通过JNDI获取Destination --> <bean id="testQueue1" class="org.springframework.jndi.JndiObjectFactoryBean"> <property name="jndiName" value="java:comp/env/jms/TestQueue1" /> </bean> <bean id="testTopic1" class="org.springframework.jndi.JndiObjectFactoryBean"> <property name="jndiName" value="java:comp/env/jms/TestTopic1" /> </bean> <!-- 配置JMS模板 --> <bean id="queueJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="queueConnectionFactory" /> </bean> <bean id="topicJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="topicConnectionFactory" /> </bean> <bean id="queueMessageProducer" class="com.zcl.mqproducer.service.QueueMessageProducer"> <property name="jmsTemplate" ref="queueJmsTemplate" /> <property name="defaultDestination" ref="testQueue1" /> </bean> <bean id="topicMessageProducer" class="com.zcl.mqproducer.service.TopicMessageProducer"> <property name="jmsTemplate" ref="topicJmsTemplate" /> <property name="defaultDestination" ref="testTopic1" /> </bean> </beans>
(4)编写Servlet类
编写一个Servlet类,响应客户端的请求。代码如下:
import java.io.IOException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.springframework.context.ApplicationContext; import org.springframework.web.context.support.WebApplicationContextUtils; import com.zcl.mqproducer.service.QueueMessageProducer; import com.zcl.mqproducer.service.TopicMessageProducer; public class MessageServlet extends HttpServlet { private static final long serialVersionUID = 3231052945892531021L; private QueueMessageProducer queueMsgProducer; private TopicMessageProducer topicMsgProducer; public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException { String param = request.getParameter("operate"); if("queueSend".equals(param)) { String message = request.getParameter("message"); queueMsgProducer.sendMsg(message); response.sendRedirect("index.jsp"); } else if("topicSend".equals(param)) { String message = request.getParameter("message"); topicMsgProducer.sendMsg(message); response.sendRedirect("index.jsp"); } else { response.sendRedirect("index.jsp"); } } public void doPost(HttpServletRequest request, HttpServletResponse response) throws IOException { doGet(request, response); } public void init() { ApplicationContext context = WebApplicationContextUtils.getRequiredWebApplicationContext(getServletContext()); queueMsgProducer = (QueueMessageProducer) context.getBean("queueMessageProducer"); topicMsgProducer = (TopicMessageProducer) context.getBean("topicMessageProducer"); } }
(5)编写JSP文件和web.xml文件
编写首页:index.jsp
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8" %> <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <title>欢迎页面</title> </head> <body> <a href="sendQueueMsg.jsp">发送Queue消息</a><br><br> <a href="sendTopicMsg.jsp">发送Topic消息</a><br> </body> </html>
编写发送Queue消息的页面:sendQueueMsg.jsp
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <title>发送Queue消息</title> </head> <body> <form method="POST" action="message?operate=queueSend"> <table> <tr> <td><textarea style="width:95%;" name="message"></textarea></td> </tr> <tr> <td><input type="submit" value="发送Queue消息" /></td> </tr> </table> </form> </body> </html>
编写发送Topic消息的页面:sendTopicMsg.jsp
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <title>发送Topic消息</title> </head> <body> <form method="POST" action="message?operate=topicSend"> <table> <tr> <td><textarea style="width:95%;" name="message"></textarea></td> </tr> <tr> <td><input type="submit" value="发送Topic消息" /></td> </tr> </table> </form> </body> </html>
编写web.xml文件,加载Spring配置文件和Servlet类。
<?xml version="1.0" encoding="UTF-8"?> <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" id="WebApp_ID" version="2.5"> <display-name>mqproducer</display-name> <!-- 加载spring配置文件 --> <context-param> <param-name>contextConfigLocation</param-name> <param-value>classpath:/spring/applicationContext.xml</param-value> </context-param> <servlet> <servlet-name>messageServlet</servlet-name> <servlet-class>com.zcl.mqproducer.servlet.MessageServlet</servlet-class> </servlet> <servlet-mapping> <servlet-name>messageServlet</servlet-name> <url-pattern>/message</url-pattern> </servlet-mapping> <!-- spring上下文监听器 --> <listener> <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class> </listener> <welcome-file-list> <welcome-file>index.jsp</welcome-file> </welcome-file-list> </web-app>
2.编写消息消费者应用
(1)添加依赖包
和消息生产者的依赖包相同。
(2)编写消息消费者类
编写两个消息消费者类,一个消费Queue消息,一个消费Topic消息。消费者类实现MessageListener接口。
import java.util.ArrayList; import java.util.List; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class QueueMessageConsumer implements MessageListener { private static List<String> queueMsgs = new ArrayList<String>(); public void onMessage(Message message) { if(message instanceof TextMessage) { TextMessage text = (TextMessage)message; try { handleMsg(text); } catch (JMSException e) { e.printStackTrace(); } } } public List<String> showAllQueueMsgs() { return queueMsgs; } private void handleMsg(TextMessage message) throws JMSException { queueMsgs.add(message.getText()); } }
import java.util.ArrayList;
import java.util.List; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class TopicMessageConsumer implements MessageListener { private List<String> topicMsgs = new ArrayList<String>(); public void onMessage(Message message) { if(message instanceof TextMessage) { TextMessage text = (TextMessage)message; try { handleMsg(text); } catch (JMSException e) { e.printStackTrace(); } } } public List<String> showAllTopicMsgs() { return topicMsgs; } private void handleMsg(TextMessage message) throws JMSException { topicMsgs.add(message.getText()); } }
(3)编写Spring配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <!-- 通过JNDI获取ConnectionFactory --> <bean id="queueConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean"> <property name="jndiName" value="java:comp/env/jms/QueueConnectionFactory" /> </bean> <bean id="topicConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean"> <property name="jndiName" value="java:comp/env/jms/TopicConnectionFactory" /> </bean> <!-- 通过JNDI获取Destination --> <bean id="testQueue1" class="org.springframework.jndi.JndiObjectFactoryBean"> <property name="jndiName" value="java:comp/env/jms/TestQueue1" /> </bean> <bean id="testTopic1" class="org.springframework.jndi.JndiObjectFactoryBean"> <property name="jndiName" value="java:comp/env/jms/TestTopic1" /> </bean> <bean id="queueMessageConsumer" class="com.zcl.mqconsumer01.service.QueueMessageConsumer" /> <bean id="topicMessageConsumer" class="com.zcl.mqconsumer01.service.TopicMessageConsumer" /> <bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="queueConnectionFactory" /> <property name="destination" ref="testQueue1" /> <property name="messageListener" ref="queueMessageConsumer" /> </bean> <bean id="topicListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="topicConnectionFactory" /> <property name="destination" ref="testTopic1" /> <property name="messageListener" ref="topicMessageConsumer" /> </bean> </beans>
说明:上面配置了ListenerContainer,这是一个消息监听器,从指定的Queue或Topic监听消息,自动接收消息,也可以使用JmsTemplate主动从Queue或Topic获取消息。
(4)编写Servlet类
编写一个Servlet类,响应客户端的请求。代码如下:
import java.io.IOException; import java.util.List; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.springframework.context.ApplicationContext; import org.springframework.web.context.support.WebApplicationContextUtils; import com.zcl.mqconsumer01.service.QueueMessageConsumer; import com.zcl.mqconsumer01.service.TopicMessageConsumer; public class MessageServlet extends HttpServlet { private static final long serialVersionUID = 1266665483325197008L; private QueueMessageConsumer queueMsgConsumer; private TopicMessageConsumer topicMsgConsumer; public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { String param = request.getParameter("operate"); if("queueReceive".equals(param)) { List<String> queueMsgs = queueMsgConsumer.showAllQueueMsgs(); request.setAttribute("queueMsgs", queueMsgs); getServletContext().getRequestDispatcher("/receiveQueueMsg.jsp").forward(request, response); } else if("topicReceive".equals(param)) { List<String> topicMsgs = topicMsgConsumer.showAllTopicMsgs(); request.setAttribute("topicMsgs", topicMsgs); getServletContext().getRequestDispatcher("/receiveTopicMsg.jsp").forward(request, response); } else { response.sendRedirect("index.jsp"); } } public void doPost(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { doGet(request, response); } public void init() { ApplicationContext context = WebApplicationContextUtils.getRequiredWebApplicationContext(getServletContext()); queueMsgConsumer = (QueueMessageConsumer) context.getBean("queueMessageConsumer"); topicMsgConsumer = (TopicMessageConsumer) context.getBean("topicMessageConsumer"); } }
(5)编写JSP文件和web.xml文件
编写首页:index.jsp
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8" %> <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <title>欢迎页面</title> </head> <body> <a href="message?operate=queueReceive">查看Queue接收消息</a><br><br> <a href="message?operate=topicReceive">查看Topic接收消息</a><br> </body> </html>
编写查看Queue消息界面:receiveQueueMsg.jsp
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <%@ taglib prefix="c" uri="http://java.sun.com/jsp/jstl/core" %> <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <title>Queue消息列表</title> </head> <body> <table> <thead> <tr> <td>Queue消息列表</td> </tr> </thead> <tbody> <c:forEach var="message" items="${queueMsgs}"> <tr> <td>${message}</td> </tr> </c:forEach> </tbody> </table> </body> </html>
编写查看Topic消息界面:receiveTopicMsg.jsp
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <%@ taglib prefix="c" uri="http://java.sun.com/jsp/jstl/core" %> <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <title>Topic消息列表</title> </head> <body> <table> <thead> <tr> <td>Topic消息列表</td> </tr> </thead> <tbody> <c:forEach var="message" items="${topicMsgs}"> <tr> <td>${message}</td> </tr> </c:forEach> </tbody> </table> </body> </html>
web.xml和消息生产者的工程一样。
3.部署应用
创建多一个消息消费者工程,内容和以上的消息消费者一样。把三个工程分包部署到三个Tomcat。启动ActiveMQ服务,再分别启动三个Tomcat,分别发送Queue消息和Topic消息,Queue消息一次只能有一个消费者,Topic消息是发送给所有的订阅者。
四、在应用中设置JMS管理对象
在上面的例子中,我们使用中间件服务器Tomcat配置JMS管理对象ConnectionFactory和Destination,我们也可以在应用配置JMS的管理对象,在Spring的配置文件配置。在JDBC中,针对数据库操作的重要资源Connection我们采用数据库连接池,以提高性能。同样的,在ActiveMQ中,针对Session和Connection,也有池的实现组件--activemq-pool,下面我们就使用activemq-pool来管理Connection和Session对象。
先注释之前在${TOMCAT_HOME}/conf/context.xml中配置的JMS管理对象,在应用的Spring配置文件中注释ConnectionFactory和Destination的配置,添加以下内容:
<bean id="queuePoolConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="failover:(tcp://localhost:61616)?initialReconnectDelay=100" /> </bean> </property> <property name="maxConnections" value="100" /> </bean> <bean id="topicPoolConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="failover:(tcp://localhost:61616)?initialReconnectDelay=100" /> </bean> </property> <property name="maxConnections" value="100" /> </bean> <bean id="testQueue1" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="testQueue1" /> </bean> <bean id="testTopic1" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="testTopic1" /> </bean>
更改JmsTemplate和ListenerContainer所引用的ConnectionFactory。到${ACTIVEMQ_HOME}/lib/optional目录下复制两个jar包activemq-pool-5.6.0.jar和commons-pool-1.5.6.jar到应用的WebContent\WEB-INF\lib路径下。重启三个Tomcat就可以了。
五、关于消息的持久化
ActiveMQ的消息默认是持久化的,默认采用内存数据库kahaDB。生产一个Queue消息,如果被消费者消费了,就会从数据库中删除,如果所有的消费者尚未激活,该消息则一直保存在数据库中,直到有任意一个消费者激活,ActiveMQ就会将该消息推送给该消费者。而生产一个Topic消息,所有连接到该Topic的订阅者都将收到该消息,如果当前没有连接到该Topic的订阅者,则该消息会丢失,就算后面有订阅者重新连接到该Topic也不会收到该消息,也就是说订阅者只能消费连接到Topic后的消息,而之前发送的Topic消息则不能消费。有时候为了保证订阅者因失效而重新连接该Topic后也能消费之前发送的消息,我们需要对Topic消息的订阅者做一些处理,后面会谈到。
打开${ACTIVEMQ_HOME}/conf/activemq.xml文件,找到:
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
说明ActiveMQ默认是采用kahaDB进行持久化的,我们也可以关闭消息的持久化,在<broker>节点处添加:
Persistent=”false”
我们把ActiveMQ的持久化介质改为mysql,需要在${ACTIVEMQ_HOME}/conf/activemq.xml文件的<broker>节点之外添加(可以参考${ACTIVEMQ_HOME}/conf/activemq-jdbc.xml文件):
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost:3306/activemq_jdbc?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="root"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
这里使用的是dbcp数据源,也可以更改其它的。然后注释掉:
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
添加jdbc持久化适配器:
<persistenceAdapter>
<jdbcPersistenceAdapter dataDirectory="${activemq.base}/data/activemq_jdbc/kahadb" dataSource="#mysql-ds" />
</persistenceAdapter>
添加mysql驱动包和dbcp的依赖包到${ACTIVEMQ_HOME}/lib目录下,在mysql中创建一个数据库,如:activemq_jdbc,重新启动ActiveMQ服务,就会在mysql数据库中生成三张表,如下:
activemq_acks:ActiveMQ的签收信息;
activemq_lock:ActiveMQ的锁信息;
activemq_msgs:ActiveMQ的消息的信息。
重启消息生产者的Tomcat,关闭所有消息消费者的Tomcat,利用消息生成者发送几个消息,然后查看activemq_msgs表的数据,就会看到发送的消息,如下:
在订阅者尚未连接到发布者之前,发布者之前发布的消息不能被后来重新激活连接的订阅者消费,订阅者只能消费激活连接后发布者发布的消息。但我们可以改变这种情况,获得更高的消息可靠性。
我们需要的订阅者端设置一个客户端ID(只能针对Topic消息),如(红色部分):
<bean id="topicPoolConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="failover:(tcp://localhost:61616)?initialReconnectDelay=100" />
<property name="useAsyncSend" value="true" />
<property name="clientID" value="consumer_02" />
</bean>
</property>
<property name="maxConnections" value="100" />
</bean>
然后在监听容器中设置一些属性,如下(红色部分):
<bean id="topicListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="topicPoolConnectionFactory" />
<property name="destination" ref="testTopic1" />
<property name="messageListener" ref="topicMessageConsumer" />
<!-- 开启发布订阅模式 -->
<property name="pubSubDomain" value="true" />
<!-- 开启订阅持久化 -->
<property name="subscriptionDurable" value="true"/>
<property name="clientId" value="consumer_02" />
<property name="durableSubscriptionName" value="consumer_02" />
<property name="sessionAcknowledgeModeName" value="AUTO_ACKNOWLEDGE"/>
</bean>
重启订阅者的Tomcat进行对比测试,就能看见效果。
六、关于消息的安全配置
以上的JMS应用都是不加入安全机制的,任何连入Internet的人,只要知道消息服务的具体地址(包括IP、端口、消息地址),就可以随意地发送和接收消息,这会造成非常严重的后果。
ActiveMQ支持可插拔的安全机制,只要在${ACTIVEMQ_HOME}/conf/activemq.xml配置即可。以下采用JAAS的安全机制去配置ActiveMQ的验证与授权。在activemq.xml的broker节点中添加如下内容:
<!-- 添加认证与授权 --> <plugins> <!-- 采用JAAS的管理机制配置角色权限 --> <jaasAuthenticationPlugin configuration="activemq-domain" /> <authorizationPlugin> <map> <authorizationMap> <authorizationEntries> <authorizationEntry queue=">" read="admins" write="admins" admin="admins" /> <authorizationEntry topic=">" read="admins" write="admins" admin="admins" /> <authorizationEntry queue="ActiveMQ.Advisory.>" read="guests,users" write="guests,users" admin="guests,users" /> <authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users" write="guests,users" admin="guests,users" /> <authorizationEntry queue="testQueue1" read="guests" write=" users " admin="users,guests" /> <authorizationEntry topic="testTopic1" read="guests" write="users" admin="users,guests" /> </authorizationEntries> </authorizationMap> </map> </authorizationPlugin> </plugins>
说明:read,write,admin(里面的内容表示都是用户组)表示的意思:
“>”是通配符,代表所有;
ActiveMQ.Advisory.>是ActiveMQ默认创建的Destination(包括Queue和Topic);
配置中用到了三个用户组:admins,users,guests。
在${ACTIVEMQ_HOME}/conf目录下创建一个login.config文件,如下:
activemq-domain { org.apache.activemq.jaas.PropertiesLoginModule required debug=true org.apache.activemq.jaas.properties.user="users.properties" org.apache.activemq.jaas.properties.group="groups.properties"; };
在${ACTIVEMQ_HOME}/conf目录下创建一个users.properties文件,用于存放用户和密码,如:
admin=admin keven=keven zyq=zyq
在${ACTIVEMQ_HOME}/conf目录下创建一个groups.properties文件,用于存放用户组和对应的用户,如:
admins=admin users=keven guests=zyq
修改Spring配置文件,在<bean id=”connectionFactory”…>中设置访问的用户名和密码,如:
<property name="userName" value="zyq" /> <property name="password" value="zyq" />
这里设置的访问用户要和activemq.xml文件中的配置对应,如果设置不正确,在启动Tomcat后发送消息时会在ActiveMQ的控制台看到提示信息。
相关推荐
Java初学者:通过学习和实践这些项目,您将能够快速掌握SSM框架的基础知识和核心技术。 中高级开发者:这些项目将为您提供丰富的实战经验和灵感,帮助您提升技术水平和解决问题的能力。 项目经理和架构师:这些项目...
spring boot 实践学习案例,与其它组件结合如 mybatis、jpa、dubbo、redis、mongodb、memcached、kafka、rabbitmq、activemq、elasticsearch、security、shiro等 #### Spring Boot 版本 - 2.0.3.RELEASE #### 模块...
Linux面试专题及答案+ActiveMQ消息中间件面试专题+Java基础面试题+MySQL性能优化的21个最佳实践+微服务面试专题及答案+深入理解java虚拟机+设计模式面试专题及答案+开源框架面试专题及答案+并发编程及答案+Spring...
内容概要:通过带着读者基于 Redis ...阅读建议:此资源以开发报文过滤系统学习其原理和内核,不仅是代码编写实现也更注重内容上的需求分析和方案设计,所以在学习的过程要结合这些内容一起来实践,并调试对应的代码。
本章主要介绍和解决以下问题,这些也是全书的基础: HTTP协议的工作方式与HTTP网络协议栈的结构。 如何实现基于HTTP协议和TCP协议的RPC调用,它们之间有何差别,分别适应何种场景。 如何实现服务的动态注册...
MySQL性能优化的21个最佳实践 Spring面试专题及答案整理文档 一线互联网企业面试题(仅参考未整理答案) 分布式数据库面试专题系列:Memcached+Redis+MongoDB 分布式通讯面试专题系列:ActiveMQ+RabbitMQ+Kafka ...
ActiveMQ+RabbitMQ+Kafka、分布式限流面试专题系列:Nginx+zookeeper、集合、开源框架面试题系列:Spring+SpringMVC+MyBatis、23种设计模式知识要点整理、Dubbo、Dubbo服务框架面试专题及答案整理文档、java筑基...
消息中间件面试.pdf 微服务面试.pdf 数据库面试.pdf 设计模式面试.pdf 乐观锁与悲观锁.pdf 开源框架面试.pdf 多线程面试.pdf 并发编程面试(下).pdf 并发编程(上).pdf zookeeper面试.pdf ...ActiveMQ消息中间件.p
ActiveMQ消息中间件面试专题 Dubbo面试及答案(上) Dubbo面试专题及答案(下) Java并发体系知识导图笔记.xmind java后端面试题答案 Java基础面试题 JVM面试专题及答案 Kafka面试专题及答案 Linux面试专题及答案 ...
ActiveMQ消息中间件面试专题.pdf Dubbo面试专题及答案(下).pdf Dubbo面试及答案(上).pdf java后端面试题答案.pdf Java基础面试题.pdf java多线程并发编程知识导图笔记.xmind Java并发体系知识导图笔记.xmind JVM...
Java基础 多线程与并发编程 算法数据结构 SpringCloud Zookeeper Dubbo NoSQL MongoDB MQ ActiveMQ RabbitMQ RocketMQ Kafka 分布式事务 Tomcat Docker MySQL Jenkins Maven Gradle Git Swagger Linux 读书清单 ...
│ 第09节:搭建基础的开发环境.avi │ 第10节:Spring+Mybatis实现DAO.avi │ 第11节:Mybatis的分页实现.avi │ 第12节:Service的实现以及模块化.avi │ 第13节:Spring MVC实现Web层开发.avi │ 第14节:新增和...