/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.rule.engine.action;

import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.rule.engine.action.TbMsgCountNodeConfiguration;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.TbRelationTypes;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.session.SessionMsgType;

@RuleNode(type=ComponentType.ACTION, name="message count", configClazz=TbMsgCountNodeConfiguration.class, nodeDescription="Count incoming messages", nodeDetails="Count incoming messages for specified interval and produces POST_TELEMETRY_REQUEST msg with messages count", icon="functions", uiResources={"static/rulenode/rulenode-core-config.js"}, configDirective="tbActionNodeMsgCountConfig")
public class TbMsgCountNode
implements TbNode {
    private static final Logger log = LoggerFactory.getLogger(TbMsgCountNode.class);
    private static final String TB_MSG_COUNT_NODE_MSG = "TbMsgCountNodeMsg";
    private AtomicLong messagesProcessed = new AtomicLong(0L);
    private final Gson gson = new Gson();
    private UUID nextTickId;
    private long delay;
    private String telemetryPrefix;
    private long lastScheduledTs;

    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
        TbMsgCountNodeConfiguration config = (TbMsgCountNodeConfiguration)TbNodeUtils.convert((TbNodeConfiguration)configuration, TbMsgCountNodeConfiguration.class);
        this.delay = TimeUnit.SECONDS.toMillis(config.getInterval());
        this.telemetryPrefix = config.getTelemetryPrefix();
        this.scheduleTickMsg(ctx, null);
    }

    public void onMsg(TbContext ctx, TbMsg msg) {
        if (msg.getType().equals(TB_MSG_COUNT_NODE_MSG) && msg.getId().equals(this.nextTickId)) {
            JsonObject telemetryJson = new JsonObject();
            telemetryJson.addProperty(this.telemetryPrefix + "_" + ctx.getServiceId(), (Number)this.messagesProcessed.longValue());
            this.messagesProcessed = new AtomicLong(0L);
            TbMsgMetaData metaData = new TbMsgMetaData();
            metaData.putValue("delta", Long.toString(System.currentTimeMillis() - this.lastScheduledTs + this.delay));
            TbMsg tbMsg = TbMsg.newMsg((String)msg.getQueueName(), (String)SessionMsgType.POST_TELEMETRY_REQUEST.name(), (EntityId)ctx.getTenantId(), (CustomerId)msg.getCustomerId(), (TbMsgMetaData)metaData, (String)this.gson.toJson((JsonElement)telemetryJson));
            ctx.enqueueForTellNext(tbMsg, TbRelationTypes.SUCCESS);
            this.scheduleTickMsg(ctx, tbMsg);
        } else {
            this.messagesProcessed.incrementAndGet();
            ctx.ack(msg);
        }
    }

    private void scheduleTickMsg(TbContext ctx, TbMsg msg) {
        long curTs = System.currentTimeMillis();
        if (this.lastScheduledTs == 0L) {
            this.lastScheduledTs = curTs;
        }
        this.lastScheduledTs += this.delay;
        long curDelay = Math.max(0L, this.lastScheduledTs - curTs);
        TbMsg tickMsg = ctx.newMsg(null, TB_MSG_COUNT_NODE_MSG, (EntityId)ctx.getSelfId(), msg != null ? msg.getCustomerId() : null, new TbMsgMetaData(), "");
        this.nextTickId = tickMsg.getId();
        ctx.tellSelf(tickMsg, curDelay);
    }
}

