Java并发编程——ForkJoinPool之WorkQueue

源码 2024-9-29 13:57:29 60 0 来自 中国
一、ForkJoinPool

ForkJoinPool 是 JDK7 引入的,由 Doug Lea 编写的高性能线程池。核心头脑是将大的任务拆分成多个小任务(即fork),然后在将多个小任务处理处罚汇总到一个结果上(即join),非常像MapReduce处理处罚原理。同时,它提供根本的线程池功能,支持设置最大并发线程数,支持任务列队,支持线程池克制,支持线程池利用环境监控,也是AbstractExecutorService的子类,紧张引入了“工作盗取”机制,在多CPU盘算机上处理处罚性能更佳。其广泛用在java8的stream中。
从图中可以看出ForkJoinPool要先实行完子任务才气实行上一层任务,以是ForkJoinPool恰当在有限的线程数下完成有父子关系的任务场景,比如:快速排序,二分查找,矩阵乘法,线性时间选择等场景,以及数组和聚集的运算。
Fork/Join Pool接纳精良的设计、代码实现和硬件原子操纵机制等多种思绪包管其实行性能。此中包罗(但不限于):盘算资源共享、高性能队列、克制伪共享、工作盗取机制等。
二、与ThreadPoolExecutor原生线程池的区别

ForkJoinPool和ThreadPoolExecutor都实现了Executor和ExecutorService接口,都可以通过构造函数设置线程数,threadFactory,可以查察ForkJoinPool.makeCommonPool()方法的源码查察通用线程池的构造细节。
在内部结构上我以为两个线程池最大的区别是在工作队列的设计上,如下图
ThreadPoolExecutor:

2.png ForkJoinPool:

3.png 图上细节画的不严谨,但大抵能看出区别:

  • ForkJoinPool每个线程都有自己的队列
  • ThreadPoolExecutor共用一个队列
利用ForkJoinPool可以在有限的线程数下来完成非常多的具有父子关系的任务,比如利用4个线程来完成凌驾2000万个任务。
ForkJoinPool最恰当盘算密集型任务,而且最好黑白壅闭任务,之前的一篇文章:Java踩坑记系列之线程池 也说了线程池的差别利用场景和留意事项。
以是ForkJoinPool是ThreadPoolExecutor线程池的一种增补,是对盘算密集型场景的增强。
三、工作盗取的实现原理

ForkJoinPool类中的WorkQueue正是实现工作盗取的队列,javadoc中的表明如下:
4.png 大意是大多数操纵都发生在工作盗取队列中(在嵌套类工作队列中)。这些是特殊情势的Deques,紧张有push,pop,poll操纵。
Deque是双端队列(double ended queue缩写),头部和尾部任何一端都可以举行插入,删除,获取的操纵,即支持FIFO(队列)也支持LIFO(栈)次序。
Deque接口的实现最常见的是LinkedList,除此另有ArrayDeque、ConcurrentLinkedDeque等。
工作盗取模式紧张分以下几个步骤:

  • 1、每个线程都有自己的双端队列。
  • 2、当调用fork方法时,将任务放进队列头部,线程以LIFO次序,利用push/pop方式处理处罚队列中的任务。
  • 3、假如自己队列里的任务处理处罚完后,会从其他线程维护的队列尾部利用poll的方式盗取任务,以到达充实利用CPU资源的目的。
  • 4、从尾部盗取可以减少同原线程的竞争。
  • 5、当队列中剩末了一个任务时,通过cas办理原线程和盗取线程的竞争。
流程大抵如下所示:

工作盗取便是ForkJoinPool线程池的上风所在,在一样平常的线程池比如ThreadPoolExecutor中,假如一个线程正在实行的任务由于某种缘故因由无法继续运行,那么该线程会处于等候状态,包罗singleThreadPool、fixedThreadPool、cachedThreadPool这几种线程池。
而在ForkJoinPool中,那么线程会主动探求其他尚未被实行的任务然后盗取过来实行,减少线程等候时间。
JDK8中的并行流(parallelStream)功能是基于ForkJoinPool实现的,别的另有java.util.concurrent.CompletableFuture异步回调future,内部利用的线程池也是ForkJoinPool。
四、ForkJoinPool分析

