package org.apache.kafka.common.network;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/common/network/SelectorTest.class */
public class SelectorTest {
    protected static final int BUFFER_SIZE = 4096;
    protected EchoServer server;
    protected Time time;
    protected Selector selector;
    protected ChannelBuilder channelBuilder;
    private Metrics metrics;

    @Before
    public void setup() throws Exception {
        HashMap hashMap = new HashMap();
        this.server = new EchoServer(hashMap);
        this.server.start();
        this.time = new MockTime();
        this.channelBuilder = new PlaintextChannelBuilder();
        this.channelBuilder.configure(hashMap);
        this.metrics = new Metrics();
        this.selector = new Selector(5000L, this.metrics, this.time, "MetricGroup", new LinkedHashMap(), this.channelBuilder);
    }

    @After
    public void teardown() throws Exception {
        this.selector.close();
        this.server.close();
        this.metrics.close();
    }

    @Test
    public void testServerDisconnect() throws Exception {
        blockingConnect("0");
        Assert.assertEquals("hello", blockingRequest("0", "hello"));
        this.server.closeConnections();
        while (!this.selector.disconnected().contains("0")) {
            this.selector.poll(1000L);
        }
        blockingConnect("0");
        Assert.assertEquals("hello", blockingRequest("0", "hello"));
    }

    @Test(expected = IllegalStateException.class)
    public void testCantSendWithInProgress() throws Exception {
        blockingConnect("0");
        this.selector.send(createSend("0", "test1"));
        this.selector.send(createSend("0", "test2"));
        this.selector.poll(1000L);
    }

    @Test(expected = IllegalStateException.class)
    public void testCantSendWithoutConnecting() throws Exception {
        this.selector.send(createSend("0", "test"));
        this.selector.poll(1000L);
    }

    @Test(expected = IOException.class)
    public void testNoRouteToHost() throws Exception {
        this.selector.connect("0", new InetSocketAddress("asdf.asdf.dsc", this.server.port), BUFFER_SIZE, BUFFER_SIZE);
    }

    @Test
    public void testConnectionRefused() throws Exception {
        ServerSocket serverSocket = new ServerSocket(0);
        this.selector.connect("0", new InetSocketAddress("localhost", serverSocket.getLocalPort()), BUFFER_SIZE, BUFFER_SIZE);
        while (this.selector.disconnected().contains("0")) {
            this.selector.poll(1000L);
        }
        serverSocket.close();
    }

    @Test
    public void testNormalOperation() throws Exception {
        InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", this.server.port);
        for (int i = 0; i < 5; i++) {
            connect(Integer.toString(i), inetSocketAddress);
        }
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        int i2 = 0;
        for (int i3 = 0; i3 < 5; i3++) {
            String num = Integer.toString(i3);
            this.selector.send(createSend(num, num + "-0"));
        }
        while (i2 < 5 * 500) {
            this.selector.poll(0L);
            Assert.assertEquals("No disconnects should have occurred.", 0L, this.selector.disconnected().size());
            for (NetworkReceive networkReceive : this.selector.completedReceives()) {
                String[] split = asString(networkReceive).split("-");
                Assert.assertEquals("Should be in the form 'conn-counter'", 2L, split.length);
                Assert.assertEquals("Check the source", networkReceive.source(), split[0]);
                Assert.assertEquals("Check that the receive has kindly been rewound", 0L, networkReceive.payload().position());
                if (hashMap2.containsKey(networkReceive.source())) {
                    Assert.assertEquals("Check the request counter", ((Integer) hashMap2.get(networkReceive.source())).intValue(), Integer.parseInt(split[1]));
                    hashMap2.put(networkReceive.source(), Integer.valueOf(((Integer) hashMap2.get(networkReceive.source())).intValue() + 1));
                } else {
                    Assert.assertEquals("Check the request counter", 0L, Integer.parseInt(split[1]));
                    hashMap2.put(networkReceive.source(), 1);
                }
                i2++;
            }
            Iterator it = this.selector.completedSends().iterator();
            while (it.hasNext()) {
                String destination = ((Send) it.next()).destination();
                if (hashMap.containsKey(destination)) {
                    hashMap.put(destination, Integer.valueOf(((Integer) hashMap.get(destination)).intValue() + 1));
                } else {
                    hashMap.put(destination, 1);
                }
                if (((Integer) hashMap.get(destination)).intValue() < 500) {
                    this.selector.send(createSend(destination, destination + "-" + hashMap.get(destination)));
                }
            }
        }
    }

