消息队列应用场景异步处理_java培训
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
// 定义一个任务类,实现Runnable接口
class MessageTask implements Runnable {
private BlockingQueue<String> queue;
public MessageTask(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
String message = queue.take(); // 阻塞获取队列中的消息
// 处理消息的业务逻辑
processMessage(message);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 重新中断线程
}
}
private void processMessage(String message) {
// 实现具体的消息处理逻辑
System.out.println("处理消息: " + message);
}
}
public class MessageQueueExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(5); // 创建线程池
BlockingQueue<String> queue = new LinkedBlockingQueue<>(); // 创建消息队列
// 启动多个消费者线程处理消息
for (int i = 0; i < 5; i++) {
executor.execute(new MessageTask(queue));
}
// 生产者线程模拟发送消息
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000); // 模拟发送消息之间的延时
queue.put("消息 " + i); // 发送消息到队列
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}).start();
// 关闭线程池
executor.shutdown();
}
}
这段代码定义了一个简单的消息队列应用场景,其中包含了生产者、消费者模型和使用了BlockingQueue
来实现异步消息处理。生产者线程每隔一秒发送一条消息到队列中,消费者线程不断从队列中获取消息并处理。这个例子展示了如何使用Java多线程和队列来实现异步处理,是一个很好的教学材料。
评论已关闭