package org.apache.flink.test.checkpointing;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.class */
public class EventTimeWindowCheckpointingITCase extends TestLogger {
    private static final int PARALLELISM = 4;
    private static ForkableFlinkMiniCluster cluster;

    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();
    private StateBackendEnum stateBackendEnum;
    private AbstractStateBackend stateBackend;

    /* renamed from: org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase$8, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase$8.class */
    static /* synthetic */ class AnonymousClass8 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$test$checkpointing$EventTimeWindowCheckpointingITCase$StateBackendEnum = new int[StateBackendEnum.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$test$checkpointing$EventTimeWindowCheckpointingITCase$StateBackendEnum[StateBackendEnum.MEM.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$test$checkpointing$EventTimeWindowCheckpointingITCase$StateBackendEnum[StateBackendEnum.FILE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$test$checkpointing$EventTimeWindowCheckpointingITCase$StateBackendEnum[StateBackendEnum.ROCKSDB.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$flink$test$checkpointing$EventTimeWindowCheckpointingITCase$StateBackendEnum[StateBackendEnum.ROCKSDB_FULLY_ASYNC.ordinal()] = EventTimeWindowCheckpointingITCase.PARALLELISM;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase$CountValidatingSink.class */
    private static class CountValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>> implements Checkpointed<HashMap<Long, Integer>> {
        private final HashMap<Long, Integer> windowCounts;
        private final int numKeys;
        private final int numWindowsExpected;

        private CountValidatingSink(int i, int i2) {
            this.windowCounts = new HashMap<>();
            this.numKeys = i;
            this.numWindowsExpected = i2;
        }

        public void open(Configuration configuration) throws Exception {
            Assert.assertEquals(1L, getRuntimeContext().getNumberOfParallelSubtasks());
        }

        public void close() throws Exception {
            boolean z = true;
            if (this.windowCounts.size() == this.numKeys) {
                Iterator<Integer> it = this.windowCounts.values().iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    } else if (it.next().intValue() < this.numWindowsExpected) {
                        z = false;
                        break;
                    }
                }
            }
            Assert.assertTrue("The source must see all expected windows.", z);
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void invoke(Tuple4<Long, Long, Long, IntType> tuple4) throws Exception {
            Integer num = this.windowCounts.get(tuple4.f0);
            if (num != null) {
                this.windowCounts.put(tuple4.f0, Integer.valueOf(num.intValue() + 1));
            } else {
                this.windowCounts.put(tuple4.f0, 1);
            }
            Assert.assertEquals("Window counts don't match for key " + tuple4.f0 + ".", ((Long) tuple4.f0).intValue() + this.windowCounts.get(tuple4.f0).intValue(), ((IntType) tuple4.f3).value);
            boolean z = true;
            if (this.windowCounts.size() == this.numKeys) {
                Iterator<Integer> it = this.windowCounts.values().iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    Integer next = it.next();
                    if (next.intValue() < this.numWindowsExpected) {
                        z = false;
                        break;
                    } else if (next.intValue() > this.numWindowsExpected) {
                        Assert.fail("Window count to high: " + next);
                    }
                }
                if (z) {
                    throw new SuccessException();
                }
            }
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public HashMap<Long, Integer> m552snapshotState(long j, long j2) {
            return this.windowCounts;
        }

        public void restoreState(HashMap<Long, Integer> hashMap) {
            this.windowCounts.putAll(hashMap);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase$FailingSource.class */
    private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>> implements Checkpointed<Integer>, CheckpointListener {
        private static volatile boolean failedBefore = false;
        private final int numKeys;
        private final int numElementsToEmit;
        private final int failureAfterNumElements;
        private volatile int numElementsEmitted;
        private volatile int numSuccessfulCheckpoints;
        private volatile boolean running;

        private FailingSource(int i, int i2, int i3) {
            this.running = true;
            this.numKeys = i;
            this.numElementsToEmit = i2;
            this.failureAfterNumElements = i3;
        }

        public void open(Configuration configuration) {
            Assert.assertEquals(1L, getRuntimeContext().getNumberOfParallelSubtasks());
        }

        public void run(SourceFunction.SourceContext<Tuple2<Long, IntType>> sourceContext) throws Exception {
            while (this.running) {
                if (!failedBefore) {
                    Thread.sleep(1L);
                    if (this.numSuccessfulCheckpoints >= 2 && this.numElementsEmitted >= this.failureAfterNumElements) {
                        failedBefore = true;
                        throw new Exception("Artificial Failure");
                    }
                }
                if (this.numElementsEmitted >= this.numElementsToEmit || (!failedBefore && this.numElementsEmitted > this.failureAfterNumElements)) {
                    Thread.sleep(1L);
                } else {
                    synchronized (sourceContext.getCheckpointLock()) {
                        int i = this.numElementsEmitted;
                        this.numElementsEmitted = i + 1;
                        for (long j = 0; j < this.numKeys; j++) {
                            sourceContext.collectWithTimestamp(new Tuple2(Long.valueOf(j), new IntType(i)), i);
                        }
                        sourceContext.emitWatermark(new Watermark(i));
                    }
                }
            }
        }

        public void cancel() {
            this.running = false;
        }

        public void notifyCheckpointComplete(long j) {
            this.numSuccessfulCheckpoints++;
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public Integer m554snapshotState(long j, long j2) {
            return Integer.valueOf(this.numElementsEmitted);
        }

        public void restoreState(Integer num) {
            this.numElementsEmitted = num.intValue();
        }

        public static void reset() {
            failedBefore = false;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase$IntType.class */
    public static class IntType {
        public int value;

        public IntType() {
        }

        public IntType(int i) {
            this.value = i;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase$StateBackendEnum.class */
    private enum StateBackendEnum {
        MEM,
        FILE,
        ROCKSDB,
        ROCKSDB_FULLY_ASYNC
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase$ValidatingSink.class */
    private static class ValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>> implements Checkpointed<HashMap<Long, Integer>> {
        private final HashMap<Long, Integer> windowCounts;
        private final int numKeys;
        private final int numWindowsExpected;

        private ValidatingSink(int i, int i2) {
            this.windowCounts = new HashMap<>();
            this.numKeys = i;
            this.numWindowsExpected = i2;
        }

        public void open(Configuration configuration) throws Exception {
            Assert.assertEquals(1L, getRuntimeContext().getNumberOfParallelSubtasks());
            if (this.windowCounts.size() == this.numKeys) {
                boolean z = true;
                Iterator<Integer> it = this.windowCounts.values().iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    } else if (it.next().intValue() != this.numWindowsExpected) {
                        z = false;
                        break;
                    }
                }
                if (z) {
                    throw new SuccessException();
                }
            }
        }

        public void close() throws Exception {
            boolean z = true;
            if (this.windowCounts.size() == this.numKeys) {
                Iterator<Integer> it = this.windowCounts.values().iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    } else if (it.next().intValue() < this.numWindowsExpected) {
                        z = false;
                        break;
                    }
                }
            }
            Assert.assertTrue("The sink must see all expected windows.", z);
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void invoke(Tuple4<Long, Long, Long, IntType> tuple4) throws Exception {
            int i = 0;
            long longValue = ((Long) tuple4.f1).longValue();
            while (true) {
                long j = longValue;
                if (j >= ((Long) tuple4.f2).longValue()) {
                    break;
                }
                if (j > 0) {
                    i = (int) (i + j);
                }
                longValue = j + 1;
            }
            Assert.assertEquals("Window start: " + tuple4.f1 + " end: " + tuple4.f2, i, ((IntType) tuple4.f3).value);
            Integer num = this.windowCounts.get(tuple4.f0);
            if (num != null) {
                this.windowCounts.put(tuple4.f0, Integer.valueOf(num.intValue() + 1));
            } else {
                this.windowCounts.put(tuple4.f0, 1);
            }
            if (this.windowCounts.size() == this.numKeys) {
                boolean z = true;
                Iterator<Integer> it = this.windowCounts.values().iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    Integer next = it.next();
                    if (next.intValue() < this.numWindowsExpected) {
                        z = false;
                        break;
                    } else if (next.intValue() > this.numWindowsExpected) {
                        Assert.fail("Window count to high: " + next);
                    }
                }
                if (z) {
                    throw new SuccessException();
                }
            }
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public HashMap<Long, Integer> m556snapshotState(long j, long j2) {
            return this.windowCounts;
        }

        public void restoreState(HashMap<Long, Integer> hashMap) {
            this.windowCounts.putAll(hashMap);
        }
    }

    public EventTimeWindowCheckpointingITCase(StateBackendEnum stateBackendEnum) {
        this.stateBackendEnum = stateBackendEnum;
    }

    @BeforeClass
    public static void startTestCluster() {
        Configuration configuration = new Configuration();
        configuration.setInteger("local.number-taskmanager", 2);
        configuration.setInteger("taskmanager.numberOfTaskSlots", 2);
        configuration.setInteger("taskmanager.memory.size", 48);
        cluster = new ForkableFlinkMiniCluster(configuration, false);
        cluster.start();
    }

    @AfterClass
    public static void stopTestCluster() {
        if (cluster != null) {
            cluster.stop();
        }
    }

    @Before
    public void initStateBackend() throws IOException {
        switch (AnonymousClass8.$SwitchMap$org$apache$flink$test$checkpointing$EventTimeWindowCheckpointingITCase$StateBackendEnum[this.stateBackendEnum.ordinal()]) {
            case 1:
                this.stateBackend = new MemoryStateBackend();
                return;
            case 2:
                this.stateBackend = new FsStateBackend("file://" + this.tempFolder.newFolder().getAbsolutePath());
                return;
            case 3:
                String absolutePath = this.tempFolder.newFolder().getAbsolutePath();
                RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(this.tempFolder.newFolder().toURI().toString(), new MemoryStateBackend());
                rocksDBStateBackend.setDbStoragePath(absolutePath);
                this.stateBackend = rocksDBStateBackend;
                return;
            case PARALLELISM /* 4 */:
                String absolutePath2 = this.tempFolder.newFolder().getAbsolutePath();
                RocksDBStateBackend rocksDBStateBackend2 = new RocksDBStateBackend(this.tempFolder.newFolder().toURI().toString(), new MemoryStateBackend());
                rocksDBStateBackend2.setDbStoragePath(absolutePath2);
                rocksDBStateBackend2.enableFullyAsyncSnapshots();
                this.stateBackend = rocksDBStateBackend2;
                return;
            default:
                return;
        }
    }

    @Test
    public void testTumblingTimeWindow() {
        FailingSource.reset();
        try {
            StreamExecutionEnvironment createRemoteEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort(), new String[0]);
            createRemoteEnvironment.setParallelism(PARALLELISM);
            createRemoteEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            createRemoteEnvironment.enableCheckpointing(100L);
            createRemoteEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0L));
            createRemoteEnvironment.getConfig().disableSysoutLogging();
            createRemoteEnvironment.setStateBackend(this.stateBackend);
            createRemoteEnvironment.addSource(new FailingSource(100, 3000, 1000)).rebalance().keyBy(new int[]{0}).timeWindow(Time.of(100L, TimeUnit.MILLISECONDS)).apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() { // from class: org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.1
                private boolean open = false;

                public void open(Configuration configuration) {
                    Assert.assertEquals(4L, getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<Long, IntType>> iterable, Collector<Tuple4<Long, Long, Long, IntType>> collector) {
                    Assert.assertTrue(this.open);
                    int i = 0;
                    long j = -1;
                    for (Tuple2<Long, IntType> tuple2 : iterable) {
                        i += ((IntType) tuple2.f1).value;
                        j = ((Long) tuple2.f0).longValue();
                    }
                    collector.collect(new Tuple4(Long.valueOf(j), Long.valueOf(timeWindow.getStart()), Long.valueOf(timeWindow.getEnd()), new IntType(i)));
                }

                public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
                    apply((Tuple) obj, (TimeWindow) window, (Iterable<Tuple2<Long, IntType>>) iterable, (Collector<Tuple4<Long, Long, Long, IntType>>) collector);
                }
            }).addSink(new ValidatingSink(100, 30)).setParallelism(1);
            TestUtils.tryExecute(createRemoteEnvironment, "Tumbling Window Test");
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testTumblingTimeWindowWithKVState() {
        FailingSource.reset();
        try {
            StreamExecutionEnvironment createRemoteEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort(), new String[0]);
            createRemoteEnvironment.setParallelism(PARALLELISM);
            createRemoteEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            createRemoteEnvironment.enableCheckpointing(100L);
            createRemoteEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0L));
            createRemoteEnvironment.getConfig().disableSysoutLogging();
            createRemoteEnvironment.setStateBackend(this.stateBackend);
            createRemoteEnvironment.addSource(new FailingSource(100, 3000, 1000)).rebalance().keyBy(new int[]{0}).timeWindow(Time.of(100L, TimeUnit.MILLISECONDS)).apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() { // from class: org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.2
                private boolean open = false;
                private ValueState<Integer> count;

                public void open(Configuration configuration) {
                    Assert.assertEquals(4L, getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                    this.count = getRuntimeContext().getState(new ValueStateDescriptor("count", Integer.class, 0));
                }

                public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<Long, IntType>> iterable, Collector<Tuple4<Long, Long, Long, IntType>> collector) throws Exception {
                    if (((Integer) this.count.value()).intValue() == 0) {
                        this.count.update(Integer.valueOf(((Long) tuple.getField(0)).intValue()));
                    }
                    Assert.assertTrue(this.open);
                    this.count.update(Integer.valueOf(((Integer) this.count.value()).intValue() + 1));
                    collector.collect(new Tuple4(tuple.getField(0), Long.valueOf(timeWindow.getStart()), Long.valueOf(timeWindow.getEnd()), new IntType(((Integer) this.count.value()).intValue())));
                }

                public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
                    apply((Tuple) obj, (TimeWindow) window, (Iterable<Tuple2<Long, IntType>>) iterable, (Collector<Tuple4<Long, Long, Long, IntType>>) collector);
                }
            }).addSink(new CountValidatingSink(100, 30)).setParallelism(1);
            TestUtils.tryExecute(createRemoteEnvironment, "Tumbling Window Test");
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testSlidingTimeWindow() {
        FailingSource.reset();
        try {
            StreamExecutionEnvironment createRemoteEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort(), new String[0]);
            createRemoteEnvironment.setParallelism(PARALLELISM);
            createRemoteEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            createRemoteEnvironment.enableCheckpointing(100L);
            createRemoteEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0L));
            createRemoteEnvironment.getConfig().disableSysoutLogging();
            createRemoteEnvironment.setStateBackend(this.stateBackend);
            createRemoteEnvironment.addSource(new FailingSource(100, 3000, 1000)).rebalance().keyBy(new int[]{0}).timeWindow(Time.of(1000L, TimeUnit.MILLISECONDS), Time.of(100L, TimeUnit.MILLISECONDS)).apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() { // from class: org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.3
                private boolean open = false;

                public void open(Configuration configuration) {
                    Assert.assertEquals(4L, getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<Long, IntType>> iterable, Collector<Tuple4<Long, Long, Long, IntType>> collector) {
                    Assert.assertTrue(this.open);
                    int i = 0;
                    long j = -1;
                    for (Tuple2<Long, IntType> tuple2 : iterable) {
                        i += ((IntType) tuple2.f1).value;
                        j = ((Long) tuple2.f0).longValue();
                    }
                    collector.collect(new Tuple4(Long.valueOf(j), Long.valueOf(timeWindow.getStart()), Long.valueOf(timeWindow.getEnd()), new IntType(i)));
                }

                public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
                    apply((Tuple) obj, (TimeWindow) window, (Iterable<Tuple2<Long, IntType>>) iterable, (Collector<Tuple4<Long, Long, Long, IntType>>) collector);
                }
            }).addSink(new ValidatingSink(100, 30)).setParallelism(1);
            TestUtils.tryExecute(createRemoteEnvironment, "Tumbling Window Test");
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testPreAggregatedTumblingTimeWindow() {
        FailingSource.reset();
        try {
            StreamExecutionEnvironment createRemoteEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort(), new String[0]);
            createRemoteEnvironment.setParallelism(PARALLELISM);
            createRemoteEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            createRemoteEnvironment.enableCheckpointing(100L);
            createRemoteEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0L));
            createRemoteEnvironment.getConfig().disableSysoutLogging();
            createRemoteEnvironment.setStateBackend(this.stateBackend);
            createRemoteEnvironment.addSource(new FailingSource(100, 3000, 1000)).rebalance().keyBy(new int[]{0}).timeWindow(Time.of(100L, TimeUnit.MILLISECONDS)).apply(new ReduceFunction<Tuple2<Long, IntType>>() { // from class: org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.4
                public Tuple2<Long, IntType> reduce(Tuple2<Long, IntType> tuple2, Tuple2<Long, IntType> tuple22) {
                    return new Tuple2<>(tuple2.f0, new IntType(((IntType) tuple2.f1).value + ((IntType) tuple22.f1).value));
                }
            }, new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() { // from class: org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.5
                private boolean open = false;

                public void open(Configuration configuration) {
                    Assert.assertEquals(4L, getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<Long, IntType>> iterable, Collector<Tuple4<Long, Long, Long, IntType>> collector) {
                    Assert.assertTrue(this.open);
                    for (Tuple2<Long, IntType> tuple2 : iterable) {
                        collector.collect(new Tuple4(tuple2.f0, Long.valueOf(timeWindow.getStart()), Long.valueOf(timeWindow.getEnd()), tuple2.f1));
                    }
                }

                public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
                    apply((Tuple) obj, (TimeWindow) window, (Iterable<Tuple2<Long, IntType>>) iterable, (Collector<Tuple4<Long, Long, Long, IntType>>) collector);
                }
            }).addSink(new ValidatingSink(100, 30)).setParallelism(1);
            TestUtils.tryExecute(createRemoteEnvironment, "Tumbling Window Test");
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testPreAggregatedSlidingTimeWindow() {
        FailingSource.reset();
        try {
            StreamExecutionEnvironment createRemoteEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort(), new String[0]);
            createRemoteEnvironment.setParallelism(PARALLELISM);
            createRemoteEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            createRemoteEnvironment.enableCheckpointing(100L);
            createRemoteEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0L));
            createRemoteEnvironment.getConfig().disableSysoutLogging();
            createRemoteEnvironment.setStateBackend(this.stateBackend);
            createRemoteEnvironment.addSource(new FailingSource(100, 3000, 1000)).rebalance().keyBy(new int[]{0}).timeWindow(Time.of(1000L, TimeUnit.MILLISECONDS), Time.of(100L, TimeUnit.MILLISECONDS)).apply(new ReduceFunction<Tuple2<Long, IntType>>() { // from class: org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.6
                public Tuple2<Long, IntType> reduce(Tuple2<Long, IntType> tuple2, Tuple2<Long, IntType> tuple22) {
                    return new Tuple2<>(tuple2.f0, new IntType(((IntType) tuple2.f1).value + ((IntType) tuple22.f1).value));
                }
            }, new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() { // from class: org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.7
                private boolean open = false;

                public void open(Configuration configuration) {
                    Assert.assertEquals(4L, getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<Long, IntType>> iterable, Collector<Tuple4<Long, Long, Long, IntType>> collector) {
                    Assert.assertTrue(this.open);
                    for (Tuple2<Long, IntType> tuple2 : iterable) {
                        collector.collect(new Tuple4(tuple2.f0, Long.valueOf(timeWindow.getStart()), Long.valueOf(timeWindow.getEnd()), tuple2.f1));
                    }
                }

                public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
                    apply((Tuple) obj, (TimeWindow) window, (Iterable<Tuple2<Long, IntType>>) iterable, (Collector<Tuple4<Long, Long, Long, IntType>>) collector);
                }
            }).addSink(new ValidatingSink(100, 30)).setParallelism(1);
            TestUtils.tryExecute(createRemoteEnvironment, "Tumbling Window Test");
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Parameterized.Parameters(name = "StateBackend = {0}")
    public static Collection<Object[]> parameters() {
        return Arrays.asList(new Object[]{StateBackendEnum.MEM}, new Object[]{StateBackendEnum.FILE}, new Object[]{StateBackendEnum.ROCKSDB}, new Object[]{StateBackendEnum.ROCKSDB_FULLY_ASYNC});
    }
}
