Rumah >Java >javaTutorial >Analisis kod sumber prinsip pelaksanaan kolam benang Java

Analisis kod sumber prinsip pelaksanaan kolam benang Java

WBOY
WBOYke hadapan
2023-05-09 14:16:071533semak imbas

Asal usul kumpulan benang

Latar belakang: Dengan peningkatan perkakasan komputer, perisian kami mempunyai keupayaan untuk melaksanakan tugas dalam berbilang rangkaian. Apabila kita melakukan pengaturcaraan berbilang utas, kita perlu mencipta utas Jika konkurensi program adalah sangat tinggi, kita akan mencipta sejumlah besar utas, dan setiap utas akan melakukan tugas yang singkat dan kemudiannya akan menyebabkannya sangat mengurangkan prestasi sistem dan meningkatkan overhed pelayan, kerana mencipta dan memusnahkan benang memerlukan penggunaan tambahan.

Pada masa ini kita boleh menggunakan teknologi pengumpulan untuk mengoptimumkan kecacatan ini, dan kumpulan benang telah dilahirkan.

Intipati teknologi pengumpulan adalah untuk mencapai penggunaan semula sumber dan mengurangkan kos penciptaan dan pemusnahan sumber dalam senario konkurensi yang tinggi Jika bilangan mata wang adalah kecil, tiada kelebihan yang jelas (sumber sentiasa menduduki memori sistem dan tidak mempunyai peluang untuk digunakan) ).

Pengenalan kepada teknologi pengumpulan: Bilakah teknologi pengumpulan? Teknologi penyatuan ialah teknik pengaturcaraan yang boleh mengoptimumkan program dengan ketara apabila terdapat keselarasan yang tinggi dalam program dan mengurangkan overhed tambahan seperti penciptaan yang kerap dan pemusnahan sambungan dalam sistem. Teknologi pengumpulan yang sering kami hubungi termasuk kumpulan sambungan pangkalan data, kumpulan benang, kumpulan objek, dsb. Ciri teknologi pengumpulan adalah untuk mengekalkan beberapa sumber kos tinggi dalam kumpulan tertentu (memori), dan menentukan bilangan sambungan minimum, bilangan sambungan maksimum, baris gilir menyekat, peraturan limpahan dan konfigurasi lain untuk memudahkan pengurusan bersatu. Dalam keadaan biasa, ia juga akan datang dengan beberapa fungsi sokongan seperti pemantauan dan kitar semula paksa.

Teknologi pengumpulan ialah teknologi penggunaan sumber yang biasa ialah:

  • Apabila kos untuk mendapatkan sumber adalah tinggi

  • Apabila kekerapan meminta sumber adalah tinggi dan jumlah bilangan sumber yang digunakan adalah rendah

  • Apabila berhadapan dengan masalah prestasi yang melibatkan kelewatan masa pemprosesan

Pengkelasan sumber teknologi pengumpulan:

  • Sumber sistem yang dipanggil oleh sistem, seperti benang, proses, peruntukan memori, dll.

  • Sumber jauh untuk komunikasi rangkaian, seperti sambungan pangkalan data, sambungan soket, dll.

Takrifan dan penggunaan kumpulan benang

Benang Kolam itu dilahirkan untuk mengelakkan overhed tambahan untuk mencipta benang dan memusnahkan benang Oleh itu, selepas kami mentakrifkan dan mencipta kumpulan benang, kami tidak perlu membuat benang sendiri, tetapi menggunakan panggilan kumpulan benang untuk melaksanakan tugas kami. Mari kita lihat cara untuk mentakrif dan mencipta kumpulan benang.

Pilihan 1: Pelaksana (untuk pemahaman sahaja, pilihan 2 disyorkan)

Untuk mencipta kumpulan benang, anda boleh menggunakan

Pelaksana, yang menyediakan satu siri kaedah kilang untuk mencipta utas Kumpulan dan kumpulan utas yang dikembalikan semuanya melaksanakan antara muka ExecutorService. Antara muka

ExecutorService ialah antara muka subkelas antara muka Executor dan digunakan secara lebih meluas Ia menyediakan kaedah pengurusan kitaran hayat kumpulan benang dan mengembalikan Objek Masa Depan.

Maksudnya, kami mencipta kumpulan benang melalui Pelaksana, dapatkan

dan laksanakan tugas tak segerak melalui ExecutorService (laksanakan antara muka Runnable) ExecutorService

