package org.thingsboard.rule.engine.analytics.incoming;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.rule.engine.analytics.incoming.state.StatePersistPolicy;
import org.thingsboard.rule.engine.analytics.incoming.state.TbIntervalState;
import org.thingsboard.rule.engine.analytics.latest.ParentEntitiesQuery;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.TbRelationTypes;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.debug.TbMsgGeneratorNodeConfiguration;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
import org.thingsboard.server.common.msg.session.SessionMsgType;

@RuleNode(type = ComponentType.ANALYTICS, name = "aggregate stream", configClazz = TbSimpleAggMsgNodeConfiguration.class, nodeDescription = "Aggregates incoming data stream grouped by originator Entity Id", nodeDetails = "Calculates MIN/MAX/SUM/AVG/COUNT/UNIQUE based on the incoming data stream. Groups incoming data stream based on originator id of the message (i.e. particular device, asset, customer) and <b>\"aggregation interval value\"</b> into Intervals.<br/><br/>Intervals are periodically persisted based on <b>\"interval persistence policy\"</b> and <b>\"interval check value\"</b>.<br/><br/>Intervals are cached in memory based on <b>\"Interval TTL value\"</b>.<br/><br/>State of the Intervals are persisted as timeseries entities based on <b>\"state persistence policy\"</b> and <b>\"state persistence value\"</b>.<br/><br/>In case there is no data for certain entity, it might be useful to generate default values for those entities. To lookup those entities one may select <b>\"Create intervals automatically\"</b> checkbox and configure <b>\"Interval entities\"</b>.<br/><br/>Generates 'POST_TELEMETRY_REQUEST' messages with the results of the aggregation for particular interval.", uiResources = {"static/rulenode/rulenode-core-config.js"}, configDirective = "tbAnalyticsNodeAggregateIncomingConfig", icon = "functions")
/* loaded from: input_file:org/thingsboard/rule/engine/analytics/incoming/TbSimpleAggMsgNode.class */
public class TbSimpleAggMsgNode implements TbNode {
    private static final Logger log = LoggerFactory.getLogger(TbSimpleAggMsgNode.class);
    private static final String TB_REPORT_TICK_MSG = "TbIntervalTickMsg";
    private static final String TB_PERSIST_TICK_MSG = "TbPersistTickMsg";
    private static final String TB_ENTITIES_TICK_MSG = "TbEntitiesTickMsg";
    private static final long START_EPOCH = -12219292800000L;
    private final JsonParser gsonParser = new JsonParser();
    private final Gson gson = new Gson();
    private StatePersistPolicy statePersistPolicy;
    private IntervalPersistPolicy intervalPersistPolicy;
    private TbSimpleAggMsgNodeConfiguration config;
    private TbIntervalTable intervals;
    private UUID nextReportTickId;
    private UUID nextPersistTickId;
    private UUID nextEntitiesTickId;
    private long intervalReportCheckPeriod;
    private long statePersistCheckPeriod;
    private long entitiesCheckPeriod;

    public void init(TbContext tbContext, TbNodeConfiguration tbNodeConfiguration) throws TbNodeException {
        this.config = (TbSimpleAggMsgNodeConfiguration) TbNodeUtils.convert(tbNodeConfiguration, TbSimpleAggMsgNodeConfiguration.class);
        this.statePersistPolicy = StatePersistPolicy.valueOf(this.config.getStatePersistencePolicy());
        this.intervalPersistPolicy = IntervalPersistPolicy.valueOf(this.config.getIntervalPersistencePolicy());
        this.intervals = new TbIntervalTable(tbContext, this.config, this.gsonParser);
        this.intervalReportCheckPeriod = TimeUnit.valueOf(this.config.getIntervalCheckTimeUnit()).toMillis(this.config.getIntervalCheckValue());
        this.statePersistCheckPeriod = TimeUnit.valueOf(this.config.getStatePersistenceTimeUnit()).toMillis(this.config.getStatePersistenceValue());
        scheduleReportTickMsg(tbContext);
        if (StatePersistPolicy.PERIODICALLY.name().equalsIgnoreCase(this.config.getStatePersistencePolicy())) {
            scheduleStatePersistTickMsg(tbContext);
        }
        if (this.config.isAutoCreateIntervals()) {
            this.entitiesCheckPeriod = this.config.getPeriodTimeUnit().toMillis(this.config.getPeriodValue());
            try {
                initEntities(tbContext, null);
                scheduleEntitiesTickMsg(tbContext);
            } catch (Exception e) {
                throw new TbNodeException(e);
            }
        }
    }

