package org.thingsboard.server.queue.kafka;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.TbQueueProducer;

/* loaded from: input_file:org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.class */
public class TbKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueueProducer<T> {
    private static final Logger log = LoggerFactory.getLogger(TbKafkaProducerTemplate.class);
    private final KafkaProducer<String, byte[]> producer;
    private final String defaultTopic;
    private final TbKafkaSettings settings;
    private final TbQueueAdmin admin;
    private final Set<TopicPartitionInfo> topics;
    private final String clientId;

    /* loaded from: input_file:org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate$TbKafkaProducerTemplateBuilder.class */
    public static class TbKafkaProducerTemplateBuilder<T extends TbQueueMsg> {
        private TbKafkaSettings settings;
        private String defaultTopic;
        private String clientId;
        private TbQueueAdmin admin;

        TbKafkaProducerTemplateBuilder() {
        }

        public TbKafkaProducerTemplateBuilder<T> settings(TbKafkaSettings tbKafkaSettings) {
            this.settings = tbKafkaSettings;
            return this;
        }

        public TbKafkaProducerTemplateBuilder<T> defaultTopic(String str) {
            this.defaultTopic = str;
            return this;
        }

        public TbKafkaProducerTemplateBuilder<T> clientId(String str) {
            this.clientId = str;
            return this;
        }

        public TbKafkaProducerTemplateBuilder<T> admin(TbQueueAdmin tbQueueAdmin) {
            this.admin = tbQueueAdmin;
            return this;
        }

        public TbKafkaProducerTemplate<T> build() {
            return new TbKafkaProducerTemplate<>(this.settings, this.defaultTopic, this.clientId, this.admin);
        }

        public String toString() {
            return "TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder(settings=" + String.valueOf(this.settings) + ", defaultTopic=" + this.defaultTopic + ", clientId=" + this.clientId + ", admin=" + String.valueOf(this.admin) + ")";
        }
    }

    private TbKafkaProducerTemplate(TbKafkaSettings tbKafkaSettings, String str, String str2, TbQueueAdmin tbQueueAdmin) {
        Properties producerProps = tbKafkaSettings.toProducerProps();
        this.clientId = (String) Objects.requireNonNull(str2, "Kafka producer client.id is null");
        if (!StringUtils.isEmpty(str2)) {
            producerProps.put("client.id", str2);
        }
        this.settings = tbKafkaSettings;
        this.producer = new KafkaProducer<>(producerProps);
        this.defaultTopic = str;
        this.admin = tbQueueAdmin;
        this.topics = ConcurrentHashMap.newKeySet();
    }

    public void init() {
    }

    void addAnalyticHeaders(List<Header> list) {
        list.add(new RecordHeader("_producerId", getClientId().getBytes(StandardCharsets.UTF_8)));
        list.add(new RecordHeader("_threadName", Thread.currentThread().getName().getBytes(StandardCharsets.UTF_8)));
        if (log.isTraceEnabled()) {
            try {
                StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
                int min = Math.min(stackTrace.length, 20);
                for (int i = 2; i < min; i++) {
                    list.add(new RecordHeader("_stackTrace" + i, stackTrace[i].toString().getBytes(StandardCharsets.UTF_8)));
                }
            } catch (Throwable th) {
                log.trace("Failed to add stacktrace headers in Kafka producer {}", getClientId(), th);
            }
        }
    }

    public void send(TopicPartitionInfo topicPartitionInfo, T t, TbQueueCallback tbQueueCallback) {
        try {
            createTopicIfNotExist(topicPartitionInfo);
            String uuid = t.getKey().toString();
            byte[] data = t.getData();
            List<Header> list = (List) t.getHeaders().getData().entrySet().stream().map(entry -> {
                return new RecordHeader((String) entry.getKey(), (byte[]) entry.getValue());
            }).collect(Collectors.toList());
            if (log.isDebugEnabled()) {
                addAnalyticHeaders(list);
            }
            this.producer.send(new ProducerRecord(topicPartitionInfo.getFullTopicName(), (Integer) null, uuid, data, list), (recordMetadata, exc) -> {
                if (exc == null) {
                    if (tbQueueCallback != null) {
                        tbQueueCallback.onSuccess(new KafkaTbQueueMsgMetadata(recordMetadata));
                    }
                } else if (tbQueueCallback != null) {
                    tbQueueCallback.onFailure(exc);
                } else {
                    log.warn("Producer template failure: {}", exc.getMessage(), exc);
                }
            });
        } catch (Exception e) {
            if (tbQueueCallback != null) {
                tbQueueCallback.onFailure(e);
            } else {
                log.warn("Producer template failure (send method wrapper): {}", e.getMessage(), e);
            }
            throw e;
        }
    }

    private void createTopicIfNotExist(TopicPartitionInfo topicPartitionInfo) {
        if (this.topics.contains(topicPartitionInfo)) {
            return;
        }
        this.admin.createTopicIfNotExists(topicPartitionInfo.getFullTopicName());
        this.topics.add(topicPartitionInfo);
    }

    public void stop() {
        if (this.producer != null) {
            this.producer.close();
        }
    }

    public static <T extends TbQueueMsg> TbKafkaProducerTemplateBuilder<T> builder() {
        return new TbKafkaProducerTemplateBuilder<>();
    }

    public String getDefaultTopic() {
        return this.defaultTopic;
    }

    public TbKafkaSettings getSettings() {
        return this.settings;
    }

    public String getClientId() {
        return this.clientId;
    }
}
