package org.apache.nifi.processors.kudu;

import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.security.auth.login.LoginException;
import org.apache.kudu.Schema;
import org.apache.kudu.client.Insert;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.Operation;
import org.apache.kudu.client.OperationResponse;
import org.apache.kudu.client.RowError;
import org.apache.kudu.client.SessionConfiguration;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.validation.RecordPathValidator;
import org.apache.nifi.security.krb.KerberosAction;
import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSet;

@CapabilityDescription("Reads records from an incoming FlowFile using the provided Record Reader, and writes those records to the specified Kudu's table. The schema for the Kudu table is inferred from the schema of the Record Reader. If any error occurs while reading records from the input, or writing records to Kudu, the FlowFile will be routed to failure")
@SystemResourceConsideration(resource = SystemResource.MEMORY)
@SupportsBatching
@RequiresInstanceClassLoading
@WritesAttribute(attribute = PutKudu.RECORD_COUNT_ATTR, description = "Number of records written to Kudu")
@EventDriven
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"put", "database", "NoSQL", "kudu", "HDFS", "record"})
/* loaded from: input_file:org/apache/nifi/processors/kudu/PutKudu.class */
public class PutKudu extends AbstractKuduProcessor {
    static final AllowableValue FAILURE_STRATEGY_ROUTE = new AllowableValue("route-to-failure", "Route to Failure", "The FlowFile containing the Records that failed to insert will be routed to the 'failure' relationship");
    static final AllowableValue FAILURE_STRATEGY_ROLLBACK = new AllowableValue("rollback", "Rollback Session", "If any Record cannot be inserted, all FlowFiles in the session will be rolled back to their input queue. This means that if data cannot be pushed, it will block any subsequent data from be pushed to Kudu as well until the issue is resolved. However, this may be advantageous if a strict ordering is required.");
    protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder().name("Table Name").description("The name of the Kudu Table to put data into").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("The service for reading records from incoming flow files.").identifiesControllerService(RecordReaderFactory.class).required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor FAILURE_STRATEGY = new PropertyDescriptor.Builder().name("Failure Strategy").displayName("Failure Strategy").description("If one or more Records in a batch cannot be transferred to Kudu, specifies how to handle the failure").required(true).allowableValues(new AllowableValue[]{FAILURE_STRATEGY_ROUTE, FAILURE_STRATEGY_ROLLBACK}).defaultValue(FAILURE_STRATEGY_ROUTE.getValue()).build();
    protected static final PropertyDescriptor SKIP_HEAD_LINE = new PropertyDescriptor.Builder().name("Skip head line").description("Deprecated. Used to ignore header lines, but this should be handled by a RecordReader (e.g. \"Treat First Line as Header\" property of CSVReader)").allowableValues(new String[]{"true", "false"}).defaultValue("false").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    protected static final PropertyDescriptor LOWERCASE_FIELD_NAMES = new PropertyDescriptor.Builder().name("Lowercase Field Names").description("Convert column names to lowercase when finding index of Kudu table columns").defaultValue("false").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
    protected static final PropertyDescriptor HANDLE_SCHEMA_DRIFT = new PropertyDescriptor.Builder().name("Handle Schema Drift").description("If set to true, when fields with names that are not in the target Kudu table are encountered, the Kudu table will be altered to include new columns for those fields.").defaultValue("false").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
    static final PropertyDescriptor DATA_RECORD_PATH = new PropertyDescriptor.Builder().name("Data RecordPath").displayName("Data RecordPath").description("If specified, this property denotes a RecordPath that will be evaluated against each incoming Record and the Record that results from evaluating the RecordPath will be sent to Kudu instead of sending the entire incoming Record. If not specified, the entire incoming Record will be published to Kudu.").required(false).addValidator(new RecordPathValidator()).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    static final PropertyDescriptor OPERATION_RECORD_PATH = new PropertyDescriptor.Builder().name("Operation RecordPath").displayName("Operation RecordPath").description("If specified, this property denotes a RecordPath that will be evaluated against each incoming Record in order to determine the Kudu Operation Type. When evaluated, the RecordPath must evaluate to one of hte valid Kudu Operation Types, or the incoming FlowFile will be routed to failure. If this property is specified, the <Kudu Operation Type> property will be ignored.").required(false).addValidator(new RecordPathValidator()).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    protected static final Validator OperationTypeValidator = new Validator() { // from class: org.apache.nifi.processors.kudu.PutKudu.1
        public ValidationResult validate(String str, String str2, ValidationContext validationContext) {
            boolean z;
            if (validationContext.isExpressionLanguageSupported(str) && validationContext.isExpressionLanguagePresent(str2)) {
                return new ValidationResult.Builder().subject(str).input(str2).explanation("Expression Language Present").valid(true).build();
            }
            try {
                OperationType.valueOf(str2.toUpperCase());
                z = true;
            } catch (IllegalArgumentException e) {
                z = false;
            }
            return new ValidationResult.Builder().subject(str).input(str2).valid(z).explanation(z ? null : "Value must be one of: " + ((String) Arrays.stream(OperationType.values()).map((v0) -> {
                return v0.toString();
            }).collect(Collectors.joining(", ")))).build();
        }
    };
    protected static final PropertyDescriptor INSERT_OPERATION = new PropertyDescriptor.Builder().name("Insert Operation").displayName("Kudu Operation Type").description("Specify operationType for this processor.\nValid values are: " + ((String) Arrays.stream(OperationType.values()).map((v0) -> {
        return v0.toString();
    }).collect(Collectors.joining(", "))) + ". This Property will be ignored if the <Operation RecordPath> property is set.").defaultValue(OperationType.INSERT.toString()).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(OperationTypeValidator).build();
    protected static final PropertyDescriptor FLUSH_MODE = new PropertyDescriptor.Builder().name("Flush Mode").description("Set the new flush mode for a kudu session.\nAUTO_FLUSH_SYNC: the call returns when the operation is persisted, else it throws an exception.\nAUTO_FLUSH_BACKGROUND: the call returns when the operation has been added to the buffer. This call should normally perform only fast in-memory operations but it may have to wait when the buffer is full and there's another buffer being flushed.\nMANUAL_FLUSH: the call returns when the operation has been added to the buffer, else it throws a KuduException if the buffer is full.").allowableValues(SessionConfiguration.FlushMode.values()).defaultValue(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND.toString()).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(true).build();
    protected static final PropertyDescriptor FLOWFILE_BATCH_SIZE = new PropertyDescriptor.Builder().name("FlowFiles per Batch").description("The maximum number of FlowFiles to process in a single execution, between 1 - 100000. Depending on your memory size, and data size per row set an appropriate batch size for the number of FlowFiles to process per client connection setup.Gradually increase this number, only if your FlowFiles typically contain a few records.").defaultValue("1").required(true).addValidator(StandardValidators.createLongValidator(1, 100000, true)).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("Batch Size").displayName("Max Records per Batch").description("The maximum number of Records to process in a single Kudu-client batch, between 1 - 100000. Depending on your memory size, and data size per row set an appropriate batch size. Gradually increase this number to find out the best one for best performances.").defaultValue("100").required(true).addValidator(StandardValidators.createLongValidator(1, 100000, true)).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    protected static final PropertyDescriptor IGNORE_NULL = new PropertyDescriptor.Builder().name("Ignore NULL").description("Ignore NULL on Kudu Put Operation, Update only non-Null columns if set true").defaultValue("false").required(true).addValidator(StandardValidators.BOOLEAN_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    protected static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("A FlowFile is routed to this relationship after it has been successfully stored in Kudu").build();
    protected static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("A FlowFile is routed to this relationship if it cannot be sent to Kudu").build();
    public static final String RECORD_COUNT_ATTR = "record.count";
    private volatile int batchSize = 100;
    private volatile int ffbatch = 1;
    private volatile SessionConfiguration.FlushMode flushMode;
    private volatile Function<Record, OperationType> recordPathOperationType;
    private volatile RecordPath dataRecordPath;
    private volatile String failureStrategy;
    private volatile boolean supportsInsertIgnoreOp;

    /* loaded from: input_file:org/apache/nifi/processors/kudu/PutKudu$RecordPathOperationType.class */
    private static class RecordPathOperationType implements Function<Record, OperationType> {
        private final RecordPath recordPath;

        public RecordPathOperationType(RecordPath recordPath) {
            this.recordPath = recordPath;
        }

        @Override // java.util.function.Function
        public OperationType apply(Record record) {
            List list = (List) this.recordPath.evaluate(record).getSelectedFields().distinct().collect(Collectors.toList());
            if (list.isEmpty()) {
                throw new ProcessException("Evaluated RecordPath " + this.recordPath.getPath() + " against Record but got no results");
            }
            if (list.size() > 1) {
                throw new ProcessException("Evaluated RecordPath " + this.recordPath.getPath() + " against Record and received multiple distinct results (" + list + ")");
            }
            String valueOf = String.valueOf(((FieldValue) list.get(0)).getValue());
            try {
                return OperationType.valueOf(valueOf.toUpperCase());
            } catch (IllegalArgumentException e) {
                throw new ProcessException("Evaluated RecordPath " + this.recordPath.getPath() + " against Record to determine Kudu Operation Type but found invalid value: " + valueOf);
            }
        }
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(KUDU_MASTERS);
        arrayList.add(TABLE_NAME);
        arrayList.add(FAILURE_STRATEGY);
        arrayList.add(KERBEROS_USER_SERVICE);
        arrayList.add(KERBEROS_CREDENTIALS_SERVICE);
        arrayList.add(KERBEROS_PRINCIPAL);
        arrayList.add(KERBEROS_PASSWORD);
        arrayList.add(SKIP_HEAD_LINE);
        arrayList.add(LOWERCASE_FIELD_NAMES);
        arrayList.add(HANDLE_SCHEMA_DRIFT);
        arrayList.add(RECORD_READER);
        arrayList.add(DATA_RECORD_PATH);
        arrayList.add(OPERATION_RECORD_PATH);
        arrayList.add(INSERT_OPERATION);
        arrayList.add(FLUSH_MODE);
        arrayList.add(FLOWFILE_BATCH_SIZE);
        arrayList.add(BATCH_SIZE);
        arrayList.add(IGNORE_NULL);
        arrayList.add(KUDU_OPERATION_TIMEOUT_MS);
        arrayList.add(KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS);
        arrayList.add(WORKER_COUNT);
        arrayList.add(KUDU_SASL_PROTOCOL_NAME);
        return arrayList;
    }

    public Set<Relationship> getRelationships() {
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_FAILURE);
        return hashSet;
    }

