/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.rule.engine.deduplication;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.deduplication.DeduplicationData;
import org.thingsboard.rule.engine.deduplication.DeduplicationStrategy;
import org.thingsboard.rule.engine.deduplication.TbMsgDeduplicationNodeConfiguration;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;

@RuleNode(type=ComponentType.TRANSFORMATION, name="deduplication", configClazz=TbMsgDeduplicationNodeConfiguration.class, version=1, hasQueueName=true, nodeDescription="Deduplicate messages within the same originator entity for a configurable period based on a specified deduplication strategy.", nodeDetails="Deduplication strategies: <ul><li><strong>FIRST</strong> - return first message that arrived during deduplication period.</li><li><strong>LAST</strong> - return last message that arrived during deduplication period.</li><li><strong>ALL</strong> - return all messages as a single JSON array message. Where each element represents object with <strong><i>msg</i></strong> and <strong><i>metadata</i></strong> inner properties.</li></ul>", icon="content_copy", uiResources={"static/rulenode/rulenode-core-config.js"}, configDirective="tbActionNodeMsgDeduplicationConfig")
public class TbMsgDeduplicationNode
implements TbNode {
    private static final Logger log = LoggerFactory.getLogger(TbMsgDeduplicationNode.class);
    public static final int TB_MSG_DEDUPLICATION_RETRY_DELAY = 10;
    private TbMsgDeduplicationNodeConfiguration config;
    private final Map<EntityId, DeduplicationData> deduplicationMap = new HashMap<EntityId, DeduplicationData>();
    private long deduplicationInterval;
    private String queueName;

    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
        this.config = (TbMsgDeduplicationNodeConfiguration)TbNodeUtils.convert((TbNodeConfiguration)configuration, TbMsgDeduplicationNodeConfiguration.class);
        this.deduplicationInterval = TimeUnit.SECONDS.toMillis(this.config.getInterval());
        this.queueName = ctx.getQueueName();
    }

    public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
        if (msg.isTypeOf(TbMsgType.DEDUPLICATION_TIMEOUT_SELF_MSG)) {
            this.processDeduplication(ctx, msg.getOriginator());
        } else {
            this.processOnRegularMsg(ctx, msg);
        }
    }

    public void destroy() {
        this.deduplicationMap.clear();
    }

    public TbPair<Boolean, JsonNode> upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException {
        boolean hasChanges = false;
        switch (fromVersion) {
            case 0: {
                if (!oldConfiguration.has("queueName")) break;
                hasChanges = true;
                ((ObjectNode)oldConfiguration).remove("queueName");
                break;
            }
        }
        return new TbPair((Object)hasChanges, (Object)oldConfiguration);
    }

    private void processOnRegularMsg(TbContext ctx, TbMsg msg) {
        EntityId id = msg.getOriginator();
        DeduplicationData deduplicationMsgs = this.deduplicationMap.computeIfAbsent(id, k -> new DeduplicationData());
        if (deduplicationMsgs.size() < this.config.getMaxPendingMsgs()) {
            log.trace("[{}][{}] Adding msg: [{}][{}] to the pending msgs map ...", new Object[]{ctx.getSelfId(), id, msg.getId(), msg.getMetaDataTs()});
            deduplicationMsgs.add(msg);
            ctx.ack(msg);
            this.scheduleTickMsg(ctx, id, deduplicationMsgs);
        } else {
            log.trace("[{}] Max limit of pending messages reached for deduplication id: [{}]", (Object)ctx.getSelfId(), (Object)id);
            ctx.tellFailure(msg, (Throwable)new RuntimeException("[" + ctx.getSelfId() + "] Max limit of pending messages reached for deduplication id: [" + id + "]"));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processDeduplication(TbContext ctx, EntityId deduplicationId) {
        DeduplicationData data = this.deduplicationMap.get(deduplicationId);
        if (data == null) {
            return;
        }
        data.setTickScheduled(false);
        if (data.isEmpty()) {
            return;
        }
        long deduplicationTimeoutMs = System.currentTimeMillis();
        try {
            ArrayList<TbMsg> deduplicationResults = new ArrayList<TbMsg>();
            List<TbMsg> msgList = data.getMsgList();
            Optional<TbPair<Long, Long>> packBoundsOpt = this.findValidPack(msgList, deduplicationTimeoutMs);
            while (packBoundsOpt.isPresent()) {
                TbPair<Long, Long> packBounds = packBoundsOpt.get();
                if (DeduplicationStrategy.ALL.equals((Object)this.config.getStrategy())) {
                    ArrayList<TbMsg> pack = new ArrayList<TbMsg>();
                    Iterator<TbMsg> iterator = msgList.iterator();
                    while (iterator.hasNext()) {
                        TbMsg msg = iterator.next();
                        long msgTs = msg.getMetaDataTs();
                        if (msgTs < (Long)packBounds.getFirst() || msgTs >= (Long)packBounds.getSecond()) continue;
                        pack.add(msg);
                        iterator.remove();
                    }
                    deduplicationResults.add(TbMsg.newMsg((String)this.queueName, (String)this.config.getOutMsgType(), (EntityId)deduplicationId, (TbMsgMetaData)this.getMetadata(), (String)this.getMergedData(pack)));
                } else {
                    TbMsg resultMsg = null;
                    boolean searchMin = DeduplicationStrategy.FIRST.equals((Object)this.config.getStrategy());
                    Iterator<TbMsg> iterator = msgList.iterator();
                    while (iterator.hasNext()) {
                        TbMsg msg = iterator.next();
                        long msgTs = msg.getMetaDataTs();
                        if (msgTs < (Long)packBounds.getFirst() || msgTs >= (Long)packBounds.getSecond()) continue;
                        iterator.remove();
                        if (resultMsg != null && (!searchMin || msg.getMetaDataTs() >= resultMsg.getMetaDataTs()) && (searchMin || msg.getMetaDataTs() <= resultMsg.getMetaDataTs())) continue;
                        resultMsg = msg;
                    }
                    if (resultMsg != null) {
                        deduplicationResults.add(TbMsg.newMsg((String)(this.queueName != null ? this.queueName : resultMsg.getQueueName()), (String)resultMsg.getType(), (EntityId)resultMsg.getOriginator(), (CustomerId)resultMsg.getCustomerId(), (TbMsgMetaData)resultMsg.getMetaData(), (String)resultMsg.getData()));
                    }
                }
                packBoundsOpt = this.findValidPack(msgList, deduplicationTimeoutMs);
            }
            deduplicationResults.forEach(outMsg -> this.enqueueForTellNextWithRetry(ctx, (TbMsg)outMsg, 0));
        }
        finally {
            if (!data.isEmpty()) {
                this.scheduleTickMsg(ctx, deduplicationId, data);
            }
        }
    }

    private void scheduleTickMsg(TbContext ctx, EntityId deduplicationId, DeduplicationData data) {
        if (!data.isTickScheduled()) {
            this.scheduleTickMsg(ctx, deduplicationId);
            data.setTickScheduled(true);
        }
    }

    private Optional<TbPair<Long, Long>> findValidPack(List<TbMsg> msgs, long deduplicationTimeoutMs) {
        Optional<TbMsg> min = msgs.stream().min(Comparator.comparing(TbMsg::getMetaDataTs));
        return min.map(minTsMsg -> {
            long packStartTs = minTsMsg.getMetaDataTs();
            long packEndTs = packStartTs + this.deduplicationInterval;
            if (packEndTs <= deduplicationTimeoutMs) {
                return new TbPair((Object)packStartTs, (Object)packEndTs);
            }
            return null;
        });
    }

    private void enqueueForTellNextWithRetry(TbContext ctx, TbMsg msg, int retryAttempt) {
        if (this.config.getMaxRetries() > retryAttempt) {
            ctx.enqueueForTellNext(msg, "Success", () -> log.trace("[{}][{}][{}] Successfully enqueue deduplication result message!", new Object[]{ctx.getSelfId(), msg.getOriginator(), retryAttempt}), throwable -> {
                log.trace("[{}][{}][{}] Failed to enqueue deduplication output message due to: ", new Object[]{ctx.getSelfId(), msg.getOriginator(), retryAttempt, throwable});
                ctx.schedule(() -> this.enqueueForTellNextWithRetry(ctx, msg, retryAttempt + 1), 10L, TimeUnit.SECONDS);
            });
        }
    }

    private void scheduleTickMsg(TbContext ctx, EntityId deduplicationId) {
        ctx.tellSelf(ctx.newMsg(null, TbMsgType.DEDUPLICATION_TIMEOUT_SELF_MSG, deduplicationId, TbMsgMetaData.EMPTY, ""), this.deduplicationInterval + 1L);
    }

    private String getMergedData(List<TbMsg> msgs) {
        ArrayNode mergedData = JacksonUtil.newArrayNode();
        msgs.forEach(msg -> {
            ObjectNode msgNode = JacksonUtil.newObjectNode();
            msgNode.set("msg", JacksonUtil.toJsonNode((String)msg.getData()));
            msgNode.set("metadata", JacksonUtil.valueToTree((Object)msg.getMetaData().getData()));
            mergedData.add((JsonNode)msgNode);
        });
        return JacksonUtil.toString((Object)mergedData);
    }

    private TbMsgMetaData getMetadata() {
        TbMsgMetaData metaData = new TbMsgMetaData();
        metaData.putValue("ts", String.valueOf(System.currentTimeMillis()));
        return metaData;
    }
}

