package org.thingsboard.rule.engine.debug;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.common.util.TbStopWatch;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.ScriptEngine;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.data.script.ScriptLanguage;
import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;

@RuleNode(type = ComponentType.ACTION, name = "generator", configClazz = TbMsgGeneratorNodeConfiguration.class, version = 1, hasQueueName = true, nodeDescription = "Periodically generates messages", nodeDetails = "Generates messages with configurable period. Javascript function used for message generation.", inEnabled = false, uiResources = {"static/rulenode/rulenode-core-config.js"}, configDirective = "tbActionNodeGeneratorConfig", icon = "repeat")
/* loaded from: input_file:org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.class */
public class TbMsgGeneratorNode implements TbNode {
    private static final Logger log = LoggerFactory.getLogger(TbMsgGeneratorNode.class);
    private TbMsgGeneratorNodeConfiguration config;
    private ScriptEngine scriptEngine;
    private long delay;
    private long lastScheduledTs;
    private int currentMsgCount;
    private EntityId originatorId;
    private UUID nextTickId;
    private TbMsg prevMsg;
    private final AtomicBoolean initialized = new AtomicBoolean(false);
    private String queueName;

    public void init(TbContext tbContext, TbNodeConfiguration tbNodeConfiguration) throws TbNodeException {
        this.config = (TbMsgGeneratorNodeConfiguration) TbNodeUtils.convert(tbNodeConfiguration, TbMsgGeneratorNodeConfiguration.class);
        this.delay = TimeUnit.SECONDS.toMillis(this.config.getPeriodInSeconds());
        this.currentMsgCount = 0;
        this.queueName = tbContext.getQueueName();
        if (StringUtils.isEmpty(this.config.getOriginatorId())) {
            this.originatorId = tbContext.getSelfId();
        } else {
            this.originatorId = EntityIdFactory.getByTypeAndUuid(this.config.getOriginatorType(), this.config.getOriginatorId());
            tbContext.checkTenantEntity(this.originatorId);
        }
        log.debug("[{}] Initializing generator with config {}", this.originatorId, tbNodeConfiguration);
        updateGeneratorState(tbContext);
    }

    public void onPartitionChangeMsg(TbContext tbContext, PartitionChangeMsg partitionChangeMsg) {
        log.debug("[{}] Handling partition change msg: {}", this.originatorId, partitionChangeMsg);
        updateGeneratorState(tbContext);
    }

    private void updateGeneratorState(TbContext tbContext) {
        log.trace("[{}] Updating generator state, config {}", this.originatorId, this.config);
        if (!tbContext.isLocalEntity(this.originatorId)) {
            if (this.initialized.compareAndSet(true, false)) {
                destroy();
            }
        } else if (this.initialized.compareAndSet(false, true)) {
            this.scriptEngine = tbContext.createScriptEngine(this.config.getScriptLang(), ScriptLanguage.TBEL.equals(this.config.getScriptLang()) ? this.config.getTbelScript() : this.config.getJsScript(), new String[]{"prevMsg", "prevMetadata", "prevMsgType"});
            scheduleTickMsg(tbContext, null);
        }
    }

