import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.sql.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class RabbitMQWithMySQLAsyncTaskExample {
private static final String RABBITMQ_HOST = "localhost";
private static final String RABBITMQ_QUEUE = "task_queue";
private static final String MYSQL_URL = "jdbc:mysql://localhost:3306/mydb";
private static final String MYSQL_USER = "user";
private static final String MYSQL_PASSWORD = "password";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RABBITMQ_HOST);
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(RABBITMQ_QUEUE, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
BlockingQueue<String> taskQueue = new LinkedBlockingQueue<>();
Runnable runnable = () -> {
while (true) {
String task = taskQueue.take();
executeMySQLTask(task);
}
};
new Thread(runnable).start();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
taskQueue.put(message);
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(RABBITMQ_QUEUE, true, deliverCallback, consumerTag -> { });
}
}
private static void executeMySQLTask(String task) {
try (Connection connection = DriverManager.getConnection(MYSQL_URL, MYSQL_USER, MYSQL_PASSWORD);
Statement statement = connection.createStatement()) {
// 假设task是一个S
评论已关闭