RocketMQ发送各种消息(同步,异步,单项,延迟,批量,顺序,事务,带标签,带key)
以下是使用RocketMQ发送不同类型消息的示例代码。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class Producer {
public static void main(String[] args) throws Exception {
// 1. 创建生产者
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
// 2. 指定Namesrv地址
producer.setNamesrvAddr("localhost:9876");
// 3. 启动生产者
producer.start();
try {
// 4. 发送同步消息
Message msg = new Message("TopicTest", "TagA", "OrderID001", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
// 5. 发送异步消息
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System..out.printf("%s%n", sendResult);
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
}
});
// 6. 发送单向消息
producer.sendOneway(msg);
// 7. 发送延时消息
Message delayMsg = new Message("TopicTest", "TagA", "OrderID002", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
delayMsg.setDelayTimeLevel(3); // 设置延时级别
producer.send(delayMsg);
// 8. 发送批量消息
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Message batchMsg = new Message("TopicTest", "TagA", "OrderID00" + i, ("Hello world " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
messages.add(batchMsg);
}
producer.send(messages);
// 9. 发送有序消息
Message orderlyMsg = new Message("TopicTest", "TagA", "OrderID002", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
orderlyMsg.setFlag(Message.FLAG_ORDERLY);
producer.send(orderlyMsg);
// 10. 发送带Tag的消息
Message tagMsg = new Message("TopicTest", "TagB", "OrderID003", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(tagMsg);
// 11. 发送带Key的消息
Message keyMsg = new Message("
评论已关闭