package org.thingsboard.rule.engine.action;

import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import jakarta.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.debug.TbMsgGeneratorNodeConfiguration;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.data.rule.RuleChainType;
import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.dao.cassandra.CassandraCluster;
import org.thingsboard.server.dao.cassandra.guava.GuavaSession;
import org.thingsboard.server.dao.nosql.CassandraStatementTask;
import org.thingsboard.server.dao.nosql.TbResultSetFuture;

@RuleNode(type = ComponentType.ACTION, name = "save to custom table", configClazz = TbSaveToCustomCassandraTableNodeConfiguration.class, version = 1, nodeDescription = "Node stores data from incoming Message payload to the Cassandra database into the predefined custom table that should have <b>cs_tb_</b> prefix, to avoid the data insertion to the common TB tables.<br><b>Note:</b> rule node can be used only for Cassandra DB.", nodeDetails = "Administrator should set the custom table name without prefix: <b>cs_tb_</b>. <br>Administrator can configure the mapping between the Message field names and Table columns name.<br><b>Note:</b>If the mapping key is <b>$entity_id</b>, that is identified by the Message Originator, then to the appropriate column name(mapping value) will be write the message originator id.<br><br>If specified message field does not exist or is not a JSON Primitive, the outbound message will be routed via <b>failure</b> chain, otherwise, the message will be routed via <b>success</b> chain.", configDirective = "tbActionNodeCustomTableConfig", icon = "file_upload", ruleChainTypes = {RuleChainType.CORE})
/* loaded from: input_file:org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.class */
public class TbSaveToCustomCassandraTableNode implements TbNode {
    private static final Logger log = LoggerFactory.getLogger(TbSaveToCustomCassandraTableNode.class);
    private static final String TABLE_PREFIX = "cs_tb_";
    private static final String ENTITY_ID = "$entityId";
    private TbSaveToCustomCassandraTableNodeConfiguration config;
    private GuavaSession session;
    private CassandraCluster cassandraCluster;
    private ConsistencyLevel defaultWriteLevel;
    private PreparedStatement saveStmt;
    private ExecutorService readResultsProcessingExecutor;
    private Map<String, String> fieldsMap;

    public void init(TbContext tbContext, TbNodeConfiguration tbNodeConfiguration) throws TbNodeException {
        this.config = (TbSaveToCustomCassandraTableNodeConfiguration) TbNodeUtils.convert(tbNodeConfiguration, TbSaveToCustomCassandraTableNodeConfiguration.class);
        this.cassandraCluster = tbContext.getCassandraCluster();
        if (this.cassandraCluster == null) {
            throw new TbNodeException("Unable to connect to Cassandra database", true);
        }
        if (!isTableExists()) {
            throw new TbNodeException("Table 'cs_tb_" + this.config.getTableName() + "' does not exist in Cassandra cluster.");
        }
        startExecutor();
        this.saveStmt = getSaveStmt();
    }

    public void onMsg(TbContext tbContext, TbMsg tbMsg) {
        DonAsynchron.withCallback(save(tbMsg, tbContext), r5 -> {
            tbContext.tellSuccess(tbMsg);
        }, th -> {
            tbContext.tellFailure(tbMsg, th);
        }, tbContext.getDbCallbackExecutor());
    }

    public void destroy() {
        stopExecutor();
        this.saveStmt = null;
    }

    private void startExecutor() {
        this.readResultsProcessingExecutor = Executors.newCachedThreadPool();
    }

    private void stopExecutor() {
        if (this.readResultsProcessingExecutor != null) {
            this.readResultsProcessingExecutor.shutdownNow();
        }
    }

    private boolean isTableExists() {
        return ((Boolean) getSession().getMetadata().getKeyspace(this.cassandraCluster.getKeyspaceName()).map(keyspaceMetadata -> {
            return Boolean.valueOf(keyspaceMetadata.getTable("cs_tb_" + this.config.getTableName()).isPresent());
        }).orElse(false)).booleanValue();
    }

    private PreparedStatement prepare(String str) {
        return getSession().prepare(str);
    }

    private GuavaSession getSession() {
        if (this.session == null) {
            this.session = this.cassandraCluster.getSession();
            this.defaultWriteLevel = this.cassandraCluster.getDefaultWriteConsistencyLevel();
        }
        return this.session;
    }

    private PreparedStatement getSaveStmt() throws TbNodeException {
        this.fieldsMap = this.config.getFieldsMapping();
        if (this.fieldsMap.isEmpty()) {
            throw new TbNodeException("Fields(key,value) map is empty!", true);
        }
        return prepareStatement(new ArrayList(this.fieldsMap.values()));
    }

    private PreparedStatement prepareStatement(List<String> list) {
        return prepare(createQuery(list));
    }

