欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

基于生产者消费者的BatchProcessor

时间:2023-07-01

一 背景

主要用于批量处理:

二 基本架构 

三 代码

package com.xuyu.batch;import lombok.extern.slf4j.Slf4j;import java.util.ArrayList;import java.util.List;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;@Slf4jpublic class BatchProcessor { private int bulkNum; private int flushInterval; private BlockingQueue itemQueue; private volatile boolean closed = false; private DoInsert doInsert; private static final int DEFAULT_BULK_NUM = 1000; private static final int DEFAULT_CAPACITY = 1024; private static final int DEFAULT_FLUSH_INTERVAL = 3; public BatchProcessor(DoInsert doInsert) { this(doInsert, DEFAULT_BULK_NUM, DEFAULT_FLUSH_INTERVAL, DEFAULT_CAPACITY); } public BatchProcessor(DoInsert doInsert, int bulkNum) { this(doInsert, bulkNum, DEFAULT_FLUSH_INTERVAL, DEFAULT_CAPACITY); } public BatchProcessor(DoInsert doInsert, int bulkNum, int flushInterval) { this(doInsert, bulkNum, flushInterval, DEFAULT_CAPACITY); } public BatchProcessor(DoInsert doInsert, int bulkNum, int flushInterval, int capacity) { if (bulkNum < 1) { bulkNum = DEFAULT_BULK_NUM; } if (capacity < 1 || capacity < bulkNum) { capacity = Math.max(DEFAULT_CAPACITY, bulkNum * 2); } if (flushInterval < 1) { flushInterval = DEFAULT_FLUSH_INTERVAL; } this.bulkNum = bulkNum; this.doInsert = doInsert; this.flushInterval = flushInterval; itemQueue = new ArrayBlockingQueue<>(capacity); //开始flash任务 this.startFlushTask(); } public boolean addItem(T item) { if (closed) { return false; } try { itemQueue.put(item); return true; } catch (InterruptedException e) { log.error("添加到队列时中断!item={}", e); } return false; } public void flushAllItem() { while (!itemQueue.isEmpty()) { List list = new ArrayList<>(bulkNum); itemQueue.drainTo(list, bulkNum); if (!list.isEmpty()) { flushToDB(list); } } log.info("flushAllItem success!"); } public void close() { this.closed = true; flushAllItem(); log.info("DbBatchInsertProcessor 成功关闭"); } private void startFlushTask() { Thread t = new Thread(() -> { int waitSecond = 0; while (true) { if (closed) { break; } if (itemQueue.size() >= bulkNum || waitSecond >= flushInterval) { //队列数量大于批量提交数或等待超过指定的时间时,进行提交 if (!itemQueue.isEmpty()) { List list = new ArrayList<>(bulkNum); itemQueue.drainTo(list, bulkNum); if (!list.isEmpty()) { flushToDB(list); } } waitSecond = 0; } else { //还没到批量提交点,进行等待 try { Thread.sleep(1000); waitSecond++; } catch (InterruptedException e) { log.error("startFlushTask 异常中断!"); } } } }); t.setName("BatchProcessor thread" + this.hashCode()); t.start(); } private void flushToDB(List list) { try { int insertRow = doInsert.batchInsert(list); log.info("{}表插入{}条记录", doInsert.tableName(), insertRow); } catch (Throwable e) { log.error("{}表批量插入时发生异常,list={}", doInsert.tableName(), e); } } interface DoInsert { int batchInsert(List list); String tableName(); }}

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。