import System.Threading.ThreadPool;
import System.Threading.WaitHandle;
import System.Runtime.Remoting.Messaging.AsyncResult;
import System.IAsyncResult;
import System.AsyncCallback;
/** @delegate */
public delegate String taskDelegate(String x);
public class Example
{
public static int NUM_THRDS_CONCURRENT = 60;
/**
* Initializations to be done only once by application
* Setup thread pool globally.
*/
static
{
int workerThreads = 0, completionPortThreads = 0;
ThreadPool.GetMaxThreads(workerThreads, completionPortThreads);
//If current pool size is less than our maximum, change the pool size
boolean changePoolSize = false;
if (workerThreads < NUM_THRDS_CONCURRENT)
{
workerThreads = NUM_THRDS_CONCURRENT;
changePoolSize = true;
}
if (completionPortThreads < NUM_THRDS_CONCURRENT)
{
completionPortThreads = NUM_THRDS_CONCURRENT;
changePoolSize = true;
}
if (changePoolSize) ThreadPool.SetMaxThreads(workerThreads, completionPortThreads);
}
public Example()
{
super();
}
public static void main() {
Example instance = new Example();
System.out.println("Processing started " + System.DateTime.get_Now().ToString());
instance.execute();
instance = null;
}
public void execute()
{
int NUM_THRDS = 100;
taskDelegate tasks[] = new taskDelegate[NUM_THRDS];
IAsyncResult ars[] = new IAsyncResult[NUM_THRDS];
WaitHandle _handles[] = new WaitHandle[NUM_THRDS];
for(int i=0; i<NUM_THRDS; i++) {
tasks[i] = new taskDelegate(task);
ars[i] = tasks[i].BeginInvoke("Thread #" + i,
new AsyncCallback(aCallback),
tasks[i]
);
_handles[i] = ars[i].get_AsyncWaitHandle();
}
// use wait handles to ensure threads complete before continuing
// in this example we are just printing data,
// but this is useful if after async calls you need to sort/manipulate results.
for (int i = 0; i < NUM_THRDS; i++)
{
_handles[i].WaitOne();
}
try
{
Thread.sleep(5000L);
}
catch (InterruptedException e)
{
// exception handling
}
tasks = null;
ars = null;
}
/* Asynchronous callback method used to call EndInvoke and process results */
public void aCallback(IAsyncResult ar) {
taskDelegate t = (taskDelegate)ar.get_AsyncState();
AsyncResult aResult = (AsyncResult)ar;
taskDelegate temp = (taskDelegate)(aResult.get_AsyncDelegate());
System.out.println(temp.EndInvoke(ar));
t = null; temp = null; aResult = null;
}
public String task(String x)
{
try {
Thread.sleep(2000L);
} catch(InterruptedException e) {
// exception handling
}
return (x.trim() + ": " + System.DateTime.get_Now().ToString());
}
}
I tried to correlate other examples/practices in here like the use of a static initializer to set the size of the pool or other data/objects you want done once, but the remainder of the code is straight from documentation on the different .NET classes as mentioned above. Note the use of the AsyncWaitHandles, as I have found it is a good way to signal the calling thread when all the Async activities have completed. Otherwise, you can start a loop from the number of total threads and do i-- (decrement counter) within AsyncCallback method to indicate the number of threads still running.
1 comment:
For issuing the wait command on the async threads (i.e. WaitOne() code executed in loop), the code can be simplified through use of the static (class) method WaitAll(WaitHandles[]) from the WaitHandle class.
WaitHandle.WaitAll(_handles);
Another simplification using for-each methodology.
for (WaitHandle h : _handles) h.WaitOne();
The above construct indicates for each WaitHandle in the array _handles assigned to h perform tasks.
The former solution works if all the loop is used for is issuing the wait command to an async thread; however, sometimes you may need to do some other task based on the number of handles and the latter for each method may be useful.
Post a Comment