java - How to run outstanding tasks immediately after ExecutorService.shutdown()? -


i've got scheduledexecutorservice tasks scheduled execute in hour. how list of outstanding tasks can force them run immediately?

i believe shutdown() wait hour , looks if shutdownnow() returns list of runnables cannot run() because runnable implementation checks executor state , when notices has shut down runnable refuses run. see scheduledthreadpoolexecutor.scheduledfuturetask.run() actual implementation.

any ideas?

i've taken mark peters' answer, implementing abstract methods, added thread-safety , tried respecting underlying scheduledthreadpoolexecutor configuration whenever possible.

/**  * overrides shutdown() run outstanding tasks immediately.  *   * @author gili tzabari  */ public class runonshutdownscheduledexecutorservice extends abstractexecutorservice     implements scheduledexecutorservice {     private final scheduledexecutorservice delegate;     private final scheduledthreadpoolexecutor scheduledthreadpoolexecutor;     private final executorservice immediateservice;     private final concurrentmap<future<?>, callable<?>> tasks = maps.newconcurrentmap();      /**      * creates new runonshutdownscheduledexecutorservice.      *       * @param delegate executor delegate      */     public runonshutdownscheduledexecutorservice(scheduledexecutorservice delegate)     {         preconditions.checknotnull(delegate, "delegate may not null");          this.delegate = delegate;         if (delegate instanceof scheduledthreadpoolexecutor)         {             this.scheduledthreadpoolexecutor = (scheduledthreadpoolexecutor) delegate;             this.immediateservice = executors.newfixedthreadpool(scheduledthreadpoolexecutor.                 getcorepoolsize(), scheduledthreadpoolexecutor.getthreadfactory());         }         else         {             scheduledthreadpoolexecutor = null;             this.immediateservice = executors.newsinglethreadexecutor(new threadfactorybuilder().                 setnameformat(runonshutdownscheduledexecutorservice.class.getname() + "-%d").build());         }     }      @override     public boolean isshutdown()     {         return delegate.isshutdown();     }      @override     public boolean isterminated()     {         return delegate.isterminated();     }      @override     public boolean awaittermination(long timeout, timeunit unit) throws interruptedexception     {         long before = system.nanotime();         if (!delegate.awaittermination(timeout, unit))             return false;         long after = system.nanotime();         long timeleft = timeout - unit.convert(after - before, timeunit.nanoseconds);         return immediateservice.awaittermination(timeleft, unit);     }      @override     public void execute(runnable command)     {         delegate.execute(command);     }      @override     public scheduledfuture<?> schedule(final runnable command, long delay, timeunit unit)     {         cleaningrunnable decorated = new cleaningrunnable(command);         scheduledfuture<?> future = delegate.schedule(decorated, delay, unit);         decorated.setfuture(future);         tasks.put(future, executors.callable(command));         return new cleaningscheduledfuture<>(future);     }      @override     public <v> scheduledfuture<v> schedule(callable<v> callable, long delay, timeunit unit)     {         callablewithfuture<v> decorated = new callablewithfuture<>(callable);         scheduledfuture<v> future = delegate.schedule(decorated, delay, unit);         decorated.setfuture(future);         tasks.put(future, callable);         return new cleaningscheduledfuture<>(future);     }      @override     public scheduledfuture<?> scheduleatfixedrate(runnable command, long initialdelay, long period,         timeunit unit)     {         cleaningrunnable decorated = new cleaningrunnable(command);         scheduledfuture<?> future = delegate.scheduleatfixedrate(decorated, initialdelay, period, unit);         decorated.setfuture(future);         tasks.put(future, executors.callable(command));         return new cleaningscheduledfuture<>(future);     }      @override     public scheduledfuture<?> schedulewithfixeddelay(runnable command, long initialdelay, long delay,         timeunit unit)     {         cleaningrunnable decorated = new cleaningrunnable(command);         scheduledfuture<?> future =             delegate.schedulewithfixeddelay(decorated, initialdelay, delay, unit);         decorated.setfuture(future);         tasks.put(future, executors.callable(command));         return new cleaningscheduledfuture<>(future);     }      @override     public synchronized void shutdown()     {         if (delegate.isshutdown())             return;         if (scheduledthreadpoolexecutor != null)         {             // workaround: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=7069418             //             // cancel waiting scheduled tasks, otherwise executor won't shut down             scheduledthreadpoolexecutor.setexecuteexistingdelayedtasksaftershutdownpolicy(false);         }         delegate.shutdown();         // users not able cancel() futures past point we're guaranteed         // "tasks" not modified.          final list<callable<?>> outstandingtasks = lists.newarraylist();         (map.entry<future<?>, callable<?>> entry: tasks.entryset())         {             future<?> future = entry.getkey();             callable<?> task = entry.getvalue();              if (future.isdone() && future.iscancelled())             {                 // task called underlying executor, not user. see cleaningscheduledfuture.                 outstandingtasks.add(task);             }         }         tasks.clear();         if (outstandingtasks.isempty())         {             immediateservice.shutdown();             return;         }          immediateservice.submit(new callable<void>()         {             @override             public void call() throws exception             {                 delegate.awaittermination(long.max_value, timeunit.days);                  // execute outstanding tasks after delegate executor finishes shutting down                 (callable<?> task: outstandingtasks)                     immediateservice.submit(task);                 immediateservice.shutdown();                 return null;             }         });     }      @override     public list<runnable> shutdownnow()     {         return delegate.shutdownnow();     }      /**      * runnable removes future when running.      */     private class cleaningrunnable implements runnable     {         private final runnable delegate;         private future<?> future;          /**          * creates new runnablewithfuture.          *           * @param delegate runnable delegate          * @throws nullpointerexception if delegate null          */         public cleaningrunnable(runnable delegate)         {             preconditions.checknotnull(delegate, "delegate may not null");              this.delegate = delegate;         }          /**          * associates future runnable.          *           * @param future future          */         public void setfuture(future<?> future)         {             this.future = future;         }          @override         public void run()         {             tasks.remove(future);             delegate.run();         }     }      /**      * callable removes future when running.      */     private class callablewithfuture<v> implements callable<v>     {         private final callable<v> delegate;         private future<v> future;          /**          * creates new callablewithfuture.          *           * @param delegate callable delegate          * @throws nullpointerexception if delegate null          */         public callablewithfuture(callable<v> delegate)         {             preconditions.checknotnull(delegate, "delegate may not null");              this.delegate = delegate;         }          /**          * associates future runnable.          *           * @param future future          */         public void setfuture(future<v> future)         {             this.future = future;         }          @override         public v call() throws exception         {             tasks.remove(future);             return delegate.call();         }     }      /**      * scheduledfuture removes future when canceling.      *       * allows differentiate between tasks canceled user , underlying      * executor. tasks canceled user removed "tasks".      *       * @param <v> result type returned future      */     private class cleaningscheduledfuture<v> implements scheduledfuture<v>     {         private final scheduledfuture<v> delegate;          /**          * creates new myscheduledfuture.          *           * @param delegate future delegate          * @throws nullpointerexception if delegate null          */         public cleaningscheduledfuture(scheduledfuture<v> delegate)         {             preconditions.checknotnull(delegate, "delegate may not null");              this.delegate = delegate;         }          @override         public long getdelay(timeunit unit)         {             return delegate.getdelay(unit);         }          @override         public int compareto(delayed o)         {             return delegate.compareto(o);         }          @override         public boolean cancel(boolean mayinterruptifrunning)         {             boolean result = delegate.cancel(mayinterruptifrunning);              if (result)             {                 // tasks canceled users removed "tasks"                 tasks.remove(delegate);             }             return result;         }          @override         public boolean iscancelled()         {             return delegate.iscancelled();         }          @override         public boolean isdone()         {             return delegate.isdone();         }          @override         public v get() throws interruptedexception, executionexception         {             return delegate.get();         }          @override         public v get(long timeout, timeunit unit) throws interruptedexception, executionexception,             timeoutexception         {             return delegate.get(timeout, unit);         }     } } 

Comments

Popular posts from this blog

linux - Using a Cron Job to check if my mod_wsgi / apache server is running and restart -

actionscript 3 - TweenLite does not work with object -

jQuery Ajax Render Fragments OR Whole Page -