package com.sohu.jafka.producer;

import com.sohu.jafka.api.ProducerRequest;
import com.sohu.jafka.cluster.Broker;
import com.sohu.jafka.cluster.Partition;
import com.sohu.jafka.common.InvalidConfigException;
import com.sohu.jafka.common.UnavailableProducerException;
import com.sohu.jafka.message.ByteBufferMessageSet;
import com.sohu.jafka.message.Message;
import com.sohu.jafka.producer.async.AsyncProducer;
import com.sohu.jafka.producer.async.AsyncProducerConfig;
import com.sohu.jafka.producer.async.CallbackHandler;
import com.sohu.jafka.producer.async.DefaultEventHandler;
import com.sohu.jafka.producer.async.EventHandler;
import com.sohu.jafka.producer.serializer.Encoder;
import com.sohu.jafka.utils.Utils;
import com.taobao.accs.common.Constants;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: classes2.dex */
public class ProducerPool<V> implements Closeable {
    private final ConcurrentMap<Integer, AsyncProducer<V>> asyncProducers;
    private final CallbackHandler<V> callbackHandler;
    private final ProducerConfig config;
    private final EventHandler<V> eventHandler;
    private final Logger logger;
    private final Encoder<V> serializer;
    private boolean sync;
    private final ConcurrentMap<Integer, SyncProducer> syncProducers;

    public ProducerPool(ProducerConfig producerConfig, Encoder<V> encoder) {
        this(producerConfig, encoder, new ConcurrentHashMap(), new ConcurrentHashMap(), (EventHandler) Utils.getObject(producerConfig.getEventHandler()), (CallbackHandler) Utils.getObject(producerConfig.getCbkHandler()));
    }

    public ProducerPool(ProducerConfig producerConfig, Encoder<V> encoder, EventHandler<V> eventHandler, CallbackHandler<V> callbackHandler) {
        this(producerConfig, encoder, new ConcurrentHashMap(), new ConcurrentHashMap(), eventHandler, callbackHandler);
    }

    public ProducerPool(ProducerConfig producerConfig, Encoder<V> encoder, ConcurrentMap<Integer, SyncProducer> concurrentMap, ConcurrentMap<Integer, AsyncProducer<V>> concurrentMap2, EventHandler<V> eventHandler, CallbackHandler<V> callbackHandler) {
        this.sync = true;
        this.logger = LoggerFactory.getLogger(ProducerPool.class);
        this.config = producerConfig;
        this.serializer = encoder;
        this.syncProducers = concurrentMap;
        this.asyncProducers = concurrentMap2;
        this.eventHandler = eventHandler == null ? new DefaultEventHandler<>(producerConfig, callbackHandler) : eventHandler;
        this.callbackHandler = callbackHandler;
        if (encoder == null) {
            throw new InvalidConfigException("serializer passed in is null!");
        }
        this.sync = !"async".equalsIgnoreCase(producerConfig.getProducerType());
    }

    private void asyncSend(List<ProducerPoolData<V>> list) {
        for (ProducerPoolData<V> producerPoolData : list) {
            AsyncProducer asyncProducer = this.asyncProducers.get(Integer.valueOf(producerPoolData.partition.brokerId));
            Iterator<V> it = producerPoolData.data.iterator();
            while (it.hasNext()) {
                asyncProducer.send(producerPoolData.topic, it.next(), producerPoolData.partition.partId);
            }
        }
    }

