package org.thingsboard.rule.engine.kafka;

import java.lang.reflect.Field;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.ReflectionUtils;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
import org.thingsboard.server.common.data.exception.ThingsboardKafkaClientError;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;

@RuleNode(type = ComponentType.EXTERNAL, name = "kafka", configClazz = TbKafkaNodeConfiguration.class, nodeDescription = "Publish messages to Kafka server", nodeDetails = "Will send record via Kafka producer to Kafka server. Outbound message will contain response fields (<code>offset</code>, <code>partition</code>, <code>topic</code>) from the Kafka in the Message Metadata. For example <b>partition</b> field can be accessed with <code>metadata.partition</code>.", uiResources = {"static/rulenode/rulenode-core-config.js"}, configDirective = "tbExternalNodeKafkaConfig", iconUrl = "data:image/svg+xml;base64,PHN2ZyB3aWR0aD0iMTUzOCIgaGVpZ2h0PSIyNTAwIiB2aWV3Qm94PSIwIDAgMjU2IDQxNiIgeG1sbnM9Imh0dHA6Ly93d3cudzMub3JnLzIwMDAvc3ZnIiBwcmVzZXJ2ZUFzcGVjdFJhdGlvPSJ4TWlkWU1pZCI+PHBhdGggZD0iTTIwMS44MTYgMjMwLjIxNmMtMTYuMTg2IDAtMzAuNjk3IDcuMTcxLTQwLjYzNCAxOC40NjFsLTI1LjQ2My0xOC4wMjZjMi43MDMtNy40NDIgNC4yNTUtMTUuNDMzIDQuMjU1LTIzLjc5NyAwLTguMjE5LTEuNDk4LTE2LjA3Ni00LjExMi0yMy40MDhsMjUuNDA2LTE3LjgzNWM5LjkzNiAxMS4yMzMgMjQuNDA5IDE4LjM2NSA0MC41NDggMTguMzY1IDI5Ljg3NSAwIDU0LjE4NC0yNC4zMDUgNTQuMTg0LTU0LjE4NCAwLTI5Ljg3OS0yNC4zMDktNTQuMTg0LTU0LjE4NC01NC4xODQtMjkuODc1IDAtNTQuMTg0IDI0LjMwNS01NC4xODQgNTQuMTg0IDAgNS4zNDguODA4IDEwLjUwNSAyLjI1OCAxNS4zODlsLTI1LjQyMyAxNy44NDRjLTEwLjYyLTEzLjE3NS0yNS45MTEtMjIuMzc0LTQzLjMzMy0yNS4xODJ2LTMwLjY0YzI0LjU0NC01LjE1NSA0My4wMzctMjYuOTYyIDQzLjAzNy01My4wMTlDMTI0LjE3MSAyNC4zMDUgOTkuODYyIDAgNjkuOTg3IDAgNDAuMTEyIDAgMTUuODAzIDI0LjMwNSAxNS44MDMgNTQuMTg0YzAgMjUuNzA4IDE4LjAxNCA0Ny4yNDYgNDIuMDY3IDUyLjc2OXYzMS4wMzhDMjUuMDQ0IDE0My43NTMgMCAxNzIuNDAxIDAgMjA2Ljg1NGMwIDM0LjYyMSAyNS4yOTIgNjMuMzc0IDU4LjM1NSA2OC45NHYzMi43NzRjLTI0LjI5OSA1LjM0MS00Mi41NTIgMjcuMDExLTQyLjU1MiA1Mi44OTQgMCAyOS44NzkgMjQuMzA5IDU0LjE4NCA1NC4xODQgNTQuMTg0IDI5Ljg3NSAwIDU0LjE4NC0yNC4zMDUgNTQuMTg0LTU0LjE4NCAwLTI1Ljg4My0xOC4yNTMtNDcuNTUzLTQyLjU1Mi01Mi44OTR2LTMyLjc3NWE2OS45NjUgNjkuOTY1IDAgMCAwIDQyLjYtMjQuNzc2bDI1LjYzMyAxOC4xNDNjLTEuNDIzIDQuODQtMi4yMiA5Ljk0Ni0yLjIyIDE1LjI0IDAgMjkuODc5IDI0LjMwOSA1NC4xODQgNTQuMTg0IDU0LjE4NCAyOS44NzUgMCA1NC4xODQtMjQuMzA1IDU0LjE4NC01NC4xODQgMC0yOS44NzktMjQuMzA5LTU0LjE4NC01NC4xODQtNTQuMTg0em0wLTEyNi42OTVjMTQuNDg3IDAgMjYuMjcgMTEuNzg4IDI2LjI3IDI2LjI3MXMtMTEuNzgzIDI2LjI3LTI2LjI3IDI2LjI3LTI2LjI3LTExLjc4Ny0yNi4yNy0yNi4yN2MwLTE0LjQ4MyAxMS43ODMtMjYuMjcxIDI2LjI3LTI2LjI3MXptLTE1OC4xLTQ5LjMzN2MwLTE0LjQ4MyAxMS43ODQtMjYuMjcgMjYuMjcxLTI2LjI3czI2LjI3IDExLjc4NyAyNi4yNyAyNi4yN2MwIDE0LjQ4My0xMS43ODMgMjYuMjctMjYuMjcgMjYuMjdzLTI2LjI3MS0xMS43ODctMjYuMjcxLTI2LjI3em01Mi41NDEgMzA3LjI3OGMwIDE0LjQ4My0xMS43ODMgMjYuMjctMjYuMjcgMjYuMjdzLTI2LjI3MS0xMS43ODctMjYuMjcxLTI2LjI3YzAtMTQuNDgzIDExLjc4NC0yNi4yNyAyNi4yNzEtMjYuMjdzMjYuMjcgMTEuNzg3IDI2LjI3IDI2LjI3em0tMjYuMjcyLTExNy45N2MtMjAuMjA1IDAtMzYuNjQyLTE2LjQzNC0zNi42NDItMzYuNjM4IDAtMjAuMjA1IDE2LjQzNy0zNi42NDIgMzYuNjQyLTM2LjY0MiAyMC4yMDQgMCAzNi42NDEgMTYuNDM3IDM2LjY0MSAzNi42NDIgMCAyMC4yMDQtMTYuNDM3IDM2LjYzOC0zNi42NDEgMzYuNjM4em0xMzEuODMxIDY3LjE3OWMtMTQuNDg3IDAtMjYuMjctMTEuNzg4LTI2LjI3LTI2LjI3MXMxMS43ODMtMjYuMjcgMjYuMjctMjYuMjcgMjYuMjcgMTEuNzg3IDI2LjI3IDI2LjI3YzAgMTQuNDgzLTExLjc4MyAyNi4yNzEtMjYuMjcgMjYuMjcxeiIvPjwvc3ZnPg==")
/* loaded from: input_file:org/thingsboard/rule/engine/kafka/TbKafkaNode.class */
public class TbKafkaNode extends TbAbstractExternalNode {
    private static final String OFFSET = "offset";
    private static final String PARTITION = "partition";
    private static final String TOPIC = "topic";
    private static final String ERROR = "error";
    public static final String TB_MSG_MD_PREFIX = "tb_msg_md_";
    private TbKafkaNodeConfiguration config;
    private boolean addMetadataKeyValuesAsKafkaHeaders;
    private Charset toBytesCharset;
    private Producer<String, String> producer;
    private Throwable initError;
    private static final Logger log = LoggerFactory.getLogger(TbKafkaNode.class);
    private static final Field IO_THREAD_FIELD = ReflectionUtils.findField(KafkaProducer.class, "ioThread");

