package org.apache.spark.network.shuffle;

import java.io.IOException;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.util.NettyUtils;
import org.apache.spark.network.util.TransportConf;
import org.p001sparkproject.guava.collect.Sets;
import org.p001sparkproject.guava.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/spark/network/shuffle/RetryingBlockFetcher.class */
public class RetryingBlockFetcher {
    private static final ExecutorService executorService = Executors.newCachedThreadPool(NettyUtils.createThreadFactory("Block Fetch Retry"));
    private final BlockFetchStarter fetchStarter;
    private final BlockFetchingListener listener;
    private final int maxRetries;
    private final int retryWaitTime;
    private RetryingBlockFetchListener currentListener;
    private final Logger logger = LoggerFactory.getLogger((Class<?>) RetryingBlockFetcher.class);
    private int retryCount = 0;
    private final LinkedHashSet<String> outstandingBlocksIds = Sets.newLinkedHashSet();

    /* loaded from: input_file:org/apache/spark/network/shuffle/RetryingBlockFetcher$BlockFetchStarter.class */
    public interface BlockFetchStarter {
        void createAndStart(String[] strArr, BlockFetchingListener blockFetchingListener) throws IOException;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/spark/network/shuffle/RetryingBlockFetcher$RetryingBlockFetchListener.class */
    public class RetryingBlockFetchListener implements BlockFetchingListener {
        private RetryingBlockFetchListener() {
        }

        @Override // org.apache.spark.network.shuffle.BlockFetchingListener
        public void onBlockFetchSuccess(String str, ManagedBuffer managedBuffer) {
            boolean z = false;
            synchronized (RetryingBlockFetcher.this) {
                if (this == RetryingBlockFetcher.this.currentListener && RetryingBlockFetcher.this.outstandingBlocksIds.contains(str)) {
                    RetryingBlockFetcher.this.outstandingBlocksIds.remove(str);
                    z = true;
                }
            }
            if (z) {
                RetryingBlockFetcher.this.listener.onBlockFetchSuccess(str, managedBuffer);
            }
        }

        @Override // org.apache.spark.network.shuffle.BlockFetchingListener
        public void onBlockFetchFailure(String str, Throwable th) {
            boolean z = false;
            synchronized (RetryingBlockFetcher.this) {
                if (this == RetryingBlockFetcher.this.currentListener && RetryingBlockFetcher.this.outstandingBlocksIds.contains(str)) {
                    if (RetryingBlockFetcher.this.shouldRetry(th)) {
                        RetryingBlockFetcher.this.initiateRetry();
                    } else {
                        RetryingBlockFetcher.this.logger.error(String.format("Failed to fetch block %s, and will not retry (%s retries)", str, Integer.valueOf(RetryingBlockFetcher.this.retryCount)), th);
                        RetryingBlockFetcher.this.outstandingBlocksIds.remove(str);
                        z = true;
                    }
                }
            }
            if (z) {
                RetryingBlockFetcher.this.listener.onBlockFetchFailure(str, th);
            }
        }
    }

    public RetryingBlockFetcher(TransportConf transportConf, BlockFetchStarter blockFetchStarter, String[] strArr, BlockFetchingListener blockFetchingListener) {
        this.fetchStarter = blockFetchStarter;
        this.listener = blockFetchingListener;
        this.maxRetries = transportConf.maxIORetries();
        this.retryWaitTime = transportConf.ioRetryWaitTimeMs();
        Collections.addAll(this.outstandingBlocksIds, strArr);
        this.currentListener = new RetryingBlockFetchListener();
    }

    public void start() {
        fetchAllOutstanding();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void fetchAllOutstanding() {
        String[] strArr;
        int i;
        RetryingBlockFetchListener retryingBlockFetchListener;
        synchronized (this) {
            strArr = (String[]) this.outstandingBlocksIds.toArray(new String[this.outstandingBlocksIds.size()]);
            i = this.retryCount;
            retryingBlockFetchListener = this.currentListener;
        }
        try {
            this.fetchStarter.createAndStart(strArr, retryingBlockFetchListener);
        } catch (Exception e) {
            Logger logger = this.logger;
            Object[] objArr = new Object[2];
            objArr[0] = Integer.valueOf(strArr.length);
            objArr[1] = i > 0 ? "(after " + i + " retries)" : "";
            logger.error(String.format("Exception while beginning fetch of %s outstanding blocks %s", objArr), (Throwable) e);
            if (shouldRetry(e)) {
                initiateRetry();
                return;
            }
            for (String str : strArr) {
                this.listener.onBlockFetchFailure(str, e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void initiateRetry() {
        this.retryCount++;
        this.currentListener = new RetryingBlockFetchListener();
        this.logger.info("Retrying fetch ({}/{}) for {} outstanding blocks after {} ms", Integer.valueOf(this.retryCount), Integer.valueOf(this.maxRetries), Integer.valueOf(this.outstandingBlocksIds.size()), Integer.valueOf(this.retryWaitTime));
        executorService.submit(new Runnable() { // from class: org.apache.spark.network.shuffle.RetryingBlockFetcher.1
            @Override // java.lang.Runnable
            public void run() {
                Uninterruptibles.sleepUninterruptibly(RetryingBlockFetcher.this.retryWaitTime, TimeUnit.MILLISECONDS);
                RetryingBlockFetcher.this.fetchAllOutstanding();
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized boolean shouldRetry(Throwable th) {
        return ((th instanceof IOException) || (th.getCause() != null && (th.getCause() instanceof IOException))) && (this.retryCount < this.maxRetries);
    }
}
