Thread Pool is one of the commonly used implementation in order to improve performance or facilitate handling of complex and time-consuming operations in an efficient manner. As outlined by Oracle (and previously Sun Microsystems) the Thread pools essentially address two different problems: they usually provide improved performance when executing large numbers of asynchronous tasks, due to reduced per-task invocation overhead and they provide means of bounding and managing the resources, including threads, consumed when executing a collection of tasks.
Jdk 5.0 provides a new thread operational control mechanism, the realization of which is within the java.util.concurrent.* package. We have at our disposal the java.util.concurrent.ThreadPoolExecutor class now, which makes the ThreadPool implementation easy and more thorough as the ThreadPoolExecutor not only outlines the concept and makes the feature out of the box, but also maintains some basic statistics, such as the number of completed tasks, etc.
I am going to illustrate an implementation that I had coded for different migration projects to handle and migrate large volume of data. Threadpooling was an automatic choice for me as I had tight migration windows and time constraints.
I have a simple wrapper over the java.util.concurrent.ThreadPoolExecutor class, which acts as a helper for the consuming applications to initialize a thread pool with the desired settings and add tasks to the pool (runnable threads).
//Java
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
//Apache Logger
import org.apache.log4j.Logger;
/**
* This executor class is a generic implementation to initialize a thread pool and execute
* tasks from the desired pool of defined thread class.
*
* @author Somesh Mukherjee
*/
public class BatchThreadPoolExecutor {
public static final Logger log = Logger.getLogger(BatchThreadPoolExecutor.class);
private ThreadPoolExecutor threadPool = null;
private ArrayBlockingQueue queue = null;
/**
* Default Constructor (Should not be Used)
*/
@Deprecated
public BatchThreadPoolExecutor(){
throw new RuntimeException("Must assign proper values for int poolSize(int), maxPoolSize(int), keepAliveTime(long), queueSize (int)");
}
/**
* ThreadPool initializer constructor.
*
* @param poolSize
* @param maxPoolSize
* @param keepAliveTime
* @param queueSize
*/
public BatchThreadPoolExecutor(int poolSize,
int maxPoolSize,
long keepAliveTime,
int queueSize) {
if(0 == poolSize
|| 0 == maxPoolSize
|| 0L == keepAliveTime
|| 0 == queueSize){
throw new RuntimeException("Must assign proper values for int poolSize(int), maxPoolSize(int), keepAliveTime(long), queueSize (int)");
}
log.debug("Initializing BatchThreadPoolExecutor...");
queue = new ArrayBlockingQueue(queueSize);
threadPool = new ThreadPoolExecutor(poolSize,
maxPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
queue);
}
/**
* Fires the individual thread task
*
* @param task
*/
public void runTask(Runnable task){
log.debug("Beginning runTask, threadPool.getTaskCount():" + threadPool.getTaskCount());
log.debug("runTask, Starting queue.size():" + queue.size());
threadPool.execute(task);
log.debug("runTask, Finishing queue.size():" + queue.size());
}
/**
* Shuts down the thread pool. Previously submitted tasks are executed,
* but no new tasks will be accepted. Invocation has no additional effect if already shut down.
*/
public void shutDown(){
log.debug("Shutting down BatchThreadPoolExecutor...");
threadPool.shutdown();
}
/**
* An utility method to find out if the thread pooled tasks are complete or if the pool
* this has active tasks.
*/
public boolean isTerminated(){
if(threadPool.isTerminated()){
log.debug("$$$$ The Thread Pool has Terminated... $$$$");
} else {
log.debug("###### The Thread Pool has NOT Terminated yet. Active Jobs in the pool: " + threadPool.getActiveCount());
}
return threadPool.isTerminated();
}
}
Before getting into the code contained within the above wrapper class, I would include one more snippet of code which outlines the mechanism to initialize and spawn the tasks using the above wrapper.
//Assume interimDataSetlist is the list containing the data to be processed in a concurrent fashion
if (interimDataSetlist != null && !interimDataSetlist.isEmpty()) {
Iterator iterator = interimDataSetlist.iterator();
BatchThreadPoolExecutor mtpe = new BatchThreadPoolExecutor( Integer.parseInt(this.getBatchedThreadPoolSize()),
Integer.parseInt(this.getBatchedThreadPoolMaxPoolSize()),
Long.parseLong(this.getBatchedThreadPoolKeepAliveTime()),
Integer.parseInt(this.getBatchedThreadPoolQueueSize()));
while (iterator.hasNext()) {
data = iterator.next();
if(null != data){
//Enqueue the runnable task here - the createProcessor helper method will simply create an instance of the class with implements the Runnable interface and overrides the run() method with the desired processing implementation
mtpe.runTask(this.createProcessor(data, this.getProcessorBeanName()));
}
}
//Now the tasks are fired and the pool executor will take care of the execution. Number of max active tasks at one
//point will be the allowed pool size
mtpe.shutDown();
interimDataSetlist = null;
//Wait for the spawned threaded tasks in the pool to finish to invoke the next batch of tasks
while(true){
if(mtpe.isTerminated()){
log.debug("Thread pool of Image upload jobs finished! Would initiate the next set if available.");
break;
} else {
//Pooled tasks in progress, would wait for the batch to finish to trigger the next set of job
Thread.sleep(busyWaitMS);
}
}
}
In the illustrated code snippet above, notice line #4, where the wrapper class is instantiated with the attribute settings for the required thread pool. I would give a quick note, so as to what the attributes implicate:
- poolSize - Core thread pool size, allowed to execute concurrently
- maxPoolSize - Allowed max size for the pool. Core Pool Size must be less than or equal to this. Let's assume that our poolSize is set to 10 and the maxPoolSize is set to 20, the executor would allow 10 threads to execute concurrently. If more tasks are enqueued, they go into the waiting queue. Hence, in our example, if the runTask fires 20 processor threads, the first 10 executes concurrently, whilst the next 10 processor thread await for them to complete. Please note that you have to careful so as to fire only a certain number of processor threads, which are less than the allowed max pool size, otherwise the processor threads beyond the limit would be Rejected
- keepAliveTime - Defines the time period the queued processor tasks would be alive. Our wrapper identifies the unit for this as Seconds
- queueSize - Sets the initialization figure for the pool's java.util.concurrent.ArrayBlockingQueue, which holds the enqueued processor tasks
Once the ThreadPoolExecutor is initialized, we fire the runTask (line #12) on the wrapper class, which accepts a Runnable implementation and internally spawns the run() method for the Processor Thread class. The runTask on the wrapper class executes the runTask on the executor.
After all the processor tasks are enqueued, I am immediately shutting the Executor pool, to stop accepting more Processor tasks (line #17). Please note that, this does not suspend or shut down the spawned tasks, but only marks the pool as saturated and shut to accepting more tasks.
The loop starting line #20, ensures that all the Processor thread tasks enqueued in the pool has finished, by checking the Executor termination using the wrapper class method.
I hope this articles helps you to build the ThreadPool you are looking for using Java. For any clarifications or if you have any questions, please drop a comment below and I would try to provide more information or a solution. If you notice any discrepancies, please let me know.
Lastly, I would highly appreciate if you like my post, please visit some of the advertisements the sponsors and advertisers have posted on the site :-)