Pelaksana boleh mencipta beberapa jenis Kumpulan benang:

  • Cipta kumpulan benang yang boleh disimpan dalam cache Jika bilangan benang kumpulan benang berlebihan, sumber benang yang berlebihan akan dikitar semula selepas 60 saat buku tugasan bertambah , jika benang tidak mencukupi, benang baharu akan dibuat. newCachedThreadPool

  • newFixedThreadPool mencipta kumpulan benang panjang tetap yang boleh mengawal bilangan maksimum urutan serentak yang akan menunggu dalam baris gilir.

  • Buat kumpulan benang panjang tetap untuk menyokong pelaksanaan tugas berjadual dan berkala. newScheduledThreadPool

  • Buat kumpulan benang satu-benang dan hanya gunakan satu-satunya utas untuk melaksanakan tugasan, memastikan tugasan diselesaikan mengikut susunan yang diserahkan. newSingleThreadExecutor

Pilihan 2: ThreadPoolExecutor

Dalam spesifikasi pembangunan Alibaba, adalah ditetapkan bahawa kumpulan benang tidak dibenarkan dibuat melalui Pelaksana, tetapi dicipta melalui ThreadPoolExecutor.

Faedah: Ia membolehkan pelajar yang menulis mengetahui dengan lebih jelas peraturan pengendalian kumpulan benang dan mengelakkan risiko keletihan sumber.

Tujuh parameter ThreadPoolExecutor:

(1)

Bilangan utas teras, utas teras akan sentiasa dikekalkan dan tidak akan dimusnahkan. corePoolSize

(2)

Bilangan maksimum utas Apabila utas teras tidak dapat memenuhi keperluan tugas, sistem akan mencipta utas baharu untuk melaksanakan tugas. maximumPoolSize

(3)

Masa hidup, berapa lama benang selain daripada benang teras melahu akan dimusnahkan. keepAliveTime

(4)

mewakili unit masa untuk kemandirian benang. timeUnit

(5)

Menyekat baris gilirBlockingQueue

  • Jika tugasan yang dilaksanakan melebihi bilangan maksimum utas, ia boleh disimpan dalam baris gilir apabila terdapat sumber percuma dalam kumpulan benang Anda boleh mengalih keluar tugas daripada baris gilir dan meneruskan pelaksanaan.

  • Jenis baris gilir adalah seperti berikut: LinkedBlockingQueue ArrayBlockingQueue SynchronousQueue TransferQueue.

(6)threadFactory Kilang benang digunakan untuk mencipta benang boleh disesuaikan Contohnya, kita boleh menentukan nama kumpulan benang, yang sangat membantu semasa menyelesaikan masalah jstack.

(7)rejectedExecutionHandler Dasar penolakan,

Apabila semua urutan (bilangan maksimum urutan) sibuk dan baris gilir tugasan penuh dengan tugasan, dasar penolakan akan dilaksanakan.

JDK memberi kita empat strategi penolakan, yang kita semua mesti biasa dengan

  • AbortPolicy: buang tugas, Dan buang pengecualian RejectedExecutionException. Lalai

  • DiscardPolicy: Buang tugas terbaharu tanpa membuang pengecualian.

  • DiscardOldestPolicy: Buang tugasan dengan masa giliran paling lama, iaitu tugasan tertua.

  • CallerRuns: Tugas diproses oleh pemanggil (urutan yang menyerahkan tugas tak segerak).

Prinsip pelaksanaan pool thread

Untuk melaksanakan pool thread, kita perlu mengambil berat tentang kelas ThreadPoolExecutor, kerana Executors juga membuat pool thread melalui objek ThreadPoolExecutor baharu .

Melihat perhubungan warisan kelas ThreadPoolExecutor, anda boleh melihat sebab kumpulan benang yang dicipta oleh Executors mengembalikan hasil ExecutorService, kerana ThreadPoolExecutor ialah kelas pelaksanaan antara muka ExecutorService dan intipati penciptaan kumpulan benang oleh Pelaksana adalah untuk mencipta objek ThreadPoolExecutor.

Analisis kod sumber prinsip pelaksanaan kolam benang Java

