package org.apache.flink.test.streaming.api;

import java.util.ArrayList;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.util.MathUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/test/streaming/api/StreamingOperatorsITCase.class */
public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase {
    private String resultPath1;
    private String resultPath2;
    private String expected1;
    private String expected2;

    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/streaming/api/StreamingOperatorsITCase$NonSerializable.class */
    public static class NonSerializable {
        private final Object obj = new Object();
        private final int value;

        public NonSerializable(int i) {
            this.value = i;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/api/StreamingOperatorsITCase$NonSerializableTupleSource.class */
    private static class NonSerializableTupleSource implements SourceFunction<Tuple2<Integer, NonSerializable>> {
        private final int numElements;

        public NonSerializableTupleSource(int i) {
            this.numElements = i;
        }

        public void run(SourceFunction.SourceContext<Tuple2<Integer, NonSerializable>> sourceContext) throws Exception {
            for (int i = 0; i < this.numElements; i++) {
                sourceContext.collect(new Tuple2(Integer.valueOf(i), new NonSerializable(i)));
            }
        }

        public void cancel() {
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/api/StreamingOperatorsITCase$TupleSource.class */
    private static class TupleSource implements SourceFunction<Tuple2<Integer, Integer>> {
        private final int numElements;
        private final int numKeys;

        public TupleSource(int i, int i2) {
            this.numElements = i;
            this.numKeys = i2;
        }

        public void run(SourceFunction.SourceContext<Tuple2<Integer, Integer>> sourceContext) throws Exception {
            for (int i = 0; i < this.numElements; i++) {
                sourceContext.collect(new Tuple2(Integer.valueOf(MathUtils.murmurHash(i) % this.numKeys), Integer.valueOf(i)));
            }
        }

        public void cancel() {
        }
    }

    @Before
    public void before() throws Exception {
        this.resultPath1 = this.tempFolder.newFile().toURI().toString();
        this.resultPath2 = this.tempFolder.newFile().toURI().toString();
        this.expected1 = "";
        this.expected2 = "";
    }

    @After
    public void after() throws Exception {
        compareResultsByLinesInMemory(this.expected1, this.resultPath1);
        compareResultsByLinesInMemory(this.expected2, this.resultPath2);
    }

    @Test
    public void testGroupedFoldOperation() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        SplitStream split = executionEnvironment.addSource(new TupleSource(10, 2)).keyBy(new int[]{0}).fold(0, new FoldFunction<Tuple2<Integer, Integer>, Integer>() { // from class: org.apache.flink.test.streaming.api.StreamingOperatorsITCase.3
            public Integer fold(Integer num, Tuple2<Integer, Integer> tuple2) throws Exception {
                return Integer.valueOf(num.intValue() + ((Integer) tuple2.f1).intValue());
            }
        }).map(new RichMapFunction<Integer, Tuple2<Integer, Integer>>() { // from class: org.apache.flink.test.streaming.api.StreamingOperatorsITCase.2
            int key = -1;

            public Tuple2<Integer, Integer> map(Integer num) throws Exception {
                if (this.key == -1) {
                    this.key = MathUtils.murmurHash(num.intValue()) % 2;
                }
                return new Tuple2<>(Integer.valueOf(this.key), num);
            }
        }).split(new OutputSelector<Tuple2<Integer, Integer>>() { // from class: org.apache.flink.test.streaming.api.StreamingOperatorsITCase.1
            public Iterable<String> select(Tuple2<Integer, Integer> tuple2) {
                ArrayList arrayList = new ArrayList();
                arrayList.add(tuple2.f0 + "");
                return arrayList;
            }
        });
        split.select(new String[]{"0"}).map(new MapFunction<Tuple2<Integer, Integer>, Integer>() { // from class: org.apache.flink.test.streaming.api.StreamingOperatorsITCase.4
            public Integer map(Tuple2<Integer, Integer> tuple2) throws Exception {
                return (Integer) tuple2.f1;
            }
        }).writeAsText(this.resultPath1, FileSystem.WriteMode.OVERWRITE);
        split.select(new String[]{"1"}).map(new MapFunction<Tuple2<Integer, Integer>, Integer>() { // from class: org.apache.flink.test.streaming.api.StreamingOperatorsITCase.5
            public Integer map(Tuple2<Integer, Integer> tuple2) throws Exception {
                return (Integer) tuple2.f1;
            }
        }).writeAsText(this.resultPath2, FileSystem.WriteMode.OVERWRITE);
        StringBuilder sb = new StringBuilder();
        StringBuilder sb2 = new StringBuilder();
        int i = 0;
        int i2 = 0;
        for (int i3 = 0; i3 < 10; i3++) {
            if (MathUtils.murmurHash(i3) % 2 == 0) {
                i += i3;
                sb.append(i + "\n");
            } else {
                i2 += i3;
                sb2.append(i2 + "\n");
            }
        }
        this.expected1 = sb.toString();
        this.expected2 = sb2.toString();
        executionEnvironment.execute();
    }

    @Test
    public void testFoldOperationWithNonJavaSerializableType() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.addSource(new NonSerializableTupleSource(10)).keyBy(new int[]{0}).fold(new NonSerializable(42), new FoldFunction<Tuple2<Integer, NonSerializable>, NonSerializable>() { // from class: org.apache.flink.test.streaming.api.StreamingOperatorsITCase.7
            public NonSerializable fold(NonSerializable nonSerializable, Tuple2<Integer, NonSerializable> tuple2) throws Exception {
                return new NonSerializable(nonSerializable.value + ((NonSerializable) tuple2.f1).value);
            }
        }).map(new MapFunction<NonSerializable, Integer>() { // from class: org.apache.flink.test.streaming.api.StreamingOperatorsITCase.6
            public Integer map(NonSerializable nonSerializable) throws Exception {
                return Integer.valueOf(nonSerializable.value);
            }
        }).writeAsText(this.resultPath1, FileSystem.WriteMode.OVERWRITE);
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < 10; i++) {
            sb.append((42 + i) + "\n");
        }
        this.expected1 = sb.toString();
        executionEnvironment.execute();
    }
}
