Package org.hobbit.core.components
Class AbstractTaskGenerator
- java.lang.Object
-
- org.hobbit.core.components.AbstractComponent
-
- org.hobbit.core.components.AbstractCommandReceivingComponent
-
- org.hobbit.core.components.AbstractPlatformConnectorComponent
-
- org.hobbit.core.components.AbstractTaskGenerator
-
- All Implemented Interfaces:
Closeable,AutoCloseable,CommandReceivingComponent,Component,GeneratedDataReceivingComponent,PlatformConnector
- Direct Known Subclasses:
AbstractSequencingTaskGenerator
public abstract class AbstractTaskGenerator extends AbstractPlatformConnectorComponent implements GeneratedDataReceivingComponent
This abstract class implements basic functions that can be used to implement a task generator. The following environment variables are expected:- Author:
- Michael Röder (roeder@informatik.uni-leipzig.de)
-
-
Field Summary
Fields Modifier and Type Field Description protected com.rabbitmq.client.QueueingConsumerconsumerprotected DataReceiverdataGenReceiverprivate static intDEFAULT_MAX_PARALLEL_PROCESSED_MESSAGESDefault value of themaxParallelProcessedMsgsattribute.private intgeneratorIdThe id of this generator.private static org.slf4j.LoggerLOGGERprivate intmaxParallelProcessedMsgsThe maximum number of incoming messages that are processed in parallel.private longnextTaskIdThe task id that will be assigned to the next task generated by this generator.private intnumberOfGeneratorsThe number of task generators created by the benchmark controller.protected booleanrunFlagprotected DataSendersender2EvalStoreprotected DataSendersender2Systemprivate SemaphorestartTaskGenMutexMutex used to wait for the start signal after the component has been started and initialized.private SemaphoreterminateMutexMutex used to wait for the terminate signal.-
Fields inherited from class org.hobbit.core.components.AbstractCommandReceivingComponent
cmdChannel, cmdQueueFactory, cmdResponseTimeout, DEFAULT_CMD_RESPONSE_TIMEOUT, defaultContainerType
-
Fields inherited from class org.hobbit.core.components.AbstractComponent
connectionFactory, incomingDataQueueFactory, NUMBER_OF_RETRIES_TO_CONNECT_TO_RABBIT_MQ, outgoingDataQueuefactory, rabbitMQHostName, START_WAITING_TIME_BEFORE_RETRY
-
-
Constructor Summary
Constructors Constructor Description AbstractTaskGenerator()Default constructor creating anAbstractTaskGeneratorprocessing up toDEFAULT_MAX_PARALLEL_PROCESSED_MESSAGES= 1 messages in parallel.AbstractTaskGenerator(int maxParallelProcessedMsgs)Constructor setting the maximum number of parallel processed messages.
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description voidclose()protected abstract voidgenerateTask(byte[] data)Generates a task from the given data, sends it to the system, takes the timestamp of the moment at which the message has been sent to the system and sends it together with the expected response to the evaluation storage.intgetGeneratorId()protected StringgetNextTaskId()Generates the next unique ID for a task.intgetNumberOfGenerators()voidinit()This method initializes the component.voidreceiveCommand(byte command, byte[] data)This method is called if a command is received and might be interesting for this particular component.voidreceiveGeneratedData(byte[] data)This method is called if data is received from a data generator.voidrun()This method executes the component.protected voidsendTaskToEvalStorage(String taskIdString, long timestamp, byte[] data)This method sends the given data and the given timestamp of the task with the given task id to the evaluation storage.protected voidsendTaskToSystemAdapter(String taskIdString, byte[] data)Sends the given task with the given task id and data to the system.-
Methods inherited from class org.hobbit.core.components.AbstractPlatformConnectorComponent
addContainerObserver, createContainer, getFactoryForIncomingCmdQueues, getFactoryForIncomingDataQueues, getFactoryForOutgoingCmdQueues, getFactoryForOutgoingDataQueues, stopContainer
-
Methods inherited from class org.hobbit.core.components.AbstractCommandReceivingComponent
addCommandHeaderId, createContainer, createContainer, getCmdResponseTimeout, handleCmd, sendToCmdQueue, sendToCmdQueue, sendToCmdQueue, setCmdResponseTimeout
-
Methods inherited from class org.hobbit.core.components.AbstractComponent
createConnection, generateSessionQueueName, getHobbitSessionId
-
-
-
-
Field Detail
-
LOGGER
private static final org.slf4j.Logger LOGGER
-
DEFAULT_MAX_PARALLEL_PROCESSED_MESSAGES
private static final int DEFAULT_MAX_PARALLEL_PROCESSED_MESSAGES
Default value of themaxParallelProcessedMsgsattribute.- See Also:
- Constant Field Values
-
startTaskGenMutex
private Semaphore startTaskGenMutex
Mutex used to wait for the start signal after the component has been started and initialized.
-
terminateMutex
private Semaphore terminateMutex
Mutex used to wait for the terminate signal.
-
generatorId
private int generatorId
The id of this generator.
-
numberOfGenerators
private int numberOfGenerators
The number of task generators created by the benchmark controller.
-
nextTaskId
private long nextTaskId
The task id that will be assigned to the next task generated by this generator.
-
maxParallelProcessedMsgs
private final int maxParallelProcessedMsgs
The maximum number of incoming messages that are processed in parallel. Additional messages have to wait.
-
sender2System
protected DataSender sender2System
-
sender2EvalStore
protected DataSender sender2EvalStore
-
dataGenReceiver
protected DataReceiver dataGenReceiver
-
consumer
protected com.rabbitmq.client.QueueingConsumer consumer
-
runFlag
protected boolean runFlag
-
-
Constructor Detail
-
AbstractTaskGenerator
public AbstractTaskGenerator()
Default constructor creating anAbstractTaskGeneratorprocessing up toDEFAULT_MAX_PARALLEL_PROCESSED_MESSAGES= 1 messages in parallel.
-
AbstractTaskGenerator
public AbstractTaskGenerator(int maxParallelProcessedMsgs)
Constructor setting the maximum number of parallel processed messages. Note that this parameter has to be larger or equal to 1 or theinit()method will throw an exception. SettingmaxParallelProcessedMsgs=1leads to the usage of aQueueingConsumer.- Parameters:
maxParallelProcessedMsgs- the number of messaegs that are processed in parallel
-
-
Method Detail
-
init
public void init() throws ExceptionDescription copied from interface:ComponentThis method initializes the component.- Specified by:
initin interfaceComponent- Overrides:
initin classAbstractCommandReceivingComponent- Throws:
Exception- if an error occurs during the initialization
-
run
public void run() throws ExceptionDescription copied from interface:ComponentThis method executes the component.
-
receiveGeneratedData
public void receiveGeneratedData(byte[] data)
Description copied from interface:GeneratedDataReceivingComponentThis method is called if data is received from a data generator.- Specified by:
receiveGeneratedDatain interfaceGeneratedDataReceivingComponent- Parameters:
data- the data received from a data generator
-
generateTask
protected abstract void generateTask(byte[] data) throws ExceptionGenerates a task from the given data, sends it to the system, takes the timestamp of the moment at which the message has been sent to the system and sends it together with the expected response to the evaluation storage.- Parameters:
data- incoming data generated by a data generator- Throws:
Exception- if a sever error occurred
-
getNextTaskId
protected String getNextTaskId()
Generates the next unique ID for a task.- Returns:
- the next unique task ID
-
receiveCommand
public void receiveCommand(byte command, byte[] data)Description copied from interface:CommandReceivingComponentThis method is called if a command is received and might be interesting for this particular component.- Specified by:
receiveCommandin interfaceCommandReceivingComponent- Overrides:
receiveCommandin classAbstractPlatformConnectorComponent- Parameters:
command- the byte encoding the commanddata- additional data that was sent together with the command
-
sendTaskToEvalStorage
protected void sendTaskToEvalStorage(String taskIdString, long timestamp, byte[] data) throws IOException
This method sends the given data and the given timestamp of the task with the given task id to the evaluation storage.- Parameters:
taskIdString- the id of the tasktimestamp- the timestamp of the moment in which the task has been sent to the systemdata- the expected response for the task with the given id- Throws:
IOException- if there is an error during the sending
-
sendTaskToSystemAdapter
protected void sendTaskToSystemAdapter(String taskIdString, byte[] data) throws IOException
Sends the given task with the given task id and data to the system.- Parameters:
taskIdString- the id of the taskdata- the data of the task- Throws:
IOException- if there is an error during the sending
-
getGeneratorId
public int getGeneratorId()
-
getNumberOfGenerators
public int getNumberOfGenerators()
-
close
public void close() throws IOException- Specified by:
closein interfaceAutoCloseable- Specified by:
closein interfaceCloseable- Overrides:
closein classAbstractCommandReceivingComponent- Throws:
IOException
-
-