    public void onMsg(TbContext tbContext, TbMsg tbMsg) throws ExecutionException, InterruptedException, TbNodeException {
        String type = tbMsg.getType();
        boolean z = -1;
        switch (type.hashCode()) {
            case -1746741327:
                if (type.equals(TB_REPORT_TICK_MSG)) {
                    z = false;
                    break;
                }
                break;
            case 964976949:
                if (type.equals(TB_ENTITIES_TICK_MSG)) {
                    z = 2;
                    break;
                }
                break;
            case 1542900318:
                if (type.equals(TB_PERSIST_TICK_MSG)) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case TbMsgGeneratorNodeConfiguration.UNLIMITED_MSG_COUNT /* 0 */:
                onIntervalTickMsg(tbContext, tbMsg);
                return;
            case true:
                onPersistTickMsg(tbContext, tbMsg);
                return;
            case true:
                try {
                    onEntitiesTickMsg(tbContext, tbMsg);
                    return;
                } catch (Exception e) {
                    throw new TbNodeException(e);
                }
            default:
                onDataMsg(tbContext, tbMsg);
                return;
        }
    }

    public void onPartitionChangeMsg(TbContext tbContext, PartitionChangeMsg partitionChangeMsg) {
        log.trace("Cluster change msg received: {}", partitionChangeMsg);
        this.intervals.cleanupEntities(tbContext);
    }

    private void onDataMsg(TbContext tbContext, TbMsg tbMsg) throws ExecutionException, InterruptedException {
        EntityId originator = tbMsg.getOriginator();
        long extractTs = extractTs(tbMsg);
        JsonElement extractValue = extractValue(tbMsg);
        TbIntervalState byEntityIdAndTs = this.intervals.getByEntityIdAndTs(originator, extractTs);
        byEntityIdAndTs.update(extractValue);
        log.trace("Data Msg received: {}", tbMsg);
        if (!byEntityIdAndTs.hasChangesToPersist() || this.statePersistPolicy != StatePersistPolicy.ON_EACH_CHANGE) {
            tbContext.getPeContext().ack(tbMsg);
        } else {
            log.trace("Persisting state: {}", byEntityIdAndTs);
            DonAsynchron.withCallback(this.intervals.saveIntervalState(originator, extractTs, byEntityIdAndTs), list -> {
                tbContext.getPeContext().ack(tbMsg);
                byEntityIdAndTs.clearChangesToPersist();
                log.trace("Cleared state after persising: {}", byEntityIdAndTs);
            }, th -> {
                tbContext.tellFailure(tbMsg, th);
            }, tbContext.getDbCallbackExecutor());
        }
    }