    private void syncSend(List<ProducerPoolData<V>> list) {
        HashMap hashMap = new HashMap();
        Iterator<ProducerPoolData<V>> it = list.iterator();
        while (true) {
            int i = 0;
            if (!it.hasNext()) {
                break;
            }
            ProducerPoolData<V> next = it.next();
            List list2 = (List) hashMap.get(Integer.valueOf(next.partition.brokerId));
            if (list2 == null) {
                list2 = new ArrayList();
                hashMap.put(Integer.valueOf(next.partition.brokerId), list2);
            }
            Message[] messageArr = new Message[next.data.size()];
            Iterator<V> it2 = next.data.iterator();
            while (it2.hasNext()) {
                messageArr[i] = this.serializer.toMessage(it2.next());
                i++;
            }
            list2.add(new ProducerRequest(next.topic, next.partition.partId, new ByteBufferMessageSet(this.config.getCompressionCodec(), messageArr)));
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            SyncProducer syncProducer = this.syncProducers.get(entry.getKey());
            if (syncProducer == null) {
                throw new UnavailableProducerException("Producer pool has not been initialized correctly. Sync Producer for broker " + entry.getKey() + " does not exist in the pool");
            }
            if (((List) entry.getValue()).size() == 1) {
                ProducerRequest producerRequest = (ProducerRequest) ((List) entry.getValue()).get(0);
                syncProducer.send(producerRequest.topic, producerRequest.partition, producerRequest.messages);
            } else {
                syncProducer.multiSend((List) entry.getValue());
            }
        }
    }

    public void addProducer(Broker broker) {
        Properties properties = new Properties();
        properties.put(Constants.KEY_HOST, broker.host);
        properties.put("port", "" + broker.port);
        properties.putAll(this.config.getProperties());
        if (this.sync) {
            SyncProducer syncProducer = new SyncProducer(new SyncProducerConfig(properties));
            this.logger.info("Creating sync producer for broker id = " + broker.id + " at " + broker.host + com.xiaomi.mipush.sdk.Constants.COLON_SEPARATOR + broker.port);
            this.syncProducers.put(Integer.valueOf(broker.id), syncProducer);
            return;
        }
        AsyncProducer<V> asyncProducer = new AsyncProducer<>(new AsyncProducerConfig(properties), new SyncProducer(new SyncProducerConfig(properties)), this.serializer, this.eventHandler, this.config.getEventHandlerProperties(), this.callbackHandler, this.config.getCbkHandlerProperties());
        asyncProducer.start();
        this.logger.info("Creating async producer for broker id = " + broker.id + " at " + broker.host + com.xiaomi.mipush.sdk.Constants.COLON_SEPARATOR + broker.port);
        this.asyncProducers.put(Integer.valueOf(broker.id), asyncProducer);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.logger.info("Closing all sync producers");
        if (this.sync) {
            Iterator<SyncProducer> it = this.syncProducers.values().iterator();
            while (it.hasNext()) {
                it.next().close();
            }
        } else {
            Iterator<AsyncProducer<V>> it2 = this.asyncProducers.values().iterator();
            while (it2.hasNext()) {
                it2.next().close();
            }
        }
    }

    public ProducerPoolData<V> getProducerPoolData(String str, Partition partition, List<V> list) {
        return new ProducerPoolData<>(str, partition, list);
    }

    public void send(ProducerPoolData<V> producerPoolData) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("send message: " + producerPoolData);
        }
        if (!this.sync) {
            AsyncProducer asyncProducer = this.asyncProducers.get(Integer.valueOf(producerPoolData.partition.brokerId));
            Iterator<V> it = producerPoolData.data.iterator();
            while (it.hasNext()) {
                asyncProducer.send(producerPoolData.topic, it.next(), producerPoolData.partition.partId);
            }
            return;
        }
        Message[] messageArr = new Message[producerPoolData.data.size()];
        int i = 0;
        Iterator<V> it2 = producerPoolData.data.iterator();
        while (it2.hasNext()) {
            messageArr[i] = this.serializer.toMessage(it2.next());
            i++;
        }
        ProducerRequest producerRequest = new ProducerRequest(producerPoolData.topic, producerPoolData.partition.partId, new ByteBufferMessageSet(this.config.getCompressionCodec(), messageArr));
        SyncProducer syncProducer = this.syncProducers.get(Integer.valueOf(producerPoolData.partition.brokerId));
        if (syncProducer != null) {
            syncProducer.send(producerRequest.topic, producerRequest.partition, producerRequest.messages);
            return;
        }
        throw new UnavailableProducerException("Producer pool has not been initialized correctly. Sync Producer for broker " + producerPoolData.partition.brokerId + " does not exist in the pool");
    }

    public void send(List<ProducerPoolData<V>> list) {
        if (this.sync) {
            syncSend(list);
        } else {
            asyncSend(list);
        }
    }
}
