博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
springboot(十二)整合rabbitmq
阅读量:4299 次
发布时间:2019-05-27

本文共 14944 字,大约阅读时间需要 49 分钟。

引入jar包

org.springframework.boot
spring-boot-starter-amqp

1在resource下创建rabbitmq.properties

 

#是访问port不是15672,15672是api和管理界面的portspring.rabbitmq.addresses=localhost:5672spring.rabbitmq.username=adminspring.rabbitmq.password=123456#如果要进行消息回调,则这里必须要设置为truespring.rabbitmq.publisherconfirms=true

2创建rabbitmq对象RabbitMq

package com.demo.model;import lombok.Getter;import lombok.Setter;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.context.annotation.Configuration;/** * Created by huguoju on 2017/3/2. * rabbitmq配置文件 */@Configuration@Getter@Setter@ConfigurationProperties(locations = "classpath:rabbitmq/rabbitmq.properties",prefix = "spring.rabbitmq")public class RabbitMq{    private String addresses;    private String username;    private String password;    private Boolean publisherconfirms;}

3生产者配置

   3.1通用性基础配置

   

package com.demo.rabbitmq.sender;import com.demo.model.RabbitMq;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageListener;import org.springframework.amqp.core.Queue;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitAdmin;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;import org.springframework.amqp.rabbit.support.CorrelationData;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.config.ConfigurableBeanFactory;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Scope;import org.springframework.messaging.converter.MappingJackson2MessageConverter;/** * Created by huguoju on 2017/3/2. * 创建消息生产者 */@Configuration@Slf4jpublic class AmqpConfig {    @Autowired    private RabbitMq rabbitMq;    /**     * 连接rabbitmq     * @return     */    @Bean    public ConnectionFactory connectionFactory(){        CachingConnectionFactory connectionFactory=new CachingConnectionFactory();        connectionFactory.setAddresses(rabbitMq.getAddresses());        connectionFactory.setUsername(rabbitMq.getUsername());        connectionFactory.setPassword(rabbitMq.getPassword());        /**         * 对于每一个RabbitTemplate只支持一个ReturnCallback。         * 对于返回消息,模板的mandatory属性必须被设定为true,         * 它同样要求CachingConnectionFactory的publisherReturns属性被设定为true。         * 如果客户端通过调用setReturnCallback(ReturnCallback callback)注册了RabbitTemplate.ReturnCallback,那么返回将被发送到客户端。         * 这个回调函数必须实现下列方法:         *void returnedMessage(Message message, intreplyCode, String replyText,String exchange, String routingKey);         */       // connectionFactory.setPublisherReturns(true);        /**         * 同样一个RabbitTemplate只支持一个ConfirmCallback。         * 对于发布确认,template要求CachingConnectionFactory的publisherConfirms属性设置为true。         * 如果客户端通过setConfirmCallback(ConfirmCallback callback)注册了RabbitTemplate.ConfirmCallback,那么确认消息将被发送到客户端。         * 这个回调函数必须实现以下方法:         * void confirm(CorrelationData correlationData, booleanack);         */        connectionFactory.setPublisherConfirms(rabbitMq.getPublisherconfirms());        return connectionFactory;    }    /**     * rabbitAdmin代理类     * @return     */    @Bean    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){        return new RabbitAdmin(connectionFactory);    }    /**     * 创建rabbitTemplate 消息模板类     * prototype原型模式:每次获取Bean的时候会有一个新的实例     *  因为要设置回调类,所以应是prototype类型,如果是singleton类型,则回调类为最后一次设置     * @return     */    @Bean    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)    public RabbitTemplate rabbitTemplate(){        RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory());       // rabbitTemplate.setMandatory(true);//返回消息必须设置为true        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());//数据转换为json存入消息队列      //  rabbitTemplate.setReplyAddress(replyQueue().getName());      //  rabbitTemplate.setReplyTimeout(100000000);        //发布确认        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {            //消息发送到queue时就执行            @Override            public void confirm(CorrelationData correlationData, boolean b, String s) {                log.debug(correlationData+"//");                if (!b){                    log.debug("发送到queue失败");                    throw new RuntimeException("send error " + s);                }            }        });        return rabbitTemplate;    }}
 3.2创建exchange

  

package com.demo.rabbitmq.sender;/** * Created by huguoju on 2017/3/2. * exchange交换机配置 */public interface RabbitMqExchange {    final String CONTRACT_FANOUT = "CONTRACT_FANOUT";    final String CONTRACT_TOPIC = "CONTRACT_TOPIC";    final String CONTRACT_DIRECT = "CONTRACT_DIRECT";}
3.3创建queue

   

package com.demo.rabbitmq.sender;/** * Created by huguoju on 2017/3/2. * 消息队列配置 */public interface RabbitMqQueue {    final String CONTRACE_SELF ="CONTRACT_SELF";    final String CONTRACE_TENANT ="CONTRACT_TENANT";}

3.4针对rabbitmq服务性的配置,配置queue和交换机并绑定

package com.demo.rabbitmq.sender;import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.core.RabbitAdmin;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * Created by huguoju on 2017/3/2. * 交换机配置并绑定queue */@Configurationpublic class ContractExchangeConfig {    @Autowired    private RabbitAdmin rabbitAdmin;    /**     * 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。     * @return     *///    @Bean//    FanoutExchange contractFanoutExchange(){//        FanoutExchange fanoutExchange=new FanoutExchange(RabbitMqExchange.CONTRACT_FANOUT);//        rabbitAdmin.declareExchange(fanoutExchange);//        return fanoutExchange;//    }    /**     *  将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”     *  默认:, durable = true, autoDelete = false     * @return     */    @Bean    TopicExchange contractTopicExchangeDurable(){        TopicExchange contractTopicExchange=new TopicExchange(RabbitMqExchange.CONTRACT_TOPIC);        rabbitAdmin.declareExchange(contractTopicExchange);        return contractTopicExchange;    }    /**     *  处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog     * @return     */    @Bean    DirectExchange contractDirectExchange(){        DirectExchange contractDirectExchange=new DirectExchange(RabbitMqExchange.CONTRACT_DIRECT);        rabbitAdmin.declareExchange(contractDirectExchange);        return contractDirectExchange;    }    @Bean    Queue queueContract(){        Queue queue=new Queue(RabbitMqQueue.CONTRACE_SELF,true);        rabbitAdmin.declareQueue(queue);        return queue;    }    @Bean    Queue queueTenant(){        Queue queue=new Queue(RabbitMqQueue.CONTRACE_TENANT,true);        rabbitAdmin.declareQueue(queue);        return queue;    }//    @Bean//    Binding bindingExchangeContract(Queue queueContract,FanoutExchange exchange){//        Binding binding=BindingBuilder.bind(queueContract).to(exchange);//        rabbitAdmin.declareBinding(binding);//        return binding;//    }    @Bean    Binding bindingExchangeContract(Queue queueContract,TopicExchange exchange){        Binding binding=BindingBuilder.bind(queueContract).to(exchange).with(RabbitMqQueue.CONTRACE_SELF);        rabbitAdmin.declareBinding(binding);        return binding;    }//    @Bean//    Binding bindingExchangeContract(Queue queueContract,DirectExchange exchange){//        Binding binding=BindingBuilder.bind(queueContract).to(exchange).with(RabbitMqQueue.CONTRACE_SELF);//        rabbitAdmin.declareBinding(binding);//        return binding;//    }    @Bean    Binding bindingExchangeTenant(Queue queueTenant, TopicExchange exchange) {        Binding binding = BindingBuilder.bind(queueTenant).to(exchange).with(RabbitMqQueue.CONTRACE_TENANT);        rabbitAdmin.declareBinding(binding);        return binding;    }//    @Bean//    Binding bindingExchangeTenant(Queue queueTenant, DirectExchange exchange) {//        Binding binding = BindingBuilder.bind(queueTenant).to(exchange).with(RabbitMqQueue.CONTRACE_TENANT);//        rabbitAdmin.declareBinding(binding);//        return binding;//    }}
3.5创建消息体

package com.demo.rabbitmq.sender;import lombok.Builder;import lombok.Data;import lombok.Getter;import java.util.Date;import java.util.List;/** * Created by huguoju on 2017/3/3.
*不能用@Builder,因为json反编译的时候需要set方法,builder没有set方法 * 合同消息载体 *///@Builder//@Getter@Datapublic class ContractRabbitMq {    private String id;    private String name;    private List
testList; private Date createDate;}
package com.demo.rabbitmq.sender;import lombok.Builder;import lombok.Getter;/** * Created by huguoju on 2017/3/3. * tenant消息载体 */@Builder@Getterpublic class TenantRabbitMq {    private String id;    private String name;}
4消费者配置,实际使用时应该和生产者不在一个项目里,这里只是演示,所有放在了一个项目里,很多公用的文件在实际开发中可以打jar用

4.1消息的监听的代理类

  

package com.demo.rabbitmq.consumer;import com.demo.model.RabbitMq;import com.demo.rabbitmq.sender.RabbitMqExchange;import com.demo.rabbitmq.sender.RabbitMqQueue;import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.annotation.EnableRabbit;import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.messaging.converter.MappingJackson2MessageConverter;import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;/** * Created by huguoju on 2017/3/3. * 接收方配置 * 消息的监听的代理类 */@Configuration@EnableRabbitpublic class ConsumerConfig implements RabbitListenerConfigurer {    @Autowired    ReceiverService receiverService;    @Autowired    private RabbitMq rabbitMq;    @Autowired    private ConnectionFactory connectionFactory;    @Bean    public DefaultMessageHandlerMethodFactory handlerMethodFactory(){        DefaultMessageHandlerMethodFactory factory=new DefaultMessageHandlerMethodFactory();        factory.setMessageConverter(new MappingJackson2MessageConverter());        return factory;    }//    @Bean//    public SimpleMessageListenerContainer messageContainer() {//        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());//        container.setQueues(queueContract());//      //  container.setExposeListenerChannel(true);//        container.setMaxConcurrentConsumers(1);//        container.setConcurrentConsumers(1);//        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认//        container.setMessageListener(new MessageListener() {////            @Override//            public void onMessage(Message message) {//                byte[] body = message.getBody();//                System.out.println("receive msg : " + new String(body));//               // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费//            }//        });//        return container;//    }    @Bean    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(){        SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory();        factory.setConnectionFactory(connectionFactory);        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);        return factory;    }    @Override    public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) {        rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(handlerMethodFactory());    }}
4.2消费者监听

package com.demo.rabbitmq.consumer;import com.demo.rabbitmq.sender.ContractRabbitMq;import com.demo.rabbitmq.sender.RabbitMqQueue;import com.demo.rabbitmq.sender.TenantRabbitMq;import com.fasterxml.jackson.databind.ObjectMapper;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;/** * Created by huguoju on 2017/3/3. */@Componentpublic class ReceiverService {    @RabbitListener(queues = RabbitMqQueue.CONTRACE_SELF)    @RabbitHandler    public void receiveContractQueue(ContractRabbitMq contract) {        ObjectMapper objectMapper=new ObjectMapper();        try {            System.out.println("Received contract<" + objectMapper.writeValueAsString(contract) + ">");        } catch (IOException e) {            e.printStackTrace();        }    }    @RabbitListener(queues = RabbitMqQueue.CONTRACE_TENANT)    public void receiveTenantQueue(TenantRabbitMq tenant) {        ObjectMapper objectMapper=new ObjectMapper();        try {            System.out.println("Received contract<" + objectMapper.writeValueAsString(tenant) + ">");        } catch (IOException e) {            e.printStackTrace();        }    }}
以上就完成了。

创建测试controller

package com.demo.controller;import com.demo.rabbitmq.sender.ContractRabbitMq;import com.demo.service.rabbitMq.ContractRabbitmqService;import com.google.common.collect.Lists;import io.swagger.annotations.Api;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestMethod;import org.springframework.web.bind.annotation.RestController;import java.util.Date;/** * Created by huguoju on 2017/3/6. */@RestController@RequestMapping("rabbitmq")@Api(value = "测试rabbitmq",tags = "测试rabbitmq")public class RabbitMqController {    @Autowired    public ContractRabbitmqService contractRabbitmqService;    @RequestMapping(value = "contract/topic",method = {RequestMethod.POST,RequestMethod.GET})    public void contractTopic(){        ContractRabbitMq mq=new ContractRabbitMq();        mq.setId("15");        mq.setName("测试");        mq.setTestList(Lists.newArrayList("111","222"));        mq.setCreateDate(new Date());        contractRabbitmqService.sendContractRabbitmqTopic(mq);    }}

关于springboot一些例子

https://my.oschina.net/didispace/blog/752981

转载地址:http://jqpws.baihongyu.com/

你可能感兴趣的文章
centos虚拟机设置共享文件夹并通过我的电脑访问[增加smbd端口修改]
查看>>
文件拷贝(IFileOperation::CopyItem)
查看>>
MapReduce的 Speculative Execution机制
查看>>
大数据学习之路------借助HDP SANDBOX开始学习
查看>>
Hadoop基础学习:基于Hortonworks HDP
查看>>
为什么linux安装程序 都要放到/usr/local目录下
查看>>
Hive安装前扫盲之Derby和Metastore
查看>>
永久修改PATH环境变量的几种办法
查看>>
大数据学习之HDP SANDBOX开始学习
查看>>
Hive Beeline使用
查看>>
Centos6安装图形界面(hdp不需要,hdp直接从github上下载数据即可)
查看>>
CentOS7 中把yum源更换成163源
查看>>
关于yum Error: Cannot retrieve repository metadata (repomd.xml) for repository:xxxxxx.
查看>>
linux下载github中的文件
查看>>
HDP Sandbox里面git clone不了数据(HTTP request failed)【目前还没解决,所以hive的练习先暂时搁置了】
查看>>
动态分区最佳实践(一定要注意实践场景)
查看>>
HIVE—索引、分区和分桶的区别
查看>>
Hive进阶总结(听课总结)
查看>>
大数据领域两大最主流集群管理工具Ambari和Cloudera Manger
查看>>
Sqoop往Hive导入数据实战
查看>>