package org.apache.flink.storm.api;

import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.SubmitOptions;
import backtype.storm.utils.Utils;
import java.io.File;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/storm/api/FlinkSubmitter.class */
public class FlinkSubmitter {
    public static final Logger logger = LoggerFactory.getLogger(FlinkSubmitter.class);

    /* loaded from: input_file:org/apache/flink/storm/api/FlinkSubmitter$FlinkProgressListener.class */
    public interface FlinkProgressListener {
    }

    public static void submitTopology(String str, Map<?, ?> map, FlinkTopology flinkTopology, SubmitOptions submitOptions) throws AlreadyAliveException, InvalidTopologyException {
        submitTopology(str, map, flinkTopology);
    }

    public static void submitTopology(String str, Map map, FlinkTopology flinkTopology) throws AlreadyAliveException, InvalidTopologyException {
        if (!Utils.isValidConf(map)) {
            throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
        }
        Configuration loadConfiguration = GlobalConfiguration.loadConfiguration();
        if (!map.containsKey("nimbus.host")) {
            map.put("nimbus.host", loadConfiguration.getString("jobmanager.rpc.address", "localhost"));
        }
        if (!map.containsKey("nimbus.thrift.port")) {
            map.put("nimbus.thrift.port", new Integer(loadConfiguration.getInteger("jobmanager.rpc.port", 6123)));
        }
        String jSONString = JSONValue.toJSONString(map);
        FlinkClient configuredClient = FlinkClient.getConfiguredClient(map);
        try {
            if (configuredClient.getTopologyJobId(str) != null) {
                throw new RuntimeException("Topology with name `" + str + "` already exists on cluster");
            }
            String property = System.getProperty("storm.jar");
            if (property == null) {
                try {
                    Iterator it = ExecutionEnvironment.getExecutionEnvironment().getJars().iterator();
                    while (it.hasNext()) {
                        property = new File(((URL) it.next()).toURI()).getAbsolutePath();
                    }
                } catch (ClassCastException e) {
                } catch (URISyntaxException e2) {
                }
            }
            logger.info("Submitting topology " + str + " in distributed mode with conf " + jSONString);
            configuredClient.submitTopologyWithOpts(str, property, flinkTopology);
            logger.info("Finished submitting topology: " + str);
        } catch (AlreadyAliveException e3) {
            logger.warn("Topology already alive exception", e3);
            throw e3;
        } catch (InvalidTopologyException e4) {
            logger.warn("Topology submission exception: " + e4.get_msg());
            throw e4;
        }
    }

    public static void submitTopologyWithProgressBar(String str, Map<?, ?> map, FlinkTopology flinkTopology) throws AlreadyAliveException, InvalidTopologyException {
        submitTopology(str, map, flinkTopology);
    }

    public static String submitJar(Map map, String str) {
        return submitJar(str);
    }

    public static String submitJar(String str) {
        if (str == null) {
            throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload");
        }
        return str;
    }
}