    @OnScheduled
    public void onScheduled(ProcessContext processContext) throws LoginException {
        this.batchSize = processContext.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger().intValue();
        this.ffbatch = processContext.getProperty(FLOWFILE_BATCH_SIZE).evaluateAttributeExpressions().asInteger().intValue();
        this.flushMode = SessionConfiguration.FlushMode.valueOf(processContext.getProperty(FLUSH_MODE).getValue().toUpperCase());
        createKerberosUserAndOrKuduClient(processContext);
        this.supportsInsertIgnoreOp = supportsIgnoreOperations();
        String value = processContext.getProperty(OPERATION_RECORD_PATH).getValue();
        if (value == null) {
            this.recordPathOperationType = null;
        } else {
            this.recordPathOperationType = new RecordPathOperationType(RecordPath.compile(value));
        }
        String value2 = processContext.getProperty(DATA_RECORD_PATH).getValue();
        this.dataRecordPath = value2 == null ? null : RecordPath.compile(value2);
        this.failureStrategy = processContext.getProperty(FAILURE_STRATEGY).getValue();
    }

    private boolean isRollbackOnFailure() {
        return FAILURE_STRATEGY_ROLLBACK.getValue().equalsIgnoreCase(this.failureStrategy);
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        List list = processSession.get(this.ffbatch);
        if (list.isEmpty()) {
            return;
        }
        KerberosUser kerberosUser = getKerberosUser();
        if (kerberosUser == null) {
            executeOnKuduClient(kuduClient -> {
                processFlowFiles(processContext, processSession, list, kuduClient);
            });
        } else {
            new KerberosAction(kerberosUser, () -> {
                executeOnKuduClient(kuduClient2 -> {
                    processFlowFiles(processContext, processSession, list, kuduClient2);
                });
                return null;
            }, getLogger()).execute();
        }
    }

