Thursday, September 06, 2007

Concurrent Threading Series (J# Revisited)

After discussing the .NET approach to threading and asynchronous processing, I figured I would circle back to my goal of showing in J# how threading would be accomplished in the same manner it was in our Java application example. Here is what the code looks like (mostly from the different Microsoft references on MSDN web-site. The interesting thing I found is that the use of IAsyncResult is slightly different in Jsharp and requires the use of an additional object System.Runtime.Remoting.Messaging.AsyncResult.


import System.Threading.ThreadPool;
import System.Threading.WaitHandle;
import System.Runtime.Remoting.Messaging.AsyncResult;
import System.IAsyncResult;
import System.AsyncCallback;

/** @delegate */
public delegate String taskDelegate(String x);

public class Example
{
public static int NUM_THRDS_CONCURRENT = 60;
/**
* Initializations to be done only once by application
* Setup thread pool globally.
*/
static
{
int workerThreads = 0, completionPortThreads = 0;
ThreadPool.GetMaxThreads(workerThreads, completionPortThreads);
//If current pool size is less than our maximum, change the pool size
boolean changePoolSize = false;
if (workerThreads < NUM_THRDS_CONCURRENT)
{
workerThreads = NUM_THRDS_CONCURRENT;
changePoolSize = true;
}
if (completionPortThreads < NUM_THRDS_CONCURRENT)
{
completionPortThreads = NUM_THRDS_CONCURRENT;
changePoolSize = true;
}
if (changePoolSize) ThreadPool.SetMaxThreads(workerThreads, completionPortThreads);
}

public Example()
{
super();
}

public static void main() {
Example instance = new Example();
System.out.println("Processing started " + System.DateTime.get_Now().ToString());
instance.execute();
instance = null;
}

public void execute()
{
int NUM_THRDS = 100;
taskDelegate tasks[] = new taskDelegate[NUM_THRDS];
IAsyncResult ars[] = new IAsyncResult[NUM_THRDS];
WaitHandle _handles[] = new WaitHandle[NUM_THRDS];

for(int i=0; i<NUM_THRDS; i++) {
tasks[i] = new taskDelegate(task);
ars[i] = tasks[i].BeginInvoke("Thread #" + i,
new AsyncCallback(aCallback),
tasks[i]
);
_handles[i] = ars[i].get_AsyncWaitHandle();
}

// use wait handles to ensure threads complete before continuing
// in this example we are just printing data,
// but this is useful if after async calls you need to sort/manipulate results.
for (int i = 0; i < NUM_THRDS; i++)
{
_handles[i].WaitOne();
}

try
{
Thread.sleep(5000L);
}
catch (InterruptedException e)
{
// exception handling
}

tasks = null;
ars = null;
}

/* Asynchronous callback method used to call EndInvoke and process results */
public void aCallback(IAsyncResult ar) {
taskDelegate t = (taskDelegate)ar.get_AsyncState();
AsyncResult aResult = (AsyncResult)ar;
taskDelegate temp = (taskDelegate)(aResult.get_AsyncDelegate());
System.out.println(temp.EndInvoke(ar));
t = null; temp = null; aResult = null;
}

public String task(String x)
{
try {
Thread.sleep(2000L);
} catch(InterruptedException e) {
// exception handling
}
return (x.trim() + ": " + System.DateTime.get_Now().ToString());
}
}

I tried to correlate other examples/practices in here like the use of a static initializer to set the size of the pool or other data/objects you want done once, but the remainder of the code is straight from documentation on the different .NET classes as mentioned above. Note the use of the AsyncWaitHandles, as I have found it is a good way to signal the calling thread when all the Async activities have completed. Otherwise, you can start a loop from the number of total threads and do i-- (decrement counter) within AsyncCallback method to indicate the number of threads still running.

1 comment:

kccjr2 said...

For issuing the wait command on the async threads (i.e. WaitOne() code executed in loop), the code can be simplified through use of the static (class) method WaitAll(WaitHandles[]) from the WaitHandle class.

WaitHandle.WaitAll(_handles);

Another simplification using for-each methodology.

for (WaitHandle h : _handles) h.WaitOne();

The above construct indicates for each WaitHandle in the array _handles assigned to h perform tasks.

The former solution works if all the loop is used for is issuing the wait command to an async thread; however, sometimes you may need to do some other task based on the number of handles and the latter for each method may be useful.