java - How to run outstanding tasks immediately after ExecutorService.shutdown()? -
i've got scheduledexecutorservice
tasks scheduled execute in hour. how list of outstanding tasks can force them run immediately?
i believe shutdown()
wait hour , looks if shutdownnow()
returns list of runnables cannot run() because runnable implementation checks executor state , when notices has shut down runnable refuses run. see scheduledthreadpoolexecutor.scheduledfuturetask.run()
actual implementation.
any ideas?
i've taken mark peters' answer, implementing abstract methods, added thread-safety , tried respecting underlying scheduledthreadpoolexecutor configuration whenever possible.
/** * overrides shutdown() run outstanding tasks immediately. * * @author gili tzabari */ public class runonshutdownscheduledexecutorservice extends abstractexecutorservice implements scheduledexecutorservice { private final scheduledexecutorservice delegate; private final scheduledthreadpoolexecutor scheduledthreadpoolexecutor; private final executorservice immediateservice; private final concurrentmap<future<?>, callable<?>> tasks = maps.newconcurrentmap(); /** * creates new runonshutdownscheduledexecutorservice. * * @param delegate executor delegate */ public runonshutdownscheduledexecutorservice(scheduledexecutorservice delegate) { preconditions.checknotnull(delegate, "delegate may not null"); this.delegate = delegate; if (delegate instanceof scheduledthreadpoolexecutor) { this.scheduledthreadpoolexecutor = (scheduledthreadpoolexecutor) delegate; this.immediateservice = executors.newfixedthreadpool(scheduledthreadpoolexecutor. getcorepoolsize(), scheduledthreadpoolexecutor.getthreadfactory()); } else { scheduledthreadpoolexecutor = null; this.immediateservice = executors.newsinglethreadexecutor(new threadfactorybuilder(). setnameformat(runonshutdownscheduledexecutorservice.class.getname() + "-%d").build()); } } @override public boolean isshutdown() { return delegate.isshutdown(); } @override public boolean isterminated() { return delegate.isterminated(); } @override public boolean awaittermination(long timeout, timeunit unit) throws interruptedexception { long before = system.nanotime(); if (!delegate.awaittermination(timeout, unit)) return false; long after = system.nanotime(); long timeleft = timeout - unit.convert(after - before, timeunit.nanoseconds); return immediateservice.awaittermination(timeleft, unit); } @override public void execute(runnable command) { delegate.execute(command); } @override public scheduledfuture<?> schedule(final runnable command, long delay, timeunit unit) { cleaningrunnable decorated = new cleaningrunnable(command); scheduledfuture<?> future = delegate.schedule(decorated, delay, unit); decorated.setfuture(future); tasks.put(future, executors.callable(command)); return new cleaningscheduledfuture<>(future); } @override public <v> scheduledfuture<v> schedule(callable<v> callable, long delay, timeunit unit) { callablewithfuture<v> decorated = new callablewithfuture<>(callable); scheduledfuture<v> future = delegate.schedule(decorated, delay, unit); decorated.setfuture(future); tasks.put(future, callable); return new cleaningscheduledfuture<>(future); } @override public scheduledfuture<?> scheduleatfixedrate(runnable command, long initialdelay, long period, timeunit unit) { cleaningrunnable decorated = new cleaningrunnable(command); scheduledfuture<?> future = delegate.scheduleatfixedrate(decorated, initialdelay, period, unit); decorated.setfuture(future); tasks.put(future, executors.callable(command)); return new cleaningscheduledfuture<>(future); } @override public scheduledfuture<?> schedulewithfixeddelay(runnable command, long initialdelay, long delay, timeunit unit) { cleaningrunnable decorated = new cleaningrunnable(command); scheduledfuture<?> future = delegate.schedulewithfixeddelay(decorated, initialdelay, delay, unit); decorated.setfuture(future); tasks.put(future, executors.callable(command)); return new cleaningscheduledfuture<>(future); } @override public synchronized void shutdown() { if (delegate.isshutdown()) return; if (scheduledthreadpoolexecutor != null) { // workaround: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=7069418 // // cancel waiting scheduled tasks, otherwise executor won't shut down scheduledthreadpoolexecutor.setexecuteexistingdelayedtasksaftershutdownpolicy(false); } delegate.shutdown(); // users not able cancel() futures past point we're guaranteed // "tasks" not modified. final list<callable<?>> outstandingtasks = lists.newarraylist(); (map.entry<future<?>, callable<?>> entry: tasks.entryset()) { future<?> future = entry.getkey(); callable<?> task = entry.getvalue(); if (future.isdone() && future.iscancelled()) { // task called underlying executor, not user. see cleaningscheduledfuture. outstandingtasks.add(task); } } tasks.clear(); if (outstandingtasks.isempty()) { immediateservice.shutdown(); return; } immediateservice.submit(new callable<void>() { @override public void call() throws exception { delegate.awaittermination(long.max_value, timeunit.days); // execute outstanding tasks after delegate executor finishes shutting down (callable<?> task: outstandingtasks) immediateservice.submit(task); immediateservice.shutdown(); return null; } }); } @override public list<runnable> shutdownnow() { return delegate.shutdownnow(); } /** * runnable removes future when running. */ private class cleaningrunnable implements runnable { private final runnable delegate; private future<?> future; /** * creates new runnablewithfuture. * * @param delegate runnable delegate * @throws nullpointerexception if delegate null */ public cleaningrunnable(runnable delegate) { preconditions.checknotnull(delegate, "delegate may not null"); this.delegate = delegate; } /** * associates future runnable. * * @param future future */ public void setfuture(future<?> future) { this.future = future; } @override public void run() { tasks.remove(future); delegate.run(); } } /** * callable removes future when running. */ private class callablewithfuture<v> implements callable<v> { private final callable<v> delegate; private future<v> future; /** * creates new callablewithfuture. * * @param delegate callable delegate * @throws nullpointerexception if delegate null */ public callablewithfuture(callable<v> delegate) { preconditions.checknotnull(delegate, "delegate may not null"); this.delegate = delegate; } /** * associates future runnable. * * @param future future */ public void setfuture(future<v> future) { this.future = future; } @override public v call() throws exception { tasks.remove(future); return delegate.call(); } } /** * scheduledfuture removes future when canceling. * * allows differentiate between tasks canceled user , underlying * executor. tasks canceled user removed "tasks". * * @param <v> result type returned future */ private class cleaningscheduledfuture<v> implements scheduledfuture<v> { private final scheduledfuture<v> delegate; /** * creates new myscheduledfuture. * * @param delegate future delegate * @throws nullpointerexception if delegate null */ public cleaningscheduledfuture(scheduledfuture<v> delegate) { preconditions.checknotnull(delegate, "delegate may not null"); this.delegate = delegate; } @override public long getdelay(timeunit unit) { return delegate.getdelay(unit); } @override public int compareto(delayed o) { return delegate.compareto(o); } @override public boolean cancel(boolean mayinterruptifrunning) { boolean result = delegate.cancel(mayinterruptifrunning); if (result) { // tasks canceled users removed "tasks" tasks.remove(delegate); } return result; } @override public boolean iscancelled() { return delegate.iscancelled(); } @override public boolean isdone() { return delegate.isdone(); } @override public v get() throws interruptedexception, executionexception { return delegate.get(); } @override public v get(long timeout, timeunit unit) throws interruptedexception, executionexception, timeoutexception { return delegate.get(timeout, unit); } } }
Comments
Post a Comment