4.1 ForkJoinPool成员变量

// 用来配置ctl在控制线程数量利用private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign//控制线程池数量(ctl & ADD_WORKER) != 0L 时创建线程,// 也就是当ctl的第16位不为0时,可以继续创建线程volatile long ctl;                   // main pool control//全局锁控制,全局运行状态volatile int runState;               // lockable status//config二进制情势的低16位体现parallelism,//config二进制情势的第高16位体现mode,1体现异步模式, 利用先辈先出队列, 0体现同步模式, 利用先辈后出栈//低16位体现workerQueue在pool中的索引,高16位体现mode, 有FIFI LIFL final int config;  // parallelism, mode    //天生workerQueue索引的紧张依据int indexSeed;         // to generate worker index  //工作者队列数组,内部线程ForkJoinWorkerThread启动时会注册一个WorkerQueue对象到这个数组中volatile WorkQueue[] workQueues;     // main registry //工作者线程线程工厂,创建ForkJoinWorkerThread的计谋final ForkJoinWorkerThreadFactory factory;  //在线程因未捕非常而退出时,java假造机将回调的非常处理处罚计谋final UncaughtExceptionHandler ueh;  // per-worker UEH //工作者线程名的前缀final String workerNamePrefix;       // to create worker name string  //实行器全部线程盗取的任务总数,也作为监督runState的锁volatile AtomicLong stealCounter;    // also used as sync monitor//通用的实行器,它在静态块中初始化static final ForkJoinPool common; 五、WorkQueue

5.1 类结构及其成员变量

5.1.1 类结构和表明

WorkQueue是ForkJoinPool的核心内部类,是一个Contented修饰的静态内部类。
/** * Queues supporting work-stealing as well as external task * submission. See above for descriptions and algorithms. * Performance on most platforms is very sensitive to placement of * instances of both WorkQueues and their arrays -- we absolutely * do not want multiple WorkQueue instances or multiple queue * arrays sharing cache lines. The @Contended annotation alerts * JVMs to try to keep instances apart. */@sun.misc.Contendedstatic final class WorkQueue {}其表明大意为:
workQUeue是一个支持任务盗取和外部提交任务的队列,着实现参考ForkJoinPool形貌的算法。在大多数平台上的性能对工作队列及其数组的实例都非常敏感。我们不渴望多个工作队列的实例和多个队列数组共享缓存。@Contented表明用来提示jvm将workQueue在实行的时间与其他对象举行区别。
@Contented,现实上就是接纳内存对齐的方式克制伪共享,包管WorkQueue在实行的时间,其前后不会有其他对象干扰。
注:JVM 添加 -XX:-RestrictContended 参数后 @sun.misc.Contended 注解才有用)
5.1.2 MAXIMUM_QUEUE_CAPACITY

MAXIMUM_QUEUE_CAPACITY表明如下:
/** * Maximum size for queue arrays. Must be a power of two less * than or equal to 1 << (31 - width of array entry) to ensure * lack of wraparound of index calculations, but defined to a * value a bit less than this to help users trap runaway * programs before saturating systems. */static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64MMAXIMUM_QUEUE_CAPACITY是队列支持的最大容量,必须是2的幂小于或便是1<<(31-数组项的宽度),但界说为一个略小于此值的值,以资助用户在饱和体系之前捕获失控的步调。
5.1.3 成员变量