    @Test
    public void testSendLargeRequest() throws Exception {
        blockingConnect("0");
        String randomString = TestUtils.randomString(40960);
        Assert.assertEquals(randomString, blockingRequest("0", randomString));
    }

    @Test
    public void testLargeMessageSequence() throws Exception {
        connect("0", new InetSocketAddress("localhost", this.server.port));
        sendAndReceive("0", TestUtils.randomString(524288), 0, 50);
    }

    @Test
    public void testEmptyRequest() throws Exception {
        blockingConnect("0");
        Assert.assertEquals("", blockingRequest("0", ""));
    }

    @Test(expected = IllegalStateException.class)
    public void testExistingConnectionId() throws IOException {
        blockingConnect("0");
        blockingConnect("0");
    }

    @Test
    public void testMute() throws Exception {
        blockingConnect("0");
        blockingConnect("1");
        this.selector.send(createSend("0", "hello"));
        this.selector.send(createSend("1", "hi"));
        this.selector.mute("1");
        while (this.selector.completedReceives().isEmpty()) {
            this.selector.poll(5L);
        }
        Assert.assertEquals("We should have only one response", 1L, this.selector.completedReceives().size());
        Assert.assertEquals("The response should not be from the muted node", "0", ((NetworkReceive) this.selector.completedReceives().get(0)).source());
        this.selector.unmute("1");
        do {
            this.selector.poll(5L);
        } while (this.selector.completedReceives().isEmpty());
        Assert.assertEquals("We should have only one response", 1L, this.selector.completedReceives().size());
        Assert.assertEquals("The response should be from the previously muted node", "1", ((NetworkReceive) this.selector.completedReceives().get(0)).source());
    }

    @Test
    public void testCloseOldestConnection() throws Exception {
        blockingConnect("0");
        this.time.sleep(6000L);
        this.selector.poll(0L);
        Assert.assertTrue("The idle connection should have been closed", this.selector.disconnected().contains("0"));
    }

    private String blockingRequest(String str, String str2) throws IOException {
        this.selector.send(createSend(str, str2));
        this.selector.poll(1000L);
        while (true) {
            this.selector.poll(1000L);
            for (NetworkReceive networkReceive : this.selector.completedReceives()) {
                if (networkReceive.source() == str) {
                    return asString(networkReceive);
                }
            }
        }
    }

    protected void connect(String str, InetSocketAddress inetSocketAddress) throws IOException {
        this.selector.connect(str, inetSocketAddress, BUFFER_SIZE, BUFFER_SIZE);
    }

    private void blockingConnect(String str) throws IOException {
        blockingConnect(str, new InetSocketAddress("localhost", this.server.port));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void blockingConnect(String str, InetSocketAddress inetSocketAddress) throws IOException {
        this.selector.connect(str, inetSocketAddress, BUFFER_SIZE, BUFFER_SIZE);
        while (!this.selector.connected().contains(str)) {
            this.selector.poll(10000L);
        }
        while (!this.selector.isChannelReady(str)) {
            this.selector.poll(10000L);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public NetworkSend createSend(String str, String str2) {
        return new NetworkSend(str, new ByteBuffer[]{ByteBuffer.wrap(str2.getBytes())});
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String asString(NetworkReceive networkReceive) {
        return new String(Utils.toArray(networkReceive.payload()));
    }

    private void sendAndReceive(String str, String str2, int i, int i2) throws Exception {
        int i3 = i;
        this.selector.send(createSend(str, str2 + "-" + i));
        int i4 = i + 1;
        while (i3 < i2) {
            this.selector.poll(0L);
            Assert.assertEquals("No disconnects should have occurred.", 0L, this.selector.disconnected().size());
            Iterator it = this.selector.completedReceives().iterator();
            while (it.hasNext()) {
                Assert.assertEquals(str2 + "-" + i3, asString((NetworkReceive) it.next()));
                i3++;
            }
            int i5 = 0;
            while (i5 < this.selector.completedSends().size() && i4 < i2) {
                this.selector.send(createSend(str, str2 + "-" + i4));
                i5++;
                i4++;
            }
        }
    }
}
