import java.util.*;
import java.util.concurrent.*;


public class ForEachConcurrently {

    /**
     * Example use of AbstractEachConcurrently to demonstrate concurrent 'iteration' over an Iterable.
     */
    public static for <T, throws X> void eachConcurrently(final Iterable<T> items, 
            Executor executor, final {T ==> void throws X} block) throws X {
    
        new AbstractEachConcurrently<throws X>() {
            protected void submitTasks() {
                for (T item : items) {
                    if (!submitTask({==> block.invoke(item);}))
                        return;
                }
            }
        }.run(executor);
    }


    /**
     * Example use of AbstractEachConcurrently to demonstrate 'iteration' over the entries in a Map.
     */
    public static for <K, V, throws X> void eachEntryConcurrently(final Map<K,V> map, 
            Executor executor, final {K, V ==> void throws X} block) throws X {
    
        new AbstractEachConcurrently<throws X>() {
            protected void submitTasks() {
                for (Map.Entry<K,V> entry : map.entrySet()) {
                    if (!submitTask({==> block.invoke(entry.getKey(), entry.getValue());}))
                        return;
                }
            }
        }.run(executor);
    }
}


/**
 * The fun begins here...
 */
abstract class AbstractEachConcurrently<throws X> {
    
    
    /**
     * Could use {==> void throws X} instead of this interface, but sometimes
     * nominal types are good too... BGGA gives us the choice :)
     */
    interface Task<throws X> {
        void execute() throws X;
    }
    
    
    private CompletionService<Throwable> completionService;
    private List<Task<X>> rejectedTasks;
    private int acceptedCount;
    private volatile boolean cancelled;


    /**
     * Implement this, submitting each task for execution by calling {@link #submitTask(Task<X>)}.
     */
    protected abstract void submitTasks();


    /**
     * Submits a task for execution.
     * @param task the task to execute
     * @return true if more tasks 
     */
    protected boolean submitTask(Task<X> task) {
        /*
         * Attempt to submit the task to the CompletionService
         * for execution in another Thread.
         */
        try {
            /* This was a closure rather than anon. class originally,
             * but it crashed the prototype compiler
             */
            Callable<Throwable> callable = 
                new Callable<Throwable>() {
                    public Throwable call() {
                        Throwable result = null;
                        if (!cancelled) {
                            try {
                                task.execute();
                            }
                            catch (Throwable ex) {
                                cancelled = true;
                                result = ex;
                            }
                        }
                        return result;
                    }
                };
            
            completionService.submit(callable);
            
            // Getting this far means the task was accepted.
            acceptedCount++;
        }
        catch (RejectedExecutionException ex) {
            /*
             * If the task was rejected, add it to the List of
             * rejected tasks which we may try to execute directly
             * in this Thread at a later point.
             */
            rejectedTasks.add(task);
        }

        return !cancelled;
    }
    
    
    /**
     * Attempts to execute a number of tasks using the provided Executor, falling back to synchronous
     * task execution for any that were rejected.
     * By the time this method completes, all tasks that will execute will have completed. 
     */
    public void run(Executor executor) throws X {

        completionService = new ExecutorCompletionService<Throwable>(executor);
        rejectedTasks = new ArrayList<Task<X>>();
        
        Throwable throwable = null;
        try {
            /* Attempt to submit all the tasks to the CompletionService.
             * If any could not be accepted, they will be held in rejectedTasks.
             */
            submitTasks();
        }
        catch (Throwable ex) {
            /* If an exception is raised during submission of the tasks, that's
             * the one we should report, regardless of any exceptions that may
             * be raised during execution of any successfully submitted tasks.
             */
            throwable = ex;
        }
        finally {
            try {
                /* Wait for all the tasks to complete, storing the first Throwable
                 * result encountered.
                 */
                while (acceptedCount-- > 0) {
                    try {
                        Throwable result = completionService.take().get();
                        if (throwable == null)
                            throwable = result;
                    }
                    catch (ExecutionException ex) {
                        throw new AssertionError(ex);
                    }
                    catch (InterruptedException ex) {
                        Thread.currentThread().interrupt();
                    }
                }

                /* Unless an exception has been recorded, execute any tasks which the
                 * CompletionService was unable to accept.
                 */
                if (throwable == null) {
                    for (Task<X> task : rejectedTasks) {
                        try {
                            task.execute();
                        }
                        catch (Throwable ex) {
                            // Record the exception and bail out.
                            throwable = ex;
                            break;
                        }
                    }
                }
            }
            finally {
                /* If there's a value in throwable, then it's the result of one of the following:
                 * 1) A problem occurred submitting a task
                 * 2) A task was executed but threw an exception
                 * 3) A task was executed and triggered a non-local transfer, which can
                 *    now be restarted from the current (correct) thread.
                 */
                if (throwable != null) {
                    
                    // Handle nonlocal transfers
                    if (throwable instanceof UnmatchedTransfer)
                        throw ((UnmatchedTransfer) throwable).transfer();

                    // Handle RuntimeExceptions (just for clarity really)
                    if (throwable instanceof RuntimeException)
                        throw (RuntimeException) throwable;

                    // Look away now
                    @SuppressWarnings("unchecked")
                    X x = (X) throwable;
                    throw x;
                }
            }
        }
    }
}