    private String createQuery(List<String> list) {
        int size = list.size();
        StringBuilder sb = new StringBuilder();
        sb.append("INSERT INTO ").append(TABLE_PREFIX).append(this.config.getTableName()).append("(");
        for (String str : list) {
            sb.append(str);
            if (list.get(size - 1).equals(str)) {
                sb.append(")");
            } else {
                sb.append(",");
            }
        }
        sb.append(" VALUES(");
        for (int i = 0; i < size; i++) {
            if (i == size - 1) {
                sb.append("?)");
            } else {
                sb.append("?, ");
            }
        }
        if (this.config.getDefaultTtl() > 0) {
            sb.append(" USING TTL ?");
        }
        return sb.toString();
    }

    private ListenableFuture<Void> save(TbMsg tbMsg, TbContext tbContext) {
        JsonElement parseString = JsonParser.parseString(tbMsg.getData());
        if (!parseString.isJsonObject()) {
            throw new IllegalStateException("Invalid message structure, it is not a JSON Object: " + String.valueOf(parseString));
        }
        JsonObject asJsonObject = parseString.getAsJsonObject();
        BoundStatementBuilder stmtBuilder = getStmtBuilder();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        this.fieldsMap.forEach((str, str2) -> {
            if (str.equals(ENTITY_ID)) {
                stmtBuilder.setUuid(atomicInteger.get(), tbMsg.getOriginator().getId());
            } else {
                if (!asJsonObject.has(str)) {
                    throw new RuntimeException("Message data doesn't contain key: '" + str + "'!");
                }
                JsonElement jsonElement = asJsonObject.get(str);
                if (jsonElement.isJsonPrimitive()) {
                    JsonPrimitive asJsonPrimitive = jsonElement.getAsJsonPrimitive();
                    if (asJsonPrimitive.isNumber()) {
                        if (asJsonPrimitive.getAsString().contains(".")) {
                            stmtBuilder.setDouble(atomicInteger.get(), asJsonPrimitive.getAsDouble());
                        } else {
                            stmtBuilder.setLong(atomicInteger.get(), asJsonPrimitive.getAsLong());
                        }
                    } else if (asJsonPrimitive.isBoolean()) {
                        stmtBuilder.setBoolean(atomicInteger.get(), asJsonPrimitive.getAsBoolean());
                    } else if (asJsonPrimitive.isString()) {
                        stmtBuilder.setString(atomicInteger.get(), asJsonPrimitive.getAsString());
                    } else {
                        stmtBuilder.setToNull(atomicInteger.get());
                    }
                } else {
                    if (!jsonElement.isJsonObject()) {
                        throw new IllegalStateException("Message data key: '" + str + "' with value: '" + String.valueOf(jsonElement) + "' is not a JSON Object or JSON Primitive!");
                    }
                    stmtBuilder.setString(atomicInteger.get(), jsonElement.getAsJsonObject().toString());
                }
            }
            atomicInteger.getAndIncrement();
        });
        if (this.config.getDefaultTtl() > 0) {
            stmtBuilder.setInt(atomicInteger.get(), this.config.getDefaultTtl());
        }
        return getFuture(executeAsyncWrite(tbContext, stmtBuilder.build()), asyncResultSet -> {
            return null;
        });
    }

    BoundStatementBuilder getStmtBuilder() {
        return new BoundStatementBuilder(this.saveStmt.bind(new Object[0]));
    }

    private TbResultSetFuture executeAsyncWrite(TbContext tbContext, Statement statement) {
        return executeAsync(tbContext, statement, this.defaultWriteLevel);
    }

    private TbResultSetFuture executeAsync(TbContext tbContext, Statement statement, ConsistencyLevel consistencyLevel) {
        if (log.isDebugEnabled()) {
            log.debug("Execute cassandra async statement {}", statementToString(statement));
        }
        if (statement.getConsistencyLevel() == null) {
            statement.setConsistencyLevel(consistencyLevel);
        }
        return tbContext.submitCassandraWriteTask(new CassandraStatementTask(tbContext.getTenantId(), getSession(), statement));
    }

    private static String statementToString(Statement statement) {
        return statement instanceof BoundStatement ? ((BoundStatement) statement).getPreparedStatement().getQuery() : statement.toString();
    }

    private <T> ListenableFuture<T> getFuture(TbResultSetFuture tbResultSetFuture, final Function<AsyncResultSet, T> function) {
        return Futures.transform(tbResultSetFuture, new com.google.common.base.Function<AsyncResultSet, T>() { // from class: org.thingsboard.rule.engine.action.TbSaveToCustomCassandraTableNode.1
            @Nullable
            public T apply(@Nullable AsyncResultSet asyncResultSet) {
                return (T) function.apply(asyncResultSet);
            }
        }, this.readResultsProcessingExecutor);
    }

    public TbPair<Boolean, JsonNode> upgrade(int i, JsonNode jsonNode) throws TbNodeException {
        boolean z = false;
        switch (i) {
            case TbMsgGeneratorNodeConfiguration.UNLIMITED_MSG_COUNT /* 0 */:
                if (!jsonNode.has("defaultTtl")) {
                    z = true;
                    ((ObjectNode) jsonNode).put("defaultTtl", 0);
                    break;
                }
                break;
        }
        return new TbPair<>(Boolean.valueOf(z), jsonNode);
    }
}
