/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.rule.engine.metadata;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.metadata.CalculateDeltaNodeConfiguration;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.dao.timeseries.TimeseriesService;

@RuleNode(type=ComponentType.ENRICHMENT, name="calculate delta", relationTypes={"Success", "Failure", "Other"}, configClazz=CalculateDeltaNodeConfiguration.class, nodeDescription="Calculates delta and amount of time passed between previous timeseries key reading and current value for this key from the incoming message", nodeDetails="Useful for metering use cases, when you need to calculate consumption based on pulse counter reading.<br><br>Output connections: <code>Success</code>, <code>Other</code> or <code>Failure</code>.", uiResources={"static/rulenode/rulenode-core-config.js"}, configDirective="tbEnrichmentNodeCalculateDeltaConfig")
public class CalculateDeltaNode
implements TbNode {
    private static final Logger log = LoggerFactory.getLogger(CalculateDeltaNode.class);
    private Map<EntityId, ValueWithTs> cache;
    private CalculateDeltaNodeConfiguration config;
    private TbContext ctx;
    private TimeseriesService timeseriesService;
    private boolean useCache;

    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
        this.config = (CalculateDeltaNodeConfiguration)TbNodeUtils.convert((TbNodeConfiguration)configuration, CalculateDeltaNodeConfiguration.class);
        this.ctx = ctx;
        this.timeseriesService = ctx.getTimeseriesService();
        this.useCache = this.config.isUseCache();
        if (this.useCache) {
            this.cache = new ConcurrentHashMap<EntityId, ValueWithTs>();
        }
    }

    public void onMsg(TbContext ctx, TbMsg msg) {
        String inputKey;
        if (!msg.isTypeOf(TbMsgType.POST_TELEMETRY_REQUEST)) {
            ctx.tellNext(msg, "Other");
            return;
        }
        JsonNode json = JacksonUtil.toJsonNode((String)msg.getData());
        if (!json.has(inputKey = this.config.getInputValueKey())) {
            ctx.tellNext(msg, "Other");
            return;
        }
        DonAsynchron.withCallback(this.getLastValue(msg.getOriginator()), previousData -> {
            double currentValue = json.get(inputKey).asDouble();
            long currentTs = msg.getMetaDataTs();
            if (this.useCache) {
                this.cache.put(msg.getOriginator(), new ValueWithTs(currentTs, currentValue));
            }
            BigDecimal delta = BigDecimal.valueOf(previousData != null ? currentValue - previousData.value : 0.0);
            if (this.config.isTellFailureIfDeltaIsNegative() && delta.doubleValue() < 0.0) {
                ctx.tellFailure(msg, (Throwable)new IllegalArgumentException("Delta value is negative!"));
                return;
            }
            if (this.config.getRound() != null) {
                delta = delta.setScale((int)this.config.getRound(), RoundingMode.HALF_UP);
            }
            ObjectNode result = (ObjectNode)json;
            if (delta.stripTrailingZeros().scale() > 0) {
                result.put(this.config.getOutputValueKey(), delta.doubleValue());
            } else {
                result.put(this.config.getOutputValueKey(), delta.longValueExact());
            }
            if (this.config.isAddPeriodBetweenMsgs()) {
                long period = previousData != null ? currentTs - previousData.ts : 0L;
                result.put(this.config.getPeriodValueKey(), period);
            }
            ctx.tellSuccess(TbMsg.transformMsgData((TbMsg)msg, (String)JacksonUtil.toString((Object)result)));
        }, t -> ctx.tellFailure(msg, t), (Executor)ctx.getDbCallbackExecutor());
    }

    public void destroy() {
        if (this.useCache) {
            this.cache.clear();
        }
    }

    private ListenableFuture<ValueWithTs> fetchLatestValueAsync(EntityId entityId) {
        return Futures.transform((ListenableFuture)this.timeseriesService.findLatest(this.ctx.getTenantId(), entityId, Collections.singletonList(this.config.getInputValueKey())), list -> this.extractValue((TsKvEntry)list.get(0)), (Executor)this.ctx.getDbCallbackExecutor());
    }

    private ValueWithTs fetchLatestValue(EntityId entityId) {
        List tsKvEntries = this.timeseriesService.findLatestSync(this.ctx.getTenantId(), entityId, Collections.singletonList(this.config.getInputValueKey()));
        return this.extractValue((TsKvEntry)tsKvEntries.get(0));
    }

    private ListenableFuture<ValueWithTs> getLastValue(EntityId entityId) {
        if (this.useCache) {
            ValueWithTs latestValue = this.cache.get(entityId);
            if (latestValue == null) {
                latestValue = this.fetchLatestValue(entityId);
            }
            return Futures.immediateFuture((Object)latestValue);
        }
        return this.fetchLatestValueAsync(entityId);
    }

    private ValueWithTs extractValue(TsKvEntry kvEntry) {
        if (kvEntry == null || kvEntry.getValue() == null) {
            return null;
        }
        double result = 0.0;
        long ts = kvEntry.getTs();
        switch (kvEntry.getDataType()) {
            case LONG: {
                result = ((Long)kvEntry.getLongValue().get()).longValue();
                break;
            }
            case DOUBLE: {
                result = (Double)kvEntry.getDoubleValue().get();
                break;
            }
            case STRING: {
                try {
                    result = Double.parseDouble((String)kvEntry.getStrValue().get());
                    break;
                }
                catch (NumberFormatException e) {
                    throw new IllegalArgumentException("Calculation failed. Unable to parse value [" + (String)kvEntry.getStrValue().get() + "] of telemetry [" + kvEntry.getKey() + "] to Double");
                }
            }
            case BOOLEAN: {
                throw new IllegalArgumentException("Calculation failed. Boolean values are not supported!");
            }
            case JSON: {
                throw new IllegalArgumentException("Calculation failed. JSON values are not supported!");
            }
        }
        return new ValueWithTs(ts, result);
    }

    private static class ValueWithTs {
        private final long ts;
        private final double value;

        private ValueWithTs(long ts, double value) {
            this.ts = ts;
            this.value = value;
        }
    }
}

