/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.rule.engine.action;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.CodecRegistry;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.TypeCodec;
import com.datastax.driver.core.exceptions.CodecNotFoundException;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.rule.engine.action.TbSaveToCustomCassandraTableNodeConfiguration;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.dao.cassandra.CassandraCluster;
import org.thingsboard.server.dao.model.type.AuthorityCodec;
import org.thingsboard.server.dao.model.type.ComponentLifecycleStateCodec;
import org.thingsboard.server.dao.model.type.ComponentScopeCodec;
import org.thingsboard.server.dao.model.type.ComponentTypeCodec;
import org.thingsboard.server.dao.model.type.DeviceCredentialsTypeCodec;
import org.thingsboard.server.dao.model.type.EntityTypeCodec;
import org.thingsboard.server.dao.model.type.JsonCodec;
import org.thingsboard.server.dao.nosql.CassandraStatementTask;

@RuleNode(type=ComponentType.ACTION, name="save to custom table", configClazz=TbSaveToCustomCassandraTableNodeConfiguration.class, nodeDescription="Node stores data from incoming Message payload to the Cassandra database into the predefined custom table that should have <b>cs_tb_</b> prefix, to avoid the data insertion to the common TB tables.<br><b>Note:</b> rule node can be used only for Cassandra DB.", nodeDetails="Administrator should set the custom table name without prefix: <b>cs_tb_</b>. <br>Administrator can configure the mapping between the Message field names and Table columns name.<br><b>Note:</b>If the mapping key is <b>$entity_id</b>, that is identified by the Message Originator, then to the appropriate column name(mapping value) will be write the message originator id.<br><br>If specified message field does not exist or is not a JSON Primitive, the outbound message will be routed via <b>failure</b> chain, otherwise, the message will be routed via <b>success</b> chain.", uiResources={"static/rulenode/rulenode-core-config.js"}, configDirective="tbActionNodeCustomTableConfig", icon="file_upload")
public class TbSaveToCustomCassandraTableNode
implements TbNode {
    private static final Logger log = LoggerFactory.getLogger(TbSaveToCustomCassandraTableNode.class);
    private static final String TABLE_PREFIX = "cs_tb_";
    private static final JsonParser parser = new JsonParser();
    private static final String ENTITY_ID = "$entityId";
    private TbSaveToCustomCassandraTableNodeConfiguration config;
    private Session session;
    private CassandraCluster cassandraCluster;
    private ConsistencyLevel defaultWriteLevel;
    private PreparedStatement saveStmt;
    private ExecutorService readResultsProcessingExecutor;
    private Map<String, String> fieldsMap;

    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
        this.config = (TbSaveToCustomCassandraTableNodeConfiguration)TbNodeUtils.convert((TbNodeConfiguration)configuration, TbSaveToCustomCassandraTableNodeConfiguration.class);
        this.cassandraCluster = ctx.getCassandraCluster();
        if (this.cassandraCluster == null) {
            throw new RuntimeException("Unable to connect to Cassandra database");
        }
        this.startExecutor();
        this.saveStmt = this.getSaveStmt();
    }

    public void onMsg(TbContext ctx, TbMsg msg) {
        DonAsynchron.withCallback(this.save(msg, ctx), aVoid -> ctx.tellSuccess(msg), e -> ctx.tellFailure(msg, e), (Executor)ctx.getDbCallbackExecutor());
    }

    public void destroy() {
        this.stopExecutor();
        this.saveStmt = null;
    }

    private void startExecutor() {
        this.readResultsProcessingExecutor = Executors.newCachedThreadPool();
    }

    private void stopExecutor() {
        if (this.readResultsProcessingExecutor != null) {
            this.readResultsProcessingExecutor.shutdownNow();
        }
    }

    private PreparedStatement prepare(String query) {
        return this.getSession().prepare(query);
    }

    private Session getSession() {
        if (this.session == null) {
            this.session = this.cassandraCluster.getSession();
            this.defaultWriteLevel = this.cassandraCluster.getDefaultWriteConsistencyLevel();
            CodecRegistry registry = this.session.getCluster().getConfiguration().getCodecRegistry();
            this.registerCodecIfNotFound(registry, (TypeCodec<?>)new JsonCodec());
            this.registerCodecIfNotFound(registry, (TypeCodec<?>)new DeviceCredentialsTypeCodec());
            this.registerCodecIfNotFound(registry, (TypeCodec<?>)new AuthorityCodec());
            this.registerCodecIfNotFound(registry, (TypeCodec<?>)new ComponentLifecycleStateCodec());
            this.registerCodecIfNotFound(registry, (TypeCodec<?>)new ComponentTypeCodec());
            this.registerCodecIfNotFound(registry, (TypeCodec<?>)new ComponentScopeCodec());
            this.registerCodecIfNotFound(registry, (TypeCodec<?>)new EntityTypeCodec());
        }
        return this.session;
    }

    private void registerCodecIfNotFound(CodecRegistry registry, TypeCodec<?> codec) {
        try {
            registry.codecFor(codec.getCqlType(), codec.getJavaType());
        }
        catch (CodecNotFoundException e) {
            registry.register(codec);
        }
    }

    private PreparedStatement getSaveStmt() {
        this.fieldsMap = this.config.getFieldsMapping();
        if (this.fieldsMap.isEmpty()) {
            throw new RuntimeException("Fields(key,value) map is empty!");
        }
        return this.prepareStatement(new ArrayList<String>(this.fieldsMap.values()));
    }

    private PreparedStatement prepareStatement(List<String> fieldsList) {
        return this.prepare(this.createQuery(fieldsList));
    }

    private String createQuery(List<String> fieldsList) {
        int size = fieldsList.size();
        StringBuilder query = new StringBuilder();
        query.append("INSERT INTO ").append(TABLE_PREFIX).append(this.config.getTableName()).append("(");
        for (String field : fieldsList) {
            query.append(field);
            if (fieldsList.get(size - 1).equals(field)) {
                query.append(")");
                continue;
            }
            query.append(",");
        }
        query.append(" VALUES(");
        for (int i = 0; i < size; ++i) {
            if (i == size - 1) {
                query.append("?)");
                continue;
            }
            query.append("?, ");
        }
        return query.toString();
    }

    private ListenableFuture<Void> save(TbMsg msg, TbContext ctx) {
        JsonElement data = parser.parse(msg.getData());
        if (!data.isJsonObject()) {
            throw new IllegalStateException("Invalid message structure, it is not a JSON Object:" + data);
        }
        JsonObject dataAsObject = data.getAsJsonObject();
        BoundStatement stmt = this.saveStmt.bind();
        AtomicInteger i = new AtomicInteger(0);
        this.fieldsMap.forEach((key, value) -> {
            if (key.equals(ENTITY_ID)) {
                stmt.setUUID(i.get(), msg.getOriginator().getId());
            } else {
                if (!dataAsObject.has(key)) throw new RuntimeException("Message data doesn't contain key: '" + key + "'!");
                if (!dataAsObject.get(key).isJsonPrimitive()) throw new IllegalStateException("Message data key: '" + key + "' with value: '" + value + "' is not a JSON Primitive!");
                JsonPrimitive primitive = dataAsObject.get(key).getAsJsonPrimitive();
                if (primitive.isNumber()) {
                    stmt.setLong(i.get(), dataAsObject.get(key).getAsLong());
                } else if (primitive.isBoolean()) {
                    stmt.setBool(i.get(), dataAsObject.get(key).getAsBoolean());
                } else if (primitive.isString()) {
                    stmt.setString(i.get(), dataAsObject.get(key).getAsString());
                } else {
                    stmt.setToNull(i.get());
                }
            }
            i.getAndIncrement();
        });
        return this.getFuture(this.executeAsyncWrite(ctx, (Statement)stmt), rs -> null);
    }

    private ResultSetFuture executeAsyncWrite(TbContext ctx, Statement statement) {
        return this.executeAsync(ctx, statement, this.defaultWriteLevel);
    }

    private ResultSetFuture executeAsync(TbContext ctx, Statement statement, ConsistencyLevel level) {
        if (log.isDebugEnabled()) {
            log.debug("Execute cassandra async statement {}", (Object)TbSaveToCustomCassandraTableNode.statementToString(statement));
        }
        if (statement.getConsistencyLevel() == null) {
            statement.setConsistencyLevel(level);
        }
        return ctx.submitCassandraTask(new CassandraStatementTask(ctx.getTenantId(), this.getSession(), statement));
    }

    private static String statementToString(Statement statement) {
        if (statement instanceof BoundStatement) {
            return ((BoundStatement)statement).preparedStatement().getQueryString();
        }
        return statement.toString();
    }

    private <T> ListenableFuture<T> getFuture(ResultSetFuture future, final java.util.function.Function<ResultSet, T> transformer) {
        return Futures.transform((ListenableFuture)future, (Function)new Function<ResultSet, T>(){

            @Nullable
            public T apply(@Nullable ResultSet input) {
                return transformer.apply(input);
            }
        }, (Executor)this.readResultsProcessingExecutor);
    }
}

