/*
 * Decompiled with CFR 0.152.
 */
package net.sansa_stack.hadoop.format.univocity.csv.csv;

import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Streams;
import com.univocity.parsers.common.AbstractParser;
import io.reactivex.rxjava3.core.Flowable;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.stream.Stream;
import net.sansa_stack.hadoop.core.Accumulating;
import net.sansa_stack.hadoop.core.RecordReaderGenericBase;
import net.sansa_stack.hadoop.core.pattern.CustomPatternCsv;
import net.sansa_stack.hadoop.format.jena.base.RecordReaderConf;
import net.sansa_stack.hadoop.format.univocity.csv.csv.FileInputFormatCsvUnivocity;
import org.aksw.commons.model.csvw.domain.api.Dialect;
import org.aksw.commons.model.csvw.domain.api.DialectMutable;
import org.aksw.commons.model.csvw.domain.impl.DialectMutableImpl;
import org.aksw.commons.model.csvw.univocity.UnivocityCsvwConf;
import org.aksw.commons.model.csvw.univocity.UnivocityParserFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class RecordReaderCsvUnivocity
extends RecordReaderGenericBase<String[], String[], String[], String[]> {
    public static final String RECORD_MINLENGTH_KEY = "mapreduce.input.csv.record.minlength";
    public static final String RECORD_MAXLENGTH_KEY = "mapreduce.input.csv.record.maxlength";
    public static final String RECORD_PROBECOUNT_KEY = "mapreduce.input.csv.record.probecount";
    public static final String CSV_FORMAT_RAW_KEY = "mapreduce.input.csv.format.raw";
    public static final String CELL_MAXLENGTH_KEY = "mapreduce.input.csv.cell.maxlength";
    public static final int CELL_MAXLENGTH_DEFAULT_VALUE = 500000;
    public static final String CELL_MAXLINES_KEY = "mapreduce.input.csv.cell.maxlines";
    public static final int CELL_MAXLINES_DEFAULT_VALUE = 1000;
    protected Dialect requestedDialect;
    protected UnivocityParserFactory parserFactory;
    protected long postProcessRowSkipCount;

    public RecordReaderCsvUnivocity() {
        this(new RecordReaderConf(null, RECORD_MINLENGTH_KEY, RECORD_MAXLENGTH_KEY, RECORD_PROBECOUNT_KEY, null));
    }

    public RecordReaderCsvUnivocity(RecordReaderConf conf) {
        super(conf, Accumulating.identity());
    }

    @Override
    public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException {
        super.initialize(inputSplit, context);
        Configuration conf = context.getConfiguration();
        UnivocityCsvwConf parserConf = FileInputFormatCsvUnivocity.getUnivocityConfig(conf);
        DialectMutableImpl tmp = new DialectMutableImpl();
        parserConf.getDialect().copyInto((DialectMutable)tmp, false);
        this.requestedDialect = tmp;
        int cellMaxLength = conf.getInt(CELL_MAXLENGTH_KEY, 500000);
        int cellMaxLines = conf.getInt(CELL_MAXLINES_KEY, 1000);
        this.recordStartPattern = CustomPatternCsv.create(this.requestedDialect, cellMaxLines, cellMaxLength);
        this.postProcessRowSkipCount = Optional.ofNullable(parserConf.getDialect().getHeaderRowCount()).orElse(1L);
        parserConf.getDialect().setHeaderRowCount(Long.valueOf(0L));
        this.parserFactory = UnivocityParserFactory.createDefault((Boolean)false).configure(parserConf);
    }

    @Override
    protected Stream<String[]> createRecordFlow() throws IOException {
        Stream<String[]> tmp = super.createRecordFlow();
        if (this.isFirstSplit && this.postProcessRowSkipCount > 0L) {
            tmp = tmp.skip(this.postProcessRowSkipCount);
        }
        return tmp;
    }

    @Override
    protected Stream<String[]> parse(InputStream in, boolean isProbe) {
        State it = new State(() -> ((UnivocityParserFactory)this.parserFactory).newParser(), this.parserFactory.newInputStreamReader(in));
        Stream result = (Stream)Streams.stream((Iterator)((Object)it)).onClose(it::close);
        return result;
    }

    protected Flowable<String[]> parse(Callable<InputStream> inputStreamSupplier) {
        return Flowable.generate(() -> new State(() -> ((UnivocityParserFactory)this.parserFactory).newParser(), this.parserFactory.newInputStreamReader((InputStream)inputStreamSupplier.call())), (s, e) -> {
            try {
                String[] row;
                AbstractParser it = s.parser;
                if (it == null) {
                    it = s.parser = this.parserFactory.newParser();
                    s.parser.beginParsing(s.reader);
                }
                if ((row = it.parseNext()) == null) {
                    e.onComplete();
                } else {
                    long rowLength = row.length;
                    if (s.seenRowLength == -1L) {
                        s.seenRowLength = rowLength;
                    } else if (rowLength != s.seenRowLength) {
                        String msg = String.format("Current row length (%d) does not match prior one (%d) - current: %s | prior: %s", rowLength, s.seenRowLength, Arrays.asList(row), Arrays.asList(s.priorRow));
                        throw new IllegalStateException(msg);
                    }
                    s.priorRow = row;
                    e.onNext((Object)row);
                }
            }
            catch (Exception x) {
                e.onError((Throwable)x);
            }
        }, s -> s.reader.close());
    }

    private static class State
    extends AbstractIterator<String[]>
    implements AutoCloseable {
        public Reader reader;
        public Callable<AbstractParser<?>> parserFactory;
        public AbstractParser<?> parser = null;
        public long seenRowLength = -1L;
        public String[] priorRow = null;

        State(Callable<AbstractParser<?>> parserFactory, Reader reader) {
            this.parserFactory = parserFactory;
            this.reader = reader;
        }

        protected String[] computeNext() {
            String[] result;
            String[] row;
            if (this.parser == null) {
                try {
                    this.parser = this.parserFactory.call();
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
                this.parser.beginParsing(this.reader);
            }
            if ((row = this.parser.parseNext()) == null) {
                result = (String[])this.endOfData();
            } else {
                long rowLength = row.length;
                if (this.seenRowLength == -1L) {
                    this.seenRowLength = rowLength;
                } else if (rowLength != this.seenRowLength) {
                    String msg = String.format("Current row length (%d) does not match prior one (%d) - current: %s | prior: %s", rowLength, this.seenRowLength, Arrays.asList(row), Arrays.asList(this.priorRow));
                    throw new IllegalStateException(msg);
                }
                this.priorRow = row;
                result = row;
            }
            return result;
        }

        @Override
        public void close() {
            try {
                if (this.parser != null) {
                    this.parser.stopParsing();
                }
            }
            finally {
                if (this.reader != null) {
                    try {
                        this.reader.close();
                    }
                    catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
        }
    }
}

