package net.sansa_stack.hadoop.format.jena.trig;

import io.reactivex.rxjava3.core.Flowable;
import java.io.InputStream;
import java.util.concurrent.Callable;
import java.util.stream.Stream;
import net.sansa_stack.hadoop.core.Accumulating;
import net.sansa_stack.hadoop.core.pattern.CustomPattern;
import net.sansa_stack.hadoop.core.pattern.CustomPatternJava;
import net.sansa_stack.hadoop.format.jena.base.RecordReaderGenericRdfAccumulatingBase;
import net.sansa_stack.hadoop.format.jena.base.RecordReaderRdfConf;
import org.aksw.jenax.arq.dataset.api.DatasetOneNg;
import org.aksw.jenax.arq.dataset.impl.DatasetOneNgImpl;
import org.aksw.jenax.sparql.query.rx.RDFDataMgrRx;
import org.apache.jena.graph.Node;
import org.apache.jena.query.ReadWrite;
import org.apache.jena.riot.Lang;
import org.apache.jena.sparql.core.Quad;

/* loaded from: input_file:net/sansa_stack/hadoop/format/jena/trig/RecordReaderRdfTrigDataset.class */
public class RecordReaderRdfTrigDataset extends RecordReaderGenericRdfAccumulatingBase<Quad, Node, DatasetOneNg, DatasetOneNg> {
    public static final String RECORD_MINLENGTH_KEY = "mapreduce.input.trig.dataset.record.minlength";
    public static final String RECORD_MAXLENGTH_KEY = "mapreduce.input.trig.dataset.record.maxlength";
    public static final String RECORD_PROBECOUNT_KEY = "mapreduce.input.trig.dataset.record.probecount";
    public static final String ELEMENT_PROBECOUNT_KEY = "mapreduce.input.trig.dataset.element.probecount";
    public static final String PREFIXES_MAXLENGTH_KEY = "mapreduce.input.trig.dataset.prefixes.maxlength";
    protected static final CustomPattern trigFwdPatternGraphFollowedByCurlyBrace = CustomPatternJava.compile("@?base|@?prefix|(graph\\s*)?(<[^>]*>|_?:[^-\\s]+)\\s*\\{", 10);
    protected static final CustomPattern trigFwdPatternNew = CustomPatternJava.compile("@?base|@?prefix|(graph\\s*)?(<[^>]*>|_?:[^-\\s]+)", 10);
    protected static final CustomPattern trigFwdPattern = trigFwdPatternNew;

    /* loaded from: input_file:net/sansa_stack/hadoop/format/jena/trig/RecordReaderRdfTrigDataset$AccumulatingDataset.class */
    public static class AccumulatingDataset implements Accumulating<Quad, Node, DatasetOneNg, DatasetOneNg> {
        @Override // net.sansa_stack.hadoop.core.Accumulating
        public Node classify(Quad quad) {
            return quad.getGraph();
        }

        @Override // net.sansa_stack.hadoop.core.Accumulating
        public DatasetOneNg createAccumulator(Node node) {
            DatasetOneNg create = DatasetOneNgImpl.create(node == null ? Quad.defaultGraphNodeGenerated.getURI() : node.getURI());
            create.begin(ReadWrite.WRITE);
            return create;
        }

        @Override // net.sansa_stack.hadoop.core.Accumulating
        public void accumulate(DatasetOneNg datasetOneNg, Quad quad) {
            datasetOneNg.asDatasetGraph().add(quad);
        }

        @Override // net.sansa_stack.hadoop.core.Accumulating
        public DatasetOneNg accumulatedValue(DatasetOneNg datasetOneNg) {
            datasetOneNg.commit();
            return datasetOneNg;
        }
    }

    public RecordReaderRdfTrigDataset() {
        super(new RecordReaderRdfConf(RECORD_MINLENGTH_KEY, RECORD_MAXLENGTH_KEY, RECORD_PROBECOUNT_KEY, trigFwdPattern, PREFIXES_MAXLENGTH_KEY, Lang.TRIG), new AccumulatingDataset());
    }

    @Override // net.sansa_stack.hadoop.core.RecordReaderGenericBase
    protected Stream<Quad> parse(InputStream inputStream, boolean z) {
        return setupParser(inputStream, z).streamQuads();
    }

    protected Flowable<Quad> parse(Callable<InputStream> callable) {
        return RDFDataMgrRx.createFlowableQuads(callable, this.lang, this.baseIri);
    }
}
