使用纯注解的方式在spring-boot中整合rabbitmq,如下

@Configuration
public class RabbitMqConfig {
    @Bean(value = "pmsMqConnectionFactory")
    public ConnectionFactory pmsMqConnectionFactory(){
        CachingConnectionFactory connectionFactory=new CachingConnectionFactory();
        connectionFactory.setAddresses(pmsMqServerAddress);
        connectionFactory.setVirtualHost(pmsMqServerVirtualHost);
        connectionFactory.setUsername(pmsMqServerUserName);
        connectionFactory.setPassword(pmsMqServerPassword);
        return connectionFactory;
    }
    @Bean(value = "pmsConsumerQueue")
    public Queue pmsConsumerQueue(){
        Queue consumerQueue=new Queue(pmsMqConsumerQueue,pmsMqQueuePersist);

        return consumerQueue;
    }
    @Bean(value = "pmsConsumerExchange")
    public Exchange pmsConsumerExchange(){
        Exchange consumerExchange=new DirectExchange(pmsMqConsumerExchangeName,pmsMqQueuePersist,false);
        return consumerExchange;
    }
    @Bean(value = "pmsConsumerMessageBinding")
    public Binding pmsConsumerMessageBinding(@Qualifier("pmsConsumerQueue") Queue queue,
                                                  @Qualifier("pmsConsumerExchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("").noargs();
    }
    @Bean(value = "pmsMessageListenerContainer")
    public SimpleMessageListenerContainer pmsMessageListenerContainer(@Qualifier("pmsMqConnectionFactory") ConnectionFactory connectionFactory,
                                                                      @Qualifier("pmsMqConsumer") PmsMqConsumer pmsMqConsumer,@Qualifier("pmsConsumerQueue")Queue queue){
        SimpleMessageListenerContainer container=new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setMessageListener(pmsMqConsumer);
        container.setQueues(queue);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return container;
    }
}

结果启动报错:

2018-02-28 19:50:26.560 [WARN] [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:544] Queue declaration failed; retries left=3
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[dev_pms2invoi_queue]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:636)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:535)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1389)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: null
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:105)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:101)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:123)
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:992)
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:50)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:955)
    at com.sun.proxy.$Proxy155.queueDeclarePassive(Unknown Source)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:615)
    ... 3 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'dev_pms2invoi_queue' in vhost '/', class-id=50, method-id=10)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:32)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:366)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:229)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:117)
    ... 12 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'dev_pms2invoi_queue' in vhost '/', class-id=50, method-id=10)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:505)
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:336)
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:143)
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:90)
    at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:634)
    at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:572)
    ... 1 common frames omitted

出现这个问题的原因是因为我自己定义了mq的ConnectionFactory,这样的话需要定义一个RabbitAdmin来管理这个ConnectionFactory,具体可参见:https://stackoverflow.com/questions/49028289/no-queue-dev-pms2invoi-queue-in-vhost-when-using-rabbitmq-in-spring-boot/49035026#49035026。因此修改如下:

@Configuration
public class RabbitMqConfig {
    @Bean(value = "pmsMqConnectionFactory")
    public ConnectionFactory pmsMqConnectionFactory(){
        CachingConnectionFactory connectionFactory=new CachingConnectionFactory();
        connectionFactory.setAddresses(pmsMqServerAddress);
        connectionFactory.setVirtualHost(pmsMqServerVirtualHost);
        connectionFactory.setUsername(pmsMqServerUserName);
        connectionFactory.setPassword(pmsMqServerPassword);
        return connectionFactory;
    }
    /**
     * 因为使用了自定义的ConnectionFactory,所以需要定义RabbitAdmin
     * */
    @Bean(value = "pmsRabbitAdmin")
    public RabbitAdmin pmsRabbitAdmin(){
        RabbitAdmin rabbitAdmin=new RabbitAdmin(pmsMqConnectionFactory());
        return rabbitAdmin;
    }
    @Bean(value = "pmsConsumerQueue")
    public Queue pmsConsumerQueue(){
        Queue consumerQueue=new Queue(pmsMqConsumerQueue,pmsMqQueuePersist);
        consumerQueue.setAdminsThatShouldDeclare(pmsRabbitAdmin());
        return consumerQueue;
    }
    @Bean(value = "pmsConsumerExchange")
    public Exchange pmsConsumerExchange(){
        DirectExchange consumerExchange=new DirectExchange(pmsMqConsumerExchangeName,pmsMqQueuePersist,false);
        consumerExchange.setAdminsThatShouldDeclare(pmsRabbitAdmin());
        return consumerExchange;
    }
    @Bean(value = "pmsConsumerMessageBinding")
    public Binding pmsConsumerMessageBinding(@Qualifier("pmsConsumerQueue") Queue queue,
                                                  @Qualifier("pmsConsumerExchange") Exchange exchange){
        Binding binding= BindingBuilder.bind(queue).to(exchange).with("").noargs();
        binding.setAdminsThatShouldDeclare(pmsRabbitAdmin());
        return binding;
    }

    @Bean(value = "pmsMessageListenerContainer")
    public SimpleMessageListenerContainer pmsMessageListenerContainer(@Qualifier("pmsMqConnectionFactory") ConnectionFactory connectionFactory,
                                                                      @Qualifier("pmsMqConsumer") PmsMqConsumer pmsMqConsumer,@Qualifier("pmsConsumerQueue")Queue queue){
        SimpleMessageListenerContainer container=new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setMessageListener(pmsMqConsumer);
        container.setQueues(queue);
        container.setRabbitAdmin(pmsRabbitAdmin());
        //设置人为发送ack
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return container;
    }
GitHub 加速计划 / sp / spring-boot
73.97 K
40.4 K
下载
spring-projects/spring-boot: 是一个用于简化Spring应用开发的框架。适合用于需要快速开发企业级Java应用的项目。特点是可以提供自动配置、独立运行和内置的Tomcat服务器,简化Spring应用的构建和部署。
最近提交(Master分支:10 天前 )
6346d4fd A task's last execution is absent if it has not yet been executed. This commit updates the documentation test to accommodate this possibility. See gh-42351 1 小时前
627c6916 See gh-42295 1 小时前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