package alluxio.master.block;

import alluxio.RpcUtils;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.RegisterLeaseNotFoundException;
import alluxio.grpc.BlockHeartbeatPRequest;
import alluxio.grpc.BlockHeartbeatPResponse;
import alluxio.grpc.BlockMasterWorkerServiceGrpc;
import alluxio.grpc.CommitBlockInUfsPRequest;
import alluxio.grpc.CommitBlockInUfsPResponse;
import alluxio.grpc.CommitBlockPRequest;
import alluxio.grpc.CommitBlockPResponse;
import alluxio.grpc.GetRegisterLeasePRequest;
import alluxio.grpc.GetRegisterLeasePResponse;
import alluxio.grpc.GetWorkerIdPRequest;
import alluxio.grpc.GetWorkerIdPResponse;
import alluxio.grpc.GrpcUtils;
import alluxio.grpc.LocationBlockIdListEntry;
import alluxio.grpc.RegisterWorkerPOptions;
import alluxio.grpc.RegisterWorkerPRequest;
import alluxio.grpc.RegisterWorkerPResponse;
import alluxio.metrics.Metric;
import alluxio.proto.meta.Block;
import com.google.common.base.Preconditions;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:alluxio/master/block/BlockMasterWorkerServiceHandler.class */
public final class BlockMasterWorkerServiceHandler extends BlockMasterWorkerServiceGrpc.BlockMasterWorkerServiceImplBase {
    private static final Logger LOG = LoggerFactory.getLogger(BlockMasterWorkerServiceHandler.class);
    private final BlockMaster mBlockMaster;

    public BlockMasterWorkerServiceHandler(BlockMaster blockMaster) {
        Preconditions.checkNotNull(blockMaster, "blockMaster");
        this.mBlockMaster = blockMaster;
    }

