package org.apache.flume.api;

import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import junit.framework.Assert;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.ThriftTestingSource;
import org.apache.flume.event.EventBuilder;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flume/api/TestThriftRpcClient.class */
public class TestThriftRpcClient {
    private static final String SEQ = "sequence";
    private final Properties props = new Properties();
    private ThriftRpcClient client;
    private ThriftTestingSource src;
    private int port;

    @Before
    public void setUp() throws Exception {
        this.props.setProperty("hosts", "h1");
        ServerSocket serverSocket = new ServerSocket(0);
        Throwable th = null;
        try {
            this.port = serverSocket.getLocalPort();
            if (serverSocket != null) {
                if (0 != 0) {
                    try {
                        serverSocket.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    serverSocket.close();
                }
            }
            this.props.setProperty("client.type", "thrift");
            this.props.setProperty("hosts.h1", "0.0.0.0:" + String.valueOf(this.port));
            this.props.setProperty("batch-size", "10");
            this.props.setProperty("request-timeout", "2000");
            this.props.setProperty("protocol", "compact");
        } catch (Throwable th3) {
            if (serverSocket != null) {
                if (0 != 0) {
                    try {
                        serverSocket.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    serverSocket.close();
                }
            }
            throw th3;
        }
    }

    @After
    public void tearDown() throws Exception {
        this.src.stop();
    }

    private static void insertEvents(RpcClient rpcClient, int i) throws Exception {
        for (int i2 = 0; i2 < i; i2++) {
            HashMap hashMap = new HashMap();
            hashMap.put(SEQ, String.valueOf(i2));
            rpcClient.append(EventBuilder.withBody(String.valueOf(i2).getBytes(), hashMap));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void insertAsBatch(RpcClient rpcClient, int i, int i2) throws Exception {
        ArrayList arrayList = new ArrayList();
        for (int i3 = i; i3 <= i2; i3++) {
            HashMap hashMap = new HashMap();
            hashMap.put(SEQ, String.valueOf(i3));
            arrayList.add(EventBuilder.withBody(String.valueOf(i3).getBytes(), hashMap));
        }
        rpcClient.appendBatch(arrayList);
    }

    @Test
    public void testOK() throws Exception {
        this.src = new ThriftTestingSource(ThriftTestingSource.HandlerType.OK.name(), this.port, "compact");
        this.client = RpcClientFactory.getInstance(this.props);
        insertEvents(this.client, 10);
        insertAsBatch(this.client, 10, 25);
        insertAsBatch(this.client, 26, 37);
        int i = 0;
        Assert.assertEquals(38, this.src.flumeEvents.size());
        Iterator<Event> it = this.src.flumeEvents.iterator();
        while (it.hasNext()) {
            int i2 = i;
            i++;
            Assert.assertEquals(new String(it.next().getBody()), String.valueOf(i2));
        }
        Assert.assertEquals(10, this.src.individualCount);
        Assert.assertEquals(4, this.src.batchCount);
        Assert.assertEquals(2, this.src.incompleteBatches);
    }

    @Test
    public void testSlow() throws Exception {
        this.src = new ThriftTestingSource(ThriftTestingSource.HandlerType.SLOW.name(), this.port, "compact");
        this.client = RpcClientFactory.getInstance(this.props);
        insertEvents(this.client, 2);
        insertAsBatch(this.client, 2, 25);
        insertAsBatch(this.client, 26, 37);
        int i = 0;
        Assert.assertEquals(38, this.src.flumeEvents.size());
        Iterator<Event> it = this.src.flumeEvents.iterator();
        while (it.hasNext()) {
            int i2 = i;
            i++;
            Assert.assertEquals(new String(it.next().getBody()), String.valueOf(i2));
        }
        Assert.assertEquals(2, this.src.individualCount);
        Assert.assertEquals(5, this.src.batchCount);
        Assert.assertEquals(2, this.src.incompleteBatches);
    }

    @Test(expected = EventDeliveryException.class)
    public void testFail() throws Exception {
        this.src = new ThriftTestingSource(ThriftTestingSource.HandlerType.FAIL.name(), this.port, "compact");
        this.client = RpcClientFactory.getInstance(this.props);
        insertEvents(this.client, 2);
        Assert.fail("Expected EventDeliveryException to be thrown.");
    }

    @Test
    public void testError() throws Throwable {
        try {
            this.src = new ThriftTestingSource(ThriftTestingSource.HandlerType.ERROR.name(), this.port, "compact");
            this.client = RpcClientFactory.getThriftInstance("0.0.0.0", Integer.valueOf(this.port));
            insertEvents(this.client, 2);
        } catch (EventDeliveryException e) {
            Assert.assertEquals("Failed to send event. ", e.getMessage());
        }
    }

    @Test(expected = TimeoutException.class)
    public void testTimeout() throws Throwable {
        try {
            this.src = new ThriftTestingSource(ThriftTestingSource.HandlerType.TIMEOUT.name(), this.port, "compact");
            this.client = RpcClientFactory.getThriftInstance(this.props);
            insertEvents(this.client, 2);
        } catch (EventDeliveryException e) {
            throw e.getCause();
        }
    }

    @Test
    public void testMultipleThreads() throws Throwable {
        this.src = new ThriftTestingSource(ThriftTestingSource.HandlerType.OK.name(), this.port, "compact");
        this.client = RpcClientFactory.getThriftInstance("0.0.0.0", Integer.valueOf(this.port), 10);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(100);
        ArrayList arrayList = new ArrayList(100);
        for (int i = 0; i < 100; i++) {
            arrayList.add(newFixedThreadPool.submit(new Runnable() { // from class: org.apache.flume.api.TestThriftRpcClient.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        TestThriftRpcClient.insertAsBatch(TestThriftRpcClient.this.client, 0, 9);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }));
        }
        for (int i2 = 0; i2 < 100; i2++) {
            ((Future) arrayList.get(i2)).get();
        }
        ArrayList arrayList2 = new ArrayList();
        Iterator<Event> it = this.src.flumeEvents.iterator();
        while (it.hasNext()) {
            arrayList2.add(new String(it.next().getBody()));
        }
        int i3 = 0;
        Collections.sort(arrayList2);
        int i4 = 0;
        while (i4 < arrayList2.size()) {
            for (int i5 = 0; i5 < 100; i5++) {
                int i6 = i4;
                i4++;
                Assert.assertEquals(String.valueOf(i3), (String) arrayList2.get(i6));
            }
            i3++;
        }
    }
}
