微服务中间件之RocketMQ
RocketMQ是一种分布式消息中间件,常用于处理大量的数据流。以下是一个使用RocketMQ发送和接收消息的简单示例。
首先,确保你已经安装并运行了RocketMQ。
以下是一个使用RocketMQ发送消息的Java代码示例:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class Producer {
public static void main(String[] args) throws Exception {
// 创建一个生产者,并指定一个组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
// 指定Namesrv地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
// 创建一个消息,并指定Topic,Tag和消息体
Message message = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送消息
producer.send(message);
// 关闭生产者
producer.shutdown();
}
}
以下是一个使用RocketMQ接收消息的Java代码示例:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
public class Consumer {
public static void main(String[] args) throws Exception {
// 创建一个消费者,并指定一个组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// 指定Namesrv地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topic和Tag
consumer.subscribe("TopicTest", "*");
// 注册消息监听器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
System.out.println("Received: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 启动消费者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
在这两个示例中,你需要替换localhost:9876
为你的RocketMQ NameServer地址,并且确保Topic名称与生产者和消费者订阅的名称相匹配。这两个类可以独立运行,一个用于发送消息,一个用于接收消息。
评论已关闭