In general development, I often see that many students only use some basic methods in dealing with the Java concurrent development model. For example, Volatile, synchronized. Advanced concurrency packages like Lock and atomic are not often used by many people. I think most of the reasons are caused by the lack of attributes of the principle. In busy development work, who can accurately grasp and use the correct concurrency model?
So based on this idea recently, I plan to organize this part of the concurrency control mechanism into an article. It is not only a memory of the knowledge I have mastered, but also I hope that the content mentioned in this article can help most developers.
Parallel program development inevitably involves issues such as multi-threading, multi-task collaboration and data sharing. In the JDK, multiple ways are provided to achieve concurrency control between multiple threads. For example, commonly used ones: internal locks, reentrant locks, read-write locks and semaphores.
In Java, each thread has a working memory area, which stores the values of variables in the main memory shared by all threads. copy. When a thread executes, it operates on these variables in its own working memory.
In order to access a shared variable, a thread usually first acquires the lock and clears its working memory area. This ensures that the shared variable is correctly loaded from the shared memory area of all threads into the thread's working memory area. , when the thread is unlocked, the values of the variables in the working memory area are guaranteed to be associated to the shared memory.
When a thread uses a variable, regardless of whether the program uses thread synchronization operations correctly, the value it obtains must be the value stored in the variable by itself or other threads. For example, if two threads store different values or object references into the same shared variable, then the value of the variable will belong to either this thread or that thread, and the value of the shared variable will not belong to both threads. A combination of reference values.
A variable is an address that a Java program can access. It includes not only basic type variables, reference type variables, but also array type variables. Variables stored in the main memory area can be shared by all threads, but it is impossible for one thread to access the parameters or local variables of another thread, so developers do not have to worry about the thread safety of local variables.
Since each thread has its own working memory area, when a thread changes the data in its own working memory , may not be visible to other threads. To this end, you can use the volatile keyword to allow all threads to read and write variables in memory, thereby making volatile variables visible across multiple threads.
Variables declared as volatile can achieve the following guarantees:
1、其他线程对变量的修改,可以及时反应在当前线程中; 2、确保当前线程对volatile变量的修改,能及时写回到共享内存中,并被其他线程所见; 3、使用volatile声明的变量,编译器会保证其有序性。
Synchronization The keyword synchronized is one of the most commonly used synchronization methods in the Java language. In early versions of the JDK, the performance of synchronized was not very good, and it was suitable for situations where lock competition was not particularly intense. In JDK6, the gap between synchronized and unfair locks has been narrowed. More importantly, synchronized is more concise and clear, and the code is more readable and maintainable.
Method to lock an object:
public synchronized void method(){}
When the method() method is called, the calling thread must first obtain the current object. If the current object is locked Held by other threads, the calling thread will wait. After the violation is completed, the object lock will be released. The above method is equivalent to the following writing:
public void method(){ synchronized(this){ // do something … } }
Secondly, using synchronized can also be used Construct a synchronization block. Compared with the synchronization method, the synchronization block can control the scope of the synchronization code more precisely. A small synchronization code is very fast in and out of the lock, allowing the system to have higher throughput.
public void method(Object o){ // before synchronized(o){ // do something ... } // after }
synchronized can also be used for static functions:
public synchronized static void method(){}
Be sure to pay attention here, the synchronized lock is added to On the current Class object, therefore, all calls to this method must obtain the lock of the Class object.
Although synchronized can ensure the thread safety of objects or code segments, using synchronized alone is not enough to control thread interactions with complex logic. In order to achieve interaction between multiple threads, you also need to use the wait() and notify() methods of the Object object.
Typical usage:
synchronized(obj){ while(<?>){ obj.wait(); // 收到通知后,继续执行。 } }
Before using the wait() method, you need to obtain the object lock. When the wait() method is executed, the current thread may release the exclusive lock of obj for use by other threads.
When the thread waiting on obj receives obj.notify(), it can regain the exclusive lock of obj and continue running. Note that the notify() method randomly awakensa thread waiting for the current object.
The following is an implementation of a blocking queue:
public class BlockQueue{ private List list = new ArrayList(); public synchronized Object pop() throws InterruptedException{ while (list.size()==0){ this.wait(); } if (list.size()>0){ return list.remove(0); } else{ return null; } } public synchronized Object put(Object obj){ list.add(obj); this.notify(); } }
synchronized with wait(), notify() should be mastered by Java developers basic skills.
try { if (lock.tryLock(5, TimeUnit.SECONDS)) { //如果已经被lock,尝试等待5s,看是否可以获得锁,如果5s后仍然无法获得锁则返回false继续执行 // lock.lockInterruptibly();可以响应中断事件 try { //操作 } finally { lock.unlock(); } } } catch (InterruptedException e) { e.printStackTrace(); //当前线程被中断时(interrupt),会抛InterruptedException }
private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); private Lock readLock = readWriteLock.readLock(); private Lock writeLock = readWriteLock.writeLock(); public Object handleRead() throws InterruptedException { try { readLock.lock(); Thread.sleep(1000); return value; }finally{ readLock.unlock(); } } public Object handleRead() throws InterruptedException { try { writeLock.lock(); Thread.sleep(1000); return value; }finally{ writeLock.unlock(); } }
public class ArrayBlockingQueue extends AbstractQueue implements BlockingQueue, { /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); // 生成与Lock绑定的Condition notFull = lock.newCondition(); } public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); insert(e); } finally { lock.unlock(); } } private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; notEmpty.signal(); // 通知 } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) // 如果队列为空 notEmpty.await(); // 则消费者队列要等待一个非空的信号 return extract(); } finally { lock.unlock(); } } private E extract() { final Object[] items = this.items; E x = this.<E>cast(items[takeIndex]); items[takeIndex] = null; takeIndex = inc(takeIndex); --count; notFull.signal(); // 通知put() 线程队列已有空闲空间 return x; } // other code }
public Semaphore(int permits) {}
public Semaphore(int permits, boolean fair){} // 可以指定是否公平
public void acquire() throws InterruptedException {} //尝试获得一个准入的许可。若无法获得,则线程会等待,知道有线程释放一个许可或者当前线程被中断。 public void acquireUninterruptibly(){} // 类似于acquire(),但是不会响应中断。 public boolean tryAcquire(){} // 尝试获取,如果成功则为true,否则false。这个方法不会等待,立即返回。 public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {} // 尝试等待多长时间 public void release() //用于在现场访问资源结束后,释放一个许可,以使其他等待许可的线程可以进行资源访问。
public class Pool { private static final int MAX_AVAILABLE = 100; private final Semaphore available = new Semaphore(MAX_AVAILABLE, true); public Object getItem() throws InterruptedException { available.acquire(); // 申请一个许可 // 同时只能有100个线程进入取得可用项, // 超过100个则需要等待 return getNextAvailableItem(); } public void putItem(Object x) { // 将给定项放回池内,标记为未被使用 if (markAsUnused(x)) { available.release(); // 新增了一个可用项,释放一个许可,请求资源的线程被激活一个 } } // 仅作示例参考,非真实数据 protected Object[] items = new Object[MAX_AVAILABLE]; // 用于对象池复用对象 protected boolean[] used = new boolean[MAX_AVAILABLE]; // 标记作用 protected synchronized Object getNextAvailableItem() { for (int i = 0; i < MAX_AVAILABLE; ++i) { if (!used[i]) { used[i] = true; return items[i]; } } return null; } protected synchronized boolean markAsUnused(Object item) { for (int i = 0; i < MAX_AVAILABLE; ++i) { if (item == items[i]) { if (used[i]) { used[i] = false; return true; } else { return false; } } } return false; } }
public class TestNum { // 通过匿名内部类覆盖ThreadLocal的initialValue()方法,指定初始值 private static ThreadLocal seqNum = new ThreadLocal() { public Integer initialValue() { return 0; } }; // 获取下一个序列值 public int getNextNum() { seqNum.set(seqNum.get() + 1); return seqNum.get(); }public static void main(String[] args) { TestNum sn = new TestNum(); //3个线程共享sn,各自产生序列号 TestClient t1 = new TestClient(sn); TestClient t2 = new TestClient(sn); TestClient t3 = new TestClient(sn); t1.start(); t2.start(); t3.start(); } private static class TestClient extends Thread { private TestNum sn; public TestClient(TestNum sn) { = sn; } public void run() { for (int i = 0; i < 3; i++) { // 每个线程打出3个序列值 System.out.println("thread[" + Thread.currentThread().getName() + "] --> sn[" + sn.getNextNum() + "]"); } } } }
thread[Thread-0] –> sn[1] thread[Thread-1] –> sn[1] thread[Thread-2] –> sn[1] thread[Thread-1] –> sn[2] thread[Thread-0] –> sn[2] thread[Thread-1] –> sn[3] thread[Thread-2] –> sn[2] thread[Thread-0] –> sn[3] thread[Thread-2] –> sn[3]
public synchronized void syncMehod(){ beforeMethod(); mutexMethod(); afterMethod(); }
public void syncMehod(){ beforeMethod(); synchronized(this){ mutexMethod(); } afterMethod(); }
public class LinkedBlockingQueue extends AbstractQueue implements BlockingQueue, { /* Lock held by take, poll, etc / private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition(); public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); // 不能有两个线程同时读取数据 try { while (count.get() == 0) { // 如果当前没有可用数据,一直等待put()的通知 notEmpty.await(); } x = dequeue(); // 从头部移除一项 c = count.getAndDecrement(); // size减1 if (c > 1) notEmpty.signal(); // 通知其他take()操作 } finally { takeLock.unlock(); // 释放锁 } if (c == capacity) signalNotFull(); // 通知put()操作,已有空余空间 return x; } public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node<E> node = new Node(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); // 不能有两个线程同时put数据 try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ while (count.get() == capacity) { // 队列满了 则等待 notFull.await(); } enqueue(node); // 加入队列 c = count.getAndIncrement();// size加1 if (c + 1 < capacity) notFull.signal(); // 如果有足够空间,通知其他线程 } finally { putLock.unlock();// 释放锁 } if (c == 0) signalNotEmpty();// 插入成功后,通知take()操作读取数据 } // other code }
public void syncMehod(){ synchronized(lock){ method1(); } synchronized(lock){ method2(); } } JVM整合后的形式: public void syncMehod(){ synchronized(lock){ method1(); method2(); } }
非阻塞同步方式其实在前面的ThreadLocal中已经有所体现,每个线程拥有各自独立的变量副本,因此在并行计算时,无需相互等待。这里笔者主要推荐一种更为重要的、基于比较并交换(Compare And Swap)CAS算法的无锁并发控制方法。
public class TestAtomic { private static final int MAX_THREADS = 3; private static final int TASK_COUNT = 3; private static final int TARGET_COUNT = 100 * 10000; private AtomicInteger acount = new AtomicInteger(0); private int count = 0; synchronized int inc() { return ++count; } synchronized int getCount() { return count; } public class SyncThread implements Runnable { String name; long startTime; TestAtomic out; public SyncThread(TestAtomic o, long startTime) { this.out = o; this.startTime = startTime; } @Override public void run() { int v =; while (v < TARGET_COUNT) { v =; } long endTime = System.currentTimeMillis(); System.out.println("SyncThread spend:" + (endTime - startTime) + "ms" + ", v=" + v); } } public class AtomicThread implements Runnable { String name; long startTime; public AtomicThread(long startTime) { this.startTime = startTime; } @Override public void run() { int v = acount.incrementAndGet(); while (v < TARGET_COUNT) { v = acount.incrementAndGet(); } long endTime = System.currentTimeMillis(); System.out.println("AtomicThread spend:" + (endTime - startTime) + "ms" + ", v=" + v); } } @Test public void testSync() throws InterruptedException { ExecutorService exe = Executors.newFixedThreadPool(MAX_THREADS); long startTime = System.currentTimeMillis(); SyncThread sync = new SyncThread(this, startTime); for (int i = 0; i < TASK_COUNT; i++) { exe.submit(sync); } Thread.sleep(10000); } @Test public void testAtomic() throws InterruptedException { ExecutorService exe = Executors.newFixedThreadPool(MAX_THREADS); long startTime = System.currentTimeMillis(); AtomicThread atomic = new AtomicThread(startTime); for (int i = 0; i < TASK_COUNT; i++) { exe.submit(atomic); } Thread.sleep(10000); } }
testSync(): SyncThread spend:201ms, v=1000002 SyncThread spend:201ms, v=1000000 SyncThread spend:201ms, v=1000001 testAtomic(): AtomicThread spend:43ms, v=1000000 AtomicThread spend:44ms, v=1000001 AtomicThread spend:46ms, v=1000002
以上就是Java并发控制机制详解 的内容,更多相关内容请关注PHP中文网(!