package org.apache.flink.yarn;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.PrivilegedAction;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.process.ProcessReaper;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.runtime.webmonitor.WebMonitor;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Some;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/yarn/YarnApplicationMasterRunner.class */
public class YarnApplicationMasterRunner {
    protected static final Logger LOG = LoggerFactory.getLogger(YarnApplicationMasterRunner.class);
    private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES);
    private static final Map<String, String> ENV = System.getenv();
    private static final int INIT_ERROR_EXIT_CODE = 31;
    private static final int ACTOR_DIED_EXIT_CODE = 32;

    public static void main(String[] strArr) {
        EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster / JobManager", strArr);
        SignalHandler.register(LOG);
        System.exit(new YarnApplicationMasterRunner().run(strArr));
    }

    protected int run(String[] strArr) {
        try {
            LOG.debug("All environment variables: {}", ENV);
            String str = ENV.get(YarnConfigKeys.ENV_CLIENT_USERNAME);
            require(str != null, "YARN client user name environment variable {} not set", YarnConfigKeys.ENV_CLIENT_USERNAME);
            try {
                UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
                LOG.info("YARN daemon runs as user {}. Running Flink Application Master/JobManager as user {}", currentUser.getShortUserName(), str);
                UserGroupInformation createRemoteUser = UserGroupInformation.createRemoteUser(str);
                Iterator it = currentUser.getTokens().iterator();
                while (it.hasNext()) {
                    createRemoteUser.addToken((Token) it.next());
                }
                return ((Integer) createRemoteUser.doAs(new PrivilegedAction<Integer>() { // from class: org.apache.flink.yarn.YarnApplicationMasterRunner.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.security.PrivilegedAction
                    public Integer run() {
                        return Integer.valueOf(YarnApplicationMasterRunner.this.runApplicationMaster());
                    }
                })).intValue();
            } catch (Throwable th) {
                throw new Exception("Cannot access UserGroupInformation information for current user", th);
            }
        } catch (Throwable th2) {
            LOG.error("YARN Application Master initialization failed", th2);
            return 31;
        }
    }

    protected int runApplicationMaster() {
        ActorSystem actorSystem = null;
        WebMonitor webMonitor = null;
        try {
            String str = ENV.get(ApplicationConstants.Environment.PWD.key());
            require(str != null, "Current working directory variable (%s) not set", ApplicationConstants.Environment.PWD.key());
            String str2 = ENV.get(ApplicationConstants.Environment.NM_HOST.key());
            require(str2 != null, "ApplicationMaster hostname variable %s not set", ApplicationConstants.Environment.NM_HOST.key());
            LOG.info("YARN assigned hostname for application master: {}", str2);
            Map<String, String> dynamicProperties = FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
            LOG.debug("YARN dynamic properties: {}", dynamicProperties);
            Configuration createConfiguration = createConfiguration(str, dynamicProperties);
            YarnConfiguration yarnConfiguration = new YarnConfiguration();
            try {
                int parseInt = Integer.parseInt(ENV.get(YarnConfigKeys.ENV_TM_MEMORY));
                try {
                    int parseInt2 = Integer.parseInt(ENV.get(YarnConfigKeys.ENV_TM_COUNT));
                    try {
                        int parseInt3 = Integer.parseInt(ENV.get(YarnConfigKeys.ENV_SLOTS));
                        ContaineredTaskManagerParameters create = ContaineredTaskManagerParameters.create(createConfiguration, parseInt, parseInt3);
                        LOG.info("TaskManagers will be created with {} task slots", Integer.valueOf(create.numSlots()));
                        LOG.info("TaskManagers will be started with container size {} MB, JVM heap size {} MB, JVM direct memory limit {} MB", new Object[]{Long.valueOf(create.taskManagerTotalMemoryMB()), Long.valueOf(create.taskManagerHeapSizeMB()), Long.valueOf(create.taskManagerDirectMemoryLimitMB())});
                        ActorSystem startActorSystem = BootstrapTools.startActorSystem(createConfiguration, str2, createConfiguration.getString("yarn.application-master.port", "0"), LOG);
                        String str3 = (String) AkkaUtils.getAddress(startActorSystem).host().get();
                        int intValue = ((Integer) AkkaUtils.getAddress(startActorSystem).port().get()).intValue();
                        LOG.info("Actor system bound to hostname {}.", str3);
                        Configuration generateTaskManagerConfiguration = BootstrapTools.generateTaskManagerConfiguration(createConfiguration, str3, intValue, parseInt3, TASKMANAGER_REGISTRATION_TIMEOUT);
                        LOG.debug("TaskManager configuration: {}", generateTaskManagerConfiguration);
                        ContainerLaunchContext createTaskManagerContext = createTaskManagerContext(createConfiguration, yarnConfiguration, ENV, create, generateTaskManagerConfiguration, str, getTaskManagerClass(), LOG);
                        LOG.debug("Starting JobManager actor");
                        ActorRef actorRef = (ActorRef) JobManager.startJobManagerActors(createConfiguration, startActorSystem, new Some(JobManager.JOB_MANAGER_NAME()), Option.empty(), getJobManagerClass(), getArchivistClass())._1();
                        LOG.debug("Starting Web Frontend");
                        WebMonitor startWebMonitorIfConfigured = BootstrapTools.startWebMonitorIfConfigured(createConfiguration, startActorSystem, actorRef, LOG);
                        String str4 = startWebMonitorIfConfigured == null ? null : "http://" + str2 + ":" + startWebMonitorIfConfigured.getServerPort();
                        LOG.debug("Starting YARN Flink Resource Manager");
                        ActorRef actorOf = startActorSystem.actorOf(YarnFlinkResourceManager.createActorProps(getResourceManagerClass(), createConfiguration, yarnConfiguration, LeaderRetrievalUtils.createLeaderRetrievalService(createConfiguration, actorRef), str2, str4, create, createTaskManagerContext, parseInt2, LOG));
                        LOG.debug("Starting process reapers for JobManager and YARN Application Master");
                        startActorSystem.actorOf(Props.create(ProcessReaper.class, new Object[]{actorOf, LOG, 32}), "YARN_Resource_Master_Process_Reaper");
                        startActorSystem.actorOf(Props.create(ProcessReaper.class, new Object[]{actorRef, LOG, 32}), "JobManager_Process_Reaper");
                        LOG.info("YARN Application Master started");
                        startActorSystem.awaitTermination();
                        if (startWebMonitorIfConfigured == null) {
                            return 0;
                        }
                        try {
                            startWebMonitorIfConfigured.stop();
                            return 0;
                        } catch (Throwable th) {
                            LOG.error("Failed to stop the web frontend", th);
                            return 0;
                        }
                    } catch (NumberFormatException e) {
                        throw new RuntimeException("Invalid value for _SLOTS : " + e.getMessage());
                    }
                } catch (NumberFormatException e2) {
                    throw new RuntimeException("Invalid value for _CLIENT_TM_COUNT : " + e2.getMessage());
                }
            } catch (NumberFormatException e3) {
                throw new RuntimeException("Invalid value for _CLIENT_TM_MEMORY : " + e3.getMessage());
            }
        } catch (Throwable th2) {
            LOG.error("YARN Application Master initialization failed", th2);
            if (0 != 0) {
                try {
                    actorSystem.shutdown();
                } catch (Throwable th3) {
                    LOG.error("Error shutting down actor system", th3);
                }
            }
            if (0 == 0) {
                return 31;
            }
            try {
                webMonitor.stop();
                return 31;
            } catch (Throwable th4) {
                LOG.warn("Failed to stop the web frontend", th2);
                return 31;
            }
        }
    }

    protected Class<? extends YarnFlinkResourceManager> getResourceManagerClass() {
        return YarnFlinkResourceManager.class;
    }

    protected Class<? extends JobManager> getJobManagerClass() {
        return YarnJobManager.class;
    }

    protected Class<? extends MemoryArchivist> getArchivistClass() {
        return MemoryArchivist.class;
    }

    protected Class<? extends TaskManager> getTaskManagerClass() {
        return YarnTaskManager.class;
    }

    private static void require(boolean z, String str, Object... objArr) {
        if (!z) {
            throw new RuntimeException(String.format(str, objArr));
        }
    }

    private static Configuration createConfiguration(String str, Map<String, String> map) {
        LOG.info("Loading config from directory " + str);
        Configuration loadConfiguration = GlobalConfiguration.loadConfiguration(str);
        loadConfiguration.setString("flink.base.dir.path", str);
        for (Map.Entry<String, String> entry : map.entrySet()) {
            loadConfiguration.setString(entry.getKey(), entry.getValue());
        }
        String str2 = ENV.get(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE);
        if (str2 != null && !str2.isEmpty()) {
            loadConfiguration.setString("recovery.zookeeper.path.namespace", str2);
        }
        if (loadConfiguration.getInteger("jobmanager.web.port", 0) >= 0) {
            loadConfiguration.setInteger("jobmanager.web.port", 0);
        }
        BootstrapTools.substituteDeprecatedConfigKey(loadConfiguration, "yarn.heap-cutoff-ratio", "containerized.heap-cutoff-ratio");
        BootstrapTools.substituteDeprecatedConfigKey(loadConfiguration, "yarn.heap-cutoff-min", "containerized.heap-cutoff-min");
        BootstrapTools.substituteDeprecatedConfigPrefix(loadConfiguration, "yarn.application-master.env.", "containerized.master.env.");
        BootstrapTools.substituteDeprecatedConfigPrefix(loadConfiguration, "yarn.taskmanager.env.", "containerized.taskmanager.env.");
        return loadConfiguration;
    }

    public static ContainerLaunchContext createTaskManagerContext(Configuration configuration, YarnConfiguration yarnConfiguration, Map<String, String> map, ContaineredTaskManagerParameters containeredTaskManagerParameters, Configuration configuration2, String str, Class<?> cls, Logger logger) throws Exception {
        logger.info("Setting up resources for TaskManagers");
        String str2 = map.get(YarnConfigKeys.FLINK_JAR_PATH);
        require(str2 != null, "Environment variable %s not set", YarnConfigKeys.FLINK_JAR_PATH);
        String str3 = map.get(YarnConfigKeys.ENV_APP_ID);
        require(str3 != null, "Environment variable %s not set", YarnConfigKeys.ENV_APP_ID);
        String str4 = map.get(YarnConfigKeys.ENV_CLIENT_HOME_DIR);
        require(str4 != null, "Environment variable %s not set", YarnConfigKeys.ENV_CLIENT_HOME_DIR);
        String str5 = map.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
        require(str5 != null, "Environment variable %s not set", YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
        String str6 = map.get(YarnConfigKeys.ENV_CLIENT_USERNAME);
        require(str6 != null, "Environment variable %s not set", YarnConfigKeys.ENV_CLIENT_USERNAME);
        String str7 = map.get(YarnConfigKeys.ENV_FLINK_CLASSPATH);
        require(str7 != null, "Environment variable %s not set", YarnConfigKeys.ENV_FLINK_CLASSPATH);
        try {
            FileSystem fileSystem = FileSystem.get(yarnConfiguration);
            LocalResource localResource = (LocalResource) Records.newRecord(LocalResource.class);
            Utils.registerLocalResource(fileSystem, new Path(str2), localResource);
            LocalResource localResource2 = (LocalResource) Records.newRecord(LocalResource.class);
            File file = new File(str, UUID.randomUUID() + "-taskmanager-conf.yaml");
            LOG.debug("Writing TaskManager configuration to {}", file.getAbsolutePath());
            BootstrapTools.writeConfiguration(configuration2, file);
            Utils.setupLocalResource(fileSystem, str3, new Path(file.toURI()), localResource2, new Path(str4));
            logger.info("Prepared local resource for modified yaml: {}", localResource2);
            HashMap hashMap = new HashMap();
            hashMap.put("flink.jar", localResource);
            hashMap.put("flink-conf.yaml", localResource2);
            for (String str8 : str5.split(",")) {
                if (!str8.isEmpty()) {
                    LocalResource localResource3 = (LocalResource) Records.newRecord(LocalResource.class);
                    Path path = new Path(str8);
                    Utils.registerLocalResource(fileSystem, path, localResource3);
                    hashMap.put(path.getName(), localResource3);
                }
            }
            logger.info("Creating container launch context for TaskManagers");
            String taskManagerShellCommand = BootstrapTools.getTaskManagerShellCommand(configuration, containeredTaskManagerParameters, ".", "<LOG_DIR>", new File(str, FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME).exists(), new File(str, FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME).exists(), cls);
            logger.info("Starting TaskManagers with command: " + taskManagerShellCommand);
            ContainerLaunchContext containerLaunchContext = (ContainerLaunchContext) Records.newRecord(ContainerLaunchContext.class);
            containerLaunchContext.setCommands(Collections.singletonList(taskManagerShellCommand));
            containerLaunchContext.setLocalResources(hashMap);
            HashMap hashMap2 = new HashMap();
            hashMap2.putAll(containeredTaskManagerParameters.taskManagerEnv());
            hashMap2.put(YarnConfigKeys.ENV_FLINK_CLASSPATH, str7);
            Utils.setupYarnClassPath(yarnConfiguration, hashMap2);
            hashMap2.put(YarnConfigKeys.ENV_CLIENT_USERNAME, str6);
            containerLaunchContext.setEnvironment(hashMap2);
            try {
                Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
                DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
                credentials.writeTokenStorageToStream(dataOutputBuffer);
                containerLaunchContext.setTokens(ByteBuffer.wrap(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength()));
            } catch (Throwable th) {
                logger.error("Getting current user info failed when trying to launch the container", th);
            }
            return containerLaunchContext;
        } catch (IOException e) {
            throw new Exception("Could not access YARN's default file system", e);
        }
    }
}