    public void init(TbContext tbContext, TbNodeConfiguration tbNodeConfiguration) throws TbNodeException {
        super.init(tbContext);
        this.config = (TbKafkaNodeConfiguration) TbNodeUtils.convert(tbNodeConfiguration, TbKafkaNodeConfiguration.class);
        this.initError = null;
        Properties properties = new Properties();
        properties.put("client.id", "producer-tb-kafka-node-" + tbContext.getSelfId().getId().toString() + "-" + tbContext.getServiceId());
        properties.put("bootstrap.servers", this.config.getBootstrapServers());
        properties.put("value.serializer", this.config.getValueSerializer());
        properties.put("key.serializer", this.config.getKeySerializer());
        properties.put("acks", this.config.getAcks());
        properties.put("retries", Integer.valueOf(this.config.getRetries()));
        properties.put("batch.size", Integer.valueOf(this.config.getBatchSize()));
        properties.put("linger.ms", Integer.valueOf(this.config.getLinger()));
        properties.put("buffer.memory", Integer.valueOf(this.config.getBufferMemory()));
        if (this.config.getOtherProperties() != null) {
            this.config.getOtherProperties().forEach((str, str2) -> {
                if ("ssl.keystore.certificate.chain".equals(str) || "ssl.keystore.key".equals(str) || "ssl.truststore.certificates".equals(str)) {
                    str2 = str2.replace("\\n", "\n");
                }
                properties.put(str, str2);
            });
        }
        this.addMetadataKeyValuesAsKafkaHeaders = BooleanUtils.toBooleanDefaultIfNull(Boolean.valueOf(this.config.isAddMetadataKeyValuesAsKafkaHeaders()), false);
        this.toBytesCharset = this.config.getKafkaHeadersCharset() != null ? Charset.forName(this.config.getKafkaHeadersCharset()) : StandardCharsets.UTF_8;
        try {
            this.producer = getKafkaProducer(properties);
            ((Thread) ReflectionUtils.getField(IO_THREAD_FIELD, this.producer)).setUncaughtExceptionHandler((thread, th) -> {
                if (th instanceof ThingsboardKafkaClientError) {
                    this.initError = th;
                    destroy();
                }
            });
        } catch (Exception e) {
            throw new TbNodeException(e);
        }
    }