    private void processFlowFiles(ProcessContext processContext, ProcessSession processSession, List<FlowFile> list, KuduClient kuduClient) {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        ArrayList arrayList = new ArrayList();
        KuduSession createKuduSession = createKuduSession(kuduClient);
        try {
            processRecords(list, hashMap, hashMap2, hashMap3, arrayList, processSession, processContext, kuduClient, createKuduSession);
            if (!isRollbackOnFailure() || (arrayList.isEmpty() && hashMap2.isEmpty())) {
                transferFlowFiles(list, hashMap, hashMap2, hashMap3, arrayList, processSession);
                return;
            }
            logFailures(arrayList, hashMap3);
            processSession.rollback();
            processContext.yield();
        } finally {
            try {
                flushKuduSession(createKuduSession, true, arrayList);
            } catch (KuduException | RuntimeException e) {
                getLogger().error("KuduSession.close() Failed", e);
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v149, types: [java.util.List] */
    /* JADX WARN: Type inference failed for: r11v0, types: [java.util.Map, java.util.Map<org.apache.nifi.flowfile.FlowFile, java.lang.Object>] */
    private void processRecords(List<FlowFile> list, Map<FlowFile, Integer> map, Map<FlowFile, Object> map2, Map<Operation, FlowFile> map3, List<RowError> list2, ProcessSession processSession, ProcessContext processContext, KuduClient kuduClient, KuduSession kuduSession) {
        Function<Record, OperationType> function;
        ArrayList<Record> arrayList;
        RecordReaderFactory asControllerService = processContext.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        int i = 0;
        OperationType operationType = OperationType.INSERT;
        for (FlowFile flowFile : list) {
            try {
                InputStream read = processSession.read(flowFile);
                Throwable th = null;
                try {
                    RecordReader createRecordReader = asControllerService.createRecordReader(flowFile, read, getLogger());
                    Throwable th2 = null;
                    try {
                        String evaluatedProperty = getEvaluatedProperty(TABLE_NAME, processContext, flowFile);
                        boolean parseBoolean = Boolean.parseBoolean(getEvaluatedProperty(IGNORE_NULL, processContext, flowFile));
                        boolean parseBoolean2 = Boolean.parseBoolean(getEvaluatedProperty(LOWERCASE_FIELD_NAMES, processContext, flowFile));
                        boolean parseBoolean3 = Boolean.parseBoolean(getEvaluatedProperty(HANDLE_SCHEMA_DRIFT, processContext, flowFile));
                        if (this.recordPathOperationType == null) {
                            OperationType valueOf = OperationType.valueOf(getEvaluatedProperty(INSERT_OPERATION, processContext, flowFile).toUpperCase());
                            function = record -> {
                                return valueOf;
                            };
                        } else {
                            function = this.recordPathOperationType;
                        }
                        RecordSet createRecordSet = createRecordReader.createRecordSet();
                        KuduTable openTable = kuduClient.openTable(evaluatedProperty);
                        Record next = createRecordSet.next();
                        if (parseBoolean3 && handleSchemaDrift(openTable, kuduClient, flowFile, next, parseBoolean2)) {
                            openTable = kuduClient.openTable(evaluatedProperty);
                        }
                        while (true) {
                            if (next == null) {
                                break;
                            }
                            OperationType apply = function.apply(next);
                            if (this.dataRecordPath == null) {
                                arrayList = Collections.singletonList(next);
                            } else {
                                List list3 = (List) this.dataRecordPath.evaluate(next).getSelectedFields().collect(Collectors.toList());
                                if (list3.isEmpty()) {
                                    throw new ProcessException("RecordPath " + this.dataRecordPath.getPath() + " evaluated against Record yielded no results.");
                                }
                                Iterator it = list3.iterator();
                                while (it.hasNext()) {
                                    RecordFieldType fieldType = ((FieldValue) it.next()).getField().getDataType().getFieldType();
                                    if (fieldType != RecordFieldType.RECORD) {
                                        throw new ProcessException("RecordPath " + this.dataRecordPath.getPath() + " evaluated against Record expected to return one or more Records but encountered field of type " + fieldType);
                                    }
                                }
                                arrayList = new ArrayList(list3.size());
                                Iterator it2 = list3.iterator();
                                while (it2.hasNext()) {
                                    arrayList.add((Record) ((FieldValue) it2.next()).getValue());
                                }
                            }
                            for (Record record2 : arrayList) {
                                if (!this.supportsInsertIgnoreOp && operationType != apply && (operationType == OperationType.INSERT_IGNORE || apply == OperationType.INSERT_IGNORE)) {
                                    flushKuduSession(kuduSession, false, list2);
                                    kuduSession.setIgnoreAllDuplicateRows(apply == OperationType.INSERT_IGNORE);
                                }
                                operationType = apply;
                                Operation createKuduOperation = createKuduOperation(apply, record2, record2.getSchema().getFieldNames(), parseBoolean, parseBoolean2, openTable);
                                map3.put(createKuduOperation, flowFile);
                                if (i == this.batchSize && this.flushMode == SessionConfiguration.FlushMode.MANUAL_FLUSH) {
                                    i = 0;
                                    flushKuduSession(kuduSession, false, list2);
                                }
                                OperationResponse apply2 = kuduSession.apply(createKuduOperation);
                                if (apply2 != null && apply2.hasRowError()) {
                                    map2.put(flowFile, apply2.getRowError());
                                    break;
                                } else {
                                    i++;
                                    map.merge(flowFile, 1, (v0, v1) -> {
                                        return Integer.sum(v0, v1);
                                    });
                                }
                            }
                            next = createRecordSet.next();
                        }
                        if (createRecordReader != null) {
                            if (0 != 0) {
                                try {
                                    createRecordReader.close();
                                } catch (Throwable th3) {
                                    th2.addSuppressed(th3);
                                }
                            } else {
                                createRecordReader.close();
                            }
                        }
                        if (read != null) {
                            if (0 != 0) {
                                try {
                                    read.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                read.close();
                            }
                        }
                    } catch (Throwable th5) {
                        if (createRecordReader != null) {
                            if (0 != 0) {
                                try {
                                    createRecordReader.close();
                                } catch (Throwable th6) {
                                    th2.addSuppressed(th6);
                                }
                            } else {
                                createRecordReader.close();
                            }
                        }
                        throw th5;
                    }
                } catch (Throwable th7) {
                    if (read != null) {
                        if (0 != 0) {
                            try {
                                read.close();
                            } catch (Throwable th8) {
                                th.addSuppressed(th8);
                            }
                        } else {
                            read.close();
                        }
                    }
                    throw th7;
                }
            } catch (Exception e) {
                getLogger().error("Failed to push {} to Kudu", new Object[]{flowFile}, e);
                map2.put(flowFile, e);
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v72, types: [java.util.List] */
    private boolean handleSchemaDrift(KuduTable kuduTable, KuduClient kuduClient, FlowFile flowFile, Record record, boolean z) {
        ArrayList arrayList;
        String lowerCase;
        if (record == null) {
            getLogger().debug("No Record to evaluate schema drift against for {}", new Object[]{flowFile});
            return false;
        }
        String name = kuduTable.getName();
        Schema schema = kuduTable.getSchema();
        if (this.dataRecordPath == null) {
            arrayList = record.getSchema().getFields();
        } else {
            List<FieldValue> list = (List) this.dataRecordPath.evaluate(record).getSelectedFields().collect(Collectors.toList());
            arrayList = new ArrayList();
            for (FieldValue fieldValue : list) {
                RecordField field = fieldValue.getField();
                if (field.getDataType().getFieldType() == RecordFieldType.RECORD) {
                    Object value = fieldValue.getValue();
                    if (value instanceof Record) {
                        arrayList.addAll(((Record) value).getSchema().getFields());
                    }
                } else {
                    arrayList.add(field);
                }
            }
        }
        List<RecordField> list2 = (List) arrayList.stream().filter(recordField -> {
            return !schema.hasColumn(z ? recordField.getFieldName().toLowerCase() : recordField.getFieldName());
        }).collect(Collectors.toList());
        if (list2.isEmpty()) {
            getLogger().debug("No schema drift detected for {}", new Object[]{flowFile});
            return false;
        }
        getLogger().info("Adding {} columns to table '{}' to handle schema drift", new Object[]{Integer.valueOf(list2.size()), name});
        for (RecordField recordField2 : list2) {
            if (z) {
                try {
                    lowerCase = recordField2.getFieldName().toLowerCase();
                } catch (KuduException e) {
                    if (!e.getStatus().isAlreadyPresent()) {
                        throw new ProcessException(e);
                    }
                    getLogger().info("Column already exists in table '{}' while handling schema drift", new Object[]{name});
                }
            } else {
                lowerCase = recordField2.getFieldName();
            }
            kuduClient.alterTable(name, getAddNullableColumnStatement(lowerCase, recordField2.getDataType()));
        }
        return true;
    }

    private void transferFlowFiles(List<FlowFile> list, Map<FlowFile, Integer> map, Map<FlowFile, Object> map2, Map<Operation, FlowFile> map3, List<RowError> list2, ProcessSession processSession) {
        Map map4 = (Map) list2.stream().filter(rowError -> {
            return map3.get(rowError.getOperation()) != null;
        }).collect(Collectors.groupingBy(rowError2 -> {
            return (FlowFile) map3.get(rowError2.getOperation());
        }));
        long j = 0;
        for (FlowFile flowFile : list) {
            int intValue = map.getOrDefault(flowFile, 0).intValue();
            j += intValue;
            List list3 = (List) map4.get(flowFile);
            if (list3 != null) {
                list3.forEach(rowError3 -> {
                    getLogger().error("Failed to write due to {}", new Object[]{rowError3.toString()});
                });
                j -= list3.size();
                processSession.transfer(processSession.putAttribute(flowFile, RECORD_COUNT_ATTR, Integer.toString(intValue - list3.size())), REL_FAILURE);
            } else {
                FlowFile putAttribute = processSession.putAttribute(flowFile, RECORD_COUNT_ATTR, String.valueOf(intValue));
                if (map2.containsKey(putAttribute)) {
                    getLogger().error("Failed to write due to {}", new Object[]{map2.get(putAttribute)});
                    processSession.transfer(putAttribute, REL_FAILURE);
                } else {
                    processSession.transfer(putAttribute, REL_SUCCESS);
                    processSession.getProvenanceReporter().send(putAttribute, "Successfully added FlowFile to Kudu");
                }
            }
        }
        processSession.adjustCounter("Records Inserted", j, false);
    }

    private void logFailures(List<RowError> list, Map<Operation, FlowFile> map) {
        for (Map.Entry entry : ((Map) list.stream().collect(Collectors.groupingBy(rowError -> {
            return (FlowFile) map.get(rowError.getOperation());
        }))).entrySet()) {
            getLogger().error("Could not write {} to Kudu due to: {}", new Object[]{(FlowFile) entry.getKey(), (List) entry.getValue()});
        }
    }

    private String getEvaluatedProperty(PropertyDescriptor propertyDescriptor, ProcessContext processContext, FlowFile flowFile) {
        PropertyValue evaluateAttributeExpressions = processContext.getProperty(propertyDescriptor).evaluateAttributeExpressions(flowFile);
        if (propertyDescriptor.isRequired() && evaluateAttributeExpressions == null) {
            throw new ProcessException(String.format("Property `%s` is required but evaluated to null", propertyDescriptor.getDisplayName()));
        }
        return evaluateAttributeExpressions.getValue();
    }

    protected KuduSession createKuduSession(KuduClient kuduClient) {
        KuduSession newSession = kuduClient.newSession();
        newSession.setMutationBufferSpace(this.batchSize);
        newSession.setFlushMode(this.flushMode);
        return newSession;
    }

    protected Operation createKuduOperation(OperationType operationType, Record record, List<String> list, boolean z, boolean z2, KuduTable kuduTable) {
        Insert newDeleteIgnore;
        switch (operationType) {
            case INSERT:
                newDeleteIgnore = kuduTable.newInsert();
                break;
            case INSERT_IGNORE:
                if (!this.supportsInsertIgnoreOp) {
                    newDeleteIgnore = kuduTable.newInsert();
                    break;
                } else {
                    newDeleteIgnore = kuduTable.newInsertIgnore();
                    break;
                }
            case UPSERT:
                newDeleteIgnore = kuduTable.newUpsert();
                break;
            case UPDATE:
                newDeleteIgnore = kuduTable.newUpdate();
                break;
            case UPDATE_IGNORE:
                newDeleteIgnore = kuduTable.newUpdateIgnore();
                break;
            case DELETE:
                newDeleteIgnore = kuduTable.newDelete();
                break;
            case DELETE_IGNORE:
                newDeleteIgnore = kuduTable.newDeleteIgnore();
                break;
            default:
                throw new IllegalArgumentException(String.format("OperationType: %s not supported by Kudu", operationType));
        }
        buildPartialRow(kuduTable.getSchema(), newDeleteIgnore.getRow(), record, list, z, z2);
        return newDeleteIgnore;
    }
}
