package org.thingsboard.rule.engine.deduplication;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.debug.TbMsgGeneratorNodeConfiguration;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;

@RuleNode(type = ComponentType.TRANSFORMATION, name = "deduplication", configClazz = TbMsgDeduplicationNodeConfiguration.class, version = 1, hasQueueName = true, nodeDescription = "Deduplicate messages within the same originator entity for a configurable period based on a specified deduplication strategy.", nodeDetails = "Deduplication strategies: <ul><li><strong>FIRST</strong> - return first message that arrived during deduplication period.</li><li><strong>LAST</strong> - return last message that arrived during deduplication period.</li><li><strong>ALL</strong> - return all messages as a single JSON array message. Where each element represents object with <strong><i>msg</i></strong> and <strong><i>metadata</i></strong> inner properties.</li></ul>", icon = "content_copy", uiResources = {"static/rulenode/rulenode-core-config.js"}, configDirective = "tbActionNodeMsgDeduplicationConfig")
/* loaded from: input_file:org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.class */
public class TbMsgDeduplicationNode implements TbNode {
    private static final Logger log = LoggerFactory.getLogger(TbMsgDeduplicationNode.class);
    public static final int TB_MSG_DEDUPLICATION_RETRY_DELAY = 10;
    private TbMsgDeduplicationNodeConfiguration config;
    private final Map<EntityId, DeduplicationData> deduplicationMap = new HashMap();
    private long deduplicationInterval;
    private String queueName;

    public void init(TbContext tbContext, TbNodeConfiguration tbNodeConfiguration) throws TbNodeException {
        this.config = (TbMsgDeduplicationNodeConfiguration) TbNodeUtils.convert(tbNodeConfiguration, TbMsgDeduplicationNodeConfiguration.class);
        this.deduplicationInterval = TimeUnit.SECONDS.toMillis(this.config.getInterval());
        this.queueName = tbContext.getQueueName();
    }

    public void onMsg(TbContext tbContext, TbMsg tbMsg) throws ExecutionException, InterruptedException, TbNodeException {
        if (tbMsg.isTypeOf(TbMsgType.DEDUPLICATION_TIMEOUT_SELF_MSG)) {
            processDeduplication(tbContext, tbMsg.getOriginator());
        } else {
            processOnRegularMsg(tbContext, tbMsg);
        }
    }

    public void destroy() {
        this.deduplicationMap.clear();
    }

    public TbPair<Boolean, JsonNode> upgrade(int i, JsonNode jsonNode) throws TbNodeException {
        boolean z = false;
        switch (i) {
            case TbMsgGeneratorNodeConfiguration.UNLIMITED_MSG_COUNT /* 0 */:
                if (jsonNode.has("queueName")) {
                    z = true;
                    ((ObjectNode) jsonNode).remove("queueName");
                    break;
                }
                break;
        }
        return new TbPair<>(Boolean.valueOf(z), jsonNode);
    }

    private void processOnRegularMsg(TbContext tbContext, TbMsg tbMsg) {
        EntityId originator = tbMsg.getOriginator();
        DeduplicationData computeIfAbsent = this.deduplicationMap.computeIfAbsent(originator, entityId -> {
            return new DeduplicationData();
        });
        if (computeIfAbsent.size() >= this.config.getMaxPendingMsgs()) {
            log.trace("[{}] Max limit of pending messages reached for deduplication id: [{}]", tbContext.getSelfId(), originator);
            tbContext.tellFailure(tbMsg, new RuntimeException("[" + tbContext.getSelfId() + "] Max limit of pending messages reached for deduplication id: [" + originator + "]"));
        } else {
            log.trace("[{}][{}] Adding msg: [{}][{}] to the pending msgs map ...", new Object[]{tbContext.getSelfId(), originator, tbMsg.getId(), Long.valueOf(tbMsg.getMetaDataTs())});
            computeIfAbsent.add(tbMsg);
            tbContext.ack(tbMsg);
            scheduleTickMsg(tbContext, originator, computeIfAbsent);
        }
    }