Mari kita lihat kod sumber ThreadPoolExecutor yang pertama ialah pembolehubah dan pemalar yang ditakrifkan dalam ThreadPoolExecutor:

    // 复合类型变量 是一个原子整数  控制状态(运行状态|线程池活跃线程数量)
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 
    private static final int COUNT_BITS = Integer.SIZE - 3; // 低29位 
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 容量
    // 运行状态存储在高位3位
    private static final int RUNNING    = -1 << COUNT_BITS;  // 接受新任务,并处理队列任务
    private static final int SHUTDOWN   =  0 << COUNT_BITS;  // 不接受新任务,但会处理队列任务
    private static final int STOP       =  1 << COUNT_BITS;  // 不接受新任务,不会处理队列任务,中断正在处理的任务
    private static final int TIDYING    =  2 << COUNT_BITS;  // 所有的任务已结束,活跃线程为0,线程过渡到TIDYING状       态,将会执行terminated()钩子方法
    private static final int TERMINATED =  3 << COUNT_BITS;  // terminated()方法已经完成
    // 设置 ctl 参数方法
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    /**
     * 阻塞队列
     */
    private final BlockingQueue<Runnable> workQueue;
    /**
     * Lock 锁.
     */
    private final ReentrantLock mainLock = new ReentrantLock();
    /**
     * 工人们
     */
    private final HashSet<Worker> workers = new HashSet<Worker>();
    /**
     * 等待条件支持等待终止
     */
    private final Condition termination = mainLock.newCondition();
    /**
     * 最大的池大小.
     */
    private int largestPoolSize;
    /**
     * 完成任务数
     */
    private long completedTaskCount;
    /**
     * 线程工厂
     */
    private volatile ThreadFactory threadFactory;
    /**
     * 拒绝策略
     */
    private volatile RejectedExecutionHandler handler;
    /**
     * 存活时间
     */
    private volatile long keepAliveTime;
    /**
     * 允许核心线程数
     */
    private volatile boolean allowCoreThreadTimeOut;
    /**
     * 核心线程数
     */
    private volatile int corePoolSize;
    /**
     * 最大线程数
     */
    private volatile int maximumPoolSize;
    /**
     * 默认拒绝策略
     */
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();
    /**
     * shutdown and shutdownNow权限
     */
    private static final RuntimePermission shutdownPerm =
        new RuntimePermission("modifyThread");

constructor ,, menyokong empat pembina dengan minimum lima parameter dan maksimum tujuh parameter:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

Pekerja, yang melaksanakan tugas dalam kumpulan benang, kumpulan benang berfungsi melalui pekerja ini, dan di sana ialah pekerja teras (Benang Teras) dan pekerja sementara (dicipta buat sementara waktu apabila tidak cukup orang, dan akan diberhentikan kerja jika mereka melahu),

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;
        // 工人的本质就是个线程
        final Thread thread;
        // 第一件工作任务
        Runnable firstTask;
      volatile long completedTasks;
        /**
         * 构造器
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
        /** 工作  */
        public void run() {
            runWorker(this);
        }
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

kaedah teras, laksanakan tugas melalui kumpulan benang (ini juga merupakan utas Prinsip operasi kumpulan):

  • Semak tugasan

  • Dapatkan status kumpulan utas semasa

  • Nilai sama ada bilangan pekerja yang bekerja adalah kurang daripada bilangan pekerja teras

  • Jika kurang, ambil orang dan aturkan kerja

  • Jika tidak kurang, nilaikan tugasan tempat menunggu Sama ada jadual sudah penuh

  • Jika tidak, tugasan akan diletakkan di tempat menunggu

  • Jika jadual penuh, lihat jika pengambilan dibenarkan, dan pengambilan dibenarkan Kemudian ambil pekerja sementara

  • Jika semuanya gagal, thread pool tidak boleh menerima tugas baharu dan mula melaksanakan dasar penolakan mengikut dasar penolakan yang dipersetujui oleh bos

  •     public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))
                reject(command);
        }
ditakrifkan oleh kelas induk abstraknya melihat dengan jelas perbezaan antara serah dan laksanakan Dengan memanggil serah, kami akan mencipta

dan mengembalikan Masa Depan Di sini kami boleh menukar jenis nilai pulangan , memaklumkan kaedah serah, dan ia akan mengembalikan nilai melalui kekangan generik. submit()

public abstract class AbstractExecutorService implements ExecutorService {
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
    ...
}
RunnableFuture

addWorker() ialah cara untuk merekrut orang:

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // 判断状态,及任务列表
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

Cara mendapatkan tugas:

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            int wc = workerCountOf(c);
            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

Cara membenarkan pekerja bekerja, menetapkan tugas dan menjalankan tugas:

   final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

Atas ialah kandungan terperinci Analisis kod sumber prinsip pelaksanaan kolam benang Java. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Artikel ini dikembalikan pada:yisu.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam