/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.service.queue.processing;

import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiConsumer;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.service.queue.processing.AbstractTbRuleEngineSubmitStrategy;
import org.thingsboard.server.service.queue.processing.IdMsgPair;

public abstract class SequentialByEntityIdTbRuleEngineSubmitStrategy
extends AbstractTbRuleEngineSubmitStrategy {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(SequentialByEntityIdTbRuleEngineSubmitStrategy.class);
    private volatile BiConsumer<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> msgConsumer;
    private volatile ConcurrentMap<UUID, EntityId> msgToEntityIdMap = new ConcurrentHashMap<UUID, EntityId>();
    private volatile ConcurrentMap<EntityId, Queue<IdMsgPair<TransportProtos.ToRuleEngineMsg>>> entityIdToListMap = new ConcurrentHashMap<EntityId, Queue<IdMsgPair<TransportProtos.ToRuleEngineMsg>>>();

    public SequentialByEntityIdTbRuleEngineSubmitStrategy(String queueName) {
        super(queueName);
    }

    @Override
    public void init(List<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> msgs) {
        super.init(msgs);
        this.initMaps();
    }

    @Override
    public void submitAttempt(BiConsumer<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> msgConsumer) {
        this.msgConsumer = msgConsumer;
        this.entityIdToListMap.forEach((entityId, queue) -> {
            IdMsgPair msg = (IdMsgPair)queue.peek();
            if (msg != null) {
                msgConsumer.accept(msg.uuid, msg.msg);
            }
        });
    }

    @Override
    public void update(ConcurrentMap<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> reprocessMap) {
        super.update(reprocessMap);
        this.initMaps();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void doOnSuccess(UUID id) {
        Queue queue;
        EntityId entityId = (EntityId)this.msgToEntityIdMap.get(id);
        if (entityId != null && (queue = (Queue)this.entityIdToListMap.get(entityId)) != null) {
            IdMsgPair next = null;
            Queue queue2 = queue;
            synchronized (queue2) {
                IdMsgPair expected = (IdMsgPair)queue.peek();
                if (expected != null && expected.uuid.equals(id)) {
                    queue.poll();
                    next = (IdMsgPair)queue.peek();
                }
            }
            if (next != null) {
                this.msgConsumer.accept(next.uuid, next.msg);
            }
        }
    }

    private void initMaps() {
        this.msgToEntityIdMap.clear();
        this.entityIdToListMap.clear();
        for (IdMsgPair pair : this.orderedMsgList) {
            EntityId entityId = this.getEntityId((TransportProtos.ToRuleEngineMsg)pair.msg.getValue());
            if (entityId == null) continue;
            this.msgToEntityIdMap.put(pair.uuid, entityId);
            this.entityIdToListMap.computeIfAbsent(entityId, id -> new LinkedList()).add(pair);
        }
    }

    protected abstract EntityId getEntityId(TransportProtos.ToRuleEngineMsg var1);
}