    private void processDeduplication(TbContext tbContext, EntityId entityId) {
        DeduplicationData deduplicationData = this.deduplicationMap.get(entityId);
        if (deduplicationData == null) {
            return;
        }
        deduplicationData.setTickScheduled(false);
        if (deduplicationData.isEmpty()) {
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        try {
            ArrayList arrayList = new ArrayList();
            List<TbMsg> msgList = deduplicationData.getMsgList();
            Optional<TbPair<Long, Long>> findValidPack = findValidPack(msgList, currentTimeMillis);
            while (findValidPack.isPresent()) {
                TbPair<Long, Long> tbPair = findValidPack.get();
                if (DeduplicationStrategy.ALL.equals(this.config.getStrategy())) {
                    ArrayList arrayList2 = new ArrayList();
                    Iterator<TbMsg> it = msgList.iterator();
                    while (it.hasNext()) {
                        TbMsg next = it.next();
                        long metaDataTs = next.getMetaDataTs();
                        if (metaDataTs >= ((Long) tbPair.getFirst()).longValue() && metaDataTs < ((Long) tbPair.getSecond()).longValue()) {
                            arrayList2.add(next);
                            it.remove();
                        }
                    }
                    arrayList.add(TbMsg.newMsg(this.queueName, this.config.getOutMsgType(), entityId, getMetadata(), getMergedData(arrayList2)));
                } else {
                    TbMsg tbMsg = null;
                    boolean equals = DeduplicationStrategy.FIRST.equals(this.config.getStrategy());
                    Iterator<TbMsg> it2 = msgList.iterator();
                    while (it2.hasNext()) {
                        TbMsg next2 = it2.next();
                        long metaDataTs2 = next2.getMetaDataTs();
                        if (metaDataTs2 >= ((Long) tbPair.getFirst()).longValue() && metaDataTs2 < ((Long) tbPair.getSecond()).longValue()) {
                            it2.remove();
                            if (tbMsg == null || ((equals && next2.getMetaDataTs() < tbMsg.getMetaDataTs()) || (!equals && next2.getMetaDataTs() > tbMsg.getMetaDataTs()))) {
                                tbMsg = next2;
                            }
                        }
                    }
                    if (tbMsg != null) {
                        arrayList.add(TbMsg.newMsg(this.queueName != null ? this.queueName : tbMsg.getQueueName(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getCustomerId(), tbMsg.getMetaData(), tbMsg.getData()));
                    }
                }
                findValidPack = findValidPack(msgList, currentTimeMillis);
            }
            arrayList.forEach(tbMsg2 -> {
                enqueueForTellNextWithRetry(tbContext, tbMsg2, 0);
            });
            if (deduplicationData.isEmpty()) {
                return;
            }
            scheduleTickMsg(tbContext, entityId, deduplicationData);
        } catch (Throwable th) {
            if (!deduplicationData.isEmpty()) {
                scheduleTickMsg(tbContext, entityId, deduplicationData);
            }
            throw th;
        }
    }

    private void scheduleTickMsg(TbContext tbContext, EntityId entityId, DeduplicationData deduplicationData) {
        if (deduplicationData.isTickScheduled()) {
            return;
        }
        scheduleTickMsg(tbContext, entityId);
        deduplicationData.setTickScheduled(true);
    }

    private Optional<TbPair<Long, Long>> findValidPack(List<TbMsg> list, long j) {
        return list.stream().min(Comparator.comparing((v0) -> {
            return v0.getMetaDataTs();
        })).map(tbMsg -> {
            long metaDataTs = tbMsg.getMetaDataTs();
            long j2 = metaDataTs + this.deduplicationInterval;
            if (j2 <= j) {
                return new TbPair(Long.valueOf(metaDataTs), Long.valueOf(j2));
            }
            return null;
        });
    }

    private void enqueueForTellNextWithRetry(TbContext tbContext, TbMsg tbMsg, int i) {
        if (this.config.getMaxRetries() > i) {
            tbContext.enqueueForTellNext(tbMsg, "Success", () -> {
                log.trace("[{}][{}][{}] Successfully enqueue deduplication result message!", new Object[]{tbContext.getSelfId(), tbMsg.getOriginator(), Integer.valueOf(i)});
            }, th -> {
                log.trace("[{}][{}][{}] Failed to enqueue deduplication output message due to: ", new Object[]{tbContext.getSelfId(), tbMsg.getOriginator(), Integer.valueOf(i), th});
                tbContext.schedule(() -> {
                    enqueueForTellNextWithRetry(tbContext, tbMsg, i + 1);
                }, 10L, TimeUnit.SECONDS);
            });
        }
    }

    private void scheduleTickMsg(TbContext tbContext, EntityId entityId) {
        tbContext.tellSelf(tbContext.newMsg((String) null, TbMsgType.DEDUPLICATION_TIMEOUT_SELF_MSG, entityId, TbMsgMetaData.EMPTY, ""), this.deduplicationInterval + 1);
    }

    private String getMergedData(List<TbMsg> list) {
        ArrayNode newArrayNode = JacksonUtil.newArrayNode();
        list.forEach(tbMsg -> {
            ObjectNode newObjectNode = JacksonUtil.newObjectNode();
            newObjectNode.set("msg", JacksonUtil.toJsonNode(tbMsg.getData()));
            newObjectNode.set("metadata", JacksonUtil.valueToTree(tbMsg.getMetaData().getData()));
            newArrayNode.add(newObjectNode);
        });
        return JacksonUtil.toString(newArrayNode);
    }

    private TbMsgMetaData getMetadata() {
        TbMsgMetaData tbMsgMetaData = new TbMsgMetaData();
        tbMsgMetaData.putValue("ts", String.valueOf(System.currentTimeMillis()));
        return tbMsgMetaData;
    }
}
