pom文件添加依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId></dependency><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId><version>5.17.1</version></dependency>

配置 application.yml

server:   port:8080#springBoot项目访问端口   spring:   activemq:     broker-url: tcp://192.168.140.xx:30005#你activeMQ的ip和端口号     user: admin#activeMq账号     password: admin#activeMq密码     pool:       enabled:true#连接池启动       max-connections:10#最大连接数

SpringBoot 的启动类添加解 @EnableJms

@SpringBootApplication@EnableJmspublicclassMqconverterApplication{publicstaticvoidmain(String[] args){SpringApplication.run(MqconverterApplication.class, args);}}

创建 activeMQ 配置类

mportorg.apache.activemq.ActiveMQConnectionFactory;importorg.apache.activemq.command.ActiveMQQueue;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.jms.config.JmsListenerContainerFactory;importorg.springframework.jms.config.SimpleJmsListenerContainerFactory;importjavax.jms.ConnectionFactory;importjavax.jms.Queue;/**  * 配置类  */@ConfigurationpublicclassActiveMQConfig{@Value("${spring.activemq.broker-url}")privateString brokerUrl;@Value("${spring.activemq.user}")privateString userName;@Value("${spring.activemq.password}")privateString password;@BeanpublicConnectionFactoryconnectionFactory(){returnnewActiveMQConnectionFactory(userName, password, brokerUrl);}/**      * 在 Queue 模式中,对消息的监听需要对containerFactory进行配置      */@Bean("queueListener")publicJmsListenerContainerFactory<?>queueJmsListenerContainerFactory(ConnectionFactory connectionFactory){SimpleJmsListenerContainerFactory factory=newSimpleJmsListenerContainerFactory();         factory.setConnectionFactory(connectionFactory);         factory.setPubSubDomain(false);return factory;}/**      * 在 topic 模式中,对消息的监听需要对containerFactory进行配置      */@Bean("topicListener")publicJmsListenerContainerFactory<?>topicJmsListenerContainerFactory(ConnectionFactory connectionFactory){SimpleJmsListenerContainerFactory factory=newSimpleJmsListenerContainerFactory();         factory.setConnectionFactory(connectionFactory);         factory.setPubSubDomain(true);return factory;}}

pubSubDomain:为true 代表 topic 模式,false 代表队列模式,默认是false

springboot 使用 ActiveMQ

生产者

@RestControllerpublicclassActiveMQProducerController{/**      * 队列模式      * @param msg 消息      * @return      */@PostMapping("/queue/{msg}")publicStringsendQueue(@PathVariable("msg")String msg){ActiveMQQueue queue=newActiveMQQueue("testq");         jmsMessagingTemplate.convertAndSend(queue, msg);return"success";}/**      * topic 模式      * @param msg 消息      * @return      */@PostMapping("/topic/{msg}")publicStringsendTopic(@PathVariable("msg")String msg){ActiveMQTopic topic=newActiveMQTopic("test");         jmsMessagingTemplate.convertAndSend(topic, msg);return"success";}}

消费者

importcom.njc.mqconverter.property.KafkaTopicProperty;importlombok.extern.slf4j.Slf4j;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.jms.annotation.JmsListener;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.stereotype.Component;@Component@Slf4jpublicclassActiveMQConsumer{@AutowiredKafkaTemplate kafkaTemplate;/**      * queue模式的消费者      */@JmsListener(destination="testq", containerFactory="queueListener")publicvoidreadActiveQueue(String message){String topic=KafkaTopicProperty.getSap();         kafkaTemplate.send(topic, message);         log.info("activeMQ 消费者接收:{}", message);}/**      * topic 模式的消费者      */@JmsListener(destination="test", containerFactory="topicListener")publicvoidreadActivTopic1(String message){         log.info("activeMQ topic 消费者1接收:{}", message);}/**      * topic 模式的消费者      */@JmsListener(destination="test", containerFactory="topicListener")publicvoidreadActivTopic2(String message){         log.info("activeMQ topic 消费者2接收:{}", message);}}

监听者的队列模式可以不配 containerFactory ,但topic的需要配置