/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.queue.kafka;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.kafka.KafkaTbQueueMsgMetadata;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;

public class TbKafkaProducerTemplate<T extends TbQueueMsg>
implements TbQueueProducer<T> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(TbKafkaProducerTemplate.class);
    private final KafkaProducer<String, byte[]> producer;
    private final String defaultTopic;
    private final TbKafkaSettings settings;
    private final TbQueueAdmin admin;
    private final Set<String> topics;
    private final String clientId;

    private TbKafkaProducerTemplate(TbKafkaSettings settings, String defaultTopic, String clientId, TbQueueAdmin admin) {
        Properties props = settings.toProducerProps();
        this.clientId = Objects.requireNonNull(clientId, "Kafka producer client.id is null");
        if (!StringUtils.isEmpty((String)clientId)) {
            props.put("client.id", clientId);
        }
        this.settings = settings;
        this.producer = new KafkaProducer(props);
        this.defaultTopic = defaultTopic;
        this.admin = admin;
        this.topics = ConcurrentHashMap.newKeySet();
    }

    void addAnalyticHeaders(List<Header> headers) {
        headers.add((Header)new RecordHeader("_producerId", this.getClientId().getBytes(StandardCharsets.UTF_8)));
        headers.add((Header)new RecordHeader("_threadName", Thread.currentThread().getName().getBytes(StandardCharsets.UTF_8)));
        if (log.isTraceEnabled()) {
            try {
                StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
                int maxLevel = Math.min(stackTrace.length, 20);
                for (int i = 2; i < maxLevel; ++i) {
                    headers.add((Header)new RecordHeader("_stackTrace" + i, stackTrace[i].toString().getBytes(StandardCharsets.UTF_8)));
                }
            }
            catch (Throwable t) {
                log.trace("Failed to add stacktrace headers in Kafka producer {}", (Object)this.getClientId(), (Object)t);
            }
        }
    }

    public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) {
        this.send(tpi, msg.getKey().toString(), msg, callback);
    }

    public void send(TopicPartitionInfo tpi, String key, T msg, TbQueueCallback callback) {
        try {
            String topic = tpi.getFullTopicName();
            this.createTopicIfNotExist(topic);
            byte[] data = msg.getData();
            List<Header> headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader((String)e.getKey(), (byte[])e.getValue())).collect(Collectors.toList());
            if (log.isDebugEnabled()) {
                this.addAnalyticHeaders(headers);
            }
            Integer partition = tpi.isUseInternalPartition() ? (Integer)tpi.getPartition().orElse(null) : null;
            ProducerRecord record = new ProducerRecord(topic, partition, (Object)key, (Object)data, headers);
            this.producer.send(record, (metadata, exception) -> {
                if (exception == null) {
                    if (callback != null) {
                        callback.onSuccess((TbQueueMsgMetadata)new KafkaTbQueueMsgMetadata(metadata));
                    }
                } else if (callback != null) {
                    callback.onFailure((Throwable)exception);
                } else {
                    log.warn("Producer template failure", (Throwable)exception);
                }
            });
        }
        catch (Exception e2) {
            if (callback != null) {
                callback.onFailure((Throwable)e2);
            } else {
                log.warn("Producer template failure (send method wrapper): {}", (Object)e2.getMessage(), (Object)e2);
            }
            throw e2;
        }
    }

    private void createTopicIfNotExist(String topic) {
        if (this.topics.contains(topic)) {
            return;
        }
        this.admin.createTopicIfNotExists(topic);
        this.topics.add(topic);
    }

    public void stop() {
        if (this.producer != null) {
            this.producer.close();
        }
    }

    @Generated
    public static <T extends TbQueueMsg> TbKafkaProducerTemplateBuilder<T> builder() {
        return new TbKafkaProducerTemplateBuilder();
    }

    @Generated
    public String getDefaultTopic() {
        return this.defaultTopic;
    }

    @Generated
    public TbKafkaSettings getSettings() {
        return this.settings;
    }

    @Generated
    public String getClientId() {
        return this.clientId;
    }

    @Generated
    public static class TbKafkaProducerTemplateBuilder<T extends TbQueueMsg> {
        @Generated
        private TbKafkaSettings settings;
        @Generated
        private String defaultTopic;
        @Generated
        private String clientId;
        @Generated
        private TbQueueAdmin admin;

        @Generated
        TbKafkaProducerTemplateBuilder() {
        }

        @Generated
        public TbKafkaProducerTemplateBuilder<T> settings(TbKafkaSettings settings) {
            this.settings = settings;
            return this;
        }

        @Generated
        public TbKafkaProducerTemplateBuilder<T> defaultTopic(String defaultTopic) {
            this.defaultTopic = defaultTopic;
            return this;
        }

        @Generated
        public TbKafkaProducerTemplateBuilder<T> clientId(String clientId) {
            this.clientId = clientId;
            return this;
        }

        @Generated
        public TbKafkaProducerTemplateBuilder<T> admin(TbQueueAdmin admin) {
            this.admin = admin;
            return this;
        }

        @Generated
        public TbKafkaProducerTemplate<T> build() {
            return new TbKafkaProducerTemplate(this.settings, this.defaultTopic, this.clientId, this.admin);
        }

        @Generated
        public String toString() {
            return "TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder(settings=" + String.valueOf(this.settings) + ", defaultTopic=" + this.defaultTopic + ", clientId=" + this.clientId + ", admin=" + String.valueOf(this.admin) + ")";
        }
    }
}

