/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.service.queue.processing;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.service.queue.processing.AbstractTbRuleEngineSubmitStrategy;
import org.thingsboard.server.service.queue.processing.IdMsgPair;

public class BatchTbRuleEngineSubmitStrategy
extends AbstractTbRuleEngineSubmitStrategy {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(BatchTbRuleEngineSubmitStrategy.class);
    private final int batchSize;
    private final AtomicInteger packIdx = new AtomicInteger(0);
    private final Map<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> pendingPack = new LinkedHashMap<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>>();
    private volatile BiConsumer<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> msgConsumer;

    public BatchTbRuleEngineSubmitStrategy(String queueName, int batchSize) {
        super(queueName);
        this.batchSize = batchSize;
    }

    @Override
    public void submitAttempt(BiConsumer<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> msgConsumer) {
        this.msgConsumer = msgConsumer;
        this.submitNext();
    }

    @Override
    public void update(ConcurrentMap<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> reprocessMap) {
        super.update(reprocessMap);
        this.packIdx.set(0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void doOnSuccess(UUID id) {
        boolean endOfPendingPack;
        Map<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> map = this.pendingPack;
        synchronized (map) {
            TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg> msg = this.pendingPack.remove(id);
            endOfPendingPack = msg != null && this.pendingPack.isEmpty();
        }
        if (endOfPendingPack) {
            this.packIdx.incrementAndGet();
            this.submitNext();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void submitNext() {
        LinkedHashMap<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> tmpPack;
        int listSize = this.orderedMsgList.size();
        int startIdx = Math.min(this.packIdx.get() * this.batchSize, listSize);
        int endIdx = Math.min(startIdx + this.batchSize, listSize);
        Map<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> map = this.pendingPack;
        synchronized (map) {
            this.pendingPack.clear();
            for (int i = startIdx; i < endIdx; ++i) {
                IdMsgPair pair = (IdMsgPair)this.orderedMsgList.get(i);
                this.pendingPack.put(pair.uuid, pair.msg);
            }
            tmpPack = new LinkedHashMap<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>>(this.pendingPack);
        }
        int submitSize = this.pendingPack.size();
        if (log.isDebugEnabled() && submitSize > 0) {
            log.debug("[{}] submitting [{}] messages to rule engine", (Object)this.queueName, (Object)submitSize);
        }
        tmpPack.forEach(this.msgConsumer);
    }
}

