package org.apache.hadoop.fs.azurebfs.services;

import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidIngressServiceException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.contracts.services.BlobAppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.store.DataBlocks;
import org.apache.hadoop.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.class */
public class AzureBlobIngressHandler extends AzureIngressHandler {
    private static final Logger LOG = LoggerFactory.getLogger(AbfsOutputStream.class);
    private volatile String eTag;
    private final AzureBlobBlockManager blobBlockManager;
    private final AbfsBlobClient blobClient;
    private final AbfsClientHandler clientHandler;

    public AzureBlobIngressHandler(AbfsOutputStream abfsOutputStream, DataBlocks.BlockFactory blockFactory, int i, String str, AbfsClientHandler abfsClientHandler, AzureBlockManager azureBlockManager) throws AzureBlobFileSystemException {
        super(abfsOutputStream);
        this.eTag = str;
        if (azureBlockManager instanceof AzureBlobBlockManager) {
            this.blobBlockManager = (AzureBlobBlockManager) azureBlockManager;
        } else {
            this.blobBlockManager = new AzureBlobBlockManager(abfsOutputStream, blockFactory, i);
        }
        this.clientHandler = abfsClientHandler;
        this.blobClient = abfsClientHandler.getBlobClient();
        LOG.trace("Created a new BlobIngress Handler for AbfsOutputStream instance {} for path {}", abfsOutputStream.getStreamID(), abfsOutputStream.getPath());
    }

    @Override // org.apache.hadoop.fs.azurebfs.services.AzureIngressHandler
    protected int bufferData(AbfsBlock abfsBlock, byte[] bArr, int i, int i2) throws IOException {
        LOG.trace("Buffering data of length {} to block at offset {}", Integer.valueOf(i2), Integer.valueOf(i));
        return abfsBlock.write(bArr, i, i2);
    }

    @Override // org.apache.hadoop.fs.azurebfs.services.AzureIngressHandler
    protected AbfsRestOperation remoteWrite(AbfsBlock abfsBlock, DataBlocks.BlockUploadData blockUploadData, AppendRequestParameters appendRequestParameters, TracingContext tracingContext) throws IOException {
        appendRequestParameters.setBlobParams(new BlobAppendRequestParameters(abfsBlock.getBlockId(), getETag()));
        String valueOf = String.valueOf(Thread.currentThread().getId());
        TracingContext tracingContext2 = new TracingContext(tracingContext);
        tracingContext2.setIngressHandler("B T " + valueOf);
        tracingContext2.setPosition(String.valueOf(abfsBlock.getOffset()));
        try {
            LOG.trace("Starting remote write for block with ID {} and offset {}", abfsBlock.getBlockId(), abfsBlock.getOffset());
            AbfsRestOperation append = getClient().append(getAbfsOutputStream().getPath(), blockUploadData.toByteArray(), appendRequestParameters, getAbfsOutputStream().getCachedSasTokenString(), getAbfsOutputStream().getContextEncryptionAdapter(), tracingContext2);
            this.blobBlockManager.updateEntry(abfsBlock);
            return append;
        } catch (AbfsRestOperationException e) {
            LOG.error("Error in remote write requiring handler switch for path {}", getAbfsOutputStream().getPath(), e);
            if (shouldIngressHandlerBeSwitched(e)) {
                throw getIngressHandlerSwitchException(e);
            }
            LOG.error("Error in remote write for path {} and offset {}", new Object[]{getAbfsOutputStream().getPath(), abfsBlock.getOffset(), e});
            throw e;
        }
    }

