Package org.hobbit.core.rabbit
Class RabbitRpcClient
- java.lang.Object
-
- org.hobbit.core.rabbit.RabbitRpcClient
-
- All Implemented Interfaces:
Closeable,AutoCloseable
public class RabbitRpcClient extends Object implements Closeable
This class implements a thread safe client that can process several RPC calls in parallel.- Author:
- Michael Röder (roeder@informatik.uni-leipzig.de)
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description protected static classRabbitRpcClient.RabbitRpcClientConsumerInternal implementation of a Consumer that receives messages on the reply queue and callsprocessResponseForRequest(String, byte[])of itsRabbitRpcClient.RabbitRpcClientConsumer.client.protected static classRabbitRpcClient.RabbitRpcRequestSimple extension of theAbstractFutureclass that waits for the response which is set by theRabbitRpcClient.RabbitRpcRequest.setResponse(byte[] response).
-
Field Summary
Fields Modifier and Type Field Description private Map<String,RabbitRpcClient.RabbitRpcRequest>currentRequestsMapping of correlation Ids to theirRabbitRpcClient.RabbitRpcRequestinstances.private static longDEFAULT_MAX_WAITING_TIMEThe default maximum amount of time in millisecond the client is waiting for a response = 600000Lms.private static org.slf4j.LoggerLOGGERprivate longmaxWaitingTimeThe maximum amount of time in millisecond the client is waiting for a response.private SemaphorerequestMapMutexMutex for managing access to thecurrentRequestsobject.private RabbitQueuerequestQueueQueue used for the request.private RabbitQueueresponseQueueQueue used for the responses.
-
Constructor Summary
Constructors Modifier Constructor Description protectedRabbitRpcClient()Constructor.
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description voidclose()static RabbitRpcClientcreate(com.rabbitmq.client.Connection connection, String requestQueueName)Creates a StorageServiceClient using the given RabbitMQConnection.longgetMaxWaitingTime()protected voidinit(com.rabbitmq.client.Connection connection, String requestQueueName)Initializes the client by declaring a request queue using the given connection and queue name as well as a second queue and a consumer for retrieving responses.protected voidprocessResponseForRequest(String corrId, byte[] body)Processes the response with the given correlation Id and byte array by searching for a matching request and setting the response if it could be found.byte[]request(byte[] data)Sends the request, i.e., the given data, and blocks until the response is received.voidsetMaxWaitingTime(long maxWaitingTime)Sets the maximum amount of time the client is waiting for a response.
-
-
-
Field Detail
-
LOGGER
private static final org.slf4j.Logger LOGGER
-
DEFAULT_MAX_WAITING_TIME
private static final long DEFAULT_MAX_WAITING_TIME
The default maximum amount of time in millisecond the client is waiting for a response = 600000Lms.- See Also:
- Constant Field Values
-
requestQueue
private RabbitQueue requestQueue
Queue used for the request.
-
responseQueue
private RabbitQueue responseQueue
Queue used for the responses.
-
requestMapMutex
private Semaphore requestMapMutex
Mutex for managing access to thecurrentRequestsobject.
-
currentRequests
private Map<String,RabbitRpcClient.RabbitRpcRequest> currentRequests
Mapping of correlation Ids to theirRabbitRpcClient.RabbitRpcRequestinstances.
-
maxWaitingTime
private long maxWaitingTime
The maximum amount of time in millisecond the client is waiting for a response. The default value is defined byDEFAULT_MAX_WAITING_TIME.
-
-
Method Detail
-
create
public static RabbitRpcClient create(com.rabbitmq.client.Connection connection, String requestQueueName) throws IOException
Creates a StorageServiceClient using the given RabbitMQConnection.- Parameters:
connection- RabbitMQ connection used for the communicationrequestQueueName- name of the queue to which the requests should be sent- Returns:
- a StorageServiceClient instance
- Throws:
IOException- if a problem occurs during the creation of the queues or the consumer.
-
init
protected void init(com.rabbitmq.client.Connection connection, String requestQueueName) throws IOExceptionInitializes the client by declaring a request queue using the given connection and queue name as well as a second queue and a consumer for retrieving responses.- Parameters:
connection- the RabbitMQ connection that is used for creating queuesrequestQueueName- the name of the queue- Throws:
IOException- if a communication problem during the creation of the channel, the queue or the internal consumer occurs
-
request
public byte[] request(byte[] data)
Sends the request, i.e., the given data, and blocks until the response is received.- Parameters:
data- the data of the request- Returns:
- the response or null if an error occurs.
-
processResponseForRequest
protected void processResponseForRequest(String corrId, byte[] body)
Processes the response with the given correlation Id and byte array by searching for a matching request and setting the response if it could be found. If there is no request with the same correlation Id, nothing is done.- Parameters:
corrId- correlation Id of the responsebody- data of the response
-
getMaxWaitingTime
public long getMaxWaitingTime()
-
setMaxWaitingTime
public void setMaxWaitingTime(long maxWaitingTime)
Sets the maximum amount of time the client is waiting for a response.- Parameters:
maxWaitingTime- the maximum waiting time in milliseconds
-
close
public void close() throws IOException- Specified by:
closein interfaceAutoCloseable- Specified by:
closein interfaceCloseable- Throws:
IOException
-
-