JAVA Concurrent Programming Bounded Cache Implementation
1. Base class of bounded cache
package cn.xf.cp.ch14; /** * *功能:有界缓存实现基类 *时间:下午2:20:00 *文件 *@author Administrator * * @param <V> */ public class BaseBoundedBuffer<V> { private final V[] buf; private int tail; private int head; private int count; public BaseBoundedBuffer(int capacity) { //初始化数组 this.buf = (V[]) new Object[capacity]; } //放入一个数据,final方法无法被重写 protected synchronized final void doPut(V v) { buf[tail] = v; if(++tail == buf.length) { tail = 0; } //插入一个方法,总量++ ++count; } /** * 取出一个数据 * @return */ protected synchronized final V doTake() { V v = buf[head]; buf[head] = null; if(++head == buf.length) { head = 0; } --count; return v; } //通过对count的判断,来确定数组是否是满的 public synchronized final boolean isFull() { return count == buf.length; } public synchronized final boolean isEmpty() { return count == 0; } }
2. Determination of prerequisites Perform operations
package cn.xf.cp.ch14; /** * *功能:对插入和获取元素操作进行先行检查,然后执行操作,校验不通过不予操作 *时间:下午2:33:41 *文件 *@author Administrator * * @param <V> */ public class GrumpyBoundedBuffer<V> extends BaseBoundedBuffer<V> { public GrumpyBoundedBuffer(int size) { super(size); } public synchronized void put(V v) throws Exception { //如果是满的队列,就无法插入新的元素 if(this.isFull()) { throw new Exception("队列超出"); } this.doPut(v); } //同理,队列为空的就无法取出新的元素 public synchronized V take() throws Exception { if(this.isEmpty()) { throw new Exception("队列中无元素"); } return this.doTake(); } }
3. Implement simple blocking through polling and sleep
package cn.xf.cp.ch14; /** * *功能:通过轮询与休眠来实现简单的阻塞 *时间:下午2:55:54 *文件 *@author Administrator * * @param <V> */ public class SleepyBoundedBuffer<V> extends BaseBoundedBuffer<V> { //2s private static final long SLEEP_GRANULARITY = 2000; public SleepyBoundedBuffer(int capacity) { super(capacity); } //放入队列的时候 public void put(V v) throws InterruptedException { while(true) { //这里不对循环上锁,不然这个锁就无法释放了,不对休眠上锁,休眠上锁,在休眠的时候别人也无法操作,永远都不可能有元素出去 synchronized (this) { //如果队列不是满的,那么就放入元素 if(!this.isFull()) { this.doPut(v); return; } } //否则休眠,退出cpu占用 Thread.sleep(SLEEP_GRANULARITY); } } public V take() throws InterruptedException { while(true) { //这里不对循环上锁,不然这个锁就无法释放了,不对休眠上锁,休眠上锁,在休眠的时候别人也无法操作,永远都不可能有新的元素进来 synchronized(this) { //如果数组部位空,那么就可以取出数据 if(!this.isEmpty()) { return this.doTake(); } //如果队列为空,休眠几秒再试 } Thread.sleep(SLEEP_GRANULARITY); } } }
4. Conditional queue
package cn.xf.cp.ch14; /** * *功能:使用条件队列 *时间:下午3:32:04 *文件 *@author Administrator * * @param <V> */ public class BoundedBuffer<V> extends BaseBoundedBuffer<V> { public BoundedBuffer(int capacity) { super(capacity); } /** * 放入数据元素 * @param v * @throws InterruptedException */ public synchronized void put(V v) throws InterruptedException { while(this.isFull()) { //这里挂起程序,会释放锁 this.wait(); } //如果队列不为满的,那么程序被唤醒之后从新获取锁 this.doPut(v); //执行结束,唤醒其他队列 this.notifyAll(); } public synchronized V take() throws InterruptedException { while(this.isEmpty()) { this.wait(); } V v = this.doTake(); this.notifyAll(); return v; } }
Thank you for reading, I hope it can help everyone, thank you for your support of this site!
For more detailed explanations on the implementation of bounded cache in JAVA concurrent programming, please pay attention to the PHP Chinese website!