package org.apache.nifi.processors.beats;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processors.beats.event.BeatsEvent;
import org.apache.nifi.processors.beats.event.BeatsEventFactory;
import org.apache.nifi.processors.beats.event.BeatsMetadata;
import org.apache.nifi.processors.beats.frame.BeatsEncoder;
import org.apache.nifi.processors.beats.handler.BeatsSocketChannelHandlerFactory;
import org.apache.nifi.processors.beats.response.BeatsChannelResponse;
import org.apache.nifi.processors.beats.response.BeatsResponse;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;

@CapabilityDescription("Listens for messages sent by libbeat compatible clients (e.g. filebeats, metricbeats, etc) using Libbeat's 'output.logstash', writing its JSON formatted payload to the content of a FlowFile.This processor replaces the now deprecated ListenLumberjack")
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"listen", "beats", "tcp", "logs"})
@WritesAttributes({@WritesAttribute(attribute = "beats.sender", description = "The sending host of the messages."), @WritesAttribute(attribute = "beats.port", description = "The sending port the messages were received over."), @WritesAttribute(attribute = BeatsMetadata.SEQNUMBER_KEY, description = "The sequence number of the message. Only included if <Batch Size> is 1."), @WritesAttribute(attribute = "mime.type", description = "The mime.type of the content which is application/json")})
@SeeAlso(classNames = {"org.apache.nifi.processors.standard.ParseSyslog"})
/* loaded from: input_file:org/apache/nifi/processors/beats/ListenBeats.class */
public class ListenBeats extends AbstractListenEventBatchingProcessor<BeatsEvent> {
    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder().name("SSL_CONTEXT_SERVICE").displayName("SSL Context Service").description("The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be received over a secure connection.").required(false).identifiesControllerService(RestrictedSSLContextService.class).build();
    public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder().name("Client Auth").displayName("Client Auth").description("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.").required(false).allowableValues(ClientAuth.values()).defaultValue(ClientAuth.REQUIRED.name()).build();
    private volatile BeatsEncoder beatsEncoder;

    /* loaded from: input_file:org/apache/nifi/processors/beats/ListenBeats$beatsAttributes.class */
    public enum beatsAttributes implements FlowFileAttributeKey {
        SENDER("beats.sender"),
        PORT("beats.port"),
        SEQNUMBER(BeatsMetadata.SEQNUMBER_KEY);

        private final String key;

        beatsAttributes(String str) {
            this.key = str;
        }

        public String key() {
            return this.key;
        }
    }

    protected List<PropertyDescriptor> getAdditionalProperties() {
        return Arrays.asList(MAX_CONNECTIONS, SSL_CONTEXT_SERVICE, CLIENT_AUTH);
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList arrayList = new ArrayList();
        SSLContextService asControllerService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
        if (asControllerService != null && !asControllerService.isTrustStoreConfigured()) {
            arrayList.add(new ValidationResult.Builder().explanation("The context service must have a truststore  configured for the beats forwarder client to work correctly").valid(false).subject(SSL_CONTEXT_SERVICE.getName()).build());
        }
        String value = validationContext.getProperty(CLIENT_AUTH).getValue();
        if (asControllerService != null && StringUtils.isBlank(value)) {
            arrayList.add(new ValidationResult.Builder().explanation("Client Auth must be provided when using TLS/SSL").valid(false).subject("Client Auth").build());
        }
        return arrayList;
    }

    @OnScheduled
    public void onScheduled(ProcessContext processContext) throws IOException {
        super.onScheduled(processContext);
        this.beatsEncoder = new BeatsEncoder();
    }

    protected ChannelDispatcher createDispatcher(ProcessContext processContext, BlockingQueue<BeatsEvent> blockingQueue) throws IOException {
        BeatsEventFactory beatsEventFactory = new BeatsEventFactory();
        BeatsSocketChannelHandlerFactory beatsSocketChannelHandlerFactory = new BeatsSocketChannelHandlerFactory();
        int intValue = processContext.getProperty(MAX_CONNECTIONS).asInteger().intValue();
        int intValue2 = processContext.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
        Charset forName = Charset.forName(processContext.getProperty(CHARSET).getValue());
        ByteBufferPool byteBufferPool = new ByteBufferPool(intValue, intValue2);
        SSLContext sSLContext = null;
        ClientAuth clientAuth = null;
        SSLContextService asControllerService = processContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
        if (asControllerService != null) {
            String value = processContext.getProperty(CLIENT_AUTH).getValue();
            sSLContext = asControllerService.createContext();
            clientAuth = ClientAuth.valueOf(value);
        }
        return new SocketChannelDispatcher(beatsEventFactory, beatsSocketChannelHandlerFactory, byteBufferPool, blockingQueue, getLogger(), intValue, sSLContext, clientAuth, forName);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getBatchKey(BeatsEvent beatsEvent) {
        return beatsEvent.getSender();
    }

    protected void respond(BeatsEvent beatsEvent, BeatsResponse beatsResponse) {
        BeatsChannelResponse beatsChannelResponse = new BeatsChannelResponse(this.beatsEncoder, beatsResponse);
        ChannelResponder responder = beatsEvent.getResponder();
        responder.addResponse(beatsChannelResponse);
        try {
            responder.respond();
        } catch (IOException e) {
            getLogger().error("Error sending response for transaction {} due to {}", new Object[]{Integer.valueOf(beatsEvent.getSeqNumber()), e.getMessage()}, e);
        }
    }

    protected void postProcess(ProcessContext processContext, ProcessSession processSession, List<BeatsEvent> list) {
        processSession.commitAsync(() -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                BeatsEvent beatsEvent = (BeatsEvent) it.next();
                respond(beatsEvent, BeatsResponse.ok(beatsEvent.getSeqNumber()));
            }
        });
    }

    protected String getTransitUri(AbstractListenEventBatchingProcessor<BeatsEvent>.FlowFileEventBatch flowFileEventBatch) {
        String sender = ((BeatsEvent) flowFileEventBatch.getEvents().get(0)).getSender();
        return "beats://" + ((!sender.startsWith("/") || sender.length() <= 1) ? sender : sender.substring(1)) + ":" + this.port;
    }

    protected Map<String, String> getAttributes(AbstractListenEventBatchingProcessor<BeatsEvent>.FlowFileEventBatch flowFileEventBatch) {
        List events = flowFileEventBatch.getEvents();
        String sender = ((BeatsEvent) events.get(0)).getSender();
        HashMap hashMap = new HashMap(events.size() == 1 ? 5 : 4);
        hashMap.put(beatsAttributes.SENDER.key(), sender);
        hashMap.put(beatsAttributes.PORT.key(), String.valueOf(this.port));
        hashMap.put(CoreAttributes.MIME_TYPE.key(), "application/json");
        if (events.size() == 1) {
            hashMap.put(beatsAttributes.SEQNUMBER.key(), String.valueOf(((BeatsEvent) events.get(0)).getSeqNumber()));
        }
        return hashMap;
    }
}
