package org.opensearch.repositories.s3.async;

import com.jcraft.jzlib.JZlib;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Base64;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.ExceptionsHelper;
import org.opensearch.common.StreamContext;
import org.opensearch.common.blobstore.exception.CorruptFileException;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.common.util.ByteUtils;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.repositories.s3.SocketAccess;
import org.opensearch.repositories.s3.StatsMetricPublisher;
import org.opensearch.repositories.s3.io.CheckedContainer;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.awssdk.utils.CompletableFutureUtils;

/* loaded from: input_file:org/opensearch/repositories/s3/async/AsyncTransferManager.class */
public final class AsyncTransferManager {
    private static final Logger log = LogManager.getLogger(AsyncTransferManager.class);
    private final ExecutorService executorService;
    private final ExecutorService priorityExecutorService;
    private final ExecutorService urgentExecutorService;
    private final long minimumPartSize;
    private final long maxRetryablePartSize;
    private final TransferSemaphoresHolder transferSemaphoresHolder;
    private static final long MAX_UPLOAD_PARTS = 10000;

    public AsyncTransferManager(long j, ExecutorService executorService, ExecutorService executorService2, ExecutorService executorService3, TransferSemaphoresHolder transferSemaphoresHolder) {
        this.executorService = executorService;
        this.priorityExecutorService = executorService2;
        this.minimumPartSize = j;
        this.maxRetryablePartSize = (long) (j + (0.1d * j));
        this.urgentExecutorService = executorService3;
        this.transferSemaphoresHolder = transferSemaphoresHolder;
    }

