package kafka.producer;

import java.io.File;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.Properties;
import kafka.admin.AdminUtils$;
import kafka.api.ProducerRequest;
import kafka.api.ProducerResponse;
import kafka.api.ProducerResponseStatus;
import kafka.common.ErrorMapping$;
import kafka.common.TopicAndPartition;
import kafka.integration.KafkaServerTestHarness;
import kafka.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.message.Message$;
import kafka.message.MessageSet$;
import kafka.message.NoCompressionCodec$;
import kafka.network.SocketServer;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.Log4jController$;
import kafka.utils.Logging;
import kafka.utils.SystemTime$;
import kafka.utils.TestUtils$;
import kafka.utils.ZkUtils;
import kafka.zk.EmbeddedZookeeper;
import kafka.zk.ZooKeeperTestHarness;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import scala.Function0;
import scala.Option;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Map$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.TraitSetter;

/* compiled from: SyncProducerTest.scala */
@ScalaSignature(bytes = "\u0006\u0001u3A!\u0001\u0002\u0001\u000f\t\u00012+\u001f8d!J|G-^2feR+7\u000f\u001e\u0006\u0003\u0007\u0011\t\u0001\u0002\u001d:pIV\u001cWM\u001d\u0006\u0002\u000b\u0005)1.\u00194lC\u000e\u00011c\u0001\u0001\t%A\u0011\u0011\u0002E\u0007\u0002\u0015)\u00111\u0002D\u0001\u0006UVt\u0017\u000e\u001e\u0006\u0003\u001b9\t\u0011b]2bY\u0006$Xm\u001d;\u000b\u0003=\t1a\u001c:h\u0013\t\t\"B\u0001\u0006K+:LGoU;ji\u0016\u0004\"a\u0005\f\u000e\u0003QQ!!\u0006\u0003\u0002\u0017%tG/Z4sCRLwN\\\u0005\u0003/Q\u0011acS1gW\u0006\u001cVM\u001d<feR+7\u000f\u001e%be:,7o\u001d\u0005\u00063\u0001!\tAG\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003m\u0001\"\u0001\b\u0001\u000e\u0003\tAqA\b\u0001C\u0002\u0013%q$\u0001\u0007nKN\u001c\u0018mZ3CsR,7/F\u0001!!\r\tCEJ\u0007\u0002E)\t1%A\u0003tG\u0006d\u0017-\u0003\u0002&E\t)\u0011I\u001d:bsB\u0011\u0011eJ\u0005\u0003Q\t\u0012AAQ=uK\"1!\u0006\u0001Q\u0001\n\u0001\nQ\"\\3tg\u0006<WMQ=uKN\u0004\u0003\"\u0002\u0017\u0001\t\u0003i\u0013aD4f]\u0016\u0014\u0018\r^3D_:4\u0017nZ:\u0015\u00039\u00022a\f\u001b7\u001b\u0005\u0001$BA\u00193\u0003%IW.\\;uC\ndWM\u0003\u00024E\u0005Q1m\u001c7mK\u000e$\u0018n\u001c8\n\u0005U\u0002$\u0001\u0002'jgR\u0004\"a\u000e\u001e\u000e\u0003aR!!\u000f\u0003\u0002\rM,'O^3s\u0013\tY\u0004HA\u0006LC\u001a\\\u0017mQ8oM&<\u0007\"B\u001f\u0001\t\u0003q\u0014a\u0005;fgR\u0014V-Y2iC\ndWmU3sm\u0016\u0014H#A \u0011\u0005\u0005\u0002\u0015BA!#\u0005\u0011)f.\u001b;)\u0005q\u001a\u0005C\u0001#G\u001b\u0005)%BA\u0006\u000f\u0013\t9UI\u0001\u0003UKN$\b\"B%\u0001\t\u0003q\u0014a\u0006;fgR,U\u000e\u001d;z!J|G-^2f%\u0016\fX/Z:uQ\tA5\tC\u0003M\u0001\u0011\u0005a(A\fuKN$X*Z:tC\u001e,7+\u001b>f)>|G*\u0019:hK\"\u00121j\u0011\u0005\u0006\u001f\u0002!\tAP\u0001#i\u0016\u001cH/T3tg\u0006<WmU5{KR{w\u000eT1sO\u0016<\u0016\u000e\u001e5BG.TVM]8)\u00059\u001b\u0005\"\u0002*\u0001\t\u0003q\u0014\u0001\n;fgR\u0004&o\u001c3vG\u0016\u001cuN\u001d:fGRd\u0017PU3dK&4Xm\u001d*fgB|gn]3)\u0005E\u001b\u0005\"B+\u0001\t\u0003q\u0014A\u0006;fgR\u0004&o\u001c3vG\u0016\u00148)\u00198US6,w.\u001e;)\u0005Q\u001b\u0005\"\u0002-\u0001\t\u0003q\u0014\u0001\t;fgR\u0004&o\u001c3vG\u0016\u0014V-];fgR<\u0016\u000e\u001e5O_J+7\u000f]8og\u0016D#aV\"\t\u000bm\u0003A\u0011\u0001 \u0002+Q,7\u000f\u001e(pi\u0016sw.^4i%\u0016\u0004H.[2bg\"\u0012!l\u0011")
/* loaded from: input_file:kafka/producer/SyncProducerTest.class */
public class SyncProducerTest extends JUnitSuite implements KafkaServerTestHarness {
    private final byte[] messageBytes;
    private Seq<KafkaConfig> instanceConfigs;
    private Buffer<KafkaServer> servers;
    private String brokerList;
    private boolean[] alive;
    private EmbeddedZookeeper zookeeper;
    private int zkPort;
    private ZkUtils zkUtils;
    private final int zkConnectionTimeout;
    private final int zkSessionTimeout;
    private final String loggerName;
    private final Logger logger;
    private String logIdent;
    private final Log4jController$ kafka$utils$Logging$$log4jController;
    private volatile boolean bitmap$0;