成员变量区如下:
@sun.misc.Contendedstatic final class WorkQueue {    //队列的初始容量    static final int INITIAL_QUEUE_CAPACITY = 1 << 13;    // 64M 队列的最大容量    static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M    // Instance fields    volatile int scanState;    // versioned, <0: inactive; odd:scanning    int stackPred;             // pool stack (ctl) predecessor    int nsteals;               // number of steals    int hint;                  // randomization and stealer index hint    int config;                // pool index and mode    volatile int qlock;        // 1: locked, < 0: terminate; else 0    volatile int base;         // index of next slot for poll    int top;                   // index of next slot for push    ForkJoinTask<?>[] array;   // the elements (initially unallocated)    final ForkJoinPool pool;   // the containing pool (may be null)    final ForkJoinWorkerThread owner; // owning thread or null if shared    volatile Thread parker;    // == owner during call to park; else null    volatile ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin    volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer}

  • scanState:它可以看作是乐观锁的版本号,别的它另有此其他功能,它为负数时,体现工作者线程非运动,它为奇数是体现,正在扫描(预备盗取)任务,它为偶数是体现正在实行任务。
  • stackPred:体现在线程池栈当前工作线程的前驱线程的索引。在唤醒线程时常用到此属性。
  • nsteals:体现owner线程盗取的任务数。
  • hint:任务盗取时的随机定位种子。
  • config:低16位体现,当前WorkerQueue对象在外部类的数组属性workQueues中的索引(下标) 。高16位体现当前WorkerQueue对象的模式。对于内部任务,若构造方法配置为异步模式就将WorkQueue当作先辈先出的队列,反之将WorkQueue当作后进先出的栈。对于外部任务,将WorkQueue视为共享队列。
  • qlock:初始值为0,”=1“时体现当前WorkerQueue对象被锁住,” < 0“时 体现当前WorkerQueue对象已停止,队列中的其他未完成任务将不再被实行。
  • base:体现下次对任务数组array举行poll出队操纵(盗取任务)的槽位索引(队尾)。
  • top:体现下次任务数组array举行push入栈操纵(添加任务)的槽位索引(栈顶)。
  • array:非学紧张的属性,这用是生存任务的数组(容器)。
  • pool:与之关联的ForkJoinPool实行器,它大概为空。若为空,就利用静态变量common作为实行器。
  • owner:当前队列对应的工作者线程,它一样平常不为空。若从外部提交任务时,当前WorkerQueue对象体现共享队列,owner为空。
  • parker:壅闭的线程。在被壅闭的时间,它便是owner,其他时间它为空。
  • currentJoin:体现当前正在join的任务,紧张在awaitJoin方法利用。
  • currentSteal:体现当前被盗取的任务,紧张在helpStealer方法中利用。
5.2 构造函数

WorkQueue就一个构造函数:
WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {    this.pool = pool;    this.owner = owner;    // Place indices in the center of array (that is not yet allocated)    base = top = INITIAL_QUEUE_CAPACITY >>> 1;}在这个构造函数中,只会指定pool和owoner,假如该队列是共享队列,那么owoner此时是空的。别的,base和top两个指针分别都指向了数组的中值,这个值是初始化容量右移一位。
那么团结前面的代码,现实上初始化的时间,数组的长度为8192,那么base=top=4096。
这个数组在构造函数被调用之后初始化如下:

6.png 5.3 紧张的方法

5.3.1 push

当ForkJoinWorkerThread须要向双端队列中放入一个新的待实行子任务时,会调用WorkQueue中的push方法。来看看这个方法的紧张实行过程(请留意,源代码来自JDK1.8,它和JDK1.7中的实现有显著差别):
/** * Pushes a task. Call only by owner in unshared queues.  (The * shared-queue version is embedded in method externalPush.) * * @param task the task. Caller must ensure non-null. * @throws RejectedExecutionException if array cannot be resized */final void push(ForkJoinTask<?> task) {    ForkJoinTask<?>[] a; ForkJoinPool p;    int b = base, s = top, n;    // 请留意,在实行task.fork时,触发push环境下,array不会为null    // 由于在这之前workqueue中的array已经完成了初始化(在工作线程初始化时就完成了)        if ((a = array) != null) {    // ignore if queue removed        //m为最高为位置的index        int m = a.length - 1;     // fenced write for task visibility        // U常量是java底层的sun.misc.Unsafe操纵类        // 这个类提供硬件级别的原子操纵        // putOrderedObject方法在指定的对象a中,指定的内存偏移量的位置,赋予一个新的元素              U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);        // putOrderedInt方法对当前指定的对象中的指定字段,举行赋值操纵        // 这里的代码意义是将workQueue对象自己中的top标示的位置 + 1,            U.putOrderedInt(this, QTOP, s + 1);        //假如n小于便是1则 且poll不为空 则触发worker盗取大概产生新的worker        if ((n = s - b) <= 1) {            if ((p = pool) != null)                // signalWork方法的意义在于,在当前运动的工作线程过少的环境下,创建新的工作线程                p.signalWork(p.workQueues, this);        }        //假如n大于便是了m 则阐明须要扩容了, array的剩余空间不敷了        else if (n >= m)            growArray();    }}这个push方法是提供给工作队列自己push任务来利用的,共享队列push任务是在外部externalPush和externalSubmit等方法来举行初始化和push。
这里须要留意的是,当队列中的任务数小于1的时间,才会调用signalWork,这个地方一开始并不明白,现实上,我们须要留意的是,这个方法是专门提供给工作队列来利用的,那么这个条件满意的时间,阐明工作队列空闲。假如这个条件不满意,那么工作队列中有许多任务须要工作队列来处理处罚,就不会触发对这个队列的盗取操纵。
5.3.2 growArray

这是扩容的方法。现实上这个方法有两个作用,起首是初始化,其次是判断,是否须要扩容,假如须要扩容则容量加倍。
/** * Initializes or doubles the capacity of array. Call either * by owner or with lock held -- it is OK for base, but not * top, to move while resizings are in progress. */final ForkJoinTask<?>[] growArray() {    //旧的数组 oldA    ForkJoinTask<?>[] oldA = array;    //假如oldA不为空,则size就为oldA的长度*2,反之阐明数组没有被初始化,那么长度就应该为初始化的长度8192    int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY;    //假如size比答应的最大容量还大,那么此时会抛出非常    if (size > MAXIMUM_QUEUE_CAPACITY)        throw new RejectedExecutionException("Queue capacity exceeded");    int oldMask, t, b;    //array a 为根据size new出来的一个新的数组    ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];    //假如oldA不为空且其长度大于便是0为有用数组,且top-base大于0 阐明不为空    if (oldA != null && (oldMask = oldA.length - 1) >= 0 &&        (t = top) - (b = base) > 0) {        //按size界说掩码        int mask = size - 1;        //从旧的数组中poll全部task,然后push到新的array中        do { // emulate poll from old array, push to new array            ForkJoinTask<?> x;            //接纳unsafe操纵            int oldj = ((b & oldMask) << ASHIFT) + ABASE;            int j    = ((b &    mask) << ASHIFT) + ABASE;            //现实上直接举行的内存对象copy,如许服从比循环调用push和poll要高许多            x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj);            //判断  x不为空 则利用unsafe举行操纵            if (x != null &&                U.compareAndSwapObject(oldA, oldj, x, null))                U.putObjectVolatile(a, j, x);        } while (++b != t);    }    //返回新的数组    return a;}须要留意的是,这个方法一旦调用举行扩容之后,无论是来自于外部push操纵触发,照旧有工作线程worker触发,都将被锁定,之后,不能移动top指针,但是base指针是可以移动的。这也就是说,一旦处于扩容的过程中,就不能新增task,但是可以从base举行斲丧,这就只支持FIFO。因此同步模式将在此时被壅闭。
5.3.3 pop

同样,pop操纵也仅限于工作线程,对于共享对立中则不答应利用pop方法。这个方法将按LIFO后进先出的方式从队列中。
/** * Takes next task, if one exists, in LIFO order.  Call only * by owner in unshared queues. */final ForkJoinTask<?> pop() {    ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m;    //假如array不为空切长度大于0    if ((a = array) != null && (m = a.length - 1) >= 0) {        //循环,s为top的指针减1,即top减1之后要大于0 也就是说要存在task        for (int s; (s = top - 1) - base >= 0;) {            //盘算unsafe的偏移量 得到s的位置            long j = ((m & s) << ASHIFT) + ABASE;            //假如这个索引处的对象为空,则退出            if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)                break;            //反之用usafe的方法将这个值取走,之后返回,并更新top的指针            if (U.compareAndSwapObject(a, j, t, null)) {                U.putOrderedInt(this, QTOP, s);                return t;            }        }    }    return null;}pop方法,这是仅限于owoner调用的方法,将从top指针处取出task。这个方法对于整个队列是LIFO的方式。
5.3.4 poll

poll方法将从队列中按FIFO的方式取出task。
/** * Takes next task, if one exists, in FIFO order. */final ForkJoinTask<?> poll() {    ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t;    //判断 base-top小于0阐明存在task 切array不为空    while ((b = base) - top < 0 && (a = array) != null) {        //盘算出unsafe操纵的索引 现实上就是拿到b        int j = (((a.length - 1) & b) << ASHIFT) + ABASE;        //之后拿到这个task 用volatile的方式        t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);        //之后假如base和b相等        if (base == b) {           //假如拿到的task不为空            if (t != null) {                //那么将这个位置的元素移除 base+1 然后返回t                if (U.compareAndSwapObject(a, j, t, null)) {                    base = b + 1;                    return t;                }            }            //在上述操纵之后,假如base比top小1阐明已经为空了 直接退出循环            else if (b + 1 == top) // now empty                break;        }    }    //默认返回null    return null;}5.3.5 pollAt

这个方法将接纳FIFO的方式,从 队列中获得task。
/** * Takes a task in FIFO order if b is base of queue and a task * can be claimed without contention. Specialized versions * appear in ForkJoinPool methods scan and helpStealer. */final ForkJoinTask<?> pollAt(int b) {    ForkJoinTask<?> t; ForkJoinTask<?>[] a;    //数组不为空    if ((a = array) != null) {        //盘算索引b的位置        int j = (((a.length - 1) & b) << ASHIFT) + ABASE;        //假云云处的task不为空,则将此处理为null然后将对象task返回        if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) != null &&            base == b && U.compareAndSwapObject(a, j, t, null)) {            base = b + 1;            return t;        }    }    return null;}通常环境下,b指的是队列的base指针。那么从底部获取元素就能实现FIFO。特殊的版本出现在scan和helpStealer中用于对工作队列的盗取操纵的实现。
5.3.6 nextLocalTask

/** * Takes next task, if one exists, in order specified by mode. */final ForkJoinTask<?> nextLocalTask() {    return (config & FIFO_QUEUE) == 0 ? pop() : poll();}这个方法中对之前的MODE会起作用,假如是FIFO则用pop方法,反之则用poll方法获得下一个task。
5.3.7 peek

/** * Returns next task, if one exists, in order specified by mode. */final ForkJoinTask<?> peek() {    ForkJoinTask<?>[] a = array; int m;    //判断数组的正当性    if (a == null || (m = a.length - 1) < 0)        return null;    //根据mode决定从top照旧base处获得task    int i = (config & FIFO_QUEUE) == 0 ? top - 1 : base;    int j = ((i & m) << ASHIFT) + ABASE;    //返回获得的task    return (ForkJoinTask<?>)U.getObjectVolatile(a, j);}peek则根据之前的mode界说,从队列的前面大概背面取得task。
5.3.8 tryUnpush

/** * Pops the given task only if it is at the current top. * (A shared version is available only via FJP.tryExternalUnpush)*/final boolean tryUnpush(ForkJoinTask<?> t) {    ForkJoinTask<?>[] a; int s;    //判断数组的正当性    if ((a = array) != null && (s = top) != base &&        //将top位置的task与t比力,假如相等则将其改为null        U.compareAndSwapObject        (a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {        //将top减1        U.putOrderedInt(this, QTOP, s);        //返回操纵乐成        return true;    }    //默认返回失败    return false;}这个方法是将之前push的任务撤回。这个操纵仅仅只有task位于top的时间操能乐成。
5.3.9 runTask

在之前的文章分析外部提交task的时间,就提到了这个方法。现实上是runWorker调用的。
也就是说,线程在启动之后,一旦worker获取到task,就会运行。
/** * Executes the given task and any remaining local tasks. */final void runTask(ForkJoinTask<?> task) {    //task不为空    if (task != null) {        //扫描状态标志为busy 那么阐明当前的worker正在处理处罚本地任务   此时这个操纵会将scanState改为0        scanState &= ~SCANNING; // mark as busy        //实行这个task        (currentSteal = task).doExec();        //开释已实行任务的内存        U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC        //实行其他本地的task        execLocalTasks();        ForkJoinWorkerThread thread = owner;        //增长增长steals的次数        if (++nsteals < 0)      // collect on overflow            transferStealCount(pool);        //将scanState改为1 如许就变得活泼可以被其他worker scan        scanState |= SCANNING;        //假如thread不为null阐明为worker线程 则调用后续的exec方法        if (thread != null)            thread.afterTopLevelExec();    }}5.3.10 execLocalTasks

调用这个方法,运行队列中的全部task,假如接纳了LIFO模式,则调用pollAndExecAll,这是别的一种实现方法。直到将队列都实行到empty
/** * Removes and executes all local tasks. If LIFO, invokes * pollAndExecAll. Otherwise implements a specialized pop loop * to exec until empty. */final void execLocalTasks() {    int b = base, m, s;    //拿到数组    ForkJoinTask<?>[] a = array;    //假如b-s小于0阐明存在task,a不为空,切a的长度大于0 这均是检测方法的正当性    if (b - (s = top - 1) <= 0 && a != null &&        (m = a.length - 1) >= 0) {        //假如没有接纳FIFO的mode  那么肯定是LIFO 则从top处开始        if ((config & FIFO_QUEUE) == 0) {           //开始循环            for (ForkJoinTask<?> t;;) {               //从top开始取出task                if ((t = (ForkJoinTask<?>)U.getAndSetObject                     (a, ((m & s) << ASHIFT) + ABASE, null)) == null)                    break;                //修改top                U.putOrderedInt(this, QTOP, s);                //实行task                t.doExec();                //假如没有任务的了 则退出                if (base - (s = top - 1) > 0)                    break;            }        }        else           //FIFO的方式调用pollAndExecAll            pollAndExecAll();    }}5.3.11 pollAndExecAll

此方法将用poll,FIFO的方式获得task并实行。
final void pollAndExecAll() {    for (ForkJoinTask<?> t; (t = poll()) != null;)        t.doExec();}可见,当通过workQueue中调用runTask的方法的时间,会将这个队列的scanState状态修改为0,之后将这个队列中的全部task根据界说的mode全部斲丧完毕。
5.3.12 tryRemoveAndExec

从表明中可知,这个方法仅仅供awaitJoin方法调用,在await的过程中,将task从workQueue中移除并实行。
/** * If present, removes from queue and executes the given task, * or any other cancelled task. Used only by awaitJoin. * * @return true if queue empty and task not known to be done */final boolean tryRemoveAndExec(ForkJoinTask<?> task) {    ForkJoinTask<?>[] a; int m, s, b, n;    //判断数组的正当性 task不能为空    if ((a = array) != null && (m = a.length - 1) >= 0 &&        task != null) {        //循环  n为task的数量,必须大于0        while ((n = (s = top) - (b = base)) > 0) {            //死循环 从top遍历到base            for (ForkJoinTask<?> t;;) {      // traverse from s to b                long j = ((--s & m) << ASHIFT) + ABASE;                if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)                    return s + 1 == top;     // shorter than expected                //假如task处于top位置                else if (t == task) {                    boolean removed = false;                    if (s + 1 == top) {      // pop                        //pop的方式获取task  然后更换为null                        if (U.compareAndSwapObject(a, j, task, null)) {                            U.putOrderedInt(this, QTOP, s);                            removed = true;                        }                    }                    //用emptytask代替                    else if (base == b)      // replace with proxy                        removed = U.compareAndSwapObject(                            a, j, task, new EmptyTask());                    //假如remove乐成 则实行这个task                    if (removed)                        task.doExec();                    break;                }                //假如task的status为负数 切 top=s=1                else if (t.status < 0 && s + 1 == top) {                    //移除                    if (U.compareAndSwapObject(a, j, t, null))                        U.putOrderedInt(this, QTOP, s);                    break;                  // was cancelled                }                if (--n == 0)                    return false;            }            if (task.status < 0)                return false;        }    }    return true;}5.3.13 popCC

假如pop CountedCompleter。这方法支持共享和worker的队列,但是仅仅通过helpComplete调用。
CountedCompleter是jdk1.8中新增的一个ForkJoinTask的一个实现类。
/** * Pops task if in the same CC computation as the given task, * in either shared or owned mode. Used only by helpComplete. */final CountedCompleter<?> popCC(CountedCompleter<?> task, int mode) {    int s; ForkJoinTask<?>[] a; Object o;    //判断队列数组正当性    if (base - (s = top) < 0 && (a = array) != null) {        //从top处开始        long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;        //假如获的的task不为null        if ((o = U.getObjectVolatile(a, j)) != null &&            //且为CountedCompleter对象            (o instanceof CountedCompleter)) {            //转换为CountedCompleter            CountedCompleter<?> t = (CountedCompleter<?>)o;            //死循环            for (CountedCompleter<?> r = t;;) {                //假如task与获得的r相等为同一对象                if (r == task) {                    //假如mode小于0                     if (mode < 0) { // must lock                         //cas的方式加锁                        if (U.compareAndSwapInt(this, QLOCK, 0, 1)) {                            //将这个对象扫除 并修改top后解锁                            if (top == s && array == a &&                                U.compareAndSwapObject(a, j, t, null)) {                                U.putOrderedInt(this, QTOP, s - 1);                                U.putOrderedInt(this, QLOCK, 0);                                //返回t                                return t;                            }                            //解锁                            U.compareAndSwapInt(this, QLOCK, 1, 0);                        }                    }                    else if (U.compareAndSwapObject(a, j, t, null)) {                        U.putOrderedInt(this, QTOP, s - 1);                        return t;                    }                    break;                }                else if ((r = r.completer) == null) // try parent                    break;            }        }    }    return null;}5.3.14 pollAndExecCC

pollAndExecCC 。盗取并运行与给定任务雷同CountedCompleter盘算任务(假如存在),并且可以在不发生争用的环境下实行该任务。否则,返回一个校验和/控制值,供helpComplete方法利用。
/** * Steals and runs a task in the same CC computation as the * given task if one exists and can be taken without * contention. Otherwise returns a checksum/control value for * use by method helpComplete. * * @return 1 if successful, 2 if retryable (lost to another * stealer), -1 if non-empty but no matching task found, else * the base index, forced negative. */final int pollAndExecCC(CountedCompleter<?> task) {    int b, h; ForkJoinTask<?>[] a; Object o;    //判断array的正当性    if ((b = base) - top >= 0 || (a = array) == null)        h = b | Integer.MIN_VALUE;  // to sense movement on re-poll    else {        //从base开始获得task        long j = (((a.length - 1) & b) << ASHIFT) + ABASE;        if ((o = U.getObjectVolatile(a, j)) == null)            h = 2;                  // retryable        else if (!(o instanceof CountedCompleter))            h = -1;                 // unmatchable        else {            CountedCompleter<?> t = (CountedCompleter<?>)o;            //死循环            for (CountedCompleter<?> r = t;;) {                if (r == task) {                    if (base == b &&                        U.compareAndSwapObject(a, j, t, null)) {                        base = b + 1;                        t.doExec();                        h = 1;      // success                    }                    else                        h = 2;      // lost CAS                    break;                }                else if ((r = r.completer) == null) {                    h = -1;         // unmatched                    break;                }            }        }    }    return h;}externalPush方法中的“q = ws[m & r & SQMASK]”代码非常紧张。我们大抵来分析一下作者的意图,起首m是ForkJoinPool中的WorkQueue数组长度减1,比方当前WorkQueue数组巨细为16,那么m的值就为15;r是一个线程独立的随机数天生器,关于java.util.concurrent.ThreadLocalRandom类的功能和利用方式可拜见别的资料;而SQMASK是一个常量,值为126 (0x7e)。以下是一种大概的盘算过程和盘算结果:

现实上任何数和126举行“与”运算,其结果只大概是0大概偶数,即0、2、4、6、8。也就是说以上代码中从名为“ws”的WorkQueue数组中,取出的元素只大概是第0个大概第偶数个队列。
结论就是偶数是外部任务,奇数是须要拆解归并的任务。
ForkJoinWorkerThread须要从双端队列中取出下一个待实行子任务,就会根据设定的asyncMode调用双端队列的差别方法,代码概要如下所示:
final ForkJoinTask<?> nextTaskFor(WorkQueue w) {    for (ForkJoinTask<?> t;;) {        WorkQueue q; int b;        // 该方法试图从“w”这个队列获取下一个待处理处罚子任务        if ((t = w.nextLocalTask()) != null)            return t;        // 假如没有获取到,则利用findNonEmptyStealQueue方法        // 随机得到一个元素非空,并且可以举行任务盗取的存在于ForkJoinPool中的别的队列        // 这个队列被记为“q”           if ((q = findNonEmptyStealQueue()) == null)            return null;        // 试图从“q”这个队列base位处取出待实行任务          if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)            return t;    }}六、总结

本文对workQueue的源码举行了分析,我们须要留意的是,对于workQueue,界说了三个操纵,分别是push,poll和pop。

  • push

紧张是操纵top指针,将top举行移动。

  • poll
    假如top和base不等,则阐明队列有值,可以斲丧,那么poll就从base指针处开始斲丧。这个方法实现了队列的FIFO。
斲丧之后对base举行移动。

  • pop
    同样,还可以从top开始斲丧,这就是pop。这个方法现实上实现了对队列的LIFO。
11.png 斲丧之后将top减1。
以上就是这三个方法对应的操纵。但是我们还须要留意的是,在全部的unsafe操纵中,通过cas举行设置大概获得task的时间,另有一个掩码。这个非常紧张。
我们可以看在push方法中:
int m = a.length - 1; U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);在扩容的方法growArray中我们可以知道。每次扩容都是接纳左移的方式来举行,如许就包管了数组的长度为2的幂。
在这里,m=a.length-1,那就阐明,m现实上其二进制格式将会有用位都为1,这个数字就可以做为掩码。当m再与s取&盘算的时间。可以想象,s大于m的部分将被去除,只会生存比m小的部分。那么现实上,这就等价于,当我们不停再push元素到数组中的时间,现实上就从数组的索引底部开始:

参考上面这个过程,也就是说,现实上这个数组,base和top现实指向的index并不紧张。只有二者的相对位移才是紧张的。这有点雷同与RingBuffer的数据结构,但是照旧有所差别。也就是说这个数组现实上是不会被浪费的。之前有许多不明白的地方,为什么top减去base大概出现负数。那么如许现实上就会导致负数的产生。
如许的话,假如我们接纳异步模式,asyncMode为true的时间,workQueue则会接纳FIFO_QUEUE的model,如许workQueue自己就利用的时poll方法。反之假如利用LIFO_QUEUE的同步模式,则workQueue利用pop方法。默认环境下接纳同步模式。同步的时间workQueue的指针都围绕在数组的初始化的中心位置颠簸。而共享队列则会不停循环。
至此,我们分析了workQueue的源码,对其内部实现的双端队列自己的操纵举行了分析。为什么作者会自己实现一个Deque,而不是利用juc中已存在的容器。这就是由于这个队列全程都是接纳Unsafe来实现的,在开篇作者也说了,须要@Contented修饰,就是为了克制缓存的伪代共享。如许来实现一个高效的Deque,以供ForkJoinPool来操纵。
这与学习ConcurrentHashMap等容器的源码一样,可以看出作者为了性能的优化,接纳了许多独特的方式来实现。这些地方都是我们值得学习和鉴戒之处。这也是ForkJoin性能高效的关键。在作者的论文中也可以看出,java的实现,由于抽象在jvm之上,性能比c/c++的实现要低许多。这也是作者尽大概将性能做到最优的缘故因由之一。
参考:
https://blog.csdn.net/Xiaowu_First/article/details/122407019
https://blog.csdn.net/tyrroo/article/details/81483608
https://www.cnblogs.com/juniorMa/articles/14234472.html
https://www.cnblogs.com/maoyx/p/13991828.html
https://blog.csdn.net/dhaibo1986/article/details/108801254
您需要登录后才可以回帖 登录 | 立即注册

Powered by CangBaoKu v1.0 小黑屋藏宝库It社区( 冀ICP备14008649号 )

GMT+8, 2024-11-21 23:26, Processed in 0.199048 second(s), 35 queries.© 2003-2025 cbk Team.

快速回复 返回顶部 返回列表