public abstract class AbstractStreamingTaskGenerator extends AbstractPlatformConnectorComponent implements StreamingGeneratedDataReceivingComponent
| Modifier and Type | Class and Description |
|---|---|
protected class |
AbstractStreamingTaskGenerator.GeneratedDataHandler
A simple internal handler class that calls
generateTask(InputStream) with the
given InputStream. |
| Modifier and Type | Field and Description |
|---|---|
protected DataReceiver |
dataReceiver
Receiver for receiving data from the data generators.
|
private static int |
DEFAULT_MAX_PARALLEL_PROCESSED_MESSAGES
Default value of the
maxParallelProcessedMsgs attribute. |
protected int |
generatorId
The id of this generator.
|
private static org.slf4j.Logger |
LOGGER |
private int |
maxParallelProcessedMsgs
The maximum number of incoming messages that are processed in parallel.
|
private long |
nextTaskId
The task id that will be assigned to the next task generated by this
generator.
|
protected int |
numberOfGenerators
The number of task generators created by the benchmark controller.
|
protected DataSender |
sender2EvalStore
Sender for transferring data to the evaluation storage.
|
protected DataSender |
sender2System
Sender for sending tasks to the benchmarked system.
|
private Semaphore |
startTaskGenMutex
Mutex used to wait for the start signal after the component has been started
and initialized.
|
private Semaphore |
terminateMutex
Mutex used to wait for the terminate signal.
|
cmdChannel, cmdQueueFactory, DEFAULT_CMD_RESPONSE_TIMEOUT, defaultContainerTypeconnectionFactory, incomingDataQueueFactory, NUMBER_OF_RETRIES_TO_CONNECT_TO_RABBIT_MQ, outgoingDataQueuefactory, rabbitMQHostName, START_WAITING_TIME_BEFORE_RETRY| Constructor and Description |
|---|
AbstractStreamingTaskGenerator()
Default constructor creating an
AbstractStreamingTaskGenerator
processing up to DEFAULT_MAX_PARALLEL_PROCESSED_MESSAGES=
10 messages in parallel. |
AbstractStreamingTaskGenerator(int maxParallelProcessedMsgs)
Constructor setting the maximum number of parallel processed messages.
|
| Modifier and Type | Method and Description |
|---|---|
void |
close() |
protected abstract void |
generateTask(InputStream dataStream)
Generates a task from the given data, sends it to the system, takes the
timestamp of the moment at which the message has been sent to the system and
sends it together with the expected response to the evaluation storage.
|
int |
getGeneratorId() |
protected String |
getNextTaskId()
Generates the next unique ID for a task.
|
int |
getNumberOfGenerators() |
void |
init()
This method initializes the component.
|
void |
receiveCommand(byte command,
byte[] data)
This method is called if a command is received and might be interesting
for this particular component.
|
void |
receiveGeneratedData(InputStream dataStream)
This method is called if a new data stream is received from a data
generator.
|
void |
run()
This method executes the component.
|
protected void |
sendTaskToEvalStorage(String taskIdString,
long timestamp,
InputStream stream)
This method sends the data read from the given
InputStream and the
given timestamp of the task with the given task id to the evaluation storage. |
protected void |
sendTaskToSystemAdapter(String taskIdString,
InputStream stream)
Sends the given task with the given task id and data to the system.
|
addContainerObserver, createContainer, getFactoryForIncomingCmdQueues, getFactoryForIncomingDataQueues, getFactoryForOutgoingCmdQueues, getFactoryForOutgoingDataQueues, stopContaineraddCommandHeaderId, createContainer, createContainer, handleCmd, sendToCmdQueue, sendToCmdQueue, sendToCmdQueuecreateConnection, generateSessionQueueName, getHobbitSessionIdprivate static final org.slf4j.Logger LOGGER
private static final int DEFAULT_MAX_PARALLEL_PROCESSED_MESSAGES
maxParallelProcessedMsgs attribute.private Semaphore startTaskGenMutex
private Semaphore terminateMutex
protected int generatorId
protected int numberOfGenerators
private long nextTaskId
private final int maxParallelProcessedMsgs
protected DataSender sender2System
protected DataSender sender2EvalStore
protected DataReceiver dataReceiver
public AbstractStreamingTaskGenerator()
AbstractStreamingTaskGenerator
processing up to DEFAULT_MAX_PARALLEL_PROCESSED_MESSAGES=
10 messages in parallel.public AbstractStreamingTaskGenerator(int maxParallelProcessedMsgs)
init()
method will throw an exception. Setting
maxParallelProcessedMsgs=1 leads to the usage of a
QueueingConsumer.maxParallelProcessedMsgs - the number of messaegs that are processed in parallelpublic void init()
throws Exception
Componentinit in interface Componentinit in class AbstractCommandReceivingComponentException - if an error occurs during the initializationpublic void run()
throws Exception
Componentpublic void receiveGeneratedData(InputStream dataStream)
StreamingGeneratedDataReceivingComponentreceiveGeneratedData in interface StreamingGeneratedDataReceivingComponentdataStream - the stream from which the data received from a data generator
is readprotected abstract void generateTask(InputStream dataStream) throws Exception
dataStream - stream containing incoming data generated by a data generatorException - if a sever error occurredprotected String getNextTaskId()
public void receiveCommand(byte command,
byte[] data)
CommandReceivingComponentreceiveCommand in interface CommandReceivingComponentreceiveCommand in class AbstractPlatformConnectorComponentcommand - the byte encoding the commanddata - additional data that was sent together with the commandprotected void sendTaskToEvalStorage(String taskIdString, long timestamp, InputStream stream) throws IOException
InputStream and the
given timestamp of the task with the given task id to the evaluation storage.taskIdString - the id of the tasktimestamp - the timestamp of the moment in which the task has been sent to the
systemstream - stream from which the expected response for the task with the
given id will be readIOException - if there is an error during the sendingprotected void sendTaskToSystemAdapter(String taskIdString, InputStream stream) throws IOException
taskIdString - the id of the taskdata - the data of the taskIOException - if there is an error during the sendingpublic int getGeneratorId()
public int getNumberOfGenerators()
public void close()
throws IOException
close in interface Closeableclose in interface AutoCloseableclose in class AbstractCommandReceivingComponentIOExceptionCopyright © 2017–2018. All rights reserved.