>Java >java지도 시간 >Java 8 동시성 튜토리얼: 동기화 및 잠금

Java 8 동시성 튜토리얼: 동기화 및 잠금

黄舟
黄舟원래의
2017-02-07 10:17:171532검색

Java8 동시성 튜토리얼의 두 번째 부분에 오신 것을 환영합니다. 이 가이드에서는 간단하고 이해하기 쉬운 코드 예제를 통해 Java 8에서 동시에 프로그래밍하는 방법을 알려줍니다. 이것은 튜토리얼 시리즈 중 두 번째입니다. 다음 15분 동안 동기화 키워드, 잠금 및 세마포어를 통해 공유 변경 가능 변수에 대한 액세스를 동기화하는 방법을 배우게 됩니다.

  • 1부: 스레드 및 실행자

  • 2부: 동기화 및 잠금

  • 3부 : 원자 연산 및 ConcurrentMap

이 게시물에 제시된 핵심 개념은 이전 버전의 Java에도 적용되지만 코드 예제는 Java 8용이며 람다 표현식과 새로운 동시성 기능에 크게 의존합니다. . 아직 람다에 익숙하지 않다면 먼저 Java 8 튜토리얼을 읽어 보시기 바랍니다.

단순화를 위해 이 튜토리얼의 코드 예제에서는 여기에 정의된 두 가지 도우미 함수 sleep(seconds) 및 stop(executor)을 사용합니다.

동기화

이전 장에서는 Executor 서비스를 통해 코드를 동시에 실행하는 방법을 배웠습니다. 이런 종류의 멀티 스레드 코드를 작성할 때 공유 변경 가능 변수에 대한 동시 액세스에 특별한 주의를 기울여야 합니다. 여러 스레드가 동시에 액세스할 수 있는 정수를 증가시키고 싶다고 가정해 보겠습니다.

count 필드를 increment() 메서드로 정의하여 개수를 1씩 늘렸습니다.

int count = 0;

void increment() {
    count = count + 1;
}

여러 스레드가 동시에 이 메서드를 호출하면 큰 문제가 발생합니다.

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 10000)
    .forEach(i -> executor.submit(this::increment));

stop(executor);

System.out.println(count);  // 9965


10000을 카운트한 결과는 보이지 않고, 위 코드의 실제 결과는 실행될 때마다 다릅니다. 그 이유는 서로 다른 스레드에서 변경 가능한 변수를 공유하고 있으며 경쟁 조건을 생성하는 변수 액세스를 위한 동기화 메커니즘이 없기 때문입니다.

값을 늘리려면 (1) 현재 값을 읽고, (2) 값을 1씩 늘리고, (3) 변수에 새 값을 쓰는 세 단계가 필요합니다. 두 개의 스레드가 동시에 실행되는 경우 두 개의 스레드가 동시에 1단계를 실행하는 것이 가능하므로 동일한 현재 값을 읽습니다. 이로 인해 잘못된 쓰기가 발생하므로 실제 결과는 더 작아집니다. 위의 예에서 count에 대한 비동기 동시 액세스는 35개의 증분 작업을 손실하지만 코드를 직접 실행하면 다른 결과를 볼 수 있습니다.

다행히 Java는 오래전부터 동기화 키워드를 통해 스레드 동기화를 지원해 왔습니다. 카운트를 늘릴 때 위의 경합 상태를 수정하기 위해 동기화를 사용할 수 있습니다.

synchronized void incrementSync() {
    count = count + 1;
}

incrementSync()를 동시에 호출하면 10000이라는 예상 결과를 얻습니다. 더 이상 경쟁 조건이 발생하지 않으며 모든 코드 실행에서 결과가 안정적입니다.

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 10000)
    .forEach(i -> executor.submit(this::incrementSync));

stop(executor);

System.out.println(count);  // 10000

동기화된 키워드는 명령문 블록에서도 사용할 수 있습니다.

void incrementSync() {
    synchronized (this) {
        count = count + 1;
    }
}

Java는 내부적으로 소위 "시계"를 사용합니다. "모니터"(모니터)는 동기화를 관리하기 위해 모니터 잠금(monitor lock) 또는 내장 잠금(intrinsic lock)이라고도 합니다. 모니터는 개체에 바인딩됩니다. 예를 들어 동기화된 메서드를 사용할 때 각 메서드는 해당 개체의 동일한 모니터를 공유합니다.

모든 암시적 모니터는 재진입 기능을 구현합니다. 재진입은 잠금이 현재 스레드에 바인딩됨을 의미합니다. 스레드는 교착 상태 없이 동일한 잠금을 여러 번 안전하게 획득할 수 있습니다(예: 동일한 개체의 다른 동기화된 메서드를 호출하는 동기화된 메서드).

잠금

동시성 API는 Lock 인터페이스에 의해 지정되고 동기화된 암시적 잠금을 대체하는 데 사용되는 다양한 명시적 잠금을 지원합니다. 잠금은 세분화된 제어를 위한 여러 방법을 지원하므로 암시적 모니터보다 오버헤드가 더 큽니다.

