package org.apache.ambari.logfeeder.output;

import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ambari.logfeeder.common.LogFeederConstants;
import org.apache.ambari.logfeeder.util.CompressionUtil;
import org.apache.ambari.logfeeder.util.S3Util;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/ambari/logfeeder/output/S3Uploader.class */
public class S3Uploader implements Runnable {
    private static final Logger LOG = Logger.getLogger(S3Uploader.class);
    public static final String POISON_PILL = "POISON-PILL";
    private final S3OutputConfiguration s3OutputConfiguration;
    private final boolean deleteOnEnd;
    private final String logType;
    private final AtomicBoolean stopRunningThread = new AtomicBoolean(false);
    private final BlockingQueue<String> fileContextsToUpload = new LinkedBlockingQueue();

    public S3Uploader(S3OutputConfiguration s3OutputConfiguration, boolean z, String str) {
        this.s3OutputConfiguration = s3OutputConfiguration;
        this.deleteOnEnd = z;
        this.logType = str;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startUploaderThread() {
        Thread thread = new Thread(this, "s3-uploader-thread-" + this.logType);
        thread.setDaemon(true);
        thread.start();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stopUploaderThread() {
        this.stopRunningThread.set(true);
        if (this.fileContextsToUpload.offer(POISON_PILL)) {
            return;
        }
        LOG.warn("Could not add poison pill to interrupt uploader thread.");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addFileForUpload(String str) {
        if (this.fileContextsToUpload.offer(str)) {
            return;
        }
        LOG.error("Could not add file " + str + " for upload.");
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!this.stopRunningThread.get()) {
            try {
                String take = this.fileContextsToUpload.take();
                if (POISON_PILL.equals(take)) {
                    LOG.warn("Found poison pill while waiting for files to upload, exiting");
                    return;
                }
                uploadFile(new File(take), this.logType);
            } catch (InterruptedException e) {
                LOG.error("Interrupted while waiting for elements from fileContextsToUpload", e);
                return;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String uploadFile(File file, String str) {
        String s3BucketName = this.s3OutputConfiguration.getS3BucketName();
        String s3AccessKey = this.s3OutputConfiguration.getS3AccessKey();
        String s3SecretKey = this.s3OutputConfiguration.getS3SecretKey();
        String compressionAlgo = this.s3OutputConfiguration.getCompressionAlgo();
        String str2 = file.getName() + "." + compressionAlgo;
        String resolvedPath = new S3LogPathResolver().getResolvedPath(this.s3OutputConfiguration.getS3Path() + LogFeederConstants.S3_PATH_SEPARATOR + str, str2, this.s3OutputConfiguration.getCluster());
        LOG.info(String.format("keyPrefix=%s, keySuffix=%s, s3Path=%s", this.s3OutputConfiguration.getS3Path(), str2, resolvedPath));
        File createCompressedFileForUpload = createCompressedFileForUpload(file, compressionAlgo);
        LOG.info("Starting S3 upload " + createCompressedFileForUpload + " -> " + s3BucketName + ", " + resolvedPath);
        uploadFileToS3(s3BucketName, resolvedPath, createCompressedFileForUpload, s3AccessKey, s3SecretKey);
        createCompressedFileForUpload.delete();
        if (this.deleteOnEnd) {
            LOG.info("Deleting input file as required");
            if (!file.delete()) {
                LOG.error("Could not delete file " + file.getAbsolutePath() + " after upload to S3");
            }
        }
        return resolvedPath;
    }

    @VisibleForTesting
    protected void uploadFileToS3(String str, String str2, File file, String str3, String str4) {
        TransferManager transferManager = S3Util.getTransferManager(str3, str4);
        try {
            try {
                transferManager.upload(str, str2, file).waitForUploadResult();
                S3Util.shutdownTransferManager(transferManager);
            } catch (AmazonClientException | InterruptedException e) {
                LOG.error("s3 uploading failed for file :" + file.getAbsolutePath(), e);
                S3Util.shutdownTransferManager(transferManager);
            }
        } catch (Throwable th) {
            S3Util.shutdownTransferManager(transferManager);
            throw th;
        }
    }

    @VisibleForTesting
    protected File createCompressedFileForUpload(File file, String str) {
        return CompressionUtil.compressFile(file, new File(file.getParent(), file.getName() + "_" + new Date().getTime() + "." + str), str);
    }
}
