`
bh三少
  • 浏览: 100825 次
  • 性别: Icon_minigender_1
  • 来自: 北海
社区版块
存档分类
最新评论

ActiveMQ基础实践

阅读更多

一、背景知识

1.JMS

JMSJava Message ServiceJava消息服务,是一组应用程序接口规范,是Java平台中关于面向消息中间件(MOM)的API,用于在多个应用程序之间,或分布式系统中发送和接收消息,进行异步通信。JMS是一套与平台无关的API,目前绝大部分的MOM厂商都对JMS提供了支持。

(可以下载JMS的规范文档,里面详细介绍了JMS的更多特性)

2.ActiveMQ

ActiveMQApache社区的开源消息产品,是一个完全支持JMS1.1Java EE1.4规范的JMS Provider的实现。ActiveMQ支持多种语言和协议编写的客户端,如,语言:JavaC/C++C#RubyPerlPythonPHP,协议:OpenWireSSLStompTCPUDPXMPPRESTNIOWS等。ActiveMQ易于与中间件服务器、Spring等进行集成,是ServiceMixGeronimo默认的消息系统。

更多ActiveMQ的特性与应用,访问:http://activemq.apache.org/index.html

二、实践环境配置

    本教程使用的中间件服务器为Tomcat,三个Tomcat分别部署三个独立的应用,一个用于生产消息,另外两个用于消费消息;一个独立部署的ActiveMQ服务器。本教程使用的技术主要有:JMSActiveMQSpring IOCSpring JMS等。

1.ActiveMQ的安装

    Apache官网下载ActiveMQ,解压下载的Apache ActiveMQ包,到${ACTIVEMQ_HOME}/bin目录运行activemq.bat,启动ActiveMQ服务。

2.Tomcat的配置

如果是在一台机器实践本教程,需要Tomcat的服务端口,否则会发生端口占用的异常,到${TOMCAT_HOME}/conf/server.xml下修改端口。

    Tomcat容器中配置JMS管理对象,JMS管理对象包括两个:ConnectionFactoryDestinationDestination包括QueueTopic。配置在容器中的JMS管理对象,在应用中可以通过JNDI的方式获取。当然还有其他的方式,后面会有介绍。在${TOMCAT_HOME}/confcontext.xml文件中添加以下内容(三个Tomcat都要做相同的配置):

<Resource name="jms/QueueConnectionFactory"
		auth="Container"
		type="org.apache.activemq.ActiveMQConnectionFactory"
		description="JMS Queue ConnectionFactory"
		factory="org.apache.activemq.jndi.JNDIReferenceFactory"
		brokerURL="failover:(tcp://localhost:61616)?initialReconnectDelay=100"
		brokerName="QueueActiveMQBroker" />
<Resource name="jms/TopicConnectionFactory"
		auth="Container"
		type="org.apache.activemq.ActiveMQConnectionFactory"
		description="JMS Topic ConnectionFactory"
		factory="org.apache.activemq.jndi.JNDIReferenceFactory"
		brokerURL="failover:(tcp://localhost:61616)?initialReconnectDelay=100"
		brokerName="TopicActiveMQBroker" />
<Resource name="jms/TestQueue1"
		auth="Container"
		type="org.apache.activemq.command.ActiveMQQueue"
		description="Test JMS Queue"
		factory="org.apache.activemq.jndi.JNDIReferenceFactory"
		physicalName="testQueue1" />
<Resource name="jms/TestTopic1"
		auth="Container"
		type="org.apache.activemq.command.ActiveMQTopic"
		description="Test JMS Topic"
		factory="org.apache.activemq.jndi.JNDIReferenceFactory"
		physicalName="testTopic1" />

说明:上面配置了两个ConnectionFactory,一个用于Queue,一个用于Topic,配置了一个队列testQueue1和一个主题testTopic161616ActiveMQ默认的监听连接的端口;failover连接机制可以在ActiveMQ服务down掉并恢复后尝试去连接ActiveMQ服务,initialReconnectDelay表示重新连接延迟时间。
    配置完成后,我们可以启动其中的一个Tomcat,在浏览器中输入:http://localhost:61616/admin,就会看到ActiveMQweb管理界面,分别点击QueuesTopics,可以看到我们创建的QueueTopic,如下图:

三、编写应用

1.编写消息生产者应用

    编写一个简单的消息生产者应用并部署到Tomcat

(1)添加依赖包

    添加ActiveMQSpringJMS等依赖包,如下图所示:

说明:ActiveMQjar包在${ACTIVEMQ_HOME}/lib目录下可以找到;servletjar包不要添加到工程的WebContent\WEB-INF\lib路径下,因为我们部署到Tomcat的时候不需要这个包,只是在开发的时候需要,可以新建一个文件夹存放,然后buildpath中。

(2)编写消息生产者类

    编写两个消息生产者类,一个发送到Queue,另一个发送到Topic。例子中使用SpringJMS模板类进行消息的发送。

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class QueueMessageProducer {

	private JmsTemplate jmsTemplate;
	
	private Destination defaultDestination;
	
	public void sendMsg(final String message) {
		MessageCreator creator = new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				return session.createTextMessage(message);
			}
		};
		jmsTemplate.send(defaultDestination, creator);
	}

	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}
	public void setDefaultDestination(Destination defaultDestination) {
		this.defaultDestination = defaultDestination;
	}
}
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class TopicMessageProducer {

	private JmsTemplate jmsTemplate;
	
	private Destination defaultDestination;
	
	public void sendMsg(final String message) {
		MessageCreator creator = new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				return session.createTextMessage(message);
			}
		};
		jmsTemplate.send(defaultDestination, creator);
	}

	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}

	public void setDefaultDestination(Destination defaultDestination) {
		this.defaultDestination = defaultDestination;
	}
}

 (3)编写Spring配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:amq="http://activemq.apache.org/schema/core"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans					http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
						http://activemq.apache.org/schema/core
				http://activemq.apache.org/schema/core/activemq-core.xsd">
	
	<!-- 通过JNDI获取ConnectionFactory -->
	<bean id="queueConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
		<property name="jndiName" value="java:comp/env/jms/QueueConnectionFactory" />
	</bean>
	<bean id="topicConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
		<property name="jndiName" value="java:comp/env/jms/TopicConnectionFactory" />
	</bean>
	
	<!-- 通过JNDI获取Destination -->
	<bean id="testQueue1" class="org.springframework.jndi.JndiObjectFactoryBean">
		<property name="jndiName" value="java:comp/env/jms/TestQueue1" />
	</bean>
	<bean id="testTopic1" class="org.springframework.jndi.JndiObjectFactoryBean">
		<property name="jndiName" value="java:comp/env/jms/TestTopic1" />
	</bean>
	
	<!-- 配置JMS模板 -->
	<bean id="queueJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="queueConnectionFactory" />
	</bean>
	<bean id="topicJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="topicConnectionFactory" />
	</bean>
	
	<bean id="queueMessageProducer" class="com.zcl.mqproducer.service.QueueMessageProducer">
		<property name="jmsTemplate" ref="queueJmsTemplate" />
		<property name="defaultDestination" ref="testQueue1" />
	</bean>
	<bean id="topicMessageProducer" class="com.zcl.mqproducer.service.TopicMessageProducer">
		<property name="jmsTemplate" ref="topicJmsTemplate" />
		<property name="defaultDestination" ref="testTopic1" />
	</bean>
</beans>

 (4)编写Servlet

    编写一个Servlet类,响应客户端的请求。代码如下:

import java.io.IOException;

import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.springframework.context.ApplicationContext;
import org.springframework.web.context.support.WebApplicationContextUtils;

import com.zcl.mqproducer.service.QueueMessageProducer;
import com.zcl.mqproducer.service.TopicMessageProducer;

public class MessageServlet extends HttpServlet {

	private static final long serialVersionUID = 3231052945892531021L;

	private QueueMessageProducer queueMsgProducer;
	
	private TopicMessageProducer topicMsgProducer;
	
	public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
		String param = request.getParameter("operate");
		if("queueSend".equals(param)) {
			String message = request.getParameter("message");
			queueMsgProducer.sendMsg(message);
			response.sendRedirect("index.jsp");
		} else if("topicSend".equals(param)) {
			String message = request.getParameter("message");
			topicMsgProducer.sendMsg(message);
			response.sendRedirect("index.jsp");
		} else {
			response.sendRedirect("index.jsp");
		}
	}
	
	public void doPost(HttpServletRequest request, HttpServletResponse response) throws IOException {
		doGet(request, response);
	}
	
	public void init() {
		ApplicationContext context = WebApplicationContextUtils.getRequiredWebApplicationContext(getServletContext());
		queueMsgProducer = (QueueMessageProducer) context.getBean("queueMessageProducer");
		topicMsgProducer = (TopicMessageProducer) context.getBean("topicMessageProducer");
	}
}

 (5)编写JSP文件和web.xml文件

编写首页:index.jsp

 

<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8" %>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
	<head>
		<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
		<title>欢迎页面</title>
	</head>
	<body>
		<a href="sendQueueMsg.jsp">发送Queue消息</a><br><br>
		<a href="sendTopicMsg.jsp">发送Topic消息</a><br>
	</body>
</html>

 

 编写发送Queue消息的页面:sendQueueMsg.jsp

 

<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
	<head>
		<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
		<title>发送Queue消息</title>
	</head>
	<body>
		<form method="POST" action="message?operate=queueSend">
			<table>
				<tr>
					<td><textarea style="width:95%;" name="message"></textarea></td>
				</tr>
				<tr>
					<td><input type="submit" value="发送Queue消息" /></td>
				</tr>
			</table>
		</form>
	</body>
</html>

 

 编写发送Topic消息的页面:sendTopicMsg.jsp

 

<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
	<head>
		<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
		<title>发送Topic消息</title>
	</head>
	<body>
		<form method="POST" action="message?operate=topicSend">
			<table>
				<tr>
					<td><textarea style="width:95%;" name="message"></textarea></td>
				</tr>
				<tr>
					<td><input type="submit" value="发送Topic消息" /></td>
				</tr>
			</table>
		</form>
	</body>
</html>

 

 编写web.xml文件,加载Spring配置文件和Servlet类。

 

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" id="WebApp_ID" version="2.5">
  <display-name>mqproducer</display-name>
  <!-- 加载spring配置文件 -->
	<context-param>
		<param-name>contextConfigLocation</param-name>
		<param-value>classpath:/spring/applicationContext.xml</param-value>
	</context-param>
	
	 <servlet>
  		<servlet-name>messageServlet</servlet-name>
  		<servlet-class>com.zcl.mqproducer.servlet.MessageServlet</servlet-class>
  	</servlet>
  	<servlet-mapping>
  		<servlet-name>messageServlet</servlet-name>
  		<url-pattern>/message</url-pattern>
  	</servlet-mapping>
	
	<!-- spring上下文监听器 -->
	<listener>
		<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
	</listener>
  
  	<welcome-file-list>
    	<welcome-file>index.jsp</welcome-file>
  	</welcome-file-list>
</web-app>

 

2.编写消息消费者应用

(1)添加依赖包

    和消息生产者的依赖包相同。

(2)编写消息消费者类

    编写两个消息消费者类,一个消费Queue消息,一个消费Topic消息。消费者类实现MessageListener接口。

 

import java.util.ArrayList;
import java.util.List;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class QueueMessageConsumer implements MessageListener {

	private static List<String> queueMsgs = new ArrayList<String>();
	
	public void onMessage(Message message) {
		if(message instanceof TextMessage) {
			TextMessage text = (TextMessage)message;
			try {
				handleMsg(text);
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}
	
	public List<String> showAllQueueMsgs() {
		return queueMsgs;
	}
	
	private void handleMsg(TextMessage message) throws JMSException {
		queueMsgs.add(message.getText());
	}
}

 import java.util.ArrayList;

import java.util.List;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class TopicMessageConsumer implements MessageListener {
	
	private List<String> topicMsgs = new ArrayList<String>();

	public void onMessage(Message message) {
		if(message instanceof TextMessage) {
			TextMessage text = (TextMessage)message;
			try {
				handleMsg(text);
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}
	
	public List<String> showAllTopicMsgs() {
		return topicMsgs;
	}
	
	private void handleMsg(TextMessage message) throws JMSException {
		topicMsgs.add(message.getText());
	}
}

 (3)编写Spring配置文件

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:amq="http://activemq.apache.org/schema/core"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
			http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
			http://activemq.apache.org/schema/core
			http://activemq.apache.org/schema/core/activemq-core.xsd">
	
	<!-- 通过JNDI获取ConnectionFactory -->
	<bean id="queueConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
		<property name="jndiName" value="java:comp/env/jms/QueueConnectionFactory" />
	</bean>
	<bean id="topicConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
		<property name="jndiName" value="java:comp/env/jms/TopicConnectionFactory" />
	</bean>
	
	<!-- 通过JNDI获取Destination -->
	<bean id="testQueue1" class="org.springframework.jndi.JndiObjectFactoryBean">
		<property name="jndiName" value="java:comp/env/jms/TestQueue1" />
	</bean>
	<bean id="testTopic1" class="org.springframework.jndi.JndiObjectFactoryBean">
		<property name="jndiName" value="java:comp/env/jms/TestTopic1" />
	</bean>
	
	<bean id="queueMessageConsumer" class="com.zcl.mqconsumer01.service.QueueMessageConsumer" />
	<bean id="topicMessageConsumer" class="com.zcl.mqconsumer01.service.TopicMessageConsumer" />
	
	<bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="queueConnectionFactory" />
		<property name="destination" ref="testQueue1" />
		<property name="messageListener" ref="queueMessageConsumer" />
	</bean>
	<bean id="topicListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="topicConnectionFactory" />
		<property name="destination" ref="testTopic1" />
		<property name="messageListener" ref="topicMessageConsumer" />
	</bean>
</beans>

 

 说明:上面配置了ListenerContainer,这是一个消息监听器,从指定的QueueTopic监听消息,自动接收消息,也可以使用JmsTemplate主动从QueueTopic获取消息。

(4)编写Servlet

    编写一个Servlet类,响应客户端的请求。代码如下:

 

import java.io.IOException;
import java.util.List;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.springframework.context.ApplicationContext;
import org.springframework.web.context.support.WebApplicationContextUtils;

import com.zcl.mqconsumer01.service.QueueMessageConsumer;
import com.zcl.mqconsumer01.service.TopicMessageConsumer;

public class MessageServlet extends HttpServlet {

	private static final long serialVersionUID = 1266665483325197008L;
	
	private QueueMessageConsumer queueMsgConsumer;
	
	private TopicMessageConsumer topicMsgConsumer;

	public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
		String param = request.getParameter("operate");
		if("queueReceive".equals(param)) {
			List<String> queueMsgs = queueMsgConsumer.showAllQueueMsgs();
			request.setAttribute("queueMsgs", queueMsgs);
			getServletContext().getRequestDispatcher("/receiveQueueMsg.jsp").forward(request, response);
		} else if("topicReceive".equals(param)) {
			List<String> topicMsgs = topicMsgConsumer.showAllTopicMsgs();
			request.setAttribute("topicMsgs", topicMsgs);
			getServletContext().getRequestDispatcher("/receiveTopicMsg.jsp").forward(request, response);
		} else {
			response.sendRedirect("index.jsp");
		}
	}
	
	public void doPost(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
		doGet(request, response);
	}
	
	public void init() {
		ApplicationContext context = WebApplicationContextUtils.getRequiredWebApplicationContext(getServletContext());
		queueMsgConsumer = (QueueMessageConsumer) context.getBean("queueMessageConsumer");
		topicMsgConsumer = (TopicMessageConsumer) context.getBean("topicMessageConsumer");
	}
}

 (5)编写JSP文件和web.xml文件

编写首页:index.jsp

 

<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8" %>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
	<head>
		<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
		<title>欢迎页面</title>
	</head>
	<body>
		<a href="message?operate=queueReceive">查看Queue接收消息</a><br><br>
		<a href="message?operate=topicReceive">查看Topic接收消息</a><br>
	</body>
</html>

 

 编写查看Queue消息界面:receiveQueueMsg.jsp

 

<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
<%@ taglib prefix="c" uri="http://java.sun.com/jsp/jstl/core" %>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
	<head>
		<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
		<title>Queue消息列表</title>
	</head>
	<body>
		<table>
			<thead>
				<tr>
					<td>Queue消息列表</td>
				</tr>
			</thead>
			<tbody>
				<c:forEach var="message" items="${queueMsgs}">
				<tr>
					<td>${message}</td>
				</tr>
				</c:forEach>
			</tbody>
		</table>
	</body>
</html>

 

 编写查看Topic消息界面:receiveTopicMsg.jsp

 

<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
<%@ taglib prefix="c" uri="http://java.sun.com/jsp/jstl/core" %>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
	<head>
		<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
		<title>Topic消息列表</title>
	</head>
	<body>
		<table>
			<thead>
				<tr>
					<td>Topic消息列表</td>
				</tr>
			</thead>
			<tbody>
				<c:forEach var="message" items="${topicMsgs}">
				<tr>
					<td>${message}</td>
				</tr>
				</c:forEach>
			</tbody>
		</table>
	</body>
</html>

 

         web.xml和消息生产者的工程一样。

3.部署应用

    创建多一个消息消费者工程,内容和以上的消息消费者一样。把三个工程分包部署到三个Tomcat。启动ActiveMQ服务,再分别启动三个Tomcat,分别发送Queue消息和Topic消息,Queue消息一次只能有一个消费者,Topic消息是发送给所有的订阅者。

四、在应用中设置JMS管理对象

 

在上面的例子中,我们使用中间件服务器Tomcat配置JMS管理对象ConnectionFactoryDestination,我们也可以在应用配置JMS的管理对象,在Spring的配置文件配置。在JDBC中,针对数据库操作的重要资源Connection我们采用数据库连接池,以提高性能。同样的,在ActiveMQ中,针对SessionConnection,也有池的实现组件--activemq-pool,下面我们就使用activemq-pool来管理ConnectionSession对象。

 

    先注释之前在${TOMCAT_HOME}/conf/context.xml中配置的JMS管理对象,在应用的Spring配置文件中注释ConnectionFactoryDestination的配置,添加以下内容:

 

<bean id="queuePoolConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
		<property name="connectionFactory">
			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
				<property name="brokerURL" value="failover:(tcp://localhost:61616)?initialReconnectDelay=100" />
			</bean>
		</property>
		<property name="maxConnections" value="100" />
</bean>
<bean id="topicPoolConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
		<property name="connectionFactory">
			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
				<property name="brokerURL" value="failover:(tcp://localhost:61616)?initialReconnectDelay=100" />
			</bean>
		</property>
		<property name="maxConnections" value="100" />
</bean>
	
<bean id="testQueue1" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg value="testQueue1" />
</bean>
<bean id="testTopic1" class="org.apache.activemq.command.ActiveMQTopic">
		<constructor-arg value="testTopic1" />
</bean>

 

更改JmsTemplateListenerContainer所引用的ConnectionFactory。到${ACTIVEMQ_HOME}/lib/optional目录下复制两个jaractivemq-pool-5.6.0.jarcommons-pool-1.5.6.jar到应用的WebContent\WEB-INF\lib路径下。重启三个Tomcat就可以了。

五、关于消息的持久化

 

ActiveMQ的消息默认是持久化的,默认采用内存数据库kahaDB。生产一个Queue消息,如果被消费者消费了,就会从数据库中删除,如果所有的消费者尚未激活,该消息则一直保存在数据库中,直到有任意一个消费者激活,ActiveMQ就会将该消息推送给该消费者。而生产一个Topic消息,所有连接到该Topic的订阅者都将收到该消息,如果当前没有连接到该Topic的订阅者,则该消息会丢失,就算后面有订阅者重新连接到该Topic也不会收到该消息,也就是说订阅者只能消费连接到Topic后的消息,而之前发送的Topic消息则不能消费。有时候为了保证订阅者因失效而重新连接该Topic后也能消费之前发送的消息,我们需要对Topic消息的订阅者做一些处理,后面会谈到。

打开${ACTIVEMQ_HOME}/conf/activemq.xml文件,找到:

<persistenceAdapter>

            <kahaDB directory="${activemq.data}/kahadb"/>

 

</persistenceAdapter>

 

说明ActiveMQ默认是采用kahaDB进行持久化的,我们也可以关闭消息的持久化,在<broker>节点处添加:

 

Persistent=”false”

我们把ActiveMQ的持久化介质改为mysql,需要在${ACTIVEMQ_HOME}/conf/activemq.xml文件的<broker>节点之外添加(可以参考${ACTIVEMQ_HOME}/conf/activemq-jdbc.xml文件):

 

<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
		<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
		<property name="url" value="jdbc:mysql://localhost:3306/activemq_jdbc?relaxAutoCommit=true"/>
		<property name="username" value="root"/>
		<property name="password" value="root"/>
		<property name="maxActive" value="200"/>
		<property name="poolPreparedStatements" value="true"/>
</bean>

 这里使用的是dbcp数据源,也可以更改其它的。然后注释掉:

 

<persistenceAdapter>

            <kahaDB directory="${activemq.data}/kahadb"/>

</persistenceAdapter>

添加jdbc持久化适配器:

 

<persistenceAdapter>
     <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data/activemq_jdbc/kahadb" dataSource="#mysql-ds" />
</persistenceAdapter>

 

添加mysql驱动包和dbcp的依赖包到${ACTIVEMQ_HOME}/lib目录下,在mysql中创建一个数据库,如:activemq_jdbc,重新启动ActiveMQ服务,就会在mysql数据库中生成三张表,如下:

 

activemq_acksActiveMQ的签收信息;

activemq_lock:ActiveMQ的锁信息;

activemq_msgs:ActiveMQ的消息的信息。

 

重启消息生产者的Tomcat,关闭所有消息消费者的Tomcat,利用消息生成者发送几个消息,然后查看activemq_msgs表的数据,就会看到发送的消息,如下:

 

 

在订阅者尚未连接到发布者之前,发布者之前发布的消息不能被后来重新激活连接的订阅者消费,订阅者只能消费激活连接后发布者发布的消息。但我们可以改变这种情况,获得更高的消息可靠性。

 

   我们需要的订阅者端设置一个客户端ID(只能针对Topic消息),如(红色部分):

 

<bean id="topicPoolConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
		<property name="connectionFactory">
			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
				<property name="brokerURL" value="failover:(tcp://localhost:61616)?initialReconnectDelay=100" />
				<property name="useAsyncSend" value="true" />
				<property name="clientID" value="consumer_02" />
			</bean>
		</property>
		<property name="maxConnections" value="100" />
</bean>

 

 然后在监听容器中设置一些属性,如下(红色部分):

 

<bean id="topicListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="topicPoolConnectionFactory" />
		<property name="destination" ref="testTopic1" />
		<property name="messageListener" ref="topicMessageConsumer" />
		<!-- 开启发布订阅模式 -->
		<property name="pubSubDomain" value="true" />
		<!-- 开启订阅持久化 -->
		<property name="subscriptionDurable" value="true"/>
		<property name="clientId" value="consumer_02" />
		<property name="durableSubscriptionName" value="consumer_02" />
		<property name="sessionAcknowledgeModeName" value="AUTO_ACKNOWLEDGE"/>
</bean>

 

 重启订阅者的Tomcat进行对比测试,就能看见效果。

六、关于消息的安全配置

 

以上的JMS应用都是不加入安全机制的,任何连入Internet的人,只要知道消息服务的具体地址(包括IP、端口、消息地址),就可以随意地发送和接收消息,这会造成非常严重的后果。

 

        ActiveMQ支持可插拔的安全机制,只要在${ACTIVEMQ_HOME}/conf/activemq.xml配置即可。以下采用JAAS的安全机制去配置ActiveMQ的验证与授权。在activemq.xmlbroker节点中添加如下内容:

 

<!-- 添加认证与授权 -->
<plugins>
	<!-- 采用JAAS的管理机制配置角色权限 -->
	<jaasAuthenticationPlugin configuration="activemq-domain" />
	<authorizationPlugin>
		<map>
			<authorizationMap>
				<authorizationEntries>
					<authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
					<authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
					<authorizationEntry queue="ActiveMQ.Advisory.>" read="guests,users" write="guests,users" admin="guests,users" />
					<authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users" write="guests,users" admin="guests,users" />
					<authorizationEntry queue="testQueue1" read="guests" write=" users " admin="users,guests" />
					<authorizationEntry topic="testTopic1" read="guests" write="users" admin="users,guests" />
				</authorizationEntries>
			</authorizationMap>
		</map>
	</authorizationPlugin>
</plugins>

 

 说明:read,write,admin(里面的内容表示都是用户组)表示的意思:

 

“>”是通配符,代表所有;

ActiveMQ.Advisory.>ActiveMQ默认创建的Destination(包括QueueTopic);

 

配置中用到了三个用户组:admins,users,guests

    在${ACTIVEMQ_HOME}/conf目录下创建一个login.config文件,如下:

 

activemq-domain {
	org.apache.activemq.jaas.PropertiesLoginModule required
	debug=true
	org.apache.activemq.jaas.properties.user="users.properties"
	org.apache.activemq.jaas.properties.group="groups.properties";
};

 

      ${ACTIVEMQ_HOME}/conf目录下创建一个users.properties文件,用于存放用户和密码,如:

 

admin=admin
keven=keven
zyq=zyq

 

   在${ACTIVEMQ_HOME}/conf目录下创建一个groups.properties文件,用于存放用户组和对应的用户,如:

 

admins=admin
users=keven
guests=zyq

 

修改Spring配置文件,在<bean id=”connectionFactory”…>中设置访问的用户名和密码,如:

 

<property name="userName" value="zyq" />
<property name="password" value="zyq" />

 这里设置的访问用户要和activemq.xml文件中的配置对应,如果设置不正确,在启动Tomcat后发送消息时会在ActiveMQ的控制台看到提示信息。

 

 

 

 

  • 大小: 8.3 KB
  • 大小: 28.8 KB
  • 大小: 61.8 KB
  • 大小: 14.6 KB
  • 大小: 53.6 KB
  • 大小: 21.5 KB
分享到:
评论

相关推荐

    互联网项目练习,使用ssm,fastDFS,activemq,freemarke

    Java初学者:通过学习和实践这些项目,您将能够快速掌握SSM框架的基础知识和核心技术。 中高级开发者:这些项目将为您提供丰富的实战经验和灵感,帮助您提升技术水平和解决问题的能力。 项目经理和架构师:这些项目...

    spring boot 实践学习案例,与其它组件整合

    spring boot 实践学习案例,与其它组件结合如 mybatis、jpa、dubbo、redis、mongodb、memcached、kafka、rabbitmq、activemq、elasticsearch、security、shiro等 #### Spring Boot 版本 - 2.0.3.RELEASE #### 模块...

    Java面试题和学习笔记

    Linux面试专题及答案+ActiveMQ消息中间件面试专题+Java基础面试题+MySQL性能优化的21个最佳实践+微服务面试专题及答案+深入理解java虚拟机+设计模式面试专题及答案+开源框架面试专题及答案+并发编程及答案+Spring...

    Java基于Redis分布式消息队的报文过滤系统的设计与实现

    内容概要:通过带着读者基于 Redis ...阅读建议:此资源以开发报文过滤系统学习其原理和内核,不仅是代码编写实现也更注重内容上的需求分析和方案设计,所以在学习的过程要结合这些内容一起来实践,并调试对应的代码。

    大型分布式网站架构与实践

     本章主要介绍和解决以下问题,这些也是全书的基础:  HTTP协议的工作方式与HTTP网络协议栈的结构。  如何实现基于HTTP协议和TCP协议的RPC调用,它们之间有何差别,分别适应何种场景。  如何实现服务的动态注册...

    Java架构面试资料合集Spring面试专题及答案MySQL面试Redis面试资料.zip

    MySQL性能优化的21个最佳实践 Spring面试专题及答案整理文档 一线互联网企业面试题(仅参考未整理答案) 分布式数据库面试专题系列:Memcached+Redis+MongoDB 分布式通讯面试专题系列:ActiveMQ+RabbitMQ+Kafka ...

    Java程序员面试题全.zip

    ActiveMQ+RabbitMQ+Kafka、分布式限流面试专题系列:Nginx+zookeeper、集合、开源框架面试题系列:Spring+SpringMVC+MyBatis、23种设计模式知识要点整理、Dubbo、Dubbo服务框架面试专题及答案整理文档、java筑基...

    Java、数据库、spring框架等面试题及答案

    消息中间件面试.pdf 微服务面试.pdf 数据库面试.pdf 设计模式面试.pdf 乐观锁与悲观锁.pdf 开源框架面试.pdf 多线程面试.pdf 并发编程面试(下).pdf 并发编程(上).pdf zookeeper面试.pdf ...ActiveMQ消息中间件.p

    重磅-史上最全的Java面试文档总结(jvm,mybatis,mysql优化算法)等总结文档大合集(300份).zip

    ActiveMQ消息中间件面试专题 Dubbo面试及答案(上) Dubbo面试专题及答案(下) Java并发体系知识导图笔记.xmind java后端面试题答案 Java基础面试题 JVM面试专题及答案 Kafka面试专题及答案 Linux面试专题及答案 ...

    Java架构面试笔试专题资料及经验(含答案)和学习笔记.zip

    ActiveMQ消息中间件面试专题.pdf Dubbo面试专题及答案(下).pdf Dubbo面试及答案(上).pdf java后端面试题答案.pdf Java基础面试题.pdf java多线程并发编程知识导图笔记.xmind Java并发体系知识导图笔记.xmind JVM...

    java8源码-Blog:个人博客,知识积累!

    Java基础 多线程与并发编程 算法数据结构 SpringCloud Zookeeper Dubbo NoSQL MongoDB MQ ActiveMQ RabbitMQ RocketMQ Kafka 分布式事务 Tomcat Docker MySQL Jenkins Maven Gradle Git Swagger Linux 读书清单 ...

    网络架构师148讲视频课程

    │ 第09节:搭建基础的开发环境.avi │ 第10节:Spring+Mybatis实现DAO.avi │ 第11节:Mybatis的分页实现.avi │ 第12节:Service的实现以及模块化.avi │ 第13节:Spring MVC实现Web层开发.avi │ 第14节:新增和...

Global site tag (gtag.js) - Google Analytics