java - Application hangs for few minutes even after all the threads are done -
i converted working producer/consumer example thread/runnable executor/callable/blockingqueues , using poison pill termination pattern.
if run program below, hang few minutes though every thread has completed. jstack shows numerous threads blocked on queue not seemingly related application.
"pool-1-thread-10" prio=5 tid=10b08d000 nid=0x10d91c000 waiting on condition [10d91b000] java.lang.thread.state: timed_waiting (parking) @ sun.misc.unsafe.park(native method) - parking wait <7f3113510> (a java.util.concurrent.synchronousqueue$transferstack) @ java.util.concurrent.locks.locksupport.parknanos(locksupport.java:198) @ java.util.concurrent.synchronousqueue$transferstack.awaitfulfill(synchronousqueue.java:424) @ java.util.concurrent.synchronousqueue$transferstack.transfer(synchronousqueue.java:323) @ java.util.concurrent.synchronousqueue.poll(synchronousqueue.java:874) @ java.util.concurrent.threadpoolexecutor.gettask(threadpoolexecutor.java:945) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:907) @ java.lang.thread.run(thread.java:680)
i can not figure out why application hangs. appreciated. thank you
import java.util.arraylist; import java.util.collection; import java.util.list; import java.util.random; import java.util.concurrent.callable; import java.util.concurrent.executionexception; import java.util.concurrent.executorservice; import java.util.concurrent.executors; import java.util.concurrent.future; import java.util.concurrent.linkedblockingqueue; public class producersconsumers { private linkedblockingqueue<item> queue = new linkedblockingqueue<item>(); private static final executorservice executorpool = executors.newcachedthreadpool(); private random randgenerator = new random(system.currenttimemillis()); private class item { private boolean done = false; private string message; private item(boolean done) { this.done = done; } private item(string message) { this.message = message; } public boolean isdone() { return done; } public string getmessage() { return message; } } private class producer implements callable<long> { private final int id; private integer numofmessages; private producer(int id, int numofmessages) { this.id = id; this.numofmessages = numofmessages; } @override public long call() throws exception { long totaltime = 0; while (numofmessages > 0) { string message; synchronized (numofmessages) { long starttime = system.nanotime(); int msglength = randgenerator.nextint(20000); stringbuilder sb = new stringbuilder(msglength); (int = 0; < msglength; a++) { sb.append((char) ('a' + randgenerator.nextint(26))); } message = sb.tostring(); long endtime = system.nanotime(); totaltime += endtime - starttime; } numofmessages--; queue.put(new item(message)); } system.out.println("-------------producer " + id + " done."); queue.put(new item(true)); return totaltime; } } private class consumer implements callable<long> { private string monitor = "monitor"; private final int id; private consumer(int id) { this.id = id; } @override public long call() throws exception { long totaltime = 0; while (true) { item item = queue.take(); if (item.isdone()) { break; } synchronized (monitor) { long starttime = system.nanotime(); stringbuilder sb = new stringbuilder(item.getmessage()); sb = sb.reverse(); string message = sb.tostring(); long endtime = system.nanotime(); totaltime += endtime - starttime; } } system.out.println("+++++++++++++consumer " + id + " done."); return totaltime; } } public void begin(int threadcount) throws interruptedexception, executionexception { collection<producer> producers = new arraylist<producer>(); (int = 0; < threadcount; i++) { producers.add(new producer(i, randgenerator.nextint(5))); } collection<consumer> consumers = new arraylist<consumer>(); (int = 0; < threadcount; i++) { consumers.add(new consumer(i)); } try { long starttime = system.nanotime(); list<future<long>> producerfuturelist = executorpool.invokeall(producers); list<future<long>> consumerfuturelist = executorpool.invokeall(consumers); long producertotaltime = 0; long consumertotaltime = 0; (future<long> future : producerfuturelist) { producertotaltime += future.get(); } (future<long> future : consumerfuturelist) { consumertotaltime += future.get(); } long mainthreadtotaltime = system.nanotime() - starttime; system.out.println("producertotaltime " + producertotaltime); system.out.println("consumertotaltime " + consumertotaltime); system.out.println("mainthreadtotaltime " + mainthreadtotaltime); system.out.println("difference " + (producertotaltime + consumertotaltime - mainthreadtotaltime)); } catch (interruptedexception e) { e.printstacktrace(); //to change body of catch statement use file | settings | file templates. throw e; } catch (executionexception e) { e.printstacktrace(); //to change body of catch statement use file | settings | file templates. throw e; } } public static void main(string[] args) throws executionexception, interruptedexception { producersconsumers prodcon = new producersconsumers(); prodcon.begin(20); } }
you should close executorservice when done it. call executorpool.shutdown() @ end of program.
Comments
Post a Comment