A ForkJoinTask is a thread-like entity which is much lighter than a normal thread and can be used to perform huge number of tasks and subtasks at a lower resource cost than a normal Thread implementation. The ForkJoinPool is a collection which hosts a number of such ForkJoinTasks and can take on such heavy lifting tasks over a certain disadvantages than a ThreadPool.
- ForkJoinPool framework is special implantation of ExecutorService family.
- ForkJoinPool divides the ForkJoinTask into subtasks until task is small enough to execute and it try to execute each task in individual worker thread. At end it will join all the threads and combine result.
- Maximum number of ForkJoinPool threads created is 32767.
Features of ForkJoinPool
The major difference from ExecutorService to ForkJoinPool is work-stealing. ForkJoinPool uses work-stealing approach to complete job efficient way. ForkJoinPool uses daemon threads to run tasks.
How Work-Stealing approach Works:
- ForkJoinPoll will have 2 queues
- central worker queue
- scheduling task queue
- Initially, the Worker Thread (ForkJoinPool thread also called as Worker Thread) will take task form central worker queue and divide into subtasks.
- Every Worker Thread (ForkJoinPool thread also called as Worker Thread) will have separate queue called scheduling queue. We can add or remove to scheduling queue frond and back. It supports both FIFO and LIFO operations.
- Worker Thread ForkJoinPoolTask is divided into subtasks and added same work Thread scheduling queue as LIFO. Worker Thread Process the scheduling queue as LIFO.
- If any other worker Thread is ideal it randomly chooses the overloaded Worker Thread to steals the task from it and it will add steeled task as FIFO order.
- When Worker Thread triggers Join operation. It waits until all work steal threads to complete the tasks and combine the result.
About Worker Threads:-
All Worker threads will be maintained in commonpool. So that different processors threads will also come under commonpool.
It is easy to share tasks inside commonpool with different processors threads. So it will enable parallel processing.
Constructors:
- ForkJoinPool() – Creates a pool with number of threads available in the system.
- ForkJoinPool(int parallelism) – Creates number of Worker threads. Value must to greater than zero.
Methods:
T invoke(ForkJoinTask
void execute(ForkJoinTask<?> task) – can be used to start a task without waiting for its completion.
submit() – can be used to start a task and retun future object. Future object support below 4 methods to check given task is completed normally or abnormally.
- boolean isDone() method returns true, if a task completes in java.
- boolean isCompletedNormally() method returns true, if task completed normally without throwing any exception in java.
- boolean isCompletedAbnormally() returns true, if task completed abnormally either by cancellation or by throwing any exception in java.
- boolean isCancelled() returns true if the task was cancelled in java.
ForkJoinTask
ForkJoinPoll uses ForkJoinTask to create subtasks and run the code.
Methods:
ForkJoinTask
V join( ) – The join( ) method waits for task completion on which it is called. The method returns result of the task.
V invoke( ) – The invoke() method combines the functionality of fork() and join() methods. invoke() submits the task and waits for completion of submitted task in java. The method returns result of task in java.
invokeAll() – method submits list of tasks i.e. tasks and waits for completion of all tasks in list in java.
RecursiveAction – RecursiveAction is subclass of ForkJoinTask. This submits a task to ForkJoinPool and does not return a result in java.
Most important method of RecursiveAction is compute() method in java.
__Example: __
Array to square each and every element and sum up all squared elements.
class task extends RecursiveAction
{
int ar[],start,end, endval;
task(int[] ar, int start, int end)
{
this.ar = ar;
this.start =start;
this.end = end;
}
@Override
protected void compute() {
if(end==start)
{
System.out.println(Thread.currentThread().getName());
endval = ar[end-1] = ar[end-1]*ar[end-1];
}
else
{
task t1= new task(ar,start,(start+end)/2);
task t2 = new task(ar,((start+end)/2)+1,end);
t1.fork();
t2.fork();
t1.join();
t2.join();
endval = t1.endval + t2.endval;
}
}
}
public class forkjoinpooltest
{
public static void main(String args[])
{
int ar[] = {1,2,3,4};
ForkJoinPool fjp = new ForkJoinPool();
task t = new task(ar,1,4);
fjp.submit(t);
System.out.println("Array: " + Arrays.toString(ar));
t.join();
System.out.println("Array: " + t.endval);
}
}
RecursiveTask
RecursiveTask is subclass of ForkJoinTask. This submits a task and returns a result.
The V specifies the result type of the task.
Most important method of RecursiveAction is compute() method.
class task1 extends RecursiveTask<Integer>
{
int ar[],start,end, endval;
task1(int[] ar, int start, int end) {
this.ar = ar;
this.start =start;
this.end = end;
}
@Override
protected Integer compute() {
if(end==start) {
System.out.println(Thread.currentThread().getName());
endval = ar[end-1] = ar[end-1]*ar[end-1];
return endval;
} else {
task1 t1= new task1(ar,start,(start+end)/2);
task1 t2 = new task1(ar,((start+end)/2)+1,end);
invokeAll(t1,t2);
endval = t1.join() + t2.join();
return endval;
}
}
}
public class forkjoin
{
public static void main(String[] args) {
int ar[] = {1,2,3,4};
ForkJoinPool fjp= new ForkJoinPool();
task1 t = new task1(ar,1,4);
fjp.invoke(t);
System.out.println("Array: " + Arrays.toString(ar));
System.out.println("sum: " + t.endval);
}
}