package org.apache.flume.sink.kite.parser;

import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.UncheckedExecutionException;
import java.io.IOException;
import java.net.URI;
import java.net.URL;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.sink.kite.DatasetSinkConstants;
import org.apache.flume.sink.kite.NonRecoverableEventException;
import org.apache.flume.sink.kite.parser.EntityParser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

/* loaded from: input_file:org/apache/flume/sink/kite/parser/AvroParser.class */
public class AvroParser implements EntityParser<GenericRecord> {
    static Configuration conf = new Configuration();
    private static final LoadingCache<String, Schema> schemasFromLiteral = CacheBuilder.newBuilder().build(new CacheLoader<String, Schema>() { // from class: org.apache.flume.sink.kite.parser.AvroParser.1
        public Schema load(String str) {
            Preconditions.checkNotNull(str, "Schema literal cannot be null without a Schema URL");
            return new Schema.Parser().parse(str);
        }
    });
    private static final LoadingCache<String, Schema> schemasFromURL = CacheBuilder.newBuilder().build(new CacheLoader<String, Schema>() { // from class: org.apache.flume.sink.kite.parser.AvroParser.2
        public Schema load(String str) throws IOException {
            Schema.Parser parser = new Schema.Parser();
            FSDataInputStream fSDataInputStream = null;
            try {
                fSDataInputStream = str.toLowerCase(Locale.ENGLISH).startsWith("hdfs:/") ? FileSystem.get(URI.create(str), AvroParser.conf).open(new Path(str)) : new URL(str).openStream();
                Schema parse = parser.parse(fSDataInputStream);
                if (fSDataInputStream != null) {
                    fSDataInputStream.close();
                }
                return parse;
            } catch (Throwable th) {
                if (fSDataInputStream != null) {
                    fSDataInputStream.close();
                }
                throw th;
            }
        }
    });
    private final Schema datasetSchema;
    private final LoadingCache<Schema, DatumReader<GenericRecord>> readers;
    private BinaryDecoder decoder;

    /* loaded from: input_file:org/apache/flume/sink/kite/parser/AvroParser$Builder.class */
    public static class Builder implements EntityParser.Builder<GenericRecord> {
        @Override // org.apache.flume.sink.kite.parser.EntityParser.Builder
        public EntityParser<GenericRecord> build(Schema schema, Context context) {
            return new AvroParser(schema);
        }
    }

    private AvroParser(Schema schema) {
        this.readers = CacheBuilder.newBuilder().build(new CacheLoader<Schema, DatumReader<GenericRecord>>() { // from class: org.apache.flume.sink.kite.parser.AvroParser.3
            public DatumReader<GenericRecord> load(Schema schema2) {
                return new GenericDatumReader(schema2, AvroParser.this.datasetSchema);
            }
        });
        this.decoder = null;
        this.datasetSchema = schema;
    }

    @Override // org.apache.flume.sink.kite.parser.EntityParser
    public GenericRecord parse(Event event, GenericRecord genericRecord) throws EventDeliveryException, NonRecoverableEventException {
        this.decoder = DecoderFactory.get().binaryDecoder(event.getBody(), this.decoder);
        try {
            return (GenericRecord) ((DatumReader) this.readers.getUnchecked(schema(event))).read(genericRecord, this.decoder);
        } catch (IOException e) {
            throw new NonRecoverableEventException("Cannot deserialize event", e);
        } catch (RuntimeException e2) {
            throw new NonRecoverableEventException("Cannot deserialize event", e2);
        }
    }

    private static Schema schema(Event event) throws EventDeliveryException, NonRecoverableEventException {
        Map headers = event.getHeaders();
        String str = (String) headers.get(DatasetSinkConstants.AVRO_SCHEMA_URL_HEADER);
        try {
            if (str != null) {
                return (Schema) schemasFromURL.get(str);
            }
            String str2 = (String) headers.get(DatasetSinkConstants.AVRO_SCHEMA_LITERAL_HEADER);
            if (str2 == null) {
                throw new NonRecoverableEventException("No schema in event headers. Headers must include either flume.avro.schema.url or flume.avro.schema.literal");
            }
            return (Schema) schemasFromLiteral.get(str2);
        } catch (ExecutionException e) {
            throw new EventDeliveryException("Cannot get schema", e.getCause());
        } catch (UncheckedExecutionException e2) {
            throw new NonRecoverableEventException("Cannot parse schema", e2.getCause());
        }
    }
}