    @Override // org.apache.hadoop.fs.azurebfs.services.AzureIngressHandler
    protected synchronized AbfsRestOperation remoteFlush(long j, boolean z, boolean z2, String str, TracingContext tracingContext) throws IOException {
        if (getAbfsOutputStream().isAppendBlob() || !this.blobBlockManager.hasListToCommit()) {
            return null;
        }
        try {
            String generateBlockListXml = AbfsBlobClient.generateBlockListXml(this.blobBlockManager.getBlockIdList());
            TracingContext tracingContext2 = new TracingContext(tracingContext);
            tracingContext2.setIngressHandler("B");
            tracingContext2.setPosition(String.valueOf(j));
            LOG.trace("Flushing data at offset {} for path {}", Long.valueOf(j), getAbfsOutputStream().getPath());
            AbfsRestOperation flush = getClient().flush(generateBlockListXml.getBytes(StandardCharsets.UTF_8), getAbfsOutputStream().getPath(), z2, getAbfsOutputStream().getCachedSasTokenString(), str, getETag(), getAbfsOutputStream().getContextEncryptionAdapter(), tracingContext2);
            setETag(flush.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG));
            return flush;
        } catch (AbfsRestOperationException e) {
            LOG.error("Error in remote flush requiring handler switch for path {}", getAbfsOutputStream().getPath(), e);
            if (shouldIngressHandlerBeSwitched(e)) {
                throw getIngressHandlerSwitchException(e);
            }
            LOG.error("Error in remote flush for path {} and offset {}", new Object[]{getAbfsOutputStream().getPath(), Long.valueOf(j), e});
            throw e;
        }
    }

    @Override // org.apache.hadoop.fs.azurebfs.services.AzureIngressHandler
    protected AbfsRestOperation remoteAppendBlobWrite(String str, DataBlocks.BlockUploadData blockUploadData, AbfsBlock abfsBlock, AppendRequestParameters appendRequestParameters, TracingContext tracingContext) throws IOException {
        try {
            return this.blobClient.appendBlock(str, appendRequestParameters, blockUploadData.toByteArray(), tracingContext);
        } catch (AbfsRestOperationException e) {
            LOG.error("Error in remote write requiring handler switch for path {}", getAbfsOutputStream().getPath(), e);
            if (shouldIngressHandlerBeSwitched(e)) {
                throw getIngressHandlerSwitchException(e);
            }
            LOG.error("Error in remote write for path {} and offset {}", new Object[]{getAbfsOutputStream().getPath(), abfsBlock.getOffset(), e});
            throw e;
        }
    }

    void setETag(String str) {
        this.eTag = str;
    }

    @Override // org.apache.hadoop.fs.azurebfs.services.AzureIngressHandler
    @VisibleForTesting
    public String getETag() {
        return this.eTag;
    }

    @Override // org.apache.hadoop.fs.azurebfs.services.AzureIngressHandler
    protected void writeAppendBlobCurrentBufferToService() throws IOException {
        AbfsRestOperation remoteAppendBlobWrite;
        AbfsBlock activeBlock = this.blobBlockManager.getActiveBlock();
        if (getAbfsOutputStream().hasActiveBlockDataToUpload()) {
            int dataSize = activeBlock.dataSize();
            Closeable startUpload = activeBlock.startUpload();
            if (this.blobBlockManager.hasActiveBlock()) {
                this.blobBlockManager.clearActiveBlock();
            }
            getAbfsOutputStream().getOutputStreamStatistics().writeCurrentBuffer();
            getAbfsOutputStream().getOutputStreamStatistics().bytesToUpload(dataSize);
            long position = getAbfsOutputStream().getPosition();
            getAbfsOutputStream().setPosition(position + dataSize);
            AbfsPerfInfo abfsPerfInfo = new AbfsPerfInfo(this.blobClient.getAbfsPerfTracker(), "writeCurrentBufferToService", AbfsHttpConstants.APPEND_ACTION);
            try {
                LOG.trace("Writing current buffer to service at offset {} and path {}", Long.valueOf(position), getAbfsOutputStream().getPath());
                AppendRequestParameters appendRequestParameters = new AppendRequestParameters(position, 0, dataSize, AppendRequestParameters.Mode.APPEND_MODE, true, getAbfsOutputStream().getLeaseId(), getAbfsOutputStream().isExpectHeaderEnabled());
                try {
                    try {
                        remoteAppendBlobWrite = remoteAppendBlobWrite(getAbfsOutputStream().getPath(), startUpload, activeBlock, appendRequestParameters, new TracingContext(getAbfsOutputStream().getTracingContext()));
                        IOUtils.closeStreams(new Closeable[]{startUpload, activeBlock});
                    } catch (InvalidIngressServiceException e) {
                        LOG.debug("InvalidIngressServiceException caught for path: {}, switching handler and retrying remoteAppendBlobWrite.", getAbfsOutputStream().getPath());
                        getAbfsOutputStream().switchHandler();
                        remoteAppendBlobWrite = getAbfsOutputStream().getIngressHandler().remoteAppendBlobWrite(getAbfsOutputStream().getPath(), startUpload, activeBlock, appendRequestParameters, new TracingContext(getAbfsOutputStream().getTracingContext()));
                        IOUtils.closeStreams(new Closeable[]{startUpload, activeBlock});
                    }
                    if (remoteAppendBlobWrite != null) {
                        getAbfsOutputStream().getCachedSasToken().update(remoteAppendBlobWrite.getSasToken());
                        getAbfsOutputStream().getOutputStreamStatistics().uploadSuccessful(dataSize);
                        abfsPerfInfo.registerResult(remoteAppendBlobWrite.getResult());
                        abfsPerfInfo.registerSuccess(true);
                    }
                    abfsPerfInfo.close();
                } catch (Throwable th) {
                    IOUtils.closeStreams(new Closeable[]{startUpload, activeBlock});
                    throw th;
                }
            } catch (Throwable th2) {
                try {
                    abfsPerfInfo.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
                throw th2;
            }
        }
    }

    @Override // org.apache.hadoop.fs.azurebfs.services.AzureIngressHandler
    public AzureBlockManager getBlockManager() {
        return this.blobBlockManager;
    }

    @Override // org.apache.hadoop.fs.azurebfs.services.AzureIngressHandler
    public AbfsBlobClient getClient() {
        return this.blobClient;
    }

    @VisibleForTesting
    public AbfsClientHandler getClientHandler() {
        return this.clientHandler;
    }
}
