package org.apache.hadoop.fs.azurebfs.services;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidIngressServiceException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.store.DataBlocks;
import org.apache.hadoop.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.class */
public class AzureDfsToBlobIngressFallbackHandler extends AzureDFSIngressHandler {
    private static final Logger LOG = LoggerFactory.getLogger(AbfsOutputStream.class);
    private final AzureBlobBlockManager blobBlockManager;
    private final String eTag;
    private final Lock lock;

    public AzureDfsToBlobIngressFallbackHandler(AbfsOutputStream abfsOutputStream, DataBlocks.BlockFactory blockFactory, int i, String str, AbfsClientHandler abfsClientHandler) throws AzureBlobFileSystemException {
        super(abfsOutputStream, abfsClientHandler);
        this.lock = new ReentrantLock();
        this.eTag = str;
        this.blobBlockManager = new AzureBlobBlockManager(abfsOutputStream, blockFactory, i);
        LOG.trace("Created a new BlobFallbackIngress Handler for AbfsOutputStream instance {} for path {}", abfsOutputStream.getStreamID(), abfsOutputStream.getPath());
    }

    @Override // org.apache.hadoop.fs.azurebfs.services.AzureDFSIngressHandler, org.apache.hadoop.fs.azurebfs.services.AzureIngressHandler
    public int bufferData(AbfsBlock abfsBlock, byte[] bArr, int i, int i2) throws IOException {
        LOG.trace("Buffering data of length {} to block at offset {}", Integer.valueOf(i2), Integer.valueOf(i));
        return super.bufferData(abfsBlock, bArr, i, i2);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.fs.azurebfs.services.AzureDFSIngressHandler, org.apache.hadoop.fs.azurebfs.services.AzureIngressHandler
    public AbfsRestOperation remoteWrite(AbfsBlock abfsBlock, DataBlocks.BlockUploadData blockUploadData, AppendRequestParameters appendRequestParameters, TracingContext tracingContext) throws IOException {
        TracingContext tracingContext2 = new TracingContext(tracingContext);
        tracingContext2.setIngressHandler("FB T " + String.valueOf(Thread.currentThread().getId()));
        tracingContext2.setPosition(String.valueOf(abfsBlock.getOffset()));
        try {
            AbfsRestOperation remoteWrite = super.remoteWrite(abfsBlock, blockUploadData, appendRequestParameters, tracingContext2);
            this.blobBlockManager.updateEntry(abfsBlock);
            return remoteWrite;
        } catch (AbfsRestOperationException e) {
            if (shouldIngressHandlerBeSwitched(e)) {
                LOG.error("Error in remote write requiring handler switch for path {}", getAbfsOutputStream().getPath(), e);
                throw getIngressHandlerSwitchException(e);
            }
            LOG.error("Error in remote write for path {} and offset {}", new Object[]{getAbfsOutputStream().getPath(), abfsBlock.getOffset(), e});
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.fs.azurebfs.services.AzureDFSIngressHandler, org.apache.hadoop.fs.azurebfs.services.AzureIngressHandler
    public synchronized AbfsRestOperation remoteFlush(long j, boolean z, boolean z2, String str, TracingContext tracingContext) throws IOException {
        if (!this.blobBlockManager.hasListToCommit()) {
            return null;
        }
        try {
            TracingContext tracingContext2 = new TracingContext(tracingContext);
            tracingContext2.setIngressHandler("FB");
            tracingContext2.setPosition(String.valueOf(j));
            return super.remoteFlush(j, z, z2, str, tracingContext2);
        } catch (AbfsRestOperationException e) {
            if (shouldIngressHandlerBeSwitched(e)) {
                LOG.error("Error in remote flush requiring handler switch for path {}", getAbfsOutputStream().getPath(), e);
                throw getIngressHandlerSwitchException(e);
            }
            LOG.error("Error in remote flush for path {} and offset {}", new Object[]{getAbfsOutputStream().getPath(), Long.valueOf(j), e});
            throw e;
        }
    }

    @Override // org.apache.hadoop.fs.azurebfs.services.AzureDFSIngressHandler, org.apache.hadoop.fs.azurebfs.services.AzureIngressHandler
    public AzureBlockManager getBlockManager() {
        return this.blobBlockManager;
    }

    @Override // org.apache.hadoop.fs.azurebfs.services.AzureDFSIngressHandler, org.apache.hadoop.fs.azurebfs.services.AzureIngressHandler
    @VisibleForTesting
    public String getETag() {
        this.lock.lock();
        try {
            return this.eTag;
        } finally {
            this.lock.unlock();
        }
    }

    @Override // org.apache.hadoop.fs.azurebfs.services.AzureDFSIngressHandler, org.apache.hadoop.fs.azurebfs.services.AzureIngressHandler
    protected void writeAppendBlobCurrentBufferToService() throws IOException {
        AbfsRestOperation remoteAppendBlobWrite;
        AbfsBlock activeBlock = this.blobBlockManager.getActiveBlock();
        if (getAbfsOutputStream().hasActiveBlockDataToUpload()) {
            int dataSize = activeBlock.dataSize();
            Closeable startUpload = activeBlock.startUpload();
            if (this.blobBlockManager.hasActiveBlock()) {
                this.blobBlockManager.clearActiveBlock();
            }
            getAbfsOutputStream().getOutputStreamStatistics().writeCurrentBuffer();
            getAbfsOutputStream().getOutputStreamStatistics().bytesToUpload(dataSize);
            long position = getAbfsOutputStream().getPosition();
            getAbfsOutputStream().setPosition(position + dataSize);
            AbfsPerfInfo abfsPerfInfo = new AbfsPerfInfo(getClient().getAbfsPerfTracker(), "writeCurrentBufferToService", AbfsHttpConstants.APPEND_ACTION);
            try {
                LOG.trace("Writing current buffer to service at offset {} and path {}", Long.valueOf(position), getAbfsOutputStream().getPath());
                AppendRequestParameters appendRequestParameters = new AppendRequestParameters(position, 0, dataSize, AppendRequestParameters.Mode.APPEND_MODE, true, getAbfsOutputStream().getLeaseId(), getAbfsOutputStream().isExpectHeaderEnabled());
                try {
                    try {
                        remoteAppendBlobWrite = remoteAppendBlobWrite(getAbfsOutputStream().getPath(), startUpload, activeBlock, appendRequestParameters, new TracingContext(getAbfsOutputStream().getTracingContext()));
                        IOUtils.closeStreams(new Closeable[]{startUpload, activeBlock});
                    } catch (InvalidIngressServiceException e) {
                        LOG.debug("InvalidIngressServiceException caught for path: {}, switching handler and retrying remoteAppendBlobWrite.", getAbfsOutputStream().getPath());
                        getAbfsOutputStream().switchHandler();
                        remoteAppendBlobWrite = getAbfsOutputStream().getIngressHandler().remoteAppendBlobWrite(getAbfsOutputStream().getPath(), startUpload, activeBlock, appendRequestParameters, new TracingContext(getAbfsOutputStream().getTracingContext()));
                        IOUtils.closeStreams(new Closeable[]{startUpload, activeBlock});
                    }
                    if (remoteAppendBlobWrite != null) {
                        getAbfsOutputStream().getCachedSasToken().update(remoteAppendBlobWrite.getSasToken());
                        getAbfsOutputStream().getOutputStreamStatistics().uploadSuccessful(dataSize);
                        abfsPerfInfo.registerResult(remoteAppendBlobWrite.getResult());
                        abfsPerfInfo.registerSuccess(true);
                    }
                    abfsPerfInfo.close();
                } catch (Throwable th) {
                    IOUtils.closeStreams(new Closeable[]{startUpload, activeBlock});
                    throw th;
                }
            } catch (Throwable th2) {
                try {
                    abfsPerfInfo.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
                throw th2;
            }
        }
    }
}
