package org.apache.ambari.logfeeder.output;

import java.io.File;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.ambari.logfeeder.common.LogFeederConstants;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
import org.apache.ambari.logfeeder.input.InputFileMarker;
import org.apache.ambari.logfeeder.output.spool.LogSpooler;
import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext;
import org.apache.ambari.logfeeder.output.spool.RolloverCondition;
import org.apache.ambari.logfeeder.output.spool.RolloverHandler;
import org.apache.ambari.logfeeder.plugin.input.InputMarker;
import org.apache.ambari.logfeeder.plugin.output.Output;
import org.apache.ambari.logfeeder.util.LogFeederHDFSUtil;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.ambari.logfeeder.util.PlaceholderUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/ambari/logfeeder/output/OutputHDFSFile.class */
public class OutputHDFSFile extends Output<LogFeederProps, InputFileMarker> implements RolloverHandler, RolloverCondition {
    private static final Logger LOG = Logger.getLogger(OutputHDFSFile.class);
    private static final long DEFAULT_ROLLOVER_THRESHOLD_TIME_SECONDS = 300;
    private long rolloverThresholdTimeMillis;
    private LogSpooler logSpooler;
    private LogFeederProps logFeederProps;
    private ConcurrentLinkedQueue<File> localReadyFiles = new ConcurrentLinkedQueue<>();
    private final Object readyMonitor = new Object();
    private Thread hdfsCopyThread = null;
    private String filenamePrefix = "service-logs-";
    private String hdfsOutDir = null;
    private String hdfsHost = null;
    private String hdfsPort = null;
    private FileSystem fileSystem = null;

    public void init(LogFeederProps logFeederProps) throws Exception {
        this.logFeederProps = logFeederProps;
        this.hdfsOutDir = getStringValue("hdfs_out_dir");
        this.hdfsHost = getStringValue("hdfs_host");
        this.hdfsPort = getStringValue("hdfs_port");
        this.rolloverThresholdTimeMillis = getLongValue("rollover_sec", Long.valueOf(DEFAULT_ROLLOVER_THRESHOLD_TIME_SECONDS)).longValue() * 1000;
        this.filenamePrefix = getStringValue("file_name_prefix", this.filenamePrefix);
        if (StringUtils.isEmpty(this.hdfsOutDir)) {
            LOG.error("HDFS config property <hdfs_out_dir> is not set in config file.");
            return;
        }
        if (StringUtils.isEmpty(this.hdfsHost)) {
            LOG.error("HDFS config property <hdfs_host> is not set in config file.");
            return;
        }
        if (StringUtils.isEmpty(this.hdfsPort)) {
            LOG.error("HDFS config property <hdfs_port> is not set in config file.");
            return;
        }
        this.hdfsOutDir = PlaceholderUtil.replaceVariables(this.hdfsOutDir, buildContextParam());
        LOG.info("hdfs Output dir=" + this.hdfsOutDir);
        this.logSpooler = new LogSpooler(logFeederProps.getTmpDir() + "hdfs/service/", this.filenamePrefix, this, this);
        startHDFSCopyThread();
    }

    public void close() {
        LOG.info("Closing file." + getShortDescription());
        this.logSpooler.rollover();
        stopHDFSCopyThread();
        setClosed(true);
    }

    public synchronized void write(String str, InputFileMarker inputFileMarker) throws Exception {
        if (str != null) {
            this.logSpooler.add(str);
            this.statMetric.value++;
        }
    }

    public String getShortDescription() {
        return "output:destination=hdfs,hdfsOutDir=" + this.hdfsOutDir;
    }

