package org.apache.flink.runtime.leaderelection;

import akka.actor.ActorRef;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.jobmanager.Tasks$BlockingOnceReceiver$;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.messages.ExecutionGraphMessages;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.impl.Promise;

/* loaded from: input_file:org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.class */
public class LeaderChangeJobRecoveryTest extends TestLogger {
    private static FiniteDuration timeout = FiniteDuration.apply(30, TimeUnit.SECONDS);
    private Configuration configuration;
    private int numTMs = 1;
    private int numSlotsPerTM = 1;
    private int parallelism = this.numTMs * this.numSlotsPerTM;
    private LeaderElectionRetrievalTestingCluster cluster = null;
    private JobGraph job = createBlockingJob(this.parallelism);

    /* loaded from: input_file:org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest$TestActorGateway.class */
    public static class TestActorGateway implements ActorGateway {
        private static final long serialVersionUID = -736146686160538227L;
        private transient Promise<Boolean> terminalState = new Promise.DefaultPromise();

        public Future<Boolean> hasReachedTerminalState() {
            return this.terminalState.future();
        }

        public Future<Object> ask(Object obj, FiniteDuration finiteDuration) {
            return null;
        }

        public void tell(Object obj) {
            tell(obj, new AkkaActorGateway(ActorRef.noSender(), (UUID) null));
        }

        public void tell(Object obj, ActorGateway actorGateway) {
            if (obj instanceof ExecutionGraphMessages.JobStatusChanged) {
                ExecutionGraphMessages.JobStatusChanged jobStatusChanged = (ExecutionGraphMessages.JobStatusChanged) obj;
                if (jobStatusChanged.newJobStatus().isGloballyTerminalState() || jobStatusChanged.newJobStatus() == JobStatus.SUSPENDED) {
                    this.terminalState.success(true);
                }
            }
        }

        public void forward(Object obj, ActorGateway actorGateway) {
        }

        public Future<Object> retry(Object obj, int i, FiniteDuration finiteDuration, ExecutionContext executionContext) {
            return null;
        }

        public String path() {
            return null;
        }

        public ActorRef actor() {
            return null;
        }

        public UUID leaderSessionID() {
            return null;
        }
    }

    @Before
    public void before() throws TimeoutException, InterruptedException {
        Tasks$BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
        this.configuration = new Configuration();
        this.configuration.setInteger("local.number-jobmanager", 1);
        this.configuration.setInteger("local.number-taskmanager", this.numTMs);
        this.configuration.setInteger("taskmanager.numberOfTaskSlots", this.numSlotsPerTM);
        this.configuration.setString("restart-strategy", "fixeddelay");
        this.configuration.setInteger("restart-strategy.fixed-delay.attempts", 9999);
        this.configuration.setString("restart-strategy.fixed-delay.delay", "100 milli");
        this.cluster = new LeaderElectionRetrievalTestingCluster(this.configuration, true, false);
        this.cluster.start(false);
        this.cluster.waitForActorsToBeAlive();
    }

    @Test
    public void testNotRestartedWhenLosingLeadership() throws Exception {
        UUID randomUUID = UUID.randomUUID();
        this.cluster.grantLeadership(0, randomUUID);
        this.cluster.notifyRetrievalListeners(0, randomUUID);
        this.cluster.waitForTaskManagersToBeRegistered(timeout);
        this.cluster.submitJobDetached(this.job);
        ActorGateway leaderGateway = this.cluster.getLeaderGateway(timeout);
        Await.ready(leaderGateway.ask(new TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished(this.job.getJobID()), timeout), timeout);
        TestingJobManagerMessages.ExecutionGraphFound executionGraphFound = (TestingJobManagerMessages.ResponseExecutionGraph) Await.result(leaderGateway.ask(new TestingJobManagerMessages.RequestExecutionGraph(this.job.getJobID()), timeout), timeout);
        Assert.assertTrue(executionGraphFound instanceof TestingJobManagerMessages.ExecutionGraphFound);
        ExecutionGraph executionGraph = executionGraphFound.executionGraph();
        TestActorGateway testActorGateway = new TestActorGateway();
        executionGraph.registerJobStatusListener(testActorGateway);
        this.cluster.revokeLeadership();
        Assert.assertTrue("The job should have reached a terminal state.", ((Boolean) Await.result(testActorGateway.hasReachedTerminalState(), timeout)).booleanValue());
    }

    public JobGraph createBlockingJob(int i) {
        Tasks$BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
        JobVertex jobVertex = new JobVertex("sender");
        JobVertex jobVertex2 = new JobVertex("receiver");
        jobVertex.setInvokableClass(Tasks.Sender.class);
        jobVertex2.setInvokableClass(Tasks.BlockingOnceReceiver.class);
        jobVertex.setParallelism(i);
        jobVertex2.setParallelism(i);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        jobVertex.setSlotSharingGroup(slotSharingGroup);
        jobVertex2.setSlotSharingGroup(slotSharingGroup);
        ExecutionConfig executionConfig = new ExecutionConfig();
        JobGraph jobGraph = new JobGraph("Blocking test job", new JobVertex[]{jobVertex, jobVertex2});
        jobGraph.setExecutionConfig(executionConfig);
        return jobGraph;
    }
}
