/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.rule.engine.action;

import java.util.EnumSet;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.ConcurrentReferenceHashMap;
import org.thingsboard.rule.engine.action.TbDeviceStateNodeConfiguration;
import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.tools.TbRateLimits;

@RuleNode(type=ComponentType.ACTION, name="device state", nodeDescription="Triggers device connectivity events", nodeDetails="If incoming message originator is a device, registers configured event for that device in the Device State Service, which sends appropriate message to the Rule Engine. If metadata <code>ts</code> property is present, it will be used as event timestamp. Otherwise, the message timestamp will be used. If originator entity type is not <code>DEVICE</code> or unexpected error happened during processing, then incoming message is forwarded using <code>Failure</code> chain. If rate of connectivity events for a given originator is too high, then incoming message is forwarded using <code>Rate limited</code> chain. <br>Supported device connectivity events are:<ul><li>Connect event</li><li>Disconnect event</li><li>Activity event</li><li>Inactivity event</li></ul>This node is particularly useful when device isn't using transports to receive data, such as when fetching data from external API or computing new data within the rule chain.", configClazz=TbDeviceStateNodeConfiguration.class, relationTypes={"Success", "Failure", "Rate limited"}, uiResources={"static/rulenode/rulenode-core-config.js"}, configDirective="tbActionNodeDeviceStateConfig")
public class TbDeviceStateNode
implements TbNode {
    private static final Logger log = LoggerFactory.getLogger(TbDeviceStateNode.class);
    private static final Set<TbMsgType> SUPPORTED_EVENTS = EnumSet.of(TbMsgType.CONNECT_EVENT, TbMsgType.ACTIVITY_EVENT, TbMsgType.DISCONNECT_EVENT, TbMsgType.INACTIVITY_EVENT);
    private static final String DEFAULT_RATE_LIMIT_CONFIG = "1:1,30:60,60:3600";
    private ConcurrentReferenceHashMap<DeviceId, TbRateLimits> rateLimits;
    private String rateLimitConfig;
    private TbMsgType event;

    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
        TbMsgType event = ((TbDeviceStateNodeConfiguration)TbNodeUtils.convert((TbNodeConfiguration)configuration, TbDeviceStateNodeConfiguration.class)).getEvent();
        if (event == null) {
            throw new TbNodeException("Event cannot be null!", true);
        }
        if (!SUPPORTED_EVENTS.contains(event)) {
            throw new TbNodeException("Unsupported event: " + event, true);
        }
        this.event = event;
        this.rateLimits = new ConcurrentReferenceHashMap();
        String deviceStateNodeRateLimitConfig = ctx.getDeviceStateNodeRateLimitConfig();
        try {
            this.rateLimitConfig = new TbRateLimits(deviceStateNodeRateLimitConfig).getConfiguration();
        }
        catch (Exception e) {
            log.error("[{}][{}] Invalid rate limit configuration provided: [{}]. Will use default value [{}].", new Object[]{ctx.getTenantId().getId(), ctx.getSelfId().getId(), deviceStateNodeRateLimitConfig, DEFAULT_RATE_LIMIT_CONFIG, e});
            this.rateLimitConfig = DEFAULT_RATE_LIMIT_CONFIG;
        }
    }

    public void onMsg(TbContext ctx, TbMsg msg) {
        EntityType originatorEntityType = msg.getOriginator().getEntityType();
        if (!EntityType.DEVICE.equals((Object)originatorEntityType)) {
            ctx.tellFailure(msg, (Throwable)new IllegalArgumentException("Unsupported originator entity type: [" + originatorEntityType + "]. Only DEVICE entity type is supported."));
            return;
        }
        DeviceId originator = new DeviceId(msg.getOriginator().getId());
        this.rateLimits.compute((Object)originator, (__, rateLimit) -> {
            boolean isNotRateLimited;
            if (rateLimit == null) {
                rateLimit = new TbRateLimits(this.rateLimitConfig);
            }
            if (isNotRateLimited = rateLimit.tryConsume()) {
                this.sendEventAndTell(ctx, originator, msg);
            } else {
                ctx.tellNext(msg, "Rate limited");
            }
            return rateLimit;
        });
    }

    private void sendEventAndTell(TbContext ctx, DeviceId originator, TbMsg msg) {
        TenantId tenantId = ctx.getTenantId();
        long eventTs = msg.getMetaDataTs();
        RuleEngineDeviceStateManager deviceStateManager = ctx.getDeviceStateManager();
        TbCallback callback = this.getMsgEnqueuedCallback(ctx, msg);
        switch (this.event) {
            case CONNECT_EVENT: {
                deviceStateManager.onDeviceConnect(tenantId, originator, eventTs, callback);
                break;
            }
            case ACTIVITY_EVENT: {
                deviceStateManager.onDeviceActivity(tenantId, originator, eventTs, callback);
                break;
            }
            case DISCONNECT_EVENT: {
                deviceStateManager.onDeviceDisconnect(tenantId, originator, eventTs, callback);
                break;
            }
            case INACTIVITY_EVENT: {
                deviceStateManager.onDeviceInactivity(tenantId, originator, eventTs, callback);
                break;
            }
            default: {
                ctx.tellFailure(msg, (Throwable)new IllegalStateException("Configured event [" + this.event + "] is not supported!"));
            }
        }
    }

    private TbCallback getMsgEnqueuedCallback(final TbContext ctx, final TbMsg msg) {
        return new TbCallback(){

            public void onSuccess() {
                ctx.tellSuccess(msg);
            }

            public void onFailure(Throwable t) {
                ctx.tellFailure(msg, t);
            }
        };
    }

    public void onPartitionChangeMsg(TbContext ctx, PartitionChangeMsg msg) {
        this.rateLimits.entrySet().removeIf(entry -> !ctx.isLocalEntity((EntityId)entry.getKey()));
    }

    public void destroy() {
        if (this.rateLimits != null) {
            this.rateLimits.clear();
            this.rateLimits = null;
        }
        this.rateLimitConfig = null;
        this.event = null;
    }
}