    private void startHDFSCopyThread() {
        this.hdfsCopyThread = new Thread("hdfsCopyThread") { // from class: org.apache.ambari.logfeeder.output.OutputHDFSFile.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (true) {
                    try {
                        Iterator it = OutputHDFSFile.this.localReadyFiles.iterator();
                        while (it.hasNext()) {
                            File file = (File) it.next();
                            OutputHDFSFile.this.fileSystem = LogFeederHDFSUtil.buildFileSystem(OutputHDFSFile.this.hdfsHost, OutputHDFSFile.this.hdfsPort);
                            if (OutputHDFSFile.this.fileSystem != null && file.exists()) {
                                String str = OutputHDFSFile.this.hdfsOutDir + LogFeederConstants.S3_PATH_SEPARATOR + file.getName();
                                String absolutePath = file.getAbsolutePath();
                                if (LogFeederHDFSUtil.copyFromLocal(file.getAbsolutePath(), str, OutputHDFSFile.this.fileSystem, true, true)) {
                                    OutputHDFSFile.LOG.debug("File copy to hdfs hdfspath :" + str + " and deleted local file :" + absolutePath);
                                } else {
                                    OutputHDFSFile.LOG.error("Hdfs file copy  failed for hdfspath :" + str + " and localpath :" + absolutePath);
                                }
                            }
                            it.remove();
                        }
                        try {
                            synchronized (OutputHDFSFile.this.readyMonitor) {
                                if (OutputHDFSFile.this.localReadyFiles.isEmpty()) {
                                    OutputHDFSFile.this.readyMonitor.wait();
                                }
                            }
                        } catch (InterruptedException e) {
                            OutputHDFSFile.LOG.error(e.getLocalizedMessage(), e);
                        }
                    } catch (Exception e2) {
                        OutputHDFSFile.LOG.error("Exception in hdfsCopyThread errorMsg:" + e2.getLocalizedMessage(), e2);
                        return;
                    }
                }
            }
        };
        this.hdfsCopyThread.setDaemon(true);
        this.hdfsCopyThread.start();
    }

    private void stopHDFSCopyThread() {
        if (this.hdfsCopyThread != null) {
            LOG.info("waiting till copy all local files to hdfs.......");
            while (!this.localReadyFiles.isEmpty()) {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    LOG.error(e.getLocalizedMessage(), e);
                }
                LOG.debug("still waiting to copy all local files to hdfs.......");
            }
            LOG.info("calling interrupt method for hdfsCopyThread to stop it.");
            try {
                this.hdfsCopyThread.interrupt();
            } catch (SecurityException e2) {
                LOG.error(" Current thread : '" + Thread.currentThread().getName() + "' does not have permission to interrupt the Thread: '" + this.hdfsCopyThread.getName() + "'");
            }
            LogFeederHDFSUtil.closeFileSystem(this.fileSystem);
        }
    }

    private HashMap<String, String> buildContextParam() {
        HashMap<String, String> hashMap = new HashMap<>();
        hashMap.put(LogFeederConstants.SOLR_HOST, LogFeederUtil.hostName);
        return hashMap;
    }

    private void addFileInReadyList(File file) {
        this.localReadyFiles.add(file);
        try {
            synchronized (this.readyMonitor) {
                this.readyMonitor.notifyAll();
            }
        } catch (Exception e) {
            LOG.error(e.getLocalizedMessage(), e);
        }
    }

    public void copyFile(File file, InputMarker inputMarker) throws UnsupportedOperationException {
        throw new UnsupportedOperationException("copyFile method is not yet supported for output=hdfs");
    }

    @Override // org.apache.ambari.logfeeder.output.spool.RolloverHandler
    public void handleRollover(File file) {
        addFileInReadyList(file);
    }

    @Override // org.apache.ambari.logfeeder.output.spool.RolloverCondition
    public boolean shouldRollover(LogSpoolerContext logSpoolerContext) {
        boolean z = new Date().getTime() - logSpoolerContext.getActiveLogCreationTime().getTime() > this.rolloverThresholdTimeMillis;
        if (z) {
            LOG.info("Detecting that time since file creation time " + logSpoolerContext.getActiveLogCreationTime() + " has crossed threshold (msecs) " + this.rolloverThresholdTimeMillis);
        }
        return z;
    }

    public String getOutputType() {
        throw new IllegalStateException("This method should be overriden if the Output wants to monitor the configuration");
    }

    public Long getPendingCount() {
        return 0L;
    }

    public String getWriteBytesMetricName() {
        return "output.hdfs.write_bytes";
    }

    public String getStatMetricName() {
        return "output.hdfs.write_logs";
    }
}
