package org.hobbit.core.rabbit;

import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.hobbit.core.data.FileReceiveState;
import org.hobbit.core.data.RabbitQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/hobbit/core/rabbit/SimpleFileReceiver.class */
public class SimpleFileReceiver {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) SimpleFileReceiver.class);
    protected static final long DEFAULT_TIMEOUT = 1000;
    protected RabbitQueue queue;
    protected QueueingConsumer consumer;
    protected Map<String, FileReceiveState> fileStates = new HashMap();
    protected boolean terminated = false;
    protected int errorCount = 0;
    protected ExecutorService executor = Executors.newCachedThreadPool();
    protected long waitingForMsgTimeout = 1000;

    /* loaded from: input_file:org/hobbit/core/rabbit/SimpleFileReceiver$MessageProcessing.class */
    protected static class MessageProcessing implements Runnable {
        private SimpleFileReceiver receiver;
        private String outputDir;
        private byte[] data;

        public MessageProcessing(SimpleFileReceiver simpleFileReceiver, String str, byte[] bArr) {
            this.outputDir = str;
            this.data = bArr;
            this.receiver = simpleFileReceiver;
        }

        @Override // java.lang.Runnable
        public void run() {
            FileReceiveState fileReceiveState;
            ByteBuffer wrap = ByteBuffer.wrap(this.data);
            String readString = RabbitMQUtils.readString(wrap);
            synchronized (this.receiver.fileStates) {
                if (this.receiver.fileStates.containsKey(readString)) {
                    fileReceiveState = this.receiver.fileStates.get(readString);
                } else {
                    try {
                        fileReceiveState = new FileReceiveState(readString, new BufferedOutputStream(new FileOutputStream(this.outputDir + readString)));
                        this.receiver.fileStates.put(readString, fileReceiveState);
                    } catch (FileNotFoundException e) {
                        SimpleFileReceiver.LOGGER.error("Couldn't create file \"" + readString + "\". Message will be ignored.", (Throwable) e);
                        this.receiver.increaseErrorCount();
                        return;
                    }
                }
            }
            synchronized (fileReceiveState) {
                int i = wrap.getInt();
                byte[] subarray = ArrayUtils.subarray(wrap.array(), wrap.position(), wrap.array().length);
                if (i == fileReceiveState.nextMessageId) {
                    processMessageData(subarray, fileReceiveState);
                } else {
                    fileReceiveState.messageBuffer.put(Integer.valueOf(i), subarray);
                }
            }
        }

        protected void processMessageData(byte[] bArr, FileReceiveState fileReceiveState) {
            if (bArr.length == 0) {
                IOUtils.closeQuietly(fileReceiveState.outputStream);
                fileReceiveState.outputStream = null;
                SimpleFileReceiver.LOGGER.debug("Received last message for file \"{}\".", fileReceiveState.name);
                if (fileReceiveState.messageBuffer.size() > 0) {
                    SimpleFileReceiver.LOGGER.error("Closed the file \"{}\" while there are still {} messages in its data buffer", fileReceiveState.name, Integer.valueOf(fileReceiveState.messageBuffer.size()));
                    return;
                }
                return;
            }
            try {
                fileReceiveState.outputStream.write(bArr);
            } catch (IOException e) {
                SimpleFileReceiver.LOGGER.error("Couldn't write message data to file.", (Throwable) e);
                this.receiver.increaseErrorCount();
            }
            fileReceiveState.nextMessageId++;
            if (fileReceiveState.messageBuffer.containsKey(Integer.valueOf(fileReceiveState.nextMessageId))) {
                processMessageData(fileReceiveState.messageBuffer.remove(Integer.valueOf(fileReceiveState.nextMessageId)), fileReceiveState);
            }
        }
    }

    public static SimpleFileReceiver create(RabbitQueueFactory rabbitQueueFactory, String str) throws IOException {
        return create(rabbitQueueFactory.createDefaultRabbitQueue(str));
    }

    public static SimpleFileReceiver create(RabbitQueue rabbitQueue) throws IOException {
        QueueingConsumer queueingConsumer = new QueueingConsumer(rabbitQueue.channel);
        rabbitQueue.channel.basicConsume(rabbitQueue.name, true, queueingConsumer);
        rabbitQueue.channel.basicQos(20);
        return new SimpleFileReceiver(rabbitQueue, queueingConsumer);
    }

    protected SimpleFileReceiver(RabbitQueue rabbitQueue, QueueingConsumer queueingConsumer) {
        this.queue = rabbitQueue;
        this.consumer = queueingConsumer;
    }

    public String[] receiveData(String str) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        if (!str.endsWith(File.separator)) {
            str = str + File.separator;
        }
        File file = new File(str);
        if (!file.exists() && !file.mkdirs()) {
            throw new IOException("Couldn't create \"" + file.getAbsolutePath() + "\".");
        }
        QueueingConsumer.Delivery delivery = null;
        while (true) {
            try {
                if (this.terminated && delivery == null && this.queue.channel.messageCount(this.queue.name) <= 0) {
                    return (String[]) this.fileStates.keySet().toArray(new String[this.fileStates.size()]);
                }
                delivery = this.consumer.nextDelivery(this.waitingForMsgTimeout);
                if (delivery != null) {
                    this.executor.execute(new MessageProcessing(this, str, delivery.getBody()));
                }
            } finally {
                close();
            }
        }
    }

    public void terminate() {
        this.terminated = true;
    }

    public void forceTermination() {
        this.terminated = true;
        close();
    }

    protected synchronized void increaseErrorCount() {
        this.errorCount++;
    }

    public int getErrorCount() {
        return this.errorCount;
    }

    public void setWaitingForMsgTimeout(long j) {
        this.waitingForMsgTimeout = j;
    }

    protected void close() {
        this.executor.shutdown();
        try {
            this.executor.awaitTermination(10L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            LOGGER.error("Interrupted while waiting for executor to terminate.");
        }
        IOUtils.closeQuietly(this.queue);
        for (String str : this.fileStates.keySet()) {
            if (this.fileStates.get(str).outputStream != null) {
                LOGGER.warn("Closing file \"{}\" for which no end message has been received.", str);
                IOUtils.closeQuietly(this.fileStates.get(str).outputStream);
                increaseErrorCount();
            }
        }
    }
}
