CyclicBarrier简单模拟订单对账流程

某对账系统流程图
代码
java
/**
* 主要针对CyclicBarrier的使用进行模拟
*/
public class CyclicBarrierDemo {
//预先设定好模拟数据量
private static final int TICKET_NUM = 20;
//模拟数据库中的订单和发货单数据
private static final List<Integer> ORDER = Collections.synchronizedList(new ArrayList<>());
private static final List<Integer> SENDER = Collections.synchronizedList(new ArrayList<>());
//假设订单库中每个订单都需要去比较是否相同,模拟需要被比较的数据的队列
private static final List<Integer> DIFFER_ORDER = Collections.synchronizedList(new ArrayList<>());
private static final List<Integer> DIFFER_SENDER = Collections.synchronizedList(new ArrayList<>());
//给订单和发货单计数
private static final AtomicInteger ORDER_COUNT = new AtomicInteger(TICKET_NUM);
private static final AtomicInteger SENDER_COUNT = new AtomicInteger(TICKET_NUM);
//为什么用Pool而不直接传Runnable,若直接传R,是同步执行,必须要一个订单比较完后才能执行下一个比较
//即barrierAction执行完后才能执行下一次barrierAction
//为什么POOL的大小置为一,因为check()方法线程不安全,置为1保证,每次只有一个线程去队列中取数据
private static final ThreadPoolExecutor POOL = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
private final CyclicBarrier notify = new CyclicBarrier(2, () -> {
POOL.execute(this::check);
});
//初始化订单和发货单
static {
for (int i = 0; i < 20; i++) {
ORDER.add(i);
SENDER.add(new Random().nextInt(TICKET_NUM));
}
System.out.println("初始化订单数据: ");
System.out.println(ORDER);
System.out.println("初始化发货单数据: ");
System.out.println(SENDER);
}
public void check() {
int orderNum = DIFFER_ORDER.remove(0);
int senderNum = DIFFER_SENDER.remove(0);
if (orderNum == senderNum) {
System.out.println(orderNum + " == " + senderNum + " 订单和发货单无差异,无需处理...");
} else {
System.out.println(orderNum + " != " + senderNum + ", Order = " + orderNum + " 订单和发货单存在差异,需要调用对账方法...");
}
}
public void checkAll() {
//有未校验订单时,将未校验订单放入比较队列
new Thread(() -> {
while (ORDER_COUNT.get() != 0) {
try {
Thread.sleep((long) (Math.random() * 300));
DIFFER_ORDER.add(ORDER.get(TICKET_NUM - 1 - ORDER_COUNT.decrementAndGet()));
notify.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}).start();
//有未校验发货单时,将未校验订单放入比较队列
new Thread(() -> {
while (SENDER_COUNT.get() > 0) {
try {
Thread.sleep((long) (Math.random() * 300));
DIFFER_SENDER.add(SENDER.get(TICKET_NUM - 1 - SENDER_COUNT.decrementAndGet()));
notify.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}).start();
}
public static void main(String[] args) {
CyclicBarrierDemo demo = new CyclicBarrierDemo();
demo.checkAll();
}
}
Run

运行截图