Class DataReceiverImpl
- java.lang.Object
-
- org.hobbit.core.rabbit.DataReceiverImpl
-
- All Implemented Interfaces:
Closeable,AutoCloseable,DataReceiver
public class DataReceiverImpl extends Object implements DataReceiver
Implementation of theDataReceiverinterface.Use the internal
DataReceiverImpl.Builderclass for creating instances of theDataReceiverImplclass. Note that the createdDataReceiverImplwill either use a givenRabbitQueueor create a new one. In both cases the receiver will become the owner of the queue, i.e., if theDataReceiverImplinstance is closed the queue will be closed as well.Internally, the receiver uses an own thread to consume incoming messages. These messages are forwarded to the given
DataHandlerinstance. Note that this forwarding is based on anExecutorServicethe called methodDataHandler.handleData(byte[])should be thread safe since it might be called in parallel.The
DataReceiverImplowns recources that need to be freed if its work is done. This can be achieved by closing the receiver. In most cases, this should be done using thecloseWhenFinished()method which waits until all incoming messages are processed and all streams are closed. Note that using theclose()method leads to a direct shutdown of the queue which could lead to data loss and threads getting stuck.- Author:
- Michael Röder (roeder@informatik.uni-leipzig.de)
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static classDataReceiverImpl.Builderprotected classDataReceiverImpl.MsgProcessingTaskprotected classDataReceiverImpl.MsgReceivingTask
-
Field Summary
Fields Modifier and Type Field Description private DataHandlerdataHandlerprivate static intDEFAULT_MAX_PARALLEL_PROCESSED_MESSAGESprivate interrorCountprivate ExecutorServiceexecutorprivate static org.slf4j.LoggerLOGGERprotected RabbitQueuequeueprivate TerminatableRunnablereceiverTaskprivate ThreadreceiverThread
-
Constructor Summary
Constructors Modifier Constructor Description protectedDataReceiverImpl(RabbitQueue queue, DataHandler handler, int maxParallelProcessedMsgs)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description static DataReceiverImpl.Builderbuilder()Returns a newly createdDataReceiverImpl.Builder.protected RunnablebuildMsgProcessingTask(com.rabbitmq.client.QueueingConsumer.Delivery delivery)This factory method creates a runnable task that processes the given message.protected TerminatableRunnablebuildMsgReceivingTask(com.rabbitmq.client.QueueingConsumer consumer)This factory method creates a runnable task that uses the given consumer to receive incoming messages.voidclose()A rude way to close the receiver.voidcloseWhenFinished()This method waits for the data receiver to finish its work and closes the incoming queue as well as the internal thread pool after that.DataHandlergetDataHandler()intgetErrorCount()Returns the number of errors that have been encountered while receiving data.protected ExecutorServicegetExecutor()RabbitQueuegetQueue()voidincreaseErrorCount()
-
-
-
Field Detail
-
LOGGER
private static final org.slf4j.Logger LOGGER
-
DEFAULT_MAX_PARALLEL_PROCESSED_MESSAGES
private static final int DEFAULT_MAX_PARALLEL_PROCESSED_MESSAGES
- See Also:
- Constant Field Values
-
queue
protected RabbitQueue queue
-
errorCount
private int errorCount
-
dataHandler
private DataHandler dataHandler
-
executor
private ExecutorService executor
-
receiverTask
private TerminatableRunnable receiverTask
-
receiverThread
private Thread receiverThread
-
-
Constructor Detail
-
DataReceiverImpl
protected DataReceiverImpl(RabbitQueue queue, DataHandler handler, int maxParallelProcessedMsgs) throws IOException
- Throws:
IOException
-
-
Method Detail
-
getDataHandler
public DataHandler getDataHandler()
- Specified by:
getDataHandlerin interfaceDataReceiver
-
increaseErrorCount
public void increaseErrorCount()
- Specified by:
increaseErrorCountin interfaceDataReceiver
-
getErrorCount
public int getErrorCount()
Description copied from interface:DataReceiverReturns the number of errors that have been encountered while receiving data. If this number is not 0 the receiver can not guarantee that all data has been received correctly.- Specified by:
getErrorCountin interfaceDataReceiver- Returns:
- the number of errors encountered during the receiving of data
-
getQueue
public RabbitQueue getQueue()
- Specified by:
getQueuein interfaceDataReceiver
-
getExecutor
protected ExecutorService getExecutor()
-
closeWhenFinished
public void closeWhenFinished()
This method waits for the data receiver to finish its work and closes the incoming queue as well as the internal thread pool after that.- Specified by:
closeWhenFinishedin interfaceDataReceiver
-
close
public void close()
A rude way to close the receiver. Note that this method directly closes the incoming queue and only notifies the internal consumer to stop its work but won't wait for the handler threads to finish their work.- Specified by:
closein interfaceAutoCloseable- Specified by:
closein interfaceCloseable
-
builder
public static DataReceiverImpl.Builder builder()
Returns a newly createdDataReceiverImpl.Builder.- Returns:
- a new
DataReceiverImpl.Builderinstance
-
buildMsgReceivingTask
protected TerminatableRunnable buildMsgReceivingTask(com.rabbitmq.client.QueueingConsumer consumer)
This factory method creates a runnable task that uses the given consumer to receive incoming messages.- Parameters:
consumer- the consumer that can be used to receive messages- Returns:
- a Runnable instance that will handle incoming messages as soon as it will be executed
-
buildMsgProcessingTask
protected Runnable buildMsgProcessingTask(com.rabbitmq.client.QueueingConsumer.Delivery delivery)
This factory method creates a runnable task that processes the given message.- Parameters:
delivery- the message that should be processed- Returns:
- a Runnable instance that will process the message as soon as it will be executed
-
-