Thursday, August 30, 2007

Jump In The Pool, The Threads Are Fine

example source code, overview, java part I

10,000 soldiers may be more powerful than one, but, if they march in a straight line towards enemy fire, they are no more effective.

Threads are the same. Marching them in regiments versus a line adds power and efficiency at the same time.


Thread Pooling
In Java 5.0 (JRE 1.5), the java.util.concurrent package was made available which provided API for concurrent processing/efficient thread pooling. For some good details on the API, please refer to Java/Sun publications on JSR 166. The Distributed Computing Laboratory at Emory University has further documentation as well as a back-ported version of the package for use with older versions of JVM.

To implement the concurrent/thread pool model to the ArrayOfThreads object we created in previous post, the ExecutorService is utilized to invoke a Callable object (introduced in the concurrent package). AThread happens to be a Callable object versus a simple sub-class of Thread object which just has a run() method without a return. Utilized the Callable even in the earlier examples as the end goal is to be able to gather data back from the execution of this process.

Consequently, after submission of threads for processing, the framework for concurrency provides a mechanism to retrieve returned data per thread using the Future object. The major difference this makes in the implementation is that the Future handles the waiting and gathering of thread data, so the main application thread spawning the new processes does not have to wait until each thread completes before moving on.


...

/* code snippet from ArrayOfThreadsPooled.java
* @see java.util.concurrent.Executors
* @see java.util.concurrent.ExecutorService
* @see java.util.concurrent.Future
*/
int NUM_THRDS = 100, NUM_THRDS_POOLED = 100;
ExecutorService tpes = Executors.newFixedThreadPool(NUM_THRDS_POOLED);
Future futures[] = new Future[NUM_THRDS];
AThread calculators[] = new AThread[NUM_THRDS];
Object status[] = new Object[NUM_THRDS];

...

Later on using a for loop to instantiate the threads, we submit each to pool (where i represents the integer index used in loop).

calculators[i] = new AThread(i);
futures[i] = tpes.submit(calculators[i]);

On the first implementation of ArrayOfThreadsPooled we see that the process impracticality of running multiple threads has been removed as the process time is about a hundredth of the time since all 100 threads execute simultaneously (see output example 1).


Don't Be A Resource Hog
However, the problem of scaling is still an issue as your business systems involved in processing and application server itself will probably not appreciate the execution of 10,000 threads. To remedy this per user, the ArrayOfThreadsPooledv2 is enhanced by illustrating the fact that the fixed thread pool does not need to have the same size as the required amount of Callables, Futures, et al. Therefore, the NUM_THRDS_POOLED value can be changed to a lower or higher number than 100.

For example I found that my application worked with a total number of 15 processors running at a time (see output example 2). To better understand the benefit, the AThread class provided includes the ability to provide variable processing times. With variance in execution times, the thread pool with stop and start threads appropriately thus adding even more efficiencies over executing x number of threads at a time with identical run times (see output example 3).


Back 2 The Future
Through the Future get() method wrapped with exception handling, we can retrieve the returned object/data from the Callable later on.

Since the Futures involved take care of getting data, as a developer you get the benefit of all the threads starting at once, but having a method of identifying which response goes to which request. With this information, you can then sort, analyze, manipulate or otherwise report the data however you see fit.

Some further enhancements to this would be o find the optimum amount of threads needed to support all requests from your client base and instantiate the ExecutorService as one static version defined globally for the application.

public static int NUM_THRDS_CONCURRENT = 60;
public static ExecutorService tpes =
Executors.newFixedThreadPool(NUM_THRDS_CONCURRENT);

Each instance of ArrayOfThreadsPooledv2 that were executed would, therefore, submit its number of Callable objects necessary into the shared pool.

futures[i] = ArrayOfThreadsPooledv2.tpes.submit(calculators[i]);

If no other users were on, the response may be as quick as first iteration of ArrayOfThreadsPooled while providing the stability and respect of resources provided by second implementation.

With the source code and detail to this point, I hope those of you on the journey towards creating a multi-threaded application in Java have found enough useful information to utilize the full power of Java's great addition. Keep in mind that further enhancements exist in JRE 1.6.

Keep evolving development!

No comments: