org.semanticweb.elk.util.concurrent.computation
Class ConcurrentComputation<I,F extends InputProcessorFactory<I,?>>

java.lang.Object
  extended by org.semanticweb.elk.util.concurrent.computation.ConcurrentComputation<I,F>
Type Parameters:
I - the type of the input to be processed.
F - the type of the factory for the input processors

public class ConcurrentComputation<I,F extends InputProcessorFactory<I,?>>
extends Object

An class for concurrent processing of a number of tasks. The input for the tasks are submitted, buffered, and processed by concurrent workers using the the InputProcessor objects created by the supplied InputProcessorFactory. The implementation is loosely based on a producer-consumer framework with one producer and many consumers. The processing of the input should start by calling the start() method, following by submit(Object) method for submitting input to be processed. The workers will always wait for new input, unless interrupted or finish() method is called. If finish() is called then no further input can be submitted and the workers will terminate when all input has been processed or they are interrupted earlier, whichever is earlier.

Author:
"Yevgeny Kazakov"

Field Summary
protected  BlockingQueue<I> buffer
          the internal buffer for queuing input
protected  ComputationExecutor executor
          the executor used internally to run the jobs
protected  boolean finishRequested
          true if the finish of computation was requested using the function finish()
protected  F inputProcessorFactory
          the factory for the input processor engines
protected  boolean interrupted
          true if the computation has been interrupted
protected  int maxWorkers
          maximum number of concurrent workers
protected  Runnable worker
          the worker instance used to process the jobs
 
Constructor Summary
ConcurrentComputation(F inputProcessorFactory, ComputationExecutor executor, int maxWorkers)
          Creating a ConcurrentComputation instance.
ConcurrentComputation(F inputProcessorFactory, ComputationExecutor executor, int maxWorkers, int bufferCapacity)
          Creating a ConcurrentComputation instance.
 
Method Summary
 void finish()
          Marks the end of the input and requests all workers to terminate when all currently submitted input has been processed.
 Iterable<I> interrupt()
          Request all currently running workers to stop; no input can be submitted after calling this method.
 boolean start()
          Starts the workers to process the input.
 boolean submit(I input)
          Submitting a new input for processing.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

inputProcessorFactory

protected final F extends InputProcessorFactory<I,?> inputProcessorFactory
the factory for the input processor engines


maxWorkers

protected final int maxWorkers
maximum number of concurrent workers


executor

protected final ComputationExecutor executor
the executor used internally to run the jobs


buffer

protected final BlockingQueue<I> buffer
the internal buffer for queuing input


finishRequested

protected volatile boolean finishRequested
true if the finish of computation was requested using the function finish()


interrupted

protected volatile boolean interrupted
true if the computation has been interrupted


worker

protected final Runnable worker
the worker instance used to process the jobs

Constructor Detail

ConcurrentComputation

public ConcurrentComputation(F inputProcessorFactory,
                             ComputationExecutor executor,
                             int maxWorkers,
                             int bufferCapacity)
Creating a ConcurrentComputation instance.

Parameters:
inputProcessorFactory - the factory for input processors
executor - the executor used internally to run the jobs
maxWorkers - the maximal number of concurrent workers processing the jobs
bufferCapacity - the size of the buffer for scheduled jobs; if the buffer is full, submitting new jobs will block until new space is available

ConcurrentComputation

public ConcurrentComputation(F inputProcessorFactory,
                             ComputationExecutor executor,
                             int maxWorkers)
Creating a ConcurrentComputation instance.

Parameters:
inputProcessorFactory - the factory for input processors
executor - the executor used internally to run the jobs
maxWorkers - the maximal number of concurrent workers processing the jobs
Method Detail

start

public boolean start()
Starts the workers to process the input.

Returns:
true if the operation was successful

submit

public boolean submit(I input)
               throws InterruptedException
Submitting a new input for processing. Submitted input jobs are first buffered, and then concurrently processed by workers. If the buffer is full, the method blocks until new space is available.

Parameters:
input - the input to be processed
Returns:
true if the input has been successfully submitted for computation; the input cannot be submitted, e.g., if finish() has been called
Throws:
InterruptedException - thrown if interrupted during waiting for space to be available

interrupt

public Iterable<I> interrupt()
                      throws InterruptedException
Request all currently running workers to stop; no input can be submitted after calling this method. When this method returns, no worker should be running.

Returns:
the submitted inputs that were not processed by the workers
Throws:
InterruptedException

finish

public void finish()
            throws InterruptedException
Marks the end of the input and requests all workers to terminate when all currently submitted input has been processed. After calling this method, no new input can be submitted anymore, i.e., calling submit(Object) will always return false. The method blocks until all workers have been stopped. If interrupted while blocked, this method can be called again in order to complete the termination request.

Throws:
InterruptedException - if interrupted during waiting for finish request


Copyright © 2011-2013 Department of Computer Science, University of Oxford. All Rights Reserved.