【Alibaba中间件技术系列】「RocketMQ技术专题」RocketMQ消息发送的全部流程和落盘原理分析
RocketMQ消息发送的全流程涉及客户端的发送请求、网络通信、服务端的处理和响应。以下是发送流程的简化描述和代码实例:
客户端发送请求:
客户端使用
DefaultMQProducer
发送消息,调用send
方法。
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.start();
Message msg = new Message("topic", "tag", "message body".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
序列化请求:
客户端将请求消息序列化成字节流,准备发送。
网络通信:
客户端使用
Netty
客户端发送请求到Broker。
public void sendMessage(final String addr, final CommandCustomHeader customHeader, final byte[] body,
final SendCallback sendCallback, final long timeoutMillis) throws InterruptedException, RemotingException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, customHeader);
request.setBody(body);
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
...
}
服务端处理请求:
Broker接收到请求后,根据请求类型处理消息发送。
服务端响应:
Broker处理完毕后,将结果响应给客户端。
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
SendMessageContext mqtraceContext;
// ... 处理请求
SendResult sendResult = this.brokerController.getBroker2Client().sendMessage(msg.getHeader().getQueueId(), msg, timeoutMillis);
// ... 构建响应命令
return null;
}
客户端处理响应:
客户端接收到响应后,解析响应数据,并通知发送结果给发送者。
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand response) {
// ... 解析响应
sendResult = SendStatus.valueOf(response.getCode());
// ... 回调通知
}
以上流程是消息发送的大致过程,省略了一些细节,如消息追踪、高可用处理、网络异常处理等。实际的RocketMQ源码会更复杂,涉及更多细节。
评论已关闭