    KafkaProducer<String, String> getKafkaProducer(Properties properties) {
        return new KafkaProducer<>(properties);
    }

    public void onMsg(TbContext tbContext, TbMsg tbMsg) {
        String processPattern = TbNodeUtils.processPattern(this.config.getTopicPattern(), tbMsg);
        String keyPattern = this.config.getKeyPattern();
        TbMsg ackIfNeeded = ackIfNeeded(tbContext, tbMsg);
        try {
            if (this.initError != null) {
                tbContext.tellFailure(ackIfNeeded, new RuntimeException("Failed to initialize Kafka rule node producer: " + this.initError.getMessage()));
            } else {
                tbContext.getExternalCallExecutor().executeAsync(() -> {
                    publish(tbContext, ackIfNeeded, processPattern, (keyPattern == null || keyPattern.isEmpty()) ? null : TbNodeUtils.processPattern(this.config.getKeyPattern(), ackIfNeeded));
                    return null;
                });
            }
        } catch (Exception e) {
            tbContext.tellFailure(ackIfNeeded, e);
        }
    }

    protected void publish(TbContext tbContext, TbMsg tbMsg, String str, String str2) {
        try {
            if (this.addMetadataKeyValuesAsKafkaHeaders) {
                RecordHeaders recordHeaders = new RecordHeaders();
                tbMsg.getMetaData().values().forEach((str3, str4) -> {
                    recordHeaders.add(new RecordHeader("tb_msg_md_" + str3, str4.getBytes(this.toBytesCharset)));
                });
                this.producer.send(new ProducerRecord(str, (Integer) null, (Long) null, str2, tbMsg.getData(), recordHeaders), (recordMetadata, exc) -> {
                    processRecord(tbContext, tbMsg, recordMetadata, exc);
                });
            } else {
                this.producer.send(new ProducerRecord(str, str2, tbMsg.getData()), (recordMetadata2, exc2) -> {
                    processRecord(tbContext, tbMsg, recordMetadata2, exc2);
                });
            }
        } catch (Exception e) {
            log.debug("[{}] Failed to process message: {}", new Object[]{tbContext.getSelfId(), tbMsg, e});
        }
    }

    public void destroy() {
        if (this.producer != null) {
            try {
                this.producer.close();
            } catch (Exception e) {
                log.error("Failed to close producer during destroy()", e);
            }
        }
    }

    private void processRecord(TbContext tbContext, TbMsg tbMsg, RecordMetadata recordMetadata, Exception exc) {
        if (exc == null) {
            tellSuccess(tbContext, processResponse(tbMsg, recordMetadata));
        } else {
            tellFailure(tbContext, processException(tbMsg, exc), exc);
        }
    }

    private TbMsg processResponse(TbMsg tbMsg, RecordMetadata recordMetadata) {
        TbMsgMetaData copy = tbMsg.getMetaData().copy();
        copy.putValue(OFFSET, String.valueOf(recordMetadata.offset()));
        copy.putValue(PARTITION, String.valueOf(recordMetadata.partition()));
        copy.putValue(TOPIC, recordMetadata.topic());
        return TbMsg.transformMsgMetadata(tbMsg, copy);
    }

    private TbMsg processException(TbMsg tbMsg, Exception exc) {
        TbMsgMetaData copy = tbMsg.getMetaData().copy();
        copy.putValue(ERROR, String.valueOf(exc.getClass()) + ": " + exc.getMessage());
        return TbMsg.transformMsgMetadata(tbMsg, copy);
    }

    static {
        IO_THREAD_FIELD.setAccessible(true);
    }
}