    private void onIntervalTickMsg(TbContext tbContext, TbMsg tbMsg) {
        if (tbMsg.getId().equals(this.nextReportTickId)) {
            scheduleReportTickMsg(tbContext);
            log.trace("Reporting intervals!");
            this.intervals.getStatesToReport(this.intervalPersistPolicy).forEach((entityId, map) -> {
                map.forEach((l, tbIntervalState) -> {
                    log.trace("Reporting interval: [{}][{}]", l, tbIntervalState);
                    TbMsgMetaData tbMsgMetaData = new TbMsgMetaData();
                    tbMsgMetaData.putValue("ts", Long.toString(l.longValue()));
                    tbContext.enqueueForTellNext(TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), entityId, tbMsgMetaData, tbIntervalState.toValueJson(this.gson, this.config.getOutputValueKey())), TbRelationTypes.SUCCESS);
                });
            });
            this.intervals.cleanupStatesUsingTTL();
        }
    }

    private void onPersistTickMsg(TbContext tbContext, TbMsg tbMsg) {
        if (tbMsg.getId().equals(this.nextPersistTickId)) {
            scheduleStatePersistTickMsg(tbContext);
            log.trace("[{}] Persisting states!", tbContext.getSelfId());
            this.intervals.getStatesToPersist().forEach((entityId, map) -> {
                map.forEach((l, tbIntervalState) -> {
                    log.trace("[{}] Persisting state: [{}][{}]", new Object[]{tbContext.getSelfId(), l, tbIntervalState});
                    this.intervals.saveIntervalState(entityId, l.longValue(), tbIntervalState);
                });
            });
            this.intervals.cleanupStatesUsingTTL();
        }
    }

    private void onEntitiesTickMsg(TbContext tbContext, TbMsg tbMsg) throws Exception {
        if (tbMsg.getId().equals(this.nextEntitiesTickId)) {
            scheduleEntitiesTickMsg(tbContext);
            initEntities(tbContext, tbMsg);
        }
    }

    private void initEntities(TbContext tbContext, TbMsg tbMsg) throws Exception {
        log.trace("[{}] Lookup entities!", tbContext.getSelfId());
        ParentEntitiesQuery parentEntitiesQuery = this.config.getParentEntitiesQuery();
        if (parentEntitiesQuery.useParentEntitiesOnlyForSimpleAggregation()) {
            addIntervals(tbContext, tbMsg, parentEntitiesQuery.getParentEntitiesAsync(tbContext));
        } else {
            DonAsynchron.withCallback(parentEntitiesQuery.getParentEntitiesAsync(tbContext), list -> {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    addIntervals(tbContext, tbMsg, parentEntitiesQuery.getChildEntitiesAsync(tbContext, (EntityId) it.next()));
                }
            }, getErrorsConsumer(tbContext, tbMsg), tbContext.getDbCallbackExecutor());
        }
    }

    private void addIntervals(TbContext tbContext, TbMsg tbMsg, ListenableFuture<List<EntityId>> listenableFuture) {
        DonAsynchron.withCallback(listenableFuture, list -> {
            this.intervals.addEntities(tbContext, tbMsg, list);
        }, getErrorsConsumer(tbContext, tbMsg), tbContext.getDbCallbackExecutor());
    }

    private Consumer<Throwable> getErrorsConsumer(TbContext tbContext, TbMsg tbMsg) {
        return th -> {
            if (tbMsg != null) {
                tbContext.tellFailure(tbMsg, th);
            }
        };
    }

    private void scheduleReportTickMsg(TbContext tbContext) {
        TbMsg newMsg = tbContext.newMsg("Main", TB_REPORT_TICK_MSG, tbContext.getSelfId(), new TbMsgMetaData(), "");
        this.nextReportTickId = newMsg.getId();
        tbContext.tellSelf(newMsg, this.intervalReportCheckPeriod);
    }

    private void scheduleStatePersistTickMsg(TbContext tbContext) {
        TbMsg newMsg = tbContext.newMsg("Main", TB_PERSIST_TICK_MSG, tbContext.getSelfId(), new TbMsgMetaData(), "");
        this.nextPersistTickId = newMsg.getId();
        tbContext.tellSelf(newMsg, this.statePersistCheckPeriod);
    }

    private void scheduleEntitiesTickMsg(TbContext tbContext) {
        TbMsg newMsg = tbContext.newMsg("Main", TB_ENTITIES_TICK_MSG, tbContext.getSelfId(), new TbMsgMetaData(), "");
        this.nextEntitiesTickId = newMsg.getId();
        tbContext.tellSelf(newMsg, this.entitiesCheckPeriod);
    }

    private long extractTs(TbMsg tbMsg) {
        String value = tbMsg.getMetaData().getValue("ts");
        return !StringUtils.isEmpty(value) ? Long.parseLong(value) : tbMsg.getTs();
    }

    private JsonElement extractValue(TbMsg tbMsg) {
        JsonElement parse = this.gsonParser.parse(tbMsg.getData());
        if (!parse.isJsonObject()) {
            throw new IllegalArgumentException("Incoming message is not a json object!");
        }
        JsonObject asJsonObject = parse.getAsJsonObject();
        if (asJsonObject.has(this.config.getInputValueKey())) {
            return asJsonObject.get(this.config.getInputValueKey());
        }
        throw new IllegalArgumentException("Incoming message does not contain " + this.config.getInputValueKey() + "!");
    }

    public void destroy() {
    }
}