표준 JDK에서는 다양한 잠금 구현이 제공되며 다음 장에서 이에 대해 설명합니다.

ReentrantLock

ReentrantLock 클래스는 동기화를 통해 액세스되는 암시적 모니터와 동일한 동작을 갖지만 기능이 확장된 뮤텍스 잠금입니다. 이름에서 알 수 있듯이 이 잠금은 암시적 모니터와 마찬가지로 재진입 속성을 구현합니다.

ReentrantLock을 사용한 후 위의 예를 살펴보겠습니다.

ReentrantLock lock = new ReentrantLock();
int count = 0;

void increment() {
    lock.lock();
    try {
        count++;
    } finally {
        lock.unlock();
    }
}

锁可以通过lock()来获取,通过unlock()来释放。把你的代码包装在try-finally代码块中来确保异常情况下的解锁非常重要。这个方法是线程安全的,就像同步副本那样。如果另一个线程已经拿到锁了,再次调用lock()会阻塞当前线程,直到锁被释放。在任意给定的时间内,只有一个线程可以拿到锁。

锁对细粒度的控制支持多种方法,就像下面的例子那样:

executor.submit(() -> {
    lock.lock();
    try {
        sleep(1);
    } finally {
        lock.unlock();
    }
});

executor.submit(() -> {
    System.out.println("Locked: " + lock.isLocked());
    System.out.println("Held by me: " + lock.isHeldByCurrentThread());
    boolean locked = lock.tryLock();
    System.out.println("Lock acquired: " + locked);
});

stop(executor);

在第一个任务拿到锁的一秒之后,第二个任务获得了锁的当前状态的不同信息。

Locked: true
Held by me: false
Lock acquired: false

tryLock()方法是lock()方法的替代,它尝试拿锁而不阻塞当前线程。在访问任何共享可变变量之前,必须使用布尔值结果来检查锁是否已经被获取。

ReadWriteLock

ReadWriteLock接口规定了锁的另一种类型,包含用于读写访问的一对锁。读写锁的理念是,只要没有任何线程写入变量,并发读取可变变量通常是安全的。所以读锁可以同时被多个线程持有,只要没有线程持有写锁。这样可以提升性能和吞吐量,因为读取比写入更加频繁。

ExecutorService executor = Executors.newFixedThreadPool(2);
Map<String, String> map = new HashMap<>();
ReadWriteLock lock = new ReentrantReadWriteLock();

executor.submit(() -> {
    lock.writeLock().lock();
    try {
        sleep(1);
        map.put("foo", "bar");
    } finally {
        lock.writeLock().unlock();
    }
});

上面的例子在暂停一秒之后,首先获取写锁来向映射添加新的值。在这个任务完成之前,两个其它的任务被启动,尝试读取映射中的元素,并暂停一秒:

Runnable readTask = () -> {
    lock.readLock().lock();
    try {
        System.out.println(map.get("foo"));
        sleep(1);
    } finally {
        lock.readLock().unlock();
    }
};

executor.submit(readTask);
executor.submit(readTask);

stop(executor);

当你执行这一代码示例时,你会注意到两个读任务需要等待写任务完成。在释放了写锁之后,两个读任务会同时执行,并同时打印结果。它们不需要相互等待完成,因为读锁可以安全同步获取,只要没有其它线程获取了写锁。

StampedLock

Java 8 自带了一种新的锁,叫做StampedLock,它同样支持读写锁,就像上面的例子那样。与ReadWriteLock不同的是,StampedLock的锁方法会返回表示为long的标记。你可以使用这些标记来释放锁,或者检查锁是否有效。此外,StampedLock支持另一种叫做乐观锁(optimistic locking)的模式。

让我们使用StampedLock代替ReadWriteLock重写上面的例子:

ExecutorService executor = Executors.newFixedThreadPool(2);
Map<String, String> map = new HashMap<>();
StampedLock lock = new StampedLock();

executor.submit(() -> {
    long stamp = lock.writeLock();
    try {
        sleep(1);
        map.put("foo", "bar");
    } finally {
        lock.unlockWrite(stamp);
    }
});

Runnable readTask = () -> {
    long stamp = lock.readLock();
    try {
        System.out.println(map.get("foo"));
        sleep(1);
    } finally {
        lock.unlockRead(stamp);
    }
};

executor.submit(readTask);
executor.submit(readTask);

stop(executor);

通过readLock() 或 writeLock()来获取读锁或写锁会返回一个标记,它可以在稍后用于在finally块中解锁。要记住StampedLock并没有实现重入特性。每次调用加锁都会返回一个新的标记,并且在没有可用的锁时阻塞,即使相同线程已经拿锁了。所以你需要额外注意不要出现死锁。