    public void onMsg(TbContext tbContext, TbMsg tbMsg) {
        log.trace("[{}] onMsg. Expected msg id: {}, msg: {}, config: {}", new Object[]{this.originatorId, this.nextTickId, tbMsg, this.config});
        if (this.initialized.get() && tbMsg.isTypeOf(TbMsgType.GENERATOR_NODE_SELF_MSG) && tbMsg.getId().equals(this.nextTickId)) {
            TbStopWatch create = TbStopWatch.create();
            DonAsynchron.withCallback(generate(tbContext, tbMsg), tbMsg2 -> {
                log.trace("onMsg onSuccess callback, took {}ms, config {}, msg {}", new Object[]{Long.valueOf(create.stopAndGetTotalTimeMillis()), this.config, tbMsg});
                if (this.initialized.get()) {
                    if (this.config.getMsgCount() == 0 || this.currentMsgCount < this.config.getMsgCount()) {
                        tbContext.enqueueForTellNext(tbMsg2, "Success");
                        scheduleTickMsg(tbContext, tbMsg);
                        this.currentMsgCount++;
                    }
                }
            }, th -> {
                log.trace("onMsg onFailure callback, took {}ms, config {}, msg {}", new Object[]{Long.valueOf(create.stopAndGetTotalTimeMillis()), this.config, tbMsg, th});
                if (this.initialized.get()) {
                    if (this.config.getMsgCount() == 0 || this.currentMsgCount < this.config.getMsgCount()) {
                        tbContext.tellFailure(tbMsg, th);
                        scheduleTickMsg(tbContext, tbMsg);
                        this.currentMsgCount++;
                    }
                }
            });
        }
    }

    private void scheduleTickMsg(TbContext tbContext, TbMsg tbMsg) {
        long currentTimeMillis = System.currentTimeMillis();
        if (this.lastScheduledTs == 0) {
            this.lastScheduledTs = currentTimeMillis;
        }
        this.lastScheduledTs += this.delay;
        long max = Math.max(0L, this.lastScheduledTs - currentTimeMillis);
        TbMsg newMsg = tbContext.newMsg(this.queueName, TbMsgType.GENERATOR_NODE_SELF_MSG, tbContext.getSelfId(), getCustomerIdFromMsg(tbMsg), TbMsgMetaData.EMPTY, "");
        this.nextTickId = newMsg.getId();
        tbContext.tellSelf(newMsg, max);
        log.trace("[{}] Scheduled tick msg with delay {}, msg: {}, config: {}", new Object[]{this.originatorId, Long.valueOf(max), newMsg, this.config});
    }

    private ListenableFuture<TbMsg> generate(TbContext tbContext, TbMsg tbMsg) {
        log.trace("generate, config {}", this.config);
        if (this.prevMsg == null) {
            this.prevMsg = tbContext.newMsg(this.queueName, "", this.originatorId, tbMsg.getCustomerId(), TbMsgMetaData.EMPTY, "{}");
        }
        if (!this.initialized.get()) {
            return Futures.immediateFuture(this.prevMsg);
        }
        tbContext.logJsEvalRequest();
        return Futures.transformAsync(this.scriptEngine.executeGenerateAsync(this.prevMsg), tbMsg2 -> {
            log.trace("generate process response, generated {}, config {}", tbMsg2, this.config);
            tbContext.logJsEvalResponse();
            this.prevMsg = tbContext.newMsg(this.queueName, tbMsg2.getType(), this.originatorId, tbMsg.getCustomerId(), tbMsg2.getMetaData(), tbMsg2.getData());
            return Futures.immediateFuture(this.prevMsg);
        }, MoreExecutors.directExecutor());
    }

    private CustomerId getCustomerIdFromMsg(TbMsg tbMsg) {
        if (tbMsg != null) {
            return tbMsg.getCustomerId();
        }
        return null;
    }

    public void destroy() {
        log.debug("[{}] Stopping generator", this.originatorId);
        this.initialized.set(false);
        this.prevMsg = null;
        this.nextTickId = null;
        this.lastScheduledTs = 0L;
        if (this.scriptEngine != null) {
            this.scriptEngine.destroy();
            this.scriptEngine = null;
        }
    }

    public TbPair<Boolean, JsonNode> upgrade(int i, JsonNode jsonNode) throws TbNodeException {
        boolean z = false;
        switch (i) {
            case TbMsgGeneratorNodeConfiguration.UNLIMITED_MSG_COUNT /* 0 */:
                if (jsonNode.has("queueName")) {
                    z = true;
                    ((ObjectNode) jsonNode).remove("queueName");
                    break;
                }
                break;
        }
        return new TbPair<>(Boolean.valueOf(z), jsonNode);
    }
}
