RocketMQ 源码分析系列 00 - 消息发送和消息接收 Hello world

Posted by Yano on November 30, 2022

RocketMQ 官网

RocketMQ 5.0:云原生“消息、事件、流”实时数据处理平台,覆盖云边端一体化数据处理场景。本系列文章会介绍 RocketMQ 的使用流程,并重点分析设计理念、架构及源码。

安装

首先从官网下载最新源码包,然后本地编译。

$ unzip rocketmq-all-5.0.0-source-release.zip
$ cd rocketmq-all-5.0.0-source-release/
$ mvn -Prelease-all -DskipTests clean install -U
$ cd distribution/target/rocketmq-5.0.0/rocketmq-5.0.0

启动NameServer

### start namesrv
$ nohup sh bin/mqnamesrv &
 
### verify namesrv 
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

能看到启动日志如下:

2022-12-04 15:07:18 INFO main - name server address updated. NEW : [192.168.100.5:9876] , OLD: null
2022-12-04 15:07:18 INFO main - Try to start service thread:FileWatchService started:false lastThread:null
2022-12-04 15:07:18 INFO main - Try to start service thread:org.apache.rocketmq.namesrv.routeinfo.BatchUnregistrationService started:false lastThread:null
2022-12-04 15:07:18 INFO FileWatchService - FileWatchService service started
2022-12-04 15:07:18 INFO main - The Name Server boot success. serializeType=JSON
2022-12-04 15:07:18 INFO NettyClientScan_thread_1 - createChannel: begin to connect remote host[192.168.100.5:9876] asynchronously

启动 Broker 和 Proxy

### start broker
$ nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &

### verify broker
$ tail -f ~/logs/rocketmqlogs/broker_default.log 
The broker[broker-a,192.169.1.2:10911] boot success...

发送、接收消息

$ export NAMESRV_ADDR=localhost:9876
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

命令行会输出:

SendResult [sendStatus=SEND_OK, msgId=7F00000122EF5B37E0D212B514EC03E5, offsetMsgId=C0A8640500002A9F000000000003AA27, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=249]

启动一个 Consumer 消费():

ConsumeMessageThread_please_rename_unique_group_name_4_10 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=241, queueOffset=249, sysFlag=0, bornTimestamp=1670137857260, bornHost=/192.168.100.5:52993, storeTimestamp=1670137857261, storeHost=/192.168.100.5:10911, msgId=C0A8640500002A9F000000000003AA27, commitLogOffset=240167, bodyCRC=1102156316, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, TRACE_ON=true, MAX_OFFSET=250, MSG_REGION=DefaultRegion, CONSUME_START_TIME=1670137898415, UNIQ_KEY=7F00000122EF5B37E0D212B514EC03E5, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 57, 55], transactionId='null'}]] 

通过 Java SDK 发送、接收消息

  1. 在 pom 文件中引入对应 jar 包:
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client-java</artifactId>
    <version>5.0.0</version>
</dependency> 
  1. 通过命令行工具创建一个 topic,Topic 名字是 YanoTestTopic
sh bin/mqadmin updatetopic -n localhost:9876 -b localhost:10911  -t YanoTestTopic
  1. 编写发送程序,指定 tag 是 tag1,发送的 messageBody 是随机数。
String endpoint = "localhost:8081";
String topic = "YanoTestTopic";

@Test
public void send() throws Exception {
    ClientServiceProvider provider = ClientServiceProvider.loadService();
    ClientConfiguration configuration = ClientConfiguration.newBuilder().setEndpoints(endpoint).build();
    Producer producer = provider.newProducerBuilder()
            .setTopics(topic)
            .setClientConfiguration(configuration)
            .build();
    Message message = provider.newMessageBuilder()
            .setTopic(topic)
            .setKeys(UUID.randomUUID().toString())
            .setTag("tag1")
            .setBody(UUID.randomUUID().toString().getBytes())
            .build();

    // 发送消息
    SendReceipt sendReceipt = producer.send(message);

    log.info("send message {}, body {}", message, transfer(message.getBody()));
    log.info("messageId {}", sendReceipt.getMessageId());
}
  1. 监听消息,将消息体打印出来
@Test
public void push() throws Exception {
    // 初始化 PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系
    ClientServiceProvider.loadService().newPushConsumerBuilder()
            // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081
            .setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints(endpoint).build())
            // 设置消费者分组
            .setConsumerGroup("yano")
            // 订阅消息的过滤规则, * 表示订阅所有Tag的消息
            .setSubscriptionExpressions(Collections.singletonMap(topic, new FilterExpression("*", FilterExpressionType.TAG)))
            // 设置消费监听器
            .setMessageListener(messageView -> {
                // 处理消息并返回消费结果
                log.info("Consume message={}, body {}", messageView, transfer(messageView.getBody()));
                return ConsumeResult.SUCCESS;
            }).build();
    Thread.sleep(Long.MAX_VALUE);
}

完整代码:

package com.example.rocketmqlearn;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.junit.jupiter.api.Test;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.UUID;

@Slf4j
public class SendMessageTest {

    String endpoint = "localhost:8081";
    String topic = "YanoTestTopic";

    @Test
    public void send() throws Exception {
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfiguration configuration = ClientConfiguration.newBuilder().setEndpoints(endpoint).build();
        Producer producer = provider.newProducerBuilder()
                .setTopics(topic)
                .setClientConfiguration(configuration)
                .build();
        Message message = provider.newMessageBuilder()
                .setTopic(topic)
                .setKeys(UUID.randomUUID().toString())
                .setTag("tag1")
                .setBody(UUID.randomUUID().toString().getBytes())
                .build();

        // 发送消息
        SendReceipt sendReceipt = producer.send(message);

        log.info("send message {}, body {}", message, transfer(message.getBody()));
        log.info("messageId {}", sendReceipt.getMessageId());
    }

    @Test
    public void push() throws Exception {
        // 初始化 PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系
        ClientServiceProvider.loadService().newPushConsumerBuilder()
                // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081
                .setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints(endpoint).build())
                // 设置消费者分组
                .setConsumerGroup("yano")
                // 订阅消息的过滤规则, * 表示订阅所有Tag的消息
                .setSubscriptionExpressions(Collections.singletonMap(topic, new FilterExpression("*", FilterExpressionType.TAG)))
                // 设置消费监听器
                .setMessageListener(messageView -> {
                    // 处理消息并返回消费结果
                    log.info("Consume message={}, body {}", messageView, transfer(messageView.getBody()));
                    return ConsumeResult.SUCCESS;
                }).build();
        Thread.sleep(Long.MAX_VALUE);
    }

    private String transfer(ByteBuffer buffer) {
        return StandardCharsets.UTF_8.decode(buffer).toString();
    }
}