/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.kafka;

import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.header.Header;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import org.thingsboard.server.kafka.TBKafkaAdmin;
import org.thingsboard.server.kafka.TbKafkaEncoder;
import org.thingsboard.server.kafka.TbKafkaPartitioner;
import org.thingsboard.server.kafka.TbKafkaSettings;

public class TBKafkaProducerTemplate<T> {
    private static final Logger log = LoggerFactory.getLogger(TBKafkaProducerTemplate.class);
    private final KafkaProducer<String, byte[]> producer;
    private final TbKafkaEncoder<T> encoder;
    private final TbKafkaPartitioner<T> partitioner;
    private ConcurrentMap<String, List<PartitionInfo>> partitionInfoMap;
    private final String defaultTopic;
    private final TbKafkaSettings settings;

    private TBKafkaProducerTemplate(TbKafkaSettings settings, TbKafkaEncoder<T> encoder, TbKafkaPartitioner<T> partitioner, String defaultTopic, String clientId) {
        Properties props = settings.toProps();
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        if (!StringUtils.isEmpty((Object)clientId)) {
            props.put("client.id", clientId);
        }
        this.settings = settings;
        this.producer = new KafkaProducer(props);
        this.encoder = encoder;
        this.partitioner = partitioner;
        this.defaultTopic = defaultTopic;
    }

    public void init() {
        this.partitionInfoMap = new ConcurrentHashMap<String, List<PartitionInfo>>();
        if (!StringUtils.isEmpty((Object)this.defaultTopic)) {
            try {
                TBKafkaAdmin admin = new TBKafkaAdmin(this.settings);
                admin.waitForTopic(this.defaultTopic, 30L, TimeUnit.SECONDS);
                log.info("[{}] Topic exists.", (Object)this.defaultTopic);
            }
            catch (Exception e) {
                log.info("[{}] Failed to wait for topic: {}", new Object[]{this.defaultTopic, e.getMessage(), e});
                throw new RuntimeException(e);
            }
            this.partitionInfoMap.putIfAbsent(this.defaultTopic, this.producer.partitionsFor(this.defaultTopic));
        }
    }

    public Future<RecordMetadata> send(String key, T value, Callback callback) {
        return this.send(key, value, null, callback);
    }

    public Future<RecordMetadata> send(String key, T value, Iterable<Header> headers, Callback callback) {
        return this.send(key, value, null, headers, callback);
    }

    public Future<RecordMetadata> send(String key, T value, Long timestamp, Iterable<Header> headers, Callback callback) {
        if (!StringUtils.isEmpty((Object)this.defaultTopic)) {
            return this.send(this.defaultTopic, key, value, timestamp, headers, callback);
        }
        throw new RuntimeException("Failed to send message! Default topic is not specified!");
    }

    public Future<RecordMetadata> send(String topic, String key, T value, Iterable<Header> headers, Callback callback) {
        return this.send(topic, key, value, null, headers, callback);
    }

    public Future<RecordMetadata> send(String topic, String key, T value, Callback callback) {
        return this.send(topic, key, value, null, null, callback);
    }

    public Future<RecordMetadata> send(String topic, String key, T value, Long timestamp, Iterable<Header> headers, Callback callback) {
        byte[] data = this.encoder.encode(value);
        Integer partition = this.getPartition(topic, key, value, data);
        ProducerRecord record = new ProducerRecord(topic, partition, timestamp, (Object)key, (Object)data, headers);
        return this.producer.send(record, callback);
    }

    private Integer getPartition(String topic, String key, T value, byte[] data) {
        if (this.partitioner == null) {
            return null;
        }
        return this.partitioner.partition(topic, key, value, data, this.partitionInfoMap.computeIfAbsent(topic, arg_0 -> this.producer.partitionsFor(arg_0)));
    }

    public static <T> TBKafkaProducerTemplateBuilder<T> builder() {
        return new TBKafkaProducerTemplateBuilder();
    }

    public String getDefaultTopic() {
        return this.defaultTopic;
    }

    public TbKafkaSettings getSettings() {
        return this.settings;
    }

    public static class TBKafkaProducerTemplateBuilder<T> {
        private TbKafkaSettings settings;
        private TbKafkaEncoder<T> encoder;
        private TbKafkaPartitioner<T> partitioner;
        private String defaultTopic;
        private String clientId;

        TBKafkaProducerTemplateBuilder() {
        }

        public TBKafkaProducerTemplateBuilder<T> settings(TbKafkaSettings settings) {
            this.settings = settings;
            return this;
        }

        public TBKafkaProducerTemplateBuilder<T> encoder(TbKafkaEncoder<T> encoder) {
            this.encoder = encoder;
            return this;
        }

        public TBKafkaProducerTemplateBuilder<T> partitioner(TbKafkaPartitioner<T> partitioner) {
            this.partitioner = partitioner;
            return this;
        }

        public TBKafkaProducerTemplateBuilder<T> defaultTopic(String defaultTopic) {
            this.defaultTopic = defaultTopic;
            return this;
        }

        public TBKafkaProducerTemplateBuilder<T> clientId(String clientId) {
            this.clientId = clientId;
            return this;
        }

        public TBKafkaProducerTemplate<T> build() {
            return new TBKafkaProducerTemplate(this.settings, this.encoder, this.partitioner, this.defaultTopic, this.clientId);
        }

        public String toString() {
            return "TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder(settings=" + this.settings + ", encoder=" + this.encoder + ", partitioner=" + this.partitioner + ", defaultTopic=" + this.defaultTopic + ", clientId=" + this.clientId + ")";
        }
    }
}

