/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.rule.engine.analytics.incoming;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.util.Pair;
import org.springframework.util.StringUtils;
import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.rule.engine.analytics.incoming.IntervalPersistPolicy;
import org.thingsboard.rule.engine.analytics.incoming.TbIntervalTable;
import org.thingsboard.rule.engine.analytics.incoming.TbSimpleAggMsgNodeConfiguration;
import org.thingsboard.rule.engine.analytics.incoming.state.StatePersistPolicy;
import org.thingsboard.rule.engine.analytics.incoming.state.TbIntervalState;
import org.thingsboard.rule.engine.analytics.latest.ParentEntitiesQuery;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.TbRelationTypes;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
import org.thingsboard.server.common.msg.session.SessionMsgType;

@RuleNode(type=ComponentType.ANALYTICS, name="aggregate stream", configClazz=TbSimpleAggMsgNodeConfiguration.class, nodeDescription="Aggregates incoming data stream grouped by originator Entity Id", nodeDetails="Calculates MIN/MAX/SUM/AVG/COUNT/UNIQUE based on the incoming data stream. Groups incoming data stream based on originator id of the message (i.e. particular device, asset, customer) and <b>\"aggregation interval value\"</b> into Intervals.<br/><br/>Intervals are periodically persisted based on <b>\"interval persistence policy\"</b> and <b>\"interval check value\"</b>.<br/><br/>Intervals are cached in memory based on <b>\"Interval TTL value\"</b>.<br/><br/>State of the Intervals are persisted as timeseries entities based on <b>\"state persistence policy\"</b> and <b>\"state persistence value\"</b>.<br/><br/>In case there is no data for certain entity, it might be useful to generate default values for those entities. To lookup those entities one may select <b>\"Create intervals automatically\"</b> checkbox and configure <b>\"Interval entities\"</b>.<br/><br/>Generates 'POST_TELEMETRY_REQUEST' messages with the results of the aggregation for particular interval.", uiResources={"static/rulenode/rulenode-core-config.js"}, configDirective="tbAnalyticsNodeAggregateIncomingConfig", icon="functions")
public class TbSimpleAggMsgNode
implements TbNode {
    private static final Logger log = LoggerFactory.getLogger(TbSimpleAggMsgNode.class);
    private static final String TB_REPORT_TICK_MSG = "TbIntervalTickMsg";
    private static final String TB_PERSIST_TICK_MSG = "TbPersistTickMsg";
    private static final String TB_ENTITIES_TICK_MSG = "TbEntitiesTickMsg";
    private static final long START_EPOCH = -12219292800000L;
    private final JsonParser gsonParser = new JsonParser();
    private final Gson gson = new Gson();
    private StatePersistPolicy statePersistPolicy;
    private IntervalPersistPolicy intervalPersistPolicy;
    private TbSimpleAggMsgNodeConfiguration config;
    private TbIntervalTable intervals;
    private UUID nextReportTickId;
    private UUID nextPersistTickId;
    private UUID nextEntitiesTickId;
    private long intervalReportCheckPeriod;
    private long statePersistCheckPeriod;
    private long entitiesCheckPeriod;

    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
        this.config = (TbSimpleAggMsgNodeConfiguration)TbNodeUtils.convert((TbNodeConfiguration)configuration, TbSimpleAggMsgNodeConfiguration.class);
        this.statePersistPolicy = StatePersistPolicy.valueOf(this.config.getStatePersistencePolicy());
        this.intervalPersistPolicy = IntervalPersistPolicy.valueOf(this.config.getIntervalPersistencePolicy());
        this.intervals = new TbIntervalTable(ctx, this.config, this.gsonParser);
        this.intervalReportCheckPeriod = Math.max(TimeUnit.valueOf(this.config.getIntervalCheckTimeUnit()).toMillis(this.config.getIntervalCheckValue()), TimeUnit.MINUTES.toMillis(1L));
        this.statePersistCheckPeriod = Math.max(TimeUnit.valueOf(this.config.getStatePersistenceTimeUnit()).toMillis(this.config.getStatePersistenceValue()), TimeUnit.MINUTES.toMillis(1L));
        this.scheduleReportTickMsg(ctx);
        if (StatePersistPolicy.PERIODICALLY.name().equalsIgnoreCase(this.config.getStatePersistencePolicy())) {
            this.scheduleStatePersistTickMsg(ctx);
        }
        if (this.config.isAutoCreateIntervals()) {
            this.entitiesCheckPeriod = Math.max(this.config.getPeriodTimeUnit().toMillis(this.config.getPeriodValue()), TimeUnit.MINUTES.toMillis(1L));
            try {
                this.initEntities(ctx, null);
            }
            catch (Exception e) {
                throw new TbNodeException(e);
            }
            this.scheduleEntitiesTickMsg(ctx);
        }
    }

    public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
        switch (msg.getType()) {
            case "TbIntervalTickMsg": {
                this.onIntervalTickMsg(ctx, msg);
                break;
            }
            case "TbPersistTickMsg": {
                this.onPersistTickMsg(ctx, msg);
                break;
            }
            case "TbEntitiesTickMsg": {
                try {
                    this.onEntitiesTickMsg(ctx, msg);
                    break;
                }
                catch (Exception e) {
                    throw new TbNodeException(e);
                }
            }
            default: {
                this.onDataMsg(ctx, msg);
            }
        }
    }

    public void onPartitionChangeMsg(TbContext ctx, PartitionChangeMsg msg) {
        log.trace("Cluster change msg received: {}", (Object)msg);
        this.intervals.cleanupEntities(ctx);
    }

    private void onDataMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
        EntityId entityId = msg.getOriginator();
        long ts = this.extractTs(msg);
        JsonElement value = this.extractValue(msg);
        Pair<Long, TbIntervalState> statePair = this.intervals.getByEntityIdAndTs(entityId, ts);
        TbIntervalState state = (TbIntervalState)statePair.getSecond();
        state.update(value);
        log.trace("Data Msg received: {}", (Object)msg);
        if (state.hasChangesToPersist() && this.statePersistPolicy == StatePersistPolicy.ON_EACH_CHANGE) {
            log.trace("Persisting state: {}", (Object)state);
            DonAsynchron.withCallback(this.intervals.saveIntervalState(entityId, ts, state), v -> {
                ctx.getPeContext().ack(msg);
                state.clearChangesToPersist();
                log.trace("Cleared state after persising: {}", (Object)state);
            }, t -> ctx.tellFailure(msg, t), (Executor)ctx.getDbCallbackExecutor());
        } else {
            ctx.getPeContext().ack(msg);
        }
        if (state.hasChangesToReport() && this.intervalPersistPolicy == IntervalPersistPolicy.ON_EACH_MESSAGE) {
            this.reportInterval(ctx, entityId, (Long)statePair.getFirst(), state);
            state.clearChangesToReport();
        }
    }

    private void onIntervalTickMsg(TbContext ctx, TbMsg msg) {
        if (!msg.getId().equals(this.nextReportTickId)) {
            return;
        }
        this.scheduleReportTickMsg(ctx);
        log.trace("Reporting intervals!");
        this.intervals.getStatesToReport(this.intervalPersistPolicy).forEach((entityId, entityStates) -> entityStates.forEach((ts, interval) -> this.reportInterval(ctx, (EntityId)entityId, (Long)ts, (TbIntervalState)interval)));
        this.intervals.cleanupStatesUsingTTL();
    }

    private void reportInterval(TbContext ctx, EntityId entityId, Long ts, TbIntervalState interval) {
        log.trace("Reporting interval: [{}][{}]", (Object)ts, (Object)interval);
        TbMsgMetaData metaData = new TbMsgMetaData();
        metaData.putValue("ts", Long.toString(ts));
        ctx.enqueueForTellNext(TbMsg.newMsg((String)SessionMsgType.POST_TELEMETRY_REQUEST.name(), (EntityId)entityId, (TbMsgMetaData)metaData, (String)interval.toValueJson(this.gson, this.config.getOutputValueKey())), TbRelationTypes.SUCCESS);
    }

    private void onPersistTickMsg(TbContext ctx, TbMsg msg) {
        if (!msg.getId().equals(this.nextPersistTickId)) {
            return;
        }
        this.scheduleStatePersistTickMsg(ctx);
        log.trace("[{}] Persisting states!", (Object)ctx.getSelfId());
        this.intervals.getStatesToPersist().forEach((entityId, entityStates) -> entityStates.forEach((ts, state) -> {
            log.trace("[{}] Persisting state: [{}][{}]", new Object[]{ctx.getSelfId(), ts, state});
            this.intervals.saveIntervalState((EntityId)entityId, (long)ts, (TbIntervalState)state);
        }));
        this.intervals.cleanupStatesUsingTTL();
    }

    private void onEntitiesTickMsg(TbContext ctx, TbMsg msg) throws Exception {
        if (!msg.getId().equals(this.nextEntitiesTickId)) {
            return;
        }
        this.scheduleEntitiesTickMsg(ctx);
        this.initEntities(ctx, msg);
    }

    private void initEntities(TbContext ctx, TbMsg msg) throws Exception {
        log.trace("[{}] Lookup entities!", (Object)ctx.getSelfId());
        ParentEntitiesQuery query = this.config.getParentEntitiesQuery();
        if (query.useParentEntitiesOnlyForSimpleAggregation()) {
            this.addIntervals(ctx, msg, query.getParentEntitiesAsync(ctx));
        } else {
            DonAsynchron.withCallback(query.getParentEntitiesAsync(ctx), parents -> {
                for (EntityId parentId : parents) {
                    this.addIntervals(ctx, msg, query.getChildEntitiesAsync(ctx, parentId));
                }
            }, this.getErrorsConsumer(ctx, msg), (Executor)ctx.getDbCallbackExecutor());
        }
    }

    private void addIntervals(TbContext ctx, TbMsg msg, ListenableFuture<List<EntityId>> entities) {
        DonAsynchron.withCallback(entities, tmp -> this.intervals.addEntities(ctx, msg, (List<EntityId>)tmp), this.getErrorsConsumer(ctx, msg), (Executor)ctx.getDbCallbackExecutor());
    }

    private Consumer<Throwable> getErrorsConsumer(TbContext ctx, TbMsg msg) {
        return t -> {
            if (msg != null) {
                ctx.tellFailure(msg, t);
            }
        };
    }

    private void scheduleReportTickMsg(TbContext ctx) {
        TbMsg tickMsg = ctx.newMsg("Main", TB_REPORT_TICK_MSG, (EntityId)ctx.getSelfId(), new TbMsgMetaData(), "");
        this.nextReportTickId = tickMsg.getId();
        ctx.tellSelf(tickMsg, this.intervalReportCheckPeriod);
    }

    private void scheduleStatePersistTickMsg(TbContext ctx) {
        TbMsg tickMsg = ctx.newMsg("Main", TB_PERSIST_TICK_MSG, (EntityId)ctx.getSelfId(), new TbMsgMetaData(), "");
        this.nextPersistTickId = tickMsg.getId();
        ctx.tellSelf(tickMsg, this.statePersistCheckPeriod);
    }

    private void scheduleEntitiesTickMsg(TbContext ctx) {
        TbMsg tickMsg = ctx.newMsg("Main", TB_ENTITIES_TICK_MSG, (EntityId)ctx.getSelfId(), new TbMsgMetaData(), "");
        this.nextEntitiesTickId = tickMsg.getId();
        ctx.tellSelf(tickMsg, this.entitiesCheckPeriod);
    }

    private long extractTs(TbMsg msg) {
        String ts = msg.getMetaData().getValue("ts");
        if (!StringUtils.isEmpty((Object)ts)) {
            return Long.parseLong(ts);
        }
        return msg.getTs();
    }

    private JsonElement extractValue(TbMsg msg) {
        JsonElement jsonElement = this.gsonParser.parse(msg.getData());
        if (!jsonElement.isJsonObject()) {
            throw new IllegalArgumentException("Incoming message is not a json object!");
        }
        JsonObject jsonObject = jsonElement.getAsJsonObject();
        if (!jsonObject.has(this.config.getInputValueKey())) {
            throw new IllegalArgumentException("Incoming message does not contain " + this.config.getInputValueKey() + "!");
        }
        return jsonObject.get(this.config.getInputValueKey());
    }

    public void destroy() {
    }
}