就像前面的ReadWriteLock例子那样,两个读任务都需要等待写锁释放。之后两个读任务同时向控制台打印信息,因为多个读操作不会相互阻塞,只要没有线程拿到写锁。

下面的例子展示了乐观锁:

ExecutorService executor = Executors.newFixedThreadPool(2);
StampedLock lock = new StampedLock();

executor.submit(() -> {
    long stamp = lock.tryOptimisticRead();
    try {
        System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
        sleep(1);
        System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
        sleep(2);
        System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
    } finally {
        lock.unlock(stamp);
    }
});

executor.submit(() -> {
    long stamp = lock.writeLock();
    try {
        System.out.println("Write Lock acquired");
        sleep(2);
    } finally {
        lock.unlock(stamp);
        System.out.println("Write done");
    }
});

stop(executor);

乐观的读锁通过调用tryOptimisticRead()获取,它总是返回一个标记而不阻塞当前线程,无论锁是否真正可用。如果已经有写锁被拿到,返回的标记等于0。你需要总是通过lock.validate(stamp)检查标记是否有效。

执行上面的代码会产生以下输出:

Optimistic Lock Valid: true
Write Lock acquired
Optimistic Lock Valid: false
Write done
Optimistic Lock Valid: false

乐观锁在刚刚拿到锁之后是有效的。和普通的读锁不同的是,乐观锁不阻止其他线程同时获取写锁。在第一个线程暂停一秒之后,第二个线程拿到写锁而无需等待乐观的读锁被释放。此时,乐观的读锁就不再有效了。甚至当写锁释放时,乐观的读锁还处于无效状态。

所以在使用乐观锁时,你需要每次在访问任何共享可变变量之后都要检查锁,来确保读锁仍然有效。

有时,将读锁转换为写锁而不用再次解锁和加锁十分实用。StampedLock为这种目的提供了tryConvertToWriteLock()方法,就像下面那样:

ExecutorService executor = Executors.newFixedThreadPool(2);
StampedLock lock = new StampedLock();

executor.submit(() -> {
    long stamp = lock.readLock();
    try {
        if (count == 0) {
            stamp = lock.tryConvertToWriteLock(stamp);
            if (stamp == 0L) {
                System.out.println("Could not convert to write lock");
                stamp = lock.writeLock();
            }
            count = 23;
        }
        System.out.println(count);
    } finally {
        lock.unlock(stamp);
    }
});

stop(executor);

第一个任务获取读锁,并向控制台打印count字段的当前值。但是如果当前值是零,我们希望将其赋值为23。我们首先需要将读锁转换为写锁,来避免打破其它线程潜在的并发访问。tryConvertToWriteLock()的调用不会阻塞,但是可能会返回为零的标记,表示当前没有可用的写锁。这种情况下,我们调用writeLock()来阻塞当前线程,直到有可用的写锁。

信号量

除了锁之外,并发 API 也支持计数的信号量。不过锁通常用于变量或资源的互斥访问,信号量可以维护整体的准入许可。这在一些不同场景下,例如你需要限制你程序某个部分的并发访问总数时非常实用。

下面是一个例子,演示了如何限制对通过sleep(5)模拟的长时间运行任务的访问:

ExecutorService executor = Executors.newFixedThreadPool(10);

Semaphore semaphore = new Semaphore(5);

Runnable longRunningTask = () -> {
    boolean permit = false;
    try {
        permit = semaphore.tryAcquire(1, TimeUnit.SECONDS);
        if (permit) {
            System.out.println("Semaphore acquired");
            sleep(5);
        } else {
            System.out.println("Could not acquire semaphore");
        }
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    } finally {
        if (permit) {
            semaphore.release();
        }
    }
}

IntStream.range(0, 10)
    .forEach(i -> executor.submit(longRunningTask));

stop(executor);

执行器可能同时运行 10 个任务,但是我们使用了大小为5的信号量,所以将并发访问限制为5。使用try-finally代码块在异常情况中合理释放信号量十分重要。

执行上述代码产生如下结果:

Semaphore acquired
Semaphore acquired
Semaphore acquired
Semaphore acquired
Semaphore acquired
Could not acquire semaphore
Could not acquire semaphore
Could not acquire semaphore
Could not acquire semaphore
Could not acquire semaphore

信号量限制对通过sleep(5)模拟的长时间运行任务的访问,最大5个线程。每个随后的tryAcquire()调用在经过最大为一秒的等待超时之后,会向控制台打印不能获取信号量的结果。

这就是我的系列并发教程的第二部分。以后会放出更多的部分,所以敬请等待吧。像以前一样,你可以在Github上找到这篇文档的所有示例代码,所以请随意fork这个仓库,并自己尝试它。

我希望你能喜欢这篇文章。如果你还有任何问题,在下面的评论中向我反馈。你也可以在 Twitter 上关注我来获取更多开发相关的信息。

以上就是Java 8 并发教程:同步和锁的内容,更多相关内容请关注PHP中文网(m.sbmmt.com)!


성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.