/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.rule.engine.debug;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.common.util.TbStopWatch;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.ScriptEngine;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.debug.TbMsgGeneratorNodeConfiguration;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.data.script.ScriptLanguage;
import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;

@RuleNode(type=ComponentType.ACTION, name="generator", configClazz=TbMsgGeneratorNodeConfiguration.class, version=1, hasQueueName=true, nodeDescription="Periodically generates messages", nodeDetails="Generates messages with configurable period. Javascript function used for message generation.", inEnabled=false, uiResources={"static/rulenode/rulenode-core-config.js"}, configDirective="tbActionNodeGeneratorConfig", icon="repeat")
public class TbMsgGeneratorNode
implements TbNode {
    private static final Logger log = LoggerFactory.getLogger(TbMsgGeneratorNode.class);
    private TbMsgGeneratorNodeConfiguration config;
    private ScriptEngine scriptEngine;
    private long delay;
    private long lastScheduledTs;
    private int currentMsgCount;
    private EntityId originatorId;
    private UUID nextTickId;
    private TbMsg prevMsg;
    private final AtomicBoolean initialized = new AtomicBoolean(false);
    private String queueName;

    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
        this.config = (TbMsgGeneratorNodeConfiguration)TbNodeUtils.convert((TbNodeConfiguration)configuration, TbMsgGeneratorNodeConfiguration.class);
        this.delay = TimeUnit.SECONDS.toMillis(this.config.getPeriodInSeconds());
        this.currentMsgCount = 0;
        this.queueName = ctx.getQueueName();
        if (!StringUtils.isEmpty((String)this.config.getOriginatorId())) {
            this.originatorId = EntityIdFactory.getByTypeAndUuid((EntityType)this.config.getOriginatorType(), (String)this.config.getOriginatorId());
            ctx.checkTenantEntity(this.originatorId);
        } else {
            this.originatorId = ctx.getSelfId();
        }
        log.debug("[{}] Initializing generator with config {}", (Object)this.originatorId, (Object)configuration);
        this.updateGeneratorState(ctx);
    }

    public void onPartitionChangeMsg(TbContext ctx, PartitionChangeMsg msg) {
        log.debug("[{}] Handling partition change msg: {}", (Object)this.originatorId, (Object)msg);
        this.updateGeneratorState(ctx);
    }

    private void updateGeneratorState(TbContext ctx) {
        log.trace("[{}] Updating generator state, config {}", (Object)this.originatorId, (Object)this.config);
        if (ctx.isLocalEntity(this.originatorId)) {
            if (this.initialized.compareAndSet(false, true)) {
                this.scriptEngine = ctx.createScriptEngine(this.config.getScriptLang(), ScriptLanguage.TBEL.equals((Object)this.config.getScriptLang()) ? this.config.getTbelScript() : this.config.getJsScript(), new String[]{"prevMsg", "prevMetadata", "prevMsgType"});
                this.scheduleTickMsg(ctx, null);
            }
        } else if (this.initialized.compareAndSet(true, false)) {
            this.destroy();
        }
    }

    public void onMsg(TbContext ctx, TbMsg msg) {
        log.trace("[{}] onMsg. Expected msg id: {}, msg: {}, config: {}", new Object[]{this.originatorId, this.nextTickId, msg, this.config});
        if (this.initialized.get() && msg.isTypeOf(TbMsgType.GENERATOR_NODE_SELF_MSG) && msg.getId().equals(this.nextTickId)) {
            TbStopWatch sw = TbStopWatch.create();
            DonAsynchron.withCallback(this.generate(ctx, msg), m -> {
                log.trace("onMsg onSuccess callback, took {}ms, config {}, msg {}", new Object[]{sw.stopAndGetTotalTimeMillis(), this.config, msg});
                if (this.initialized.get() && (this.config.getMsgCount() == 0 || this.currentMsgCount < this.config.getMsgCount())) {
                    ctx.enqueueForTellNext(m, "Success");
                    this.scheduleTickMsg(ctx, msg);
                    ++this.currentMsgCount;
                }
            }, t -> {
                log.trace("onMsg onFailure callback, took {}ms, config {}, msg {}", new Object[]{sw.stopAndGetTotalTimeMillis(), this.config, msg, t});
                if (this.initialized.get() && (this.config.getMsgCount() == 0 || this.currentMsgCount < this.config.getMsgCount())) {
                    ctx.tellFailure(msg, t);
                    this.scheduleTickMsg(ctx, msg);
                    ++this.currentMsgCount;
                }
            });
        }
    }

    private void scheduleTickMsg(TbContext ctx, TbMsg msg) {
        long curTs = System.currentTimeMillis();
        if (this.lastScheduledTs == 0L) {
            this.lastScheduledTs = curTs;
        }
        this.lastScheduledTs += this.delay;
        long curDelay = Math.max(0L, this.lastScheduledTs - curTs);
        TbMsg tickMsg = ctx.newMsg(this.queueName, TbMsgType.GENERATOR_NODE_SELF_MSG, (EntityId)ctx.getSelfId(), this.getCustomerIdFromMsg(msg), TbMsgMetaData.EMPTY, "");
        this.nextTickId = tickMsg.getId();
        ctx.tellSelf(tickMsg, curDelay);
        log.trace("[{}] Scheduled tick msg with delay {}, msg: {}, config: {}", new Object[]{this.originatorId, curDelay, tickMsg, this.config});
    }

    private ListenableFuture<TbMsg> generate(TbContext ctx, TbMsg msg) {
        log.trace("generate, config {}", (Object)this.config);
        if (this.prevMsg == null) {
            this.prevMsg = ctx.newMsg(this.queueName, "", this.originatorId, msg.getCustomerId(), TbMsgMetaData.EMPTY, "{}");
        }
        if (this.initialized.get()) {
            ctx.logJsEvalRequest();
            return Futures.transformAsync((ListenableFuture)this.scriptEngine.executeGenerateAsync(this.prevMsg), generated -> {
                log.trace("generate process response, generated {}, config {}", generated, (Object)this.config);
                ctx.logJsEvalResponse();
                this.prevMsg = ctx.newMsg(this.queueName, generated.getType(), this.originatorId, msg.getCustomerId(), generated.getMetaData(), generated.getData());
                return Futures.immediateFuture((Object)this.prevMsg);
            }, (Executor)MoreExecutors.directExecutor());
        }
        return Futures.immediateFuture((Object)this.prevMsg);
    }

    private CustomerId getCustomerIdFromMsg(TbMsg msg) {
        return msg != null ? msg.getCustomerId() : null;
    }

    public void destroy() {
        log.debug("[{}] Stopping generator", (Object)this.originatorId);
        this.initialized.set(false);
        this.prevMsg = null;
        this.nextTickId = null;
        this.lastScheduledTs = 0L;
        if (this.scriptEngine != null) {
            this.scriptEngine.destroy();
            this.scriptEngine = null;
        }
    }

    public TbPair<Boolean, JsonNode> upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException {
        boolean hasChanges = false;
        switch (fromVersion) {
            case 0: {
                if (!oldConfiguration.has("queueName")) break;
                hasChanges = true;
                ((ObjectNode)oldConfiguration).remove("queueName");
                break;
            }
        }
        return new TbPair((Object)hasChanges, (Object)oldConfiguration);
    }
}

