本文共 14944 字,大约阅读时间需要 49 分钟。
引入jar包
org.springframework.boot spring-boot-starter-amqp
1在resource下创建rabbitmq.properties
#是访问port不是15672,15672是api和管理界面的portspring.rabbitmq.addresses=localhost:5672spring.rabbitmq.username=adminspring.rabbitmq.password=123456#如果要进行消息回调,则这里必须要设置为truespring.rabbitmq.publisherconfirms=true
2创建rabbitmq对象RabbitMq
package com.demo.model;import lombok.Getter;import lombok.Setter;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.context.annotation.Configuration;/** * Created by huguoju on 2017/3/2. * rabbitmq配置文件 */@Configuration@Getter@Setter@ConfigurationProperties(locations = "classpath:rabbitmq/rabbitmq.properties",prefix = "spring.rabbitmq")public class RabbitMq{ private String addresses; private String username; private String password; private Boolean publisherconfirms;}
3生产者配置
3.1通用性基础配置
package com.demo.rabbitmq.sender;import com.demo.model.RabbitMq;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageListener;import org.springframework.amqp.core.Queue;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitAdmin;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;import org.springframework.amqp.rabbit.support.CorrelationData;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.config.ConfigurableBeanFactory;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Scope;import org.springframework.messaging.converter.MappingJackson2MessageConverter;/** * Created by huguoju on 2017/3/2. * 创建消息生产者 */@Configuration@Slf4jpublic class AmqpConfig { @Autowired private RabbitMq rabbitMq; /** * 连接rabbitmq * @return */ @Bean public ConnectionFactory connectionFactory(){ CachingConnectionFactory connectionFactory=new CachingConnectionFactory(); connectionFactory.setAddresses(rabbitMq.getAddresses()); connectionFactory.setUsername(rabbitMq.getUsername()); connectionFactory.setPassword(rabbitMq.getPassword()); /** * 对于每一个RabbitTemplate只支持一个ReturnCallback。 * 对于返回消息,模板的mandatory属性必须被设定为true, * 它同样要求CachingConnectionFactory的publisherReturns属性被设定为true。 * 如果客户端通过调用setReturnCallback(ReturnCallback callback)注册了RabbitTemplate.ReturnCallback,那么返回将被发送到客户端。 * 这个回调函数必须实现下列方法: *void returnedMessage(Message message, intreplyCode, String replyText,String exchange, String routingKey); */ // connectionFactory.setPublisherReturns(true); /** * 同样一个RabbitTemplate只支持一个ConfirmCallback。 * 对于发布确认,template要求CachingConnectionFactory的publisherConfirms属性设置为true。 * 如果客户端通过setConfirmCallback(ConfirmCallback callback)注册了RabbitTemplate.ConfirmCallback,那么确认消息将被发送到客户端。 * 这个回调函数必须实现以下方法: * void confirm(CorrelationData correlationData, booleanack); */ connectionFactory.setPublisherConfirms(rabbitMq.getPublisherconfirms()); return connectionFactory; } /** * rabbitAdmin代理类 * @return */ @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){ return new RabbitAdmin(connectionFactory); } /** * 创建rabbitTemplate 消息模板类 * prototype原型模式:每次获取Bean的时候会有一个新的实例 * 因为要设置回调类,所以应是prototype类型,如果是singleton类型,则回调类为最后一次设置 * @return */ @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public RabbitTemplate rabbitTemplate(){ RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory()); // rabbitTemplate.setMandatory(true);//返回消息必须设置为true rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());//数据转换为json存入消息队列 // rabbitTemplate.setReplyAddress(replyQueue().getName()); // rabbitTemplate.setReplyTimeout(100000000); //发布确认 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { //消息发送到queue时就执行 @Override public void confirm(CorrelationData correlationData, boolean b, String s) { log.debug(correlationData+"//"); if (!b){ log.debug("发送到queue失败"); throw new RuntimeException("send error " + s); } } }); return rabbitTemplate; }}3.2创建exchange
package com.demo.rabbitmq.sender;/** * Created by huguoju on 2017/3/2. * exchange交换机配置 */public interface RabbitMqExchange { final String CONTRACT_FANOUT = "CONTRACT_FANOUT"; final String CONTRACT_TOPIC = "CONTRACT_TOPIC"; final String CONTRACT_DIRECT = "CONTRACT_DIRECT";}3.3创建queue
package com.demo.rabbitmq.sender;/** * Created by huguoju on 2017/3/2. * 消息队列配置 */public interface RabbitMqQueue { final String CONTRACE_SELF ="CONTRACT_SELF"; final String CONTRACE_TENANT ="CONTRACT_TENANT";}
3.4针对rabbitmq服务性的配置,配置queue和交换机并绑定
package com.demo.rabbitmq.sender;import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.core.RabbitAdmin;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * Created by huguoju on 2017/3/2. * 交换机配置并绑定queue */@Configurationpublic class ContractExchangeConfig { @Autowired private RabbitAdmin rabbitAdmin; /** * 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。 * @return */// @Bean// FanoutExchange contractFanoutExchange(){// FanoutExchange fanoutExchange=new FanoutExchange(RabbitMqExchange.CONTRACT_FANOUT);// rabbitAdmin.declareExchange(fanoutExchange);// return fanoutExchange;// } /** * 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs” * 默认:, durable = true, autoDelete = false * @return */ @Bean TopicExchange contractTopicExchangeDurable(){ TopicExchange contractTopicExchange=new TopicExchange(RabbitMqExchange.CONTRACT_TOPIC); rabbitAdmin.declareExchange(contractTopicExchange); return contractTopicExchange; } /** * 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog * @return */ @Bean DirectExchange contractDirectExchange(){ DirectExchange contractDirectExchange=new DirectExchange(RabbitMqExchange.CONTRACT_DIRECT); rabbitAdmin.declareExchange(contractDirectExchange); return contractDirectExchange; } @Bean Queue queueContract(){ Queue queue=new Queue(RabbitMqQueue.CONTRACE_SELF,true); rabbitAdmin.declareQueue(queue); return queue; } @Bean Queue queueTenant(){ Queue queue=new Queue(RabbitMqQueue.CONTRACE_TENANT,true); rabbitAdmin.declareQueue(queue); return queue; }// @Bean// Binding bindingExchangeContract(Queue queueContract,FanoutExchange exchange){// Binding binding=BindingBuilder.bind(queueContract).to(exchange);// rabbitAdmin.declareBinding(binding);// return binding;// } @Bean Binding bindingExchangeContract(Queue queueContract,TopicExchange exchange){ Binding binding=BindingBuilder.bind(queueContract).to(exchange).with(RabbitMqQueue.CONTRACE_SELF); rabbitAdmin.declareBinding(binding); return binding; }// @Bean// Binding bindingExchangeContract(Queue queueContract,DirectExchange exchange){// Binding binding=BindingBuilder.bind(queueContract).to(exchange).with(RabbitMqQueue.CONTRACE_SELF);// rabbitAdmin.declareBinding(binding);// return binding;// } @Bean Binding bindingExchangeTenant(Queue queueTenant, TopicExchange exchange) { Binding binding = BindingBuilder.bind(queueTenant).to(exchange).with(RabbitMqQueue.CONTRACE_TENANT); rabbitAdmin.declareBinding(binding); return binding; }// @Bean// Binding bindingExchangeTenant(Queue queueTenant, DirectExchange exchange) {// Binding binding = BindingBuilder.bind(queueTenant).to(exchange).with(RabbitMqQueue.CONTRACE_TENANT);// rabbitAdmin.declareBinding(binding);// return binding;// }}3.5创建消息体
package com.demo.rabbitmq.sender;import lombok.Builder;import lombok.Data;import lombok.Getter;import java.util.Date;import java.util.List;/** * Created by huguoju on 2017/3/3.
*不能用@Builder,因为json反编译的时候需要set方法,builder没有set方法 * 合同消息载体 *///@Builder//@Getter@Datapublic class ContractRabbitMq { private String id; private String name; private ListtestList; private Date createDate;}
package com.demo.rabbitmq.sender;import lombok.Builder;import lombok.Getter;/** * Created by huguoju on 2017/3/3. * tenant消息载体 */@Builder@Getterpublic class TenantRabbitMq { private String id; private String name;}4消费者配置,实际使用时应该和生产者不在一个项目里,这里只是演示,所有放在了一个项目里,很多公用的文件在实际开发中可以打jar用
4.1消息的监听的代理类
package com.demo.rabbitmq.consumer;import com.demo.model.RabbitMq;import com.demo.rabbitmq.sender.RabbitMqExchange;import com.demo.rabbitmq.sender.RabbitMqQueue;import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.annotation.EnableRabbit;import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.messaging.converter.MappingJackson2MessageConverter;import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;/** * Created by huguoju on 2017/3/3. * 接收方配置 * 消息的监听的代理类 */@Configuration@EnableRabbitpublic class ConsumerConfig implements RabbitListenerConfigurer { @Autowired ReceiverService receiverService; @Autowired private RabbitMq rabbitMq; @Autowired private ConnectionFactory connectionFactory; @Bean public DefaultMessageHandlerMethodFactory handlerMethodFactory(){ DefaultMessageHandlerMethodFactory factory=new DefaultMessageHandlerMethodFactory(); factory.setMessageConverter(new MappingJackson2MessageConverter()); return factory; }// @Bean// public SimpleMessageListenerContainer messageContainer() {// SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());// container.setQueues(queueContract());// // container.setExposeListenerChannel(true);// container.setMaxConcurrentConsumers(1);// container.setConcurrentConsumers(1);// container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认// container.setMessageListener(new MessageListener() {//// @Override// public void onMessage(Message message) {// byte[] body = message.getBody();// System.out.println("receive msg : " + new String(body));// // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费// }// });// return container;// } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(){ SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setAcknowledgeMode(AcknowledgeMode.AUTO); return factory; } @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) { rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(handlerMethodFactory()); }}4.2消费者监听
package com.demo.rabbitmq.consumer;import com.demo.rabbitmq.sender.ContractRabbitMq;import com.demo.rabbitmq.sender.RabbitMqQueue;import com.demo.rabbitmq.sender.TenantRabbitMq;import com.fasterxml.jackson.databind.ObjectMapper;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;/** * Created by huguoju on 2017/3/3. */@Componentpublic class ReceiverService { @RabbitListener(queues = RabbitMqQueue.CONTRACE_SELF) @RabbitHandler public void receiveContractQueue(ContractRabbitMq contract) { ObjectMapper objectMapper=new ObjectMapper(); try { System.out.println("Received contract<" + objectMapper.writeValueAsString(contract) + ">"); } catch (IOException e) { e.printStackTrace(); } } @RabbitListener(queues = RabbitMqQueue.CONTRACE_TENANT) public void receiveTenantQueue(TenantRabbitMq tenant) { ObjectMapper objectMapper=new ObjectMapper(); try { System.out.println("Received contract<" + objectMapper.writeValueAsString(tenant) + ">"); } catch (IOException e) { e.printStackTrace(); } }}以上就完成了。
创建测试controller
package com.demo.controller;import com.demo.rabbitmq.sender.ContractRabbitMq;import com.demo.service.rabbitMq.ContractRabbitmqService;import com.google.common.collect.Lists;import io.swagger.annotations.Api;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestMethod;import org.springframework.web.bind.annotation.RestController;import java.util.Date;/** * Created by huguoju on 2017/3/6. */@RestController@RequestMapping("rabbitmq")@Api(value = "测试rabbitmq",tags = "测试rabbitmq")public class RabbitMqController { @Autowired public ContractRabbitmqService contractRabbitmqService; @RequestMapping(value = "contract/topic",method = {RequestMethod.POST,RequestMethod.GET}) public void contractTopic(){ ContractRabbitMq mq=new ContractRabbitMq(); mq.setId("15"); mq.setName("测试"); mq.setTestList(Lists.newArrayList("111","222")); mq.setCreateDate(new Date()); contractRabbitmqService.sendContractRabbitmqTopic(mq); }}
关于springboot一些例子
https://my.oschina.net/didispace/blog/752981
转载地址:http://jqpws.baihongyu.com/