package org.thingsboard.rule.engine.action;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.CodecRegistry;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.TypeCodec;
import com.datastax.driver.core.exceptions.CodecNotFoundException;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.TbRelationTypes;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.dao.cassandra.CassandraCluster;
import org.thingsboard.server.dao.model.type.AuthorityCodec;
import org.thingsboard.server.dao.model.type.ComponentLifecycleStateCodec;
import org.thingsboard.server.dao.model.type.ComponentScopeCodec;
import org.thingsboard.server.dao.model.type.ComponentTypeCodec;
import org.thingsboard.server.dao.model.type.DeviceCredentialsTypeCodec;
import org.thingsboard.server.dao.model.type.EntityTypeCodec;
import org.thingsboard.server.dao.model.type.JsonCodec;
import org.thingsboard.server.dao.nosql.CassandraStatementTask;

@RuleNode(type = ComponentType.ACTION, name = "save to custom table", configClazz = TbSaveToCustomCassandraTableNodeConfiguration.class, nodeDescription = "Node stores data from incoming Message payload to the Cassandra database into the predefined custom table that should have <b>cs_tb_</b> prefix, to avoid the data insertion to the common TB tables.<br><b>Note:</b> rule node can be used only for Cassandra DB.", nodeDetails = "Administrator should set the custom table name without prefix: <b>cs_tb_</b>. <br>Administrator can configure the mapping between the Message field names and Table columns name.<br><b>Note:</b>If the mapping key is <b>$entity_id</b>, that is identified by the Message Originator, then to the appropriate column name(mapping value) will be write the message originator id.<br><br>If specified message field does not exist or is not a JSON Primitive, the outbound message will be routed via <b>failure</b> chain, otherwise, the message will be routed via <b>success</b> chain.", uiResources = {"static/rulenode/rulenode-core-config.js"}, configDirective = "tbActionNodeCustomTableConfig", icon = "file_upload")
/* loaded from: input_file:org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.class */
public class TbSaveToCustomCassandraTableNode implements TbNode {
    private static final String TABLE_PREFIX = "cs_tb_";
    private static final String ENTITY_ID = "$entityId";
    private TbSaveToCustomCassandraTableNodeConfiguration config;
    private Session session;
    private CassandraCluster cassandraCluster;
    private ConsistencyLevel defaultWriteLevel;
    private PreparedStatement saveStmt;
    private ExecutorService readResultsProcessingExecutor;
    private Map<String, String> fieldsMap;
    private static final Logger log = LoggerFactory.getLogger(TbSaveToCustomCassandraTableNode.class);
    private static final JsonParser parser = new JsonParser();

    public void init(TbContext tbContext, TbNodeConfiguration tbNodeConfiguration) throws TbNodeException {
        this.config = (TbSaveToCustomCassandraTableNodeConfiguration) TbNodeUtils.convert(tbNodeConfiguration, TbSaveToCustomCassandraTableNodeConfiguration.class);
        this.cassandraCluster = tbContext.getCassandraCluster();
        if (this.cassandraCluster == null) {
            throw new RuntimeException("Unable to connect to Cassandra database");
        }
        startExecutor();
        this.saveStmt = getSaveStmt();
    }

    public void onMsg(TbContext tbContext, TbMsg tbMsg) throws ExecutionException, InterruptedException, TbNodeException {
        DonAsynchron.withCallback(save(tbMsg, tbContext), r6 -> {
            tbContext.tellNext(tbMsg, TbRelationTypes.SUCCESS);
        }, th -> {
            tbContext.tellFailure(tbMsg, th);
        }, tbContext.getDbCallbackExecutor());
    }

    public void destroy() {
        stopExecutor();
        this.saveStmt = null;
    }

    private void startExecutor() {
        this.readResultsProcessingExecutor = Executors.newCachedThreadPool();
    }

    private void stopExecutor() {
        if (this.readResultsProcessingExecutor != null) {
            this.readResultsProcessingExecutor.shutdownNow();
        }
    }

    private PreparedStatement prepare(String str) {
        return getSession().prepare(str);
    }

    private Session getSession() {
        if (this.session == null) {
            this.session = this.cassandraCluster.getSession();
            this.defaultWriteLevel = this.cassandraCluster.getDefaultWriteConsistencyLevel();
            CodecRegistry codecRegistry = this.session.getCluster().getConfiguration().getCodecRegistry();
            registerCodecIfNotFound(codecRegistry, new JsonCodec());
            registerCodecIfNotFound(codecRegistry, new DeviceCredentialsTypeCodec());
            registerCodecIfNotFound(codecRegistry, new AuthorityCodec());
            registerCodecIfNotFound(codecRegistry, new ComponentLifecycleStateCodec());
            registerCodecIfNotFound(codecRegistry, new ComponentTypeCodec());
            registerCodecIfNotFound(codecRegistry, new ComponentScopeCodec());
            registerCodecIfNotFound(codecRegistry, new EntityTypeCodec());
        }
        return this.session;
    }

