• 技术文章 >Java >java教程

    Java中使用ThreadPoolExecutor并行执行独立的单线程任务的详细介绍

    黄舟黄舟2017-03-23 11:06:44原创2017

    Java SE 5.0中引入了任务执行框架,这是简化多线程程序设计开发的一大进步。使用这个框架可以方便地管理任务:管理任务的生命周期以及执行策略。

    在这篇文章中,我们通过一个简单的例子来展现这个框架所带来的灵活与简单。

    基础

    执行框架引入了Executor接口来管理任务的执行。Executor是一个用来提交Runnable任务的接口。这个接口将任务提交与任务执行隔离起来:拥有不同执行策略的executor都实现了同一个提交接口。改变执行策略不会影响任务的提交逻辑。

    如果你要提交一个Runnable对象来执行,很简单:

    Executor exec = …;
    exec.execute(runnable);

    线程池

    如前所述,executor如何去执行提交的runnable任务并没有在Executor接口中规定,这取决于你所用的executor的具体类型。这个框架提供了几种不同的executor,执行策略针对不同的场景而不同。

    你可能会用到的最常见的executor类型就是线程池executor,也就是ThreadPoolExecutor类(及其子类)的实例。ThreadPoolExecutor管理着一个线程池和一个工作队列,线程池存放着用于执行任务的工作线程。

    你肯定在其他技术中也了解过“池”的概念。使用“池”的一个最大的好处就是减少资源创建的开销,用过并释放后,还可以重用。另一个间接的好处是你可以控制使用资源的多少。比如,你可以调整线程池的大小达到你想要的负载,而不损害系统的资源。

    这个框架提供了一个工厂类,叫Executors,来创建线程池。使用这个工程类你可以创建不同特性的线程池。尽管底层的实现常常是一样的(ThreadPoolExecutor),但工厂类可以使你不必使用复杂的构造函数就可以快速地设置一个线程池。工程类的工厂方法有:

    这仅仅是一个开端。Executor还有一些其他用法已超出了这篇文章的范围,我强烈推荐你研究以下内容:

    ExecutorService接口特别重要,因为它提供了关闭线程池的方法,并确保清理了不再使用的资源。令人欣慰的是,ExecutorService接口相当简单、一目了然,我建议全面地学习下它的文档。

    大致来说,当你向ExecutorService发送了一个shutdown()消息后,它就不会接收新提交的任务,但是仍在队列中的任务会被继续处理完。你可以使用isTerminated()来查询ExecutorService终止状态,或使用awaitTermination(…)方法来等待ExecutorService终止。如果传入一个最大超时时间作为参数,awaitTermination方法就不会永远等待。

    警告: 对JVM进程永远不会退出的理解上,存在着一些错误和迷惑。如果你不关闭executorService,只是销毁了底层的线程,JVM就不会退出。当最后一个普通线程(非守护线程)退出后,JVM也会退出。

    配置ThreadPoolExecutor

    如果你决定不使用Executor的工厂类,而是手动创建一个 ThreadPoolExecutor,你需要使用构造函数来创建并配置。下面是这个类使用最广泛的一个构造函数:

    public ThreadPoolExecutor(
        int corePoolSize,
        int maxPoolSize,
        long keepAlive,
        TimeUnit unit,
        BlockingQueue<Runnable> workQueue,
        RejectedExecutionHandler handler);

    如你所见,你可以配置以下内容:

    限制队列中任务数

    限制执行任务的并发数、限制线程池大小对应用程序以及程序执行结果的可预期性与稳定性有很大的好处。无尽地创建线程,最终会耗尽运行时资源。你的应用程序因此会产生严重的性能问题,甚至导致程序不稳定。

    这只解决了部分问题:限制了并发任务数,但并没有限制提交到等待队列的任务数。如果任务提交的速率一直高于任务执行的速率,那么应用程序最终会出现资源短缺的状况。

    解决方法是:

    默认的拒绝策略可以让executor抛出一个RejectedExecutionException异常。然而,还有其他的内建策略:

    什么时候以及为什么我们才会这样配置线程池?让我们看一个例子。

    示例:并行执行独立的单线程任务

    最近,我被叫去解决一个很久以前的任务的问题,我的客户之前就运行过这个任务。大致来说,这个任务包含一个组件,这个组件监听目录树所产生的文件系统事件。每当一个事件被触发,必须处理一个文件。一个专门的单线程执行文件处理。说真的,根据任务的特点,即使我能把它并行化,我也不想那么做。一天的某些时候,事件到达率才很高,文件也没必要实时处理,在第二天之前处理完即可。

    当前的实现采用了一些混合且匹配的技术,包括使用UNIX SHELL脚本扫描目录结构,并检测是否发生改变。实现完成后,我们采用了双核的执行环境。同样,事件的到达率相当低:目前为止,事件数以百万计,总共要处理1~2T字节的原始数据。

    运行处理程序的主机是12核的机器:很好机会去并行化这些旧的单线程任务。基本上,我们有了食谱的所有原料,我们需要做的仅仅是把程序建立起来并调节。在写代码前,我们必须了解下程序的负载。我列一下我检测到的内容:

    我需要这样一个线程池,它的大小在程序运行的时候通过负载配置来设置。我倾向于根据负载策略创建一个固定大小的线程池。由于线程的性能瓶颈在CPU,它的核心使用率是100%,不会等待其他资源,那么负载策略就很好计算了:用执行环境的CPU核心数乘以一个负载因子(保证计算的结果在峰值时至少有一个核心):

    int cpus = Runtime.getRuntime().availableProcessors();
    int maxThreads = cpus * scaleFactor;
    maxThreads = (maxThreads > 0 ? maxThreads : 1);

    然后我需要使用阻塞队列创建一个ThreadPoolExecutor,可以限制提交的任务数。为什么?是这样,扫描算法执行很快,很快就产生庞大数量需要处理的文件。数量有多庞大呢?很难预测,因为变动太大了。我不想让executor内部的队列不加选择地填满了要执行的任务实例(这些实例包含了庞大的文件描述符)。我宁愿在队列填满时,拒绝这些文件。

    而且,我将使用ThreadPoolExecutor.CallerRunsPolicy作为拒绝策略。为什么?因为当队列已满时,线程池的线程忙于处理文件,我让提交任务的线程去执行它(被拒绝的任务)。这样,扫面会停止,转而去处理一个文件,处理结束后马上又会扫描目录。

    下面是创建executor的代码:

    ExecutorService executorService =
        new ThreadPoolExecutor(
            maxThreads, // core thread pool size
            maxThreads, // maximum thread pool size
            1, // time to wait before resizing pool
            TimeUnit.MINUTES, 
            new ArrayBlockingQueue<Runnable>(maxThreads, true),
            new ThreadPoolExecutor.CallerRunsPolicy());

    下面是程序的框架(极其简化版):

    // scanning loop: fake scanning
    while (!dirsToProcess.isEmpty()) {
        File currentDir = dirsToProcess.pop();
    
        // listing children
        File[] children = currentDir.listFiles();
    
        // processing children
        for (final File currentFile : children) {
            // if it's a directory, defer processing
            if (currentFile.isDirectory()) {
                dirsToProcess.add(currentFile);
                continue;
            }
    
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        // if it's a file, process it
                        new ConvertTask(currentFile).perform();
                    } catch (Exception ex) {
                        // error management logic
                    }
                }
            });
        }
    }
    
    // ...
    // wait for all of the executor threads to finish
    executorService.shutdown();
    try {
        if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
            // pool didn't terminate after the first try
            executorService.shutdownNow();
        }
    
        if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
            // pool didn't terminate after the second try
        }
    } catch (InterruptedException ex) {
        executorService.shutdownNow();
        Thread.currentThread().interrupt();
    }

    总结

    看到了吧,Java并发API非常简单易用,十分灵活,也很强大。真希望我多年前可以多花点功夫写一个这样简单的程序。这样我就可以在几小时内解决由传统单线程组件所引发的扩展性问题。

    以上就是Java中使用ThreadPoolExecutor并行执行独立的单线程任务的详细介绍的详细内容,更多请关注php中文网其它相关文章!

    声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn核实处理。
    上一篇:详解为任务关键型Java应用优化垃圾回收(下) 下一篇:自己动手写 PHP MVC 框架(40节精讲/巨细/新人进阶必看)

    相关文章推荐

    • 一文详解怎么实现微服务鉴权• Java中Map集合体系的基本使用和常用API• Java数据结构常见排序算法(总结分享)• 一起来分析java设计模式之单例• 一文搞懂Java线程池实现原理
    1/1

    PHP中文网