接上篇02快速部署RocketMQ5.x(手动和容器部署)
已经完成 RocketMQ5.0 环境的部署,就需要对这个环境进行测试,查看集群、写入消息、读取消息等

本篇教你如何使用和查看部署的服务:

Docker部署 Dashboard

以上通过可执行文件部署或者容器部署的形式,都需要有一个可以查看的集群的地方,对于官方自己配备的有 rocketmq-dashboard, 可以使用docker快速部署,便于测试

获取镜像并下载

docker search rocketmq-dashboard & docker pull apacherocketmq/rocketmq-dashboard

部署服务

docker run -d --name rmqdashboard -e "JAVA_OPTS=-Xmx256M -Xms256M -Xmn128M -Drocketmq.namesrv.addr=192.168.2.92:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8088:8080 apacherocketmq/rocketmq-dashboard

这边将端口映射到了8088,所以访问 localhost:8088,就可以查看到集群,如果有数据正在写入与读取,就能够大概看到数据量
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

客户端连接

手动创建 topic: sh bin/mqadmin updatetopic -n 192.168.2.92:9876 -t dataTopic2 -c DefaultCluster

pom文件

 <properties>
        <java.version>17</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>3.0.2</spring-boot.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client-java</artifactId>
            <version>5.0.5</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>17</source>
                    <target>17</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>${spring-boot.version}</version>
                <configuration>
                    <mainClass>com.learning.springbootrmq5.SpringbootRmq5Application</mainClass>
                    <skip>true</skip>
                </configuration>
                <executions>
                    <execution>
                        <id>repackage</id>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

生产者代码

 @GetMapping("/sendSync")
    public String sendSync() throws ClientException, IOException {
        String endpoint = "192.168.2.92:8081";
        String topic = "dataTopic2";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
        ClientConfiguration configuration = builder.enableSsl(true).build();
        Producer producer = provider.newProducerBuilder()
                                    .setTopics(topic)
                                    .setClientConfiguration(configuration)
                                    .build();
        Message message = provider.newMessageBuilder()
                                  .setTopic(topic)
                                  .setKeys("messageKey")
                                  .setTag("messageTag")
                                  .setBody("messageBodySync".getBytes())
                                  .build();
        try {
            SendReceipt sendReceipt = producer.send(message);
            log.info("Send sync message successfully, messageId={}", sendReceipt.getMessageId());
        } catch (ClientException e) {
            log.error("Failed to send message", e);
        }
        producer.close();
        return "success";
    }

    @GetMapping("/sendAsync")
    public String sendAsync() throws ClientException, InterruptedException, IOException {
        String endpoint = "192.168.2.92:8081";
        String topic = "dataTopic2";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().enableSsl(true).setEndpoints(endpoint);
        ClientConfiguration configuration = builder.build();
        Producer producer = provider.newProducerBuilder()
                                    .setTopics(topic)
                                    .setClientConfiguration(configuration)
                                    .build();
        Message message = provider.newMessageBuilder()
                                  .setTopic(topic)
                                  .setKeys("messageKey")
                                  .setTag("messageTag")
                                  .setBody("messageBodyASync".getBytes())
                                  .build();
        producer.sendAsync(message);
        log.info("Send async message successfully, messageId");
        return "success";
    }

消费者代码

@Slf4j
@Component
public class MessageConsumerRunner implements CommandLineRunner {
    @Override
    public void run(final String... args) throws Exception {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        String endpoints = "192.168.2.92:8081";
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                                                                     .setEndpoints(endpoints)
                                                                     .build();
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        String consumerGroup = "YourConsumerGroup";
        String topic = "dataTopic2";
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                                            .setClientConfiguration(clientConfiguration)
                                            .setConsumerGroup(consumerGroup)
                                            .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                                            .setMessageListener(messageView -> {
                                                log.info("Consume message successfully, messageId={}", messageView.getMessageId());
                                                return ConsumeResult.SUCCESS;
                                            })
                                            .build();
        Thread.sleep(Long.MAX_VALUE);
    }
}

接口测试

请求接口 /msg/sendAsync
在这里插入图片描述

能够正常收发

问题: broker资源不足无法提供服务

可能出现的客户端报错为:

org.apache.rocketmq.client.java.exception.InternalErrorException: [request-id=e3f9dxxxx1aa872, response-code=50001] org.apache.rocketmq.proxy.common.ProxyException: service not available now. It may be caused by one of the following reasons: the broker's disk is full [CL:  0.96 CQ:  0.96 INDEX: -1.00], messages are put to the slave, message store has been shut down, etc.

java.util.concurrent.RejectedExecutionException: Task org.apache.rocketmq.shaded.io.grpc.internal.DelayedStream$4@72ba34c2 rejected from java.util.concurrent.ThreadPoolExecutor@7deb0119[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 13]
	at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365) ~[na:na]

以上大体就是描述资源不足无法进行接入、服务不可达等,通常就是因为环境的资源不足,可能是内存、可能是硬盘

.../broker/logs/rocketmqlogs/store.log 中可以看出端倪,是磁盘存储不够了

2024-01-08 13:34:24 ERROR StoreScheduledThread1 - physic disk maybe full soon 0.95, so mark disk full, storePathPhysic=/home/rocketmq/store/commitlog

可以通过清除以下数据暂时缓解 .../broker/store/commitlog,可以发现没怎么用也有好多G。不过确实需要使用的话尽早考虑扩容啊

扩大存储增加可用磁盘空间,就能够正常使用连接了

如果这篇文章对你有用的话,帮忙留个关注吧~

GitHub 加速计划 / ro / rocketmq-dashboard
1.04 K
508
下载
The state-of-the-art Dashboard of Apache RoccketMQ provides excellent monitoring capability. Various graphs and statistics of events, performance and system information of clients and application is evidently made available to the user.
最近提交(Master分支:3 个月前 )
464f57ad * fix:Fixed the issue that normal messages in version v4 are not showed * feat:support retryMaxTimes args set --------- Co-authored-by: yuanziwei <yuanziwei@xiaomi.com> Co-authored-by: yuanziwei.akai <yuanziwei.akai@bytedance.com> 2 个月前
5d08d3b1 * fix:Fixed the issue that normal messages in version v4 are not showed * feat:support unspecified topic --------- Co-authored-by: yuanziwei <yuanziwei@xiaomi.com> 3 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