    private void registerCodecIfNotFound(CodecRegistry codecRegistry, TypeCodec<?> typeCodec) {
        try {
            codecRegistry.codecFor(typeCodec.getCqlType(), typeCodec.getJavaType());
        } catch (CodecNotFoundException e) {
            codecRegistry.register(typeCodec);
        }
    }

    private PreparedStatement getSaveStmt() {
        this.fieldsMap = this.config.getFieldsMapping();
        if (this.fieldsMap.isEmpty()) {
            throw new RuntimeException("Fields(key,value) map is empty!");
        }
        return prepareStatement(new ArrayList(this.fieldsMap.values()));
    }

    private PreparedStatement prepareStatement(List<String> list) {
        return prepare(createQuery(list));
    }

    private String createQuery(List<String> list) {
        int size = list.size();
        StringBuilder sb = new StringBuilder();
        sb.append("INSERT INTO ").append(TABLE_PREFIX).append(this.config.getTableName()).append("(");
        for (String str : list) {
            sb.append(str);
            if (list.get(size - 1).equals(str)) {
                sb.append(")");
            } else {
                sb.append(",");
            }
        }
        sb.append(" VALUES(");
        for (int i = 0; i < size; i++) {
            if (i == size - 1) {
                sb.append("?)");
            } else {
                sb.append("?, ");
            }
        }
        return sb.toString();
    }

    private ListenableFuture<Void> save(TbMsg tbMsg, TbContext tbContext) {
        JsonElement parse = parser.parse(tbMsg.getData());
        if (!parse.isJsonObject()) {
            throw new IllegalStateException("Invalid message structure, it is not a JSON Object:" + parse);
        }
        JsonObject asJsonObject = parse.getAsJsonObject();
        BoundStatement bind = this.saveStmt.bind();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        this.fieldsMap.forEach((str, str2) -> {
            if (str.equals(ENTITY_ID)) {
                bind.setUUID(atomicInteger.get(), tbMsg.getOriginator().getId());
            } else {
                if (!asJsonObject.has(str)) {
                    throw new RuntimeException("Message data doesn't contain key: '" + str + "'!");
                }
                if (!asJsonObject.get(str).isJsonPrimitive()) {
                    throw new IllegalStateException("Message data key: '" + str + "' with value: '" + str2 + "' is not a JSON Primitive!");
                }
                JsonPrimitive asJsonPrimitive = asJsonObject.get(str).getAsJsonPrimitive();
                if (asJsonPrimitive.isNumber()) {
                    bind.setLong(atomicInteger.get(), asJsonObject.get(str).getAsLong());
                } else if (asJsonPrimitive.isBoolean()) {
                    bind.setBool(atomicInteger.get(), asJsonObject.get(str).getAsBoolean());
                } else if (asJsonPrimitive.isString()) {
                    bind.setString(atomicInteger.get(), asJsonObject.get(str).getAsString());
                } else {
                    bind.setToNull(atomicInteger.get());
                }
            }
            atomicInteger.getAndIncrement();
        });
        return getFuture(executeAsyncWrite(tbContext, bind), resultSet -> {
            return null;
        });
    }

    private ResultSetFuture executeAsyncWrite(TbContext tbContext, Statement statement) {
        return executeAsync(tbContext, statement, this.defaultWriteLevel);
    }

    private ResultSetFuture executeAsync(TbContext tbContext, Statement statement, ConsistencyLevel consistencyLevel) {
        if (log.isDebugEnabled()) {
            log.debug("Execute cassandra async statement {}", statementToString(statement));
        }
        if (statement.getConsistencyLevel() == null) {
            statement.setConsistencyLevel(consistencyLevel);
        }
        return tbContext.submitCassandraTask(new CassandraStatementTask(tbContext.getTenantId(), getSession(), statement));
    }

    private static String statementToString(Statement statement) {
        return statement instanceof BoundStatement ? ((BoundStatement) statement).preparedStatement().getQueryString() : statement.toString();
    }

    private <T> ListenableFuture<T> getFuture(ResultSetFuture resultSetFuture, final Function<ResultSet, T> function) {
        return Futures.transform(resultSetFuture, new com.google.common.base.Function<ResultSet, T>() { // from class: org.thingsboard.rule.engine.action.TbSaveToCustomCassandraTableNode.1
            @Nullable
            public T apply(@Nullable ResultSet resultSet) {
                return (T) function.apply(resultSet);
            }
        }, this.readResultsProcessingExecutor);
    }
}