    public void blockHeartbeat(BlockHeartbeatPRequest blockHeartbeatPRequest, StreamObserver<BlockHeartbeatPResponse> streamObserver) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Block heartbeat request is {} bytes, {} added blocks and {} removed blocks", new Object[]{Integer.valueOf(blockHeartbeatPRequest.getSerializedSize()), Integer.valueOf(blockHeartbeatPRequest.getAddedBlocksCount()), Integer.valueOf(blockHeartbeatPRequest.getRemovedBlockIdsCount())});
        }
        long workerId = blockHeartbeatPRequest.getWorkerId();
        Map capacityBytesOnTiersMap = blockHeartbeatPRequest.getOptions().getCapacityBytesOnTiersMap();
        Map usedBytesOnTiersMap = blockHeartbeatPRequest.getUsedBytesOnTiersMap();
        List removedBlockIdsList = blockHeartbeatPRequest.getRemovedBlockIdsList();
        Map lostStorageMap = blockHeartbeatPRequest.getLostStorageMap();
        Map<Block.BlockLocation, List<Long>> reconstructBlocksOnLocationMap = reconstructBlocksOnLocationMap(blockHeartbeatPRequest.getAddedBlocksList(), workerId);
        List list = (List) blockHeartbeatPRequest.getOptions().getMetricsList().stream().map(Metric::fromProto).collect(Collectors.toList());
        RpcUtils.call(LOG, () -> {
            return BlockHeartbeatPResponse.newBuilder().setCommand(this.mBlockMaster.workerHeartbeat(workerId, capacityBytesOnTiersMap, usedBytesOnTiersMap, removedBlockIdsList, reconstructBlocksOnLocationMap, lostStorageMap, list)).build();
        }, "blockHeartbeat", "request=%s", streamObserver, new Object[]{blockHeartbeatPRequest});
    }

    public void commitBlock(CommitBlockPRequest commitBlockPRequest, StreamObserver<CommitBlockPResponse> streamObserver) {
        long workerId = commitBlockPRequest.getWorkerId();
        long usedBytesOnTier = commitBlockPRequest.getUsedBytesOnTier();
        String tierAlias = commitBlockPRequest.getTierAlias();
        long blockId = commitBlockPRequest.getBlockId();
        String mediumType = commitBlockPRequest.getMediumType();
        long length = commitBlockPRequest.getLength();
        RpcUtils.call(LOG, () -> {
            this.mBlockMaster.commitBlock(workerId, usedBytesOnTier, tierAlias, mediumType, blockId, length);
            return CommitBlockPResponse.getDefaultInstance();
        }, "commitBlock", "request=%s", streamObserver, new Object[]{commitBlockPRequest});
    }

    public void commitBlockInUfs(CommitBlockInUfsPRequest commitBlockInUfsPRequest, StreamObserver<CommitBlockInUfsPResponse> streamObserver) {
        RpcUtils.call(LOG, () -> {
            this.mBlockMaster.commitBlockInUFS(commitBlockInUfsPRequest.getBlockId(), commitBlockInUfsPRequest.getLength());
            return CommitBlockInUfsPResponse.getDefaultInstance();
        }, "commitBlock", "request=%s", streamObserver, new Object[]{commitBlockInUfsPRequest});
    }

    public void getWorkerId(GetWorkerIdPRequest getWorkerIdPRequest, StreamObserver<GetWorkerIdPResponse> streamObserver) {
        RpcUtils.call(LOG, () -> {
            return GetWorkerIdPResponse.newBuilder().setWorkerId(this.mBlockMaster.getWorkerId(GrpcUtils.fromProto(getWorkerIdPRequest.getWorkerNetAddress()))).build();
        }, "getWorkerId", "request=%s", streamObserver, new Object[]{getWorkerIdPRequest});
    }

    public void requestRegisterLease(GetRegisterLeasePRequest getRegisterLeasePRequest, StreamObserver<GetRegisterLeasePResponse> streamObserver) {
        RpcUtils.call(LOG, () -> {
            return GrpcUtils.toProto(getRegisterLeasePRequest.getWorkerId(), this.mBlockMaster.tryAcquireRegisterLease(getRegisterLeasePRequest));
        }, "getRegisterLease", "request=%s", streamObserver, new Object[]{getRegisterLeasePRequest});
    }

    public void registerWorker(RegisterWorkerPRequest registerWorkerPRequest, StreamObserver<RegisterWorkerPResponse> streamObserver) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Register worker request is {} bytes, containing {} blocks", Integer.valueOf(registerWorkerPRequest.getSerializedSize()), Integer.valueOf(registerWorkerPRequest.getCurrentBlocksCount()));
        }
        long workerId = registerWorkerPRequest.getWorkerId();
        RegisterWorkerPOptions options = registerWorkerPRequest.getOptions();
        RpcUtils.call(LOG, () -> {
            if (Configuration.getBoolean(PropertyKey.MASTER_WORKER_REGISTER_LEASE_ENABLED) && !this.mBlockMaster.hasRegisterLease(workerId)) {
                String format = String.format("Worker %s does not have a lease or the lease has expired. The worker should acquire a new lease and retry to register.", Long.valueOf(workerId));
                LOG.warn(format);
                throw new RegisterLeaseNotFoundException(format);
            }
            LOG.debug("Worker {} proceeding to register...", Long.valueOf(workerId));
            this.mBlockMaster.workerRegister(workerId, registerWorkerPRequest.getStorageTiersList(), registerWorkerPRequest.getTotalBytesOnTiersMap(), registerWorkerPRequest.getUsedBytesOnTiersMap(), reconstructBlocksOnLocationMap(registerWorkerPRequest.getCurrentBlocksList(), workerId), registerWorkerPRequest.getLostStorageMap(), options);
            LOG.info("Worker {} finished registering, releasing its lease.", Long.valueOf(workerId));
            this.mBlockMaster.releaseRegisterLease(workerId);
            return RegisterWorkerPResponse.getDefaultInstance();
        }, "registerWorker", true, "request=%s", streamObserver, new Object[]{Long.valueOf(workerId)});
    }

    public StreamObserver<RegisterWorkerPRequest> registerWorkerStream(StreamObserver<RegisterWorkerPResponse> streamObserver) {
        return new RegisterStreamObserver(this.mBlockMaster, streamObserver);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Map<Block.BlockLocation, List<Long>> reconstructBlocksOnLocationMap(List<LocationBlockIdListEntry> list, long j) {
        return (Map) list.stream().collect(Collectors.toMap(locationBlockIdListEntry -> {
            return Block.BlockLocation.newBuilder().setTier(locationBlockIdListEntry.getKey().getTierAlias()).setMediumType(locationBlockIdListEntry.getKey().getMediumType()).setWorkerId(j).build();
        }, locationBlockIdListEntry2 -> {
            return locationBlockIdListEntry2.getValue().getBlockIdList();
        }, (list2, list3) -> {
            throw new AssertionError(String.format("Duplicate locations found for worker %s with LocationBlockIdListEntry objects %s", Long.valueOf(j), (String) list.stream().map(locationBlockIdListEntry3 -> {
                return locationBlockIdListEntry3.getKey().toString();
            }).collect(Collectors.joining(","))));
        }));
    }
}