    public CompletableFuture<Void> uploadObject(S3AsyncClient s3AsyncClient, UploadRequest uploadRequest, StreamContext streamContext, StatsMetricPublisher statsMetricPublisher) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        try {
            if (streamContext.getNumberOfParts() == 1) {
                log.debug(() -> {
                    return "Starting the upload as a single upload part request";
                });
                Semaphore maybeAcquireSemaphore = AsyncPartsHandler.maybeAcquireSemaphore(this.transferSemaphoresHolder, this.transferSemaphoresHolder.createRequestContext(), uploadRequest.getWritePriority(), uploadRequest.getKey());
                try {
                    uploadInOneChunk(s3AsyncClient, uploadRequest, streamContext, completableFuture, statsMetricPublisher, maybeAcquireSemaphore);
                } catch (Exception e) {
                    if (maybeAcquireSemaphore != null) {
                        maybeAcquireSemaphore.release();
                    }
                    throw e;
                }
            } else {
                log.debug(() -> {
                    return "Starting the upload as multipart upload request";
                });
                uploadInParts(s3AsyncClient, uploadRequest, streamContext, completableFuture, statsMetricPublisher);
            }
        } catch (Throwable th) {
            completableFuture.completeExceptionally(th);
        }
        return completableFuture;
    }

    private void uploadInParts(S3AsyncClient s3AsyncClient, UploadRequest uploadRequest, StreamContext streamContext, CompletableFuture<Void> completableFuture, StatsMetricPublisher statsMetricPublisher) {
        CreateMultipartUploadRequest.Builder overrideConfiguration = CreateMultipartUploadRequest.builder().bucket(uploadRequest.getBucket()).key(uploadRequest.getKey()).overrideConfiguration(builder -> {
            builder.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector);
        });
        if (CollectionUtils.isNotEmpty(uploadRequest.getMetadata())) {
            overrideConfiguration.metadata(uploadRequest.getMetadata());
        }
        if (uploadRequest.doRemoteDataIntegrityCheck()) {
            overrideConfiguration.checksumAlgorithm(ChecksumAlgorithm.CRC32);
        }
        CompletableFuture completableFuture2 = (CompletableFuture) SocketAccess.doPrivileged(() -> {
            return s3AsyncClient.createMultipartUpload((CreateMultipartUploadRequest) overrideConfiguration.build());
        });
        CompletableFutureUtils.forwardExceptionTo(completableFuture, completableFuture2);
        try {
            CreateMultipartUploadResponse createMultipartUploadResponse = (CreateMultipartUploadResponse) completableFuture2.get();
            String uploadId = createMultipartUploadResponse.uploadId();
            log.debug(() -> {
                return "Initiated new multipart upload, uploadId: " + createMultipartUploadResponse.uploadId();
            });
            doUploadInParts(s3AsyncClient, uploadRequest, streamContext, completableFuture, uploadId, statsMetricPublisher);
        } catch (Exception e) {
            handleException(completableFuture, () -> {
                return "Failed to initiate multipart upload";
            }, e);
        }
    }

    private void doUploadInParts(S3AsyncClient s3AsyncClient, UploadRequest uploadRequest, StreamContext streamContext, CompletableFuture<Void> completableFuture, String str, StatsMetricPublisher statsMetricPublisher) {
        AtomicReferenceArray atomicReferenceArray = new AtomicReferenceArray(streamContext.getNumberOfParts());
        AtomicReferenceArray atomicReferenceArray2 = new AtomicReferenceArray(streamContext.getNumberOfParts());
        try {
            CompletableFutureUtils.allOfExceptionForwarded((CompletableFuture[]) AsyncPartsHandler.uploadParts(s3AsyncClient, this.executorService, this.priorityExecutorService, this.urgentExecutorService, uploadRequest, streamContext, str, atomicReferenceArray, atomicReferenceArray2, statsMetricPublisher, uploadRequest.isUploadRetryEnabled(), this.transferSemaphoresHolder, this.maxRetryablePartSize).toArray(i -> {
                return new CompletableFuture[i];
            })).thenApply(r5 -> {
                try {
                    uploadRequest.getUploadFinalizer().accept(true);
                    return r5;
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }).thenApply(r9 -> {
                if (!uploadRequest.doRemoteDataIntegrityCheck()) {
                    return null;
                }
                mergeAndVerifyChecksum(atomicReferenceArray2, uploadRequest.getKey(), uploadRequest.getExpectedChecksum().longValue());
                return null;
            }).thenCompose(obj -> {
                return completeMultipartUpload(s3AsyncClient, uploadRequest, str, atomicReferenceArray, statsMetricPublisher);
            }).handle((BiFunction) handleExceptionOrResponse(s3AsyncClient, uploadRequest, completableFuture, str)).exceptionally(th -> {
                handleException(completableFuture, () -> {
                    return "Unexpected exception occurred";
                }, th);
                return null;
            });
        } catch (Exception e) {
            try {
                AsyncPartsHandler.cleanUpParts(s3AsyncClient, uploadRequest, str);
                completableFuture.completeExceptionally(e);
            } catch (Throwable th2) {
                completableFuture.completeExceptionally(e);
                throw th2;
            }
        }
    }

    private void mergeAndVerifyChecksum(AtomicReferenceArray<CheckedContainer> atomicReferenceArray, String str, long j) {
        long fromBase64String = fromBase64String(atomicReferenceArray.get(0).getChecksum());
        for (int i = 1; i < atomicReferenceArray.length(); i++) {
            fromBase64String = JZlib.crc32_combine(fromBase64String, fromBase64String(atomicReferenceArray.get(i).getChecksum()), atomicReferenceArray.get(i).getContentLength());
        }
        if (fromBase64String != j) {
            throw new RuntimeException((Throwable) new CorruptFileException("File level checksums didn't match combined part checksums", str));
        }
    }

    private BiFunction<CompleteMultipartUploadResponse, Throwable, Void> handleExceptionOrResponse(S3AsyncClient s3AsyncClient, UploadRequest uploadRequest, CompletableFuture<Void> completableFuture, String str) {
        return (completeMultipartUploadResponse, th) -> {
            if (th == null) {
                completableFuture.complete(null);
                return null;
            }
            AsyncPartsHandler.cleanUpParts(s3AsyncClient, uploadRequest, str);
            handleException(completableFuture, () -> {
                return "Failed to send multipart upload requests.";
            }, th);
            return null;
        };
    }

    private CompletableFuture<CompleteMultipartUploadResponse> completeMultipartUpload(S3AsyncClient s3AsyncClient, UploadRequest uploadRequest, String str, AtomicReferenceArray<CompletedPart> atomicReferenceArray, StatsMetricPublisher statsMetricPublisher) {
        log.debug(() -> {
            return new ParameterizedMessage("Sending completeMultipartUploadRequest, uploadId: {}", str);
        });
        IntStream range = IntStream.range(0, atomicReferenceArray.length());
        Objects.requireNonNull(atomicReferenceArray);
        CompleteMultipartUploadRequest completeMultipartUploadRequest = (CompleteMultipartUploadRequest) CompleteMultipartUploadRequest.builder().bucket(uploadRequest.getBucket()).key(uploadRequest.getKey()).uploadId(str).overrideConfiguration(builder -> {
            builder.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector);
        }).multipartUpload((CompletedMultipartUpload) CompletedMultipartUpload.builder().parts((CompletedPart[]) range.mapToObj(atomicReferenceArray::get).toArray(i -> {
            return new CompletedPart[i];
        })).build()).build();
        return (CompletableFuture) SocketAccess.doPrivileged(() -> {
            return s3AsyncClient.completeMultipartUpload(completeMultipartUploadRequest);
        });
    }

    private static String base64StringFromLong(Long l) {
        return Base64.getEncoder().encodeToString(Arrays.copyOfRange(ByteUtils.toByteArrayBE(l.longValue()), 4, 8));
    }

    private static long fromBase64String(String str) {
        if (Base64.getDecoder().decode(str).length != 4) {
            throw new IllegalArgumentException("Invalid Base64 encoded CRC32 checksum");
        }
        long j = 0;
        for (int i = 0; i < 4; i++) {
            j = (j << 8) | (r0[i] & 255);
        }
        return j;
    }

    private static void handleException(CompletableFuture<Void> completableFuture, Supplier<String> supplier, Throwable th) {
        Throwable cause = th instanceof CompletionException ? th.getCause() : th;
        if (cause instanceof Error) {
            completableFuture.completeExceptionally(cause);
        } else {
            completableFuture.completeExceptionally(SdkClientException.create(supplier.get(), cause));
        }
    }

    public long calculateOptimalPartSize(long j, WritePriority writePriority, boolean z) {
        return j < ByteSizeUnit.MB.toBytes(5L) ? j : (z && (writePriority == WritePriority.HIGH || writePriority == WritePriority.URGENT)) ? new ByteSizeValue(5L, ByteSizeUnit.MB).getBytes() : (long) Math.max(Math.ceil(j / 10000.0d), this.minimumPartSize);
    }

    private void uploadInOneChunk(S3AsyncClient s3AsyncClient, UploadRequest uploadRequest, StreamContext streamContext, CompletableFuture<Void> completableFuture, StatsMetricPublisher statsMetricPublisher, Semaphore semaphore) {
        PutObjectRequest.Builder overrideConfiguration = PutObjectRequest.builder().bucket(uploadRequest.getBucket()).key(uploadRequest.getKey()).contentLength(Long.valueOf(uploadRequest.getContentLength())).overrideConfiguration(builder -> {
            builder.addMetricPublisher(statsMetricPublisher.putObjectMetricPublisher);
        });
        if (CollectionUtils.isNotEmpty(uploadRequest.getMetadata())) {
            overrideConfiguration.metadata(uploadRequest.getMetadata());
        }
        if (uploadRequest.doRemoteDataIntegrityCheck()) {
            overrideConfiguration.checksumAlgorithm(ChecksumAlgorithm.CRC32);
            overrideConfiguration.checksumCRC32(base64StringFromLong(uploadRequest.getExpectedChecksum()));
        }
        PutObjectRequest putObjectRequest = (PutObjectRequest) overrideConfiguration.build();
        ExecutorService executorService = uploadRequest.getWritePriority() == WritePriority.URGENT ? this.urgentExecutorService : uploadRequest.getWritePriority() == WritePriority.HIGH ? this.priorityExecutorService : this.executorService;
        CompletableFuture completableFuture2 = (CompletableFuture) SocketAccess.doPrivileged(() -> {
            InputStream inputStream = null;
            try {
                InputStreamContainer provideStream = streamContext.provideStream(0);
                inputStream = AsyncPartsHandler.maybeRetryInputStream(provideStream.getInputStream(), uploadRequest.getWritePriority(), uploadRequest.isUploadRetryEnabled(), uploadRequest.getContentLength(), this.maxRetryablePartSize);
                return s3AsyncClient.putObject(putObjectRequest, AsyncRequestBody.fromInputStream(inputStream, Long.valueOf(provideStream.getContentLength()), executorService)).handle((putObjectResponse, th) -> {
                    releaseResourcesSafely(semaphore, inputStream, uploadRequest.getKey());
                    if (th == null) {
                        try {
                            uploadRequest.getUploadFinalizer().accept(true);
                            completableFuture.complete(null);
                            return null;
                        } catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    }
                    S3Exception unwrap = ExceptionsHelper.unwrap(th, new Class[]{S3Exception.class});
                    if (unwrap != null) {
                        S3Exception s3Exception = unwrap;
                        if (s3Exception.statusCode() == 400 && "BadDigest".equals(s3Exception.awsErrorDetails().errorCode())) {
                            throw new RuntimeException((Throwable) new CorruptFileException(s3Exception, uploadRequest.getKey()));
                        }
                    }
                    completableFuture.completeExceptionally(th);
                    return null;
                }).handle((obj, th2) -> {
                    if (th2 == null) {
                        return null;
                    }
                    deleteUploadedObject(s3AsyncClient, uploadRequest);
                    completableFuture.completeExceptionally(th2);
                    return null;
                });
            } catch (Exception e) {
                releaseResourcesSafely(semaphore, inputStream, uploadRequest.getKey());
                return CompletableFuture.failedFuture(e);
            }
        });
        CompletableFutureUtils.forwardExceptionTo(completableFuture, completableFuture2);
        CompletableFutureUtils.forwardResultTo(completableFuture2, completableFuture);
    }

    private void releaseResourcesSafely(Semaphore semaphore, InputStream inputStream, String str) {
        if (semaphore != null) {
            semaphore.release();
        }
        if (inputStream != null) {
            try {
                inputStream.close();
            } catch (IOException e) {
                log.error(() -> {
                    return new ParameterizedMessage("Failed to close stream while uploading single file {}.", str);
                }, e);
            }
        }
    }

    private void deleteUploadedObject(S3AsyncClient s3AsyncClient, UploadRequest uploadRequest) {
        DeleteObjectRequest deleteObjectRequest = (DeleteObjectRequest) DeleteObjectRequest.builder().bucket(uploadRequest.getBucket()).key(uploadRequest.getKey()).build();
        ((CompletableFuture) SocketAccess.doPrivileged(() -> {
            return s3AsyncClient.deleteObject(deleteObjectRequest);
        })).exceptionally(th -> {
            log.error(() -> {
                return new ParameterizedMessage("Failed to delete uploaded object of key {}", uploadRequest.getKey());
            }, th);
            return null;
        });
    }
}