    @Override // kafka.integration.KafkaServerTestHarness
    public Seq<KafkaConfig> instanceConfigs() {
        return this.instanceConfigs;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    @TraitSetter
    public void instanceConfigs_$eq(Seq<KafkaConfig> seq) {
        this.instanceConfigs = seq;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    public Buffer<KafkaServer> servers() {
        return this.servers;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    @TraitSetter
    public void servers_$eq(Buffer<KafkaServer> buffer) {
        this.servers = buffer;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    public String brokerList() {
        return this.brokerList;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    @TraitSetter
    public void brokerList_$eq(String str) {
        this.brokerList = str;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    public boolean[] alive() {
        return this.alive;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    @TraitSetter
    public void alive_$eq(boolean[] zArr) {
        this.alive = zArr;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    public /* synthetic */ void kafka$integration$KafkaServerTestHarness$$super$setUp() {
        ZooKeeperTestHarness.Cclass.setUp(this);
    }

    @Override // kafka.integration.KafkaServerTestHarness
    public /* synthetic */ void kafka$integration$KafkaServerTestHarness$$super$tearDown() {
        ZooKeeperTestHarness.Cclass.tearDown(this);
    }

    @Override // kafka.integration.KafkaServerTestHarness
    public Seq<KafkaConfig> configs() {
        return KafkaServerTestHarness.Cclass.configs(this);
    }

    @Override // kafka.integration.KafkaServerTestHarness
    public Option<KafkaServer> serverForId(int i) {
        return KafkaServerTestHarness.Cclass.serverForId(this, i);
    }

    @Override // kafka.integration.KafkaServerTestHarness
    public SecurityProtocol securityProtocol() {
        return KafkaServerTestHarness.Cclass.securityProtocol(this);
    }

    @Override // kafka.integration.KafkaServerTestHarness
    /* renamed from: trustStoreFile */
    public Option<File> mo195trustStoreFile() {
        return KafkaServerTestHarness.Cclass.trustStoreFile(this);
    }

    @Override // kafka.integration.KafkaServerTestHarness, kafka.zk.ZooKeeperTestHarness
    @Before
    public void setUp() {
        KafkaServerTestHarness.Cclass.setUp(this);
    }

    @Override // kafka.integration.KafkaServerTestHarness, kafka.zk.ZooKeeperTestHarness
    @After
    public void tearDown() {
        KafkaServerTestHarness.Cclass.tearDown(this);
    }

    @Override // kafka.integration.KafkaServerTestHarness
    public int killRandomBroker() {
        return KafkaServerTestHarness.Cclass.killRandomBroker(this);
    }

    @Override // kafka.integration.KafkaServerTestHarness
    public void restartDeadBrokers() {
        KafkaServerTestHarness.Cclass.restartDeadBrokers(this);
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public EmbeddedZookeeper zookeeper() {
        return this.zookeeper;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void zookeeper_$eq(EmbeddedZookeeper embeddedZookeeper) {
        this.zookeeper = embeddedZookeeper;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public int zkPort() {
        return this.zkPort;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void zkPort_$eq(int i) {
        this.zkPort = i;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public ZkUtils zkUtils() {
        return this.zkUtils;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void zkUtils_$eq(ZkUtils zkUtils) {
        this.zkUtils = zkUtils;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public int zkConnectionTimeout() {
        return this.zkConnectionTimeout;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public int zkSessionTimeout() {
        return this.zkSessionTimeout;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkConnectionTimeout_$eq(int i) {
        this.zkConnectionTimeout = i;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkSessionTimeout_$eq(int i) {
        this.zkSessionTimeout = i;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public String zkConnect() {
        return ZooKeeperTestHarness.Cclass.zkConnect(this);
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public String confFile() {
        return ZooKeeperTestHarness.Cclass.confFile(this);
    }

    public String loggerName() {
        return this.loggerName;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v5 */
    private Logger logger$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                this.logger = Logging.class.logger(this);
                this.bitmap$0 = true;
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return this.logger;
        }
    }

    public Logger logger() {
        return this.bitmap$0 ? this.logger : logger$lzycompute();
    }

    public String logIdent() {
        return this.logIdent;
    }

    public void logIdent_$eq(String str) {
        this.logIdent = str;
    }

    public Log4jController$ kafka$utils$Logging$$log4jController() {
        return this.kafka$utils$Logging$$log4jController;
    }

    public void kafka$utils$Logging$_setter_$loggerName_$eq(String str) {
        this.loggerName = str;
    }

    public void kafka$utils$Logging$_setter_$kafka$utils$Logging$$log4jController_$eq(Log4jController$ log4jController$) {
        this.kafka$utils$Logging$$log4jController = log4jController$;
    }

    public void trace(Function0<String> function0) {
        Logging.class.trace(this, function0);
    }

    /* renamed from: trace, reason: collision with other method in class */
    public Object m366trace(Function0<Throwable> function0) {
        return Logging.class.trace(this, function0);
    }

    public void trace(Function0<String> function0, Function0<Throwable> function02) {
        Logging.class.trace(this, function0, function02);
    }

    public void swallowTrace(Function0<BoxedUnit> function0) {
        Logging.class.swallowTrace(this, function0);
    }

    public void debug(Function0<String> function0) {
        Logging.class.debug(this, function0);
    }

    /* renamed from: debug, reason: collision with other method in class */
    public Object m367debug(Function0<Throwable> function0) {
        return Logging.class.debug(this, function0);
    }

    public void debug(Function0<String> function0, Function0<Throwable> function02) {
        Logging.class.debug(this, function0, function02);
    }

    public void swallowDebug(Function0<BoxedUnit> function0) {
        Logging.class.swallowDebug(this, function0);
    }

    public void info(Function0<String> function0) {
        Logging.class.info(this, function0);
    }

    /* renamed from: info, reason: collision with other method in class */
    public Object m368info(Function0<Throwable> function0) {
        return Logging.class.info(this, function0);
    }

    public void info(Function0<String> function0, Function0<Throwable> function02) {
        Logging.class.info(this, function0, function02);
    }

    public void swallowInfo(Function0<BoxedUnit> function0) {
        Logging.class.swallowInfo(this, function0);
    }

    public void warn(Function0<String> function0) {
        Logging.class.warn(this, function0);
    }

    /* renamed from: warn, reason: collision with other method in class */
    public Object m369warn(Function0<Throwable> function0) {
        return Logging.class.warn(this, function0);
    }

    public void warn(Function0<String> function0, Function0<Throwable> function02) {
        Logging.class.warn(this, function0, function02);
    }

    public void swallowWarn(Function0<BoxedUnit> function0) {
        Logging.class.swallowWarn(this, function0);
    }

    public void swallow(Function0<BoxedUnit> function0) {
        Logging.class.swallow(this, function0);
    }

    public void error(Function0<String> function0) {
        Logging.class.error(this, function0);
    }

    /* renamed from: error, reason: collision with other method in class */
    public Object m370error(Function0<Throwable> function0) {
        return Logging.class.error(this, function0);
    }

    public void error(Function0<String> function0, Function0<Throwable> function02) {
        Logging.class.error(this, function0, function02);
    }

    public void swallowError(Function0<BoxedUnit> function0) {
        Logging.class.swallowError(this, function0);
    }

    public void fatal(Function0<String> function0) {
        Logging.class.fatal(this, function0);
    }

    /* renamed from: fatal, reason: collision with other method in class */
    public Object m371fatal(Function0<Throwable> function0) {
        return Logging.class.fatal(this, function0);
    }

    public void fatal(Function0<String> function0, Function0<Throwable> function02) {
        Logging.class.fatal(this, function0, function02);
    }

    private byte[] messageBytes() {
        return this.messageBytes;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    /* renamed from: generateConfigs, reason: merged with bridge method [inline-methods] */
    public List<KafkaConfig> mo220generateConfigs() {
        return List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new KafkaConfig[]{KafkaConfig$.MODULE$.fromProps((Properties) TestUtils$.MODULE$.createBrokerConfigs(1, zkConnect(), false, TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10()).head())}));
    }

    @Test
    public void testReachableServer() {
        KafkaServer kafkaServer = (KafkaServer) servers().head();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        SocketServer socketServer = kafkaServer.socketServer();
        SyncProducer syncProducer = new SyncProducer(new SyncProducerConfig(testUtils$.getSyncProducerConfig(socketServer.boundPort(socketServer.boundPort$default$1()))));
        long milliseconds = SystemTime$.MODULE$.milliseconds();
        try {
            Assert.assertNotNull(syncProducer.send(TestUtils$.MODULE$.produceRequest("test", 0, new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, Predef$.MODULE$.wrapRefArray(new Message[]{new Message(messageBytes())})), 1, TestUtils$.MODULE$.produceRequest$default$5(), TestUtils$.MODULE$.produceRequest$default$6(), TestUtils$.MODULE$.produceRequest$default$7())));
        } catch (Exception e) {
            Assert.fail(new StringBuilder().append("Unexpected failure sending message to broker. ").append(e.getMessage()).toString());
        }
        Assert.assertTrue(SystemTime$.MODULE$.milliseconds() - milliseconds < 500);
        long milliseconds2 = SystemTime$.MODULE$.milliseconds();
        try {
            Assert.assertNotNull(syncProducer.send(TestUtils$.MODULE$.produceRequest("test", 0, new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, Predef$.MODULE$.wrapRefArray(new Message[]{new Message(messageBytes())})), 1, TestUtils$.MODULE$.produceRequest$default$5(), TestUtils$.MODULE$.produceRequest$default$6(), TestUtils$.MODULE$.produceRequest$default$7())));
        } catch (Exception e2) {
            Assert.fail(new StringBuilder().append("Unexpected failure sending message to broker. ").append(e2.getMessage()).toString());
        }
        Assert.assertTrue(SystemTime$.MODULE$.milliseconds() - milliseconds2 < 500);
        try {
            Assert.assertNotNull(syncProducer.send(TestUtils$.MODULE$.produceRequest("test", 0, new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, Predef$.MODULE$.wrapRefArray(new Message[]{new Message(messageBytes())})), 1, TestUtils$.MODULE$.produceRequest$default$5(), TestUtils$.MODULE$.produceRequest$default$6(), TestUtils$.MODULE$.produceRequest$default$7())));
        } catch (Exception e3) {
            Assert.fail(new StringBuilder().append("Unexpected failure sending message to broker. ").append(e3.getMessage()).toString());
        }
    }

    @Test
    public void testEmptyProduceRequest() {
        KafkaServer kafkaServer = (KafkaServer) servers().head();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        SocketServer socketServer = kafkaServer.socketServer();
        ProducerResponse send = new SyncProducer(new SyncProducerConfig(testUtils$.getSyncProducerConfig(socketServer.boundPort(socketServer.boundPort$default$1())))).send(new ProducerRequest(0, SyncProducerConfig$.MODULE$.DefaultClientId(), (short) 1, SyncProducerConfig$.MODULE$.DefaultAckTimeoutMs(), Map$.MODULE$.apply(Nil$.MODULE$)));
        Assert.assertTrue(send != null);
        Assert.assertTrue(!send.hasError() && send.status().size() == 0);
    }

    @Test
    public void testMessageSizeTooLarge() {
        KafkaServer kafkaServer = (KafkaServer) servers().head();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        SocketServer socketServer = kafkaServer.socketServer();
        SyncProducer syncProducer = new SyncProducer(new SyncProducerConfig(testUtils$.getSyncProducerConfig(socketServer.boundPort(socketServer.boundPort$default$1()))));
        TestUtils$.MODULE$.createTopic(zkUtils(), "test", 1, 1, servers(), TestUtils$.MODULE$.createTopic$default$6());
        ProducerResponse send = syncProducer.send(TestUtils$.MODULE$.produceRequest("test", 0, new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, Predef$.MODULE$.wrapRefArray(new Message[]{new Message(new byte[Predef$.MODULE$.Integer2int(((KafkaConfig) configs().apply(0)).messageMaxBytes()) + 1])})), 1, TestUtils$.MODULE$.produceRequest$default$5(), TestUtils$.MODULE$.produceRequest$default$6(), TestUtils$.MODULE$.produceRequest$default$7()));
        Assert.assertEquals(1L, send.status().count(new SyncProducerTest$$anonfun$testMessageSizeTooLarge$1(this)));
        Assert.assertEquals(ErrorMapping$.MODULE$.MessageSizeTooLargeCode(), ((ProducerResponseStatus) send.status().apply(new TopicAndPartition("test", 0))).error());
        Assert.assertEquals(-1L, ((ProducerResponseStatus) send.status().apply(new TopicAndPartition("test", 0))).offset());
        ProducerResponse send2 = syncProducer.send(TestUtils$.MODULE$.produceRequest("test", 0, new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, Predef$.MODULE$.wrapRefArray(new Message[]{new Message(new byte[((Predef$.MODULE$.Integer2int(((KafkaConfig) configs().apply(0)).messageMaxBytes()) - Message$.MODULE$.MessageOverhead()) - MessageSet$.MODULE$.LogOverhead()) - 1])})), 1, TestUtils$.MODULE$.produceRequest$default$5(), TestUtils$.MODULE$.produceRequest$default$6(), TestUtils$.MODULE$.produceRequest$default$7()));
        Assert.assertEquals(1L, send.status().count(new SyncProducerTest$$anonfun$testMessageSizeTooLarge$2(this)));
        Assert.assertEquals(ErrorMapping$.MODULE$.NoError(), ((ProducerResponseStatus) send2.status().apply(new TopicAndPartition("test", 0))).error());
        Assert.assertEquals(0L, ((ProducerResponseStatus) send2.status().apply(new TopicAndPartition("test", 0))).offset());
    }

    @Test
    public void testMessageSizeTooLargeWithAckZero() {
        KafkaServer kafkaServer = (KafkaServer) servers().head();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        SocketServer socketServer = kafkaServer.socketServer();
        Properties syncProducerConfig = testUtils$.getSyncProducerConfig(socketServer.boundPort(socketServer.boundPort$default$1()));
        syncProducerConfig.put("request.required.acks", "0");
        SyncProducer syncProducer = new SyncProducer(new SyncProducerConfig(syncProducerConfig));
        AdminUtils$.MODULE$.createTopic(zkUtils(), "test", 1, 1, AdminUtils$.MODULE$.createTopic$default$5());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkUtils(), "test", 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        syncProducer.send(TestUtils$.MODULE$.produceRequest("test", 0, new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, Predef$.MODULE$.wrapRefArray(new Message[]{new Message(new byte[Predef$.MODULE$.Integer2int(((KafkaConfig) configs().apply(0)).messageMaxBytes()) + 1])})), 0, TestUtils$.MODULE$.produceRequest$default$5(), TestUtils$.MODULE$.produceRequest$default$6(), TestUtils$.MODULE$.produceRequest$default$7()));
        try {
            syncProducer.send(TestUtils$.MODULE$.produceRequest("test", 0, new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, Predef$.MODULE$.wrapRefArray(new Message[]{new Message(new byte[Predef$.MODULE$.Integer2int(((KafkaConfig) configs().apply(0)).messageMaxBytes()) + 1])})), 0, TestUtils$.MODULE$.produceRequest$default$5(), TestUtils$.MODULE$.produceRequest$default$6(), TestUtils$.MODULE$.produceRequest$default$7()));
        } catch (IOException e) {
        }
    }

    @Test
    public void testProduceCorrectlyReceivesResponse() {
        KafkaServer kafkaServer = (KafkaServer) servers().head();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        SocketServer socketServer = kafkaServer.socketServer();
        SyncProducer syncProducer = new SyncProducer(new SyncProducerConfig(testUtils$.getSyncProducerConfig(socketServer.boundPort(socketServer.boundPort$default$1()))));
        ProducerRequest produceRequestWithAcks = TestUtils$.MODULE$.produceRequestWithAcks(Predef$.MODULE$.wrapRefArray(new String[]{"topic1", "topic2", "topic3"}), Predef$.MODULE$.wrapIntArray(new int[]{0}), new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, Predef$.MODULE$.wrapRefArray(new Message[]{new Message(messageBytes())})), 1, TestUtils$.MODULE$.produceRequestWithAcks$default$5(), TestUtils$.MODULE$.produceRequestWithAcks$default$6(), TestUtils$.MODULE$.produceRequestWithAcks$default$7());
        ProducerResponse send = syncProducer.send(produceRequestWithAcks);
        Assert.assertNotNull(send);
        Assert.assertEquals(produceRequestWithAcks.correlationId(), send.correlationId());
        Assert.assertEquals(3L, send.status().size());
        send.status().values().foreach(new SyncProducerTest$$anonfun$testProduceCorrectlyReceivesResponse$1(this));
        AdminUtils$.MODULE$.createTopic(zkUtils(), "topic1", 1, 1, AdminUtils$.MODULE$.createTopic$default$5());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkUtils(), "topic1", 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        AdminUtils$.MODULE$.createTopic(zkUtils(), "topic3", 1, 1, AdminUtils$.MODULE$.createTopic$default$5());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkUtils(), "topic3", 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        ProducerResponse send2 = syncProducer.send(produceRequestWithAcks);
        Assert.assertNotNull(send2);
        Assert.assertEquals(produceRequestWithAcks.correlationId(), send2.correlationId());
        Assert.assertEquals(3L, send2.status().size());
        Assert.assertEquals(ErrorMapping$.MODULE$.NoError(), ((ProducerResponseStatus) send2.status().apply(new TopicAndPartition("topic1", 0))).error());
        Assert.assertEquals(ErrorMapping$.MODULE$.NoError(), ((ProducerResponseStatus) send2.status().apply(new TopicAndPartition("topic3", 0))).error());
        Assert.assertEquals(0L, ((ProducerResponseStatus) send2.status().apply(new TopicAndPartition("topic1", 0))).offset());
        Assert.assertEquals(0L, ((ProducerResponseStatus) send2.status().apply(new TopicAndPartition("topic3", 0))).offset());
        Assert.assertEquals(ErrorMapping$.MODULE$.UnknownTopicOrPartitionCode(), ((ProducerResponseStatus) send2.status().apply(new TopicAndPartition("topic2", 0))).error());
        Assert.assertEquals(-1L, ((ProducerResponseStatus) send2.status().apply(new TopicAndPartition("topic2", 0))).offset());
    }

    @Test
    public void testProducerCanTimeout() {
        KafkaServer kafkaServer = (KafkaServer) servers().head();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        SocketServer socketServer = kafkaServer.socketServer();
        SyncProducer syncProducer = new SyncProducer(new SyncProducerConfig(testUtils$.getSyncProducerConfig(socketServer.boundPort(socketServer.boundPort$default$1()))));
        ProducerRequest produceRequest = TestUtils$.MODULE$.produceRequest("topic1", 0, new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, Predef$.MODULE$.wrapRefArray(new Message[]{new Message(messageBytes())})), 1, TestUtils$.MODULE$.produceRequest$default$5(), TestUtils$.MODULE$.produceRequest$default$6(), TestUtils$.MODULE$.produceRequest$default$7());
        kafkaServer.requestHandlerPool().shutdown();
        long milliseconds = SystemTime$.MODULE$.milliseconds();
        try {
            syncProducer.send(produceRequest);
            Assert.fail("Should have received timeout exception since request handling is stopped.");
        } catch (SocketTimeoutException e) {
        } catch (Throwable th) {
            Assert.fail(new StringBuilder().append("Unexpected exception when expecting timeout: ").append(th).toString());
        }
        Assert.assertTrue(SystemTime$.MODULE$.milliseconds() - milliseconds >= ((long) 500));
    }

    @Test
    public void testProduceRequestWithNoResponse() {
        Assert.assertTrue(new SyncProducer(new SyncProducerConfig(TestUtils$.MODULE$.getSyncProducerConfig(((KafkaServer) servers().head()).socketServer().boundPort(SecurityProtocol.PLAINTEXT)))).send(new ProducerRequest(0, SyncProducerConfig$.MODULE$.DefaultClientId(), (short) 0, SyncProducerConfig$.MODULE$.DefaultAckTimeoutMs(), Map$.MODULE$.apply(Nil$.MODULE$))) == null);
    }

    @Test
    public void testNotEnoughReplicas() {
        KafkaServer kafkaServer = (KafkaServer) servers().head();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        SocketServer socketServer = kafkaServer.socketServer();
        Properties syncProducerConfig = testUtils$.getSyncProducerConfig(socketServer.boundPort(socketServer.boundPort$default$1()));
        syncProducerConfig.put("request.required.acks", "-1");
        SyncProducer syncProducer = new SyncProducer(new SyncProducerConfig(syncProducerConfig));
        Properties properties = new Properties();
        properties.put("min.insync.replicas", "2");
        AdminUtils$.MODULE$.createTopic(zkUtils(), "minisrtest", 1, 1, properties);
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkUtils(), "minisrtest", 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        Assert.assertEquals(ErrorMapping$.MODULE$.NotEnoughReplicasCode(), ((ProducerResponseStatus) syncProducer.send(TestUtils$.MODULE$.produceRequest("minisrtest", 0, new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, Predef$.MODULE$.wrapRefArray(new Message[]{new Message(messageBytes())})), -1, TestUtils$.MODULE$.produceRequest$default$5(), TestUtils$.MODULE$.produceRequest$default$6(), TestUtils$.MODULE$.produceRequest$default$7())).status().apply(new TopicAndPartition("minisrtest", 0))).error());
    }

    public SyncProducerTest() {
        Logging.class.$init$(this);
        ZooKeeperTestHarness.Cclass.$init$(this);
        KafkaServerTestHarness.Cclass.$init$(this);
        this.messageBytes = new byte[2];
    }
}
