package net.sansa_stack.hadoop.format.commons_csv.csv;

import io.reactivex.rxjava3.core.Flowable;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.regex.Pattern;
import net.sansa_stack.hadoop.core.Accumulating;
import net.sansa_stack.hadoop.core.RecordReaderGenericBase;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/sansa_stack/hadoop/format/commons_csv/csv/RecordReaderCsv.class */
public class RecordReaderCsv extends RecordReaderGenericBase<List, List, List, List> {
    private static final Logger logger = LoggerFactory.getLogger(RecordReaderCsv.class);
    public static final String RECORD_MINLENGTH_KEY = "mapreduce.input.csv.record.minlength";
    public static final String RECORD_MAXLENGTH_KEY = "mapreduce.input.csv.record.maxlength";
    public static final String RECORD_PROBECOUNT_KEY = "mapreduce.input.csv.record.probecount";
    public static final String CSV_FORMAT_RAW_KEY = "mapreduce.input.csv.format.raw";
    public static final String CELL_MAXLENGTH_KEY = "mapreduce.input.csv.cell.maxlength";
    public static final long CELL_MAXLENGTH_DEFAULT_VALUE = 300000;
    protected CSVFormat requestedCsvFormat;
    protected CSVFormat effectiveCsvFormat;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/sansa_stack/hadoop/format/commons_csv/csv/RecordReaderCsv$State.class */
    public static class State {
        public Reader reader;
        public Iterator<CSVRecord> iterator;
        public CSVParser csvParser = null;
        public long seenRowLength = -1;
        public List<String> priorRow = null;

        State(Reader reader) {
            this.reader = reader;
        }
    }

    public static Pattern createStartOfCsvRecordPattern(long j) {
        return Pattern.compile("(?<=\n(?!((?<![^\"]\"[^\"]).){0," + j + "}\"(\r?\n|,|$))).", 32);
    }

    public RecordReaderCsv() {
        this("mapreduce.input.csv.record.minlength", "mapreduce.input.csv.record.maxlength", "mapreduce.input.csv.record.probecount", null);
    }

    public RecordReaderCsv(String str, String str2, String str3, Pattern pattern) {
        super(str, str2, str3, pattern, Accumulating.identity());
    }

    @Override // net.sansa_stack.hadoop.core.RecordReaderGenericBase
    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException {
        super.initialize(inputSplit, taskAttemptContext);
        Configuration configuration = taskAttemptContext.getConfiguration();
        this.recordStartPattern = createStartOfCsvRecordPattern(configuration.getLong("mapreduce.input.csv.cell.maxlength", 300000L));
        this.requestedCsvFormat = FileInputFormatCsv.getCsvFormat(configuration, CSVFormat.EXCEL);
        this.effectiveCsvFormat = disableSkipHeaderRecord(this.requestedCsvFormat);
    }

    protected CSVFormat disableSkipHeaderRecord(CSVFormat cSVFormat) {
        return CSVFormat.Builder.create(cSVFormat).setSkipHeaderRecord(false).build();
    }

    protected CSVParser newCsvParser(Reader reader) throws IOException {
        return new CSVParser(reader, this.effectiveCsvFormat);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // net.sansa_stack.hadoop.core.RecordReaderGenericBase
    public Flowable<List> createRecordFlow() throws IOException {
        Flowable<List> createRecordFlow = super.createRecordFlow();
        if (this.requestedCsvFormat.getSkipHeaderRecord() && this.isFirstSplit) {
            createRecordFlow = createRecordFlow.skip(1L);
        }
        return createRecordFlow;
    }

    @Override // net.sansa_stack.hadoop.core.RecordReaderGenericBase
    protected Flowable<List> parse(Callable<InputStream> callable) {
        return Flowable.generate(() -> {
            return new State(new InputStreamReader((InputStream) callable.call()));
        }, (state, emitter) -> {
            try {
                Iterator<CSVRecord> it = state.iterator;
                if (it == null) {
                    state.csvParser = newCsvParser(state.reader);
                    Iterator<CSVRecord> it2 = state.csvParser.iterator();
                    state.iterator = it2;
                    it = it2;
                }
                if (it.hasNext()) {
                    List<String> list = it.next().toList();
                    long size = list.size();
                    if (state.seenRowLength == -1) {
                        state.seenRowLength = size;
                    } else if (size != state.seenRowLength) {
                        throw new IllegalStateException(String.format("Current row length (%d) does not match prior one (%d) - current: %s | prior: %s", Long.valueOf(size), Long.valueOf(state.seenRowLength), list, state.priorRow));
                    }
                    state.priorRow = list;
                    emitter.onNext(list);
                } else {
                    emitter.onComplete();
                }
            } catch (Exception e) {
                emitter.onError(e);
            }
        }, state2 -> {
            state2.csvParser.close();
        });
    }
}
