Java实战:Spring Boot 利用 ThreadPoolTaskExecutor 批量插入百万级数据
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Service
public class BulkInsertService {
@Resource
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
public void bulkInsert(List<DataObject> dataList) throws InterruptedException {
int batchSize = 1000; // 假设每个批次大小为1000
int batchCount = (int) Math.ceil((double) dataList.size() / batchSize);
CountDownLatch latch = new CountDownLatch(batchCount);
ExecutorService executorService = Executors.newFixedThreadPool(batchCount);
try {
for (int i = 0; i < batchCount; i++) {
int start = i * batchSize;
int end = Math.min(start + batchSize, dataList.size());
List<DataObject> batchData = dataList.subList(start, end);
executorService.submit(new InsertTask(batchData, latch, threadPoolTaskExecutor));
}
latch.await(); // 等待所有批次插入完成
} finally {
executorService.shutdown();
}
}
private static class InsertTask implements Runnable {
private List<DataObject> data;
private CountDownLatch latch;
private ThreadPoolTaskExecutor executor;
public InsertTask(List<DataObject> data, CountDownLatch latch, ThreadPoolTaskExecutor executor) {
this.data = data;
this.latch = latch;
this.executor = executor;
}
@Override
public void run() {
try {
// 假设的数据库批量插入方法
dbBatchInsert(data);
} catch (Exception e) {
// 异常处理逻辑
} finally {
latch.countDown(); // 完成一个批次
}
}
}
// 假设的数据库批量插入方法
private void dbBatchInsert(List<DataObject> data) {
// 实现数据库批量插入逻辑
}
}
这个代码示例展示了如何使用ThreadPoolTaskExecutor
来实现数据的批量插入。它首先将数据列表分割成多个批次,然后使用CountDownLatch
来确保当所有批次完成后主线程能够继续执行。每个批次作为一个任务被提交到线程池中执行,实现了并行处理。这种方法在处理大量数据插入时能够提升性能。
评论已关闭