/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.service.queue.ruleengine;

import com.google.protobuf.ProtocolStringList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.gen.MsgProtos;
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
import org.thingsboard.server.common.msg.queue.RuleNodeInfo;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager;
import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask;
import org.thingsboard.server.queue.common.consumer.TbQueueConsumerTask;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.service.queue.TbMsgPackCallback;
import org.thingsboard.server.service.queue.TbMsgPackProcessingContext;
import org.thingsboard.server.service.queue.TbRuleEngineConsumerStats;
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingDecision;
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingResult;
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategy;
import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategy;
import org.thingsboard.server.service.queue.ruleengine.TbRuleEngineConsumerContext;

public class TbRuleEngineQueueConsumerManager
extends MainQueueConsumerManager<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>, Queue> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(TbRuleEngineQueueConsumerManager.class);
    public static final String SUCCESSFUL_STATUS = "successful";
    public static final String FAILED_STATUS = "failed";
    private final TbRuleEngineConsumerContext ctx;
    private final TbRuleEngineConsumerStats stats;

    public TbRuleEngineQueueConsumerManager(TbRuleEngineConsumerContext ctx, QueueKey queueKey, ExecutorService consumerExecutor, ScheduledExecutorService scheduler, ExecutorService taskExecutor) {
        super((Object)queueKey, null, null, (queueConfig, tpi) -> {
            Integer partitionId = tpi != null ? tpi.getPartition().orElse(-1) : null;
            return ctx.getQueueFactory().createToRuleEngineMsgConsumer(queueConfig, partitionId);
        }, consumerExecutor, scheduler, taskExecutor, null);
        this.ctx = ctx;
        this.stats = new TbRuleEngineConsumerStats(queueKey, ctx.getStatsFactory());
    }

    public void delete(boolean drainQueue) {
        this.addTask((TbQueueConsumerManagerTask)new TbQueueConsumerManagerTask.DeleteQueueTask(drainQueue));
    }

    protected void processTask(TbQueueConsumerManagerTask task) {
        if (task instanceof TbQueueConsumerManagerTask.DeleteQueueTask) {
            TbQueueConsumerManagerTask.DeleteQueueTask deleteQueueTask = (TbQueueConsumerManagerTask.DeleteQueueTask)task;
            this.doDelete(deleteQueueTask.drainQueue());
        }
    }

    private void doDelete(boolean drainQueue) {
        this.stopped = true;
        log.info("[{}] Handling queue deletion", this.queueKey);
        this.consumerWrapper.getConsumers().forEach(TbQueueConsumerTask::awaitCompletion);
        List queueConsumers = this.consumerWrapper.getConsumers().stream().map(TbQueueConsumerTask::getConsumer).collect(Collectors.toList());
        this.consumerExecutor.submit(() -> {
            if (drainQueue) {
                this.drainQueue(queueConsumers);
            }
            queueConsumers.forEach(consumer -> {
                for (String topic : consumer.getFullTopicNames()) {
                    try {
                        this.ctx.getQueueAdmin().deleteTopic(topic);
                        log.info("Deleted topic {}", (Object)topic);
                    }
                    catch (Exception e) {
                        log.error("Failed to delete topic {}", (Object)topic, (Object)e);
                    }
                }
                try {
                    consumer.unsubscribe();
                }
                catch (Exception e) {
                    log.error("[{}] Failed to unsubscribe consumer", this.queueKey, (Object)e);
                }
            });
        });
    }

    protected void processMsgs(List<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> consumer, Object consumerKey, Queue queue) throws Exception {
        TbRuleEngineSubmitStrategy submitStrategy = this.getSubmitStrategy(queue);
        TbRuleEngineProcessingStrategy ackStrategy = this.getProcessingStrategy(queue);
        submitStrategy.init(msgs);
        while (!this.stopped && !consumer.isStopped()) {
            TbMsgPackProcessingContext packCtx = new TbMsgPackProcessingContext(queue.getName(), submitStrategy, ackStrategy.isSkipTimeoutMsgs());
            submitStrategy.submitAttempt((id, msg) -> this.submitMessage(packCtx, (UUID)id, (TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>)msg));
            boolean timeout = !packCtx.await(queue.getPackProcessingTimeout(), TimeUnit.MILLISECONDS);
            TbRuleEngineProcessingResult result = new TbRuleEngineProcessingResult(queue.getName(), timeout, packCtx);
            if (timeout) {
                this.printFirstOrAll(packCtx, packCtx.getPendingMap(), "Timeout");
            }
            if (!packCtx.getFailedMap().isEmpty()) {
                this.printFirstOrAll(packCtx, packCtx.getFailedMap(), "Failed");
            }
            packCtx.printProfilerStats();
            TbRuleEngineProcessingDecision decision = ackStrategy.analyze(result);
            if (this.ctx.isStatsEnabled()) {
                this.stats.log(result, decision.isCommit());
            }
            packCtx.cleanup();
            if (decision.isCommit()) {
                submitStrategy.stop();
                consumer.commit();
                break;
            }
            submitStrategy.update(decision.getReprocessMap());
        }
    }

    private TbRuleEngineSubmitStrategy getSubmitStrategy(Queue queue) {
        return this.ctx.getSubmitStrategyFactory().newInstance(queue.getName(), queue.getSubmitStrategy());
    }

    private TbRuleEngineProcessingStrategy getProcessingStrategy(Queue queue) {
        return this.ctx.getProcessingStrategyFactory().newInstance(queue.getName(), queue.getProcessingStrategy());
    }

    private void submitMessage(TbMsgPackProcessingContext packCtx, UUID id, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg> msg) {
        log.trace("[{}] Creating callback for topic {} message: {}", new Object[]{id, ((Queue)this.config).getName(), msg.getValue()});
        TransportProtos.ToRuleEngineMsg toRuleEngineMsg = (TransportProtos.ToRuleEngineMsg)msg.getValue();
        TenantId tenantId = TenantId.fromUUID((UUID)new UUID(toRuleEngineMsg.getTenantIdMSB(), toRuleEngineMsg.getTenantIdLSB()));
        TbMsgPackCallback callback = this.ctx.isPrometheusStatsEnabled() ? new TbMsgPackCallback(id, tenantId, packCtx, this.stats.getTimer(tenantId, SUCCESSFUL_STATUS), this.stats.getTimer(tenantId, FAILED_STATUS)) : new TbMsgPackCallback(id, tenantId, packCtx);
        try {
            if (!toRuleEngineMsg.getTbMsg().isEmpty() || toRuleEngineMsg.hasTbMsgProto()) {
                this.forwardToRuleEngineActor(((Queue)this.config).getName(), tenantId, toRuleEngineMsg, callback);
            } else {
                callback.onSuccess();
            }
        }
        catch (Exception e) {
            callback.onFailure(new RuleEngineException(e.getMessage(), (Throwable)e));
        }
    }

    private void forwardToRuleEngineActor(String queueName, TenantId tenantId, TransportProtos.ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) {
        TbMsg tbMsg = ProtoUtils.fromTbMsgProto((String)queueName, (TransportProtos.ToRuleEngineMsg)toRuleEngineMsg, (TbMsgCallback)callback);
        ProtocolStringList relationTypesList = toRuleEngineMsg.getRelationTypesList();
        Set<Object> relationTypes = relationTypesList.size() == 1 ? Collections.singleton((String)relationTypesList.get(0)) : new HashSet(relationTypesList);
        QueueToRuleEngineMsg msg = new QueueToRuleEngineMsg(tenantId, tbMsg, relationTypes, toRuleEngineMsg.getFailureMessage());
        this.ctx.getActorContext().tell((TbActorMsg)msg);
    }

    private void printFirstOrAll(TbMsgPackProcessingContext ctx, Map<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> map, String prefix) {
        boolean printAll = log.isTraceEnabled();
        log.info("[{}] {} to process [{}] messages", new Object[]{this.queueKey, prefix, map.size()});
        for (Map.Entry<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> pending : map.entrySet()) {
            TransportProtos.ToRuleEngineMsg tmp = (TransportProtos.ToRuleEngineMsg)pending.getValue().getValue();
            TbMsg tmpMsg = ProtoUtils.fromTbMsgProto((String)((Queue)this.config).getName(), (TransportProtos.ToRuleEngineMsg)tmp, (TbMsgCallback)TbMsgCallback.EMPTY);
            RuleNodeInfo ruleNodeInfo = ctx.getLastVisitedRuleNode(pending.getKey());
            if (printAll) {
                log.trace("[{}][{}] {} to process message: {}, Last Rule Node: {}", new Object[]{this.queueKey, TenantId.fromUUID((UUID)new UUID(tmp.getTenantIdMSB(), tmp.getTenantIdLSB())), prefix, tmpMsg, ruleNodeInfo});
                continue;
            }
            log.info("[{}] {} to process message: {}, Last Rule Node: {}", new Object[]{TenantId.fromUUID((UUID)new UUID(tmp.getTenantIdMSB(), tmp.getTenantIdLSB())), prefix, tmpMsg, ruleNodeInfo});
            break;
        }
    }

    public void printStats(long ts) {
        this.stats.printStats();
        this.ctx.getStatisticsService().reportQueueStats(ts, this.stats);
        this.stats.reset();
    }

    private void drainQueue(List<TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>>> consumers) {
        long finishTs = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(this.ctx.getTopicDeletionDelayInSec());
        try {
            int n = 0;
            while (System.currentTimeMillis() <= finishTs) {
                for (TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> consumer : consumers) {
                    List msgs = consumer.poll((long)((Queue)this.config).getPollInterval());
                    if (msgs.isEmpty()) continue;
                    for (TbProtoQueueMsg msg : msgs) {
                        try {
                            MsgProtos.TbMsgProto tbMsgProto = ProtoUtils.getTbMsgProto((TransportProtos.ToRuleEngineMsg)((TransportProtos.ToRuleEngineMsg)msg.getValue()));
                            EntityId originator = EntityIdFactory.getByTypeAndUuid((String)tbMsgProto.getEntityType(), (UUID)new UUID(tbMsgProto.getEntityIdMSB(), tbMsgProto.getEntityIdLSB()));
                            TopicPartitionInfo tpi = this.ctx.getPartitionService().resolve(ServiceType.TB_RULE_ENGINE, ((Queue)this.config).getName(), TenantId.SYS_TENANT_ID, originator);
                            this.ctx.getProducerProvider().getRuleEngineMsgProducer().send(tpi, (TbQueueMsg)msg, null);
                            ++n;
                        }
                        catch (Throwable e) {
                            log.warn("Failed to move message to system {}: {}", new Object[]{consumer.getTopic(), msg, e});
                        }
                    }
                    consumer.commit();
                }
            }
            if (n > 0) {
                log.info("Moved {} messages from {} to system {}", new Object[]{n, this.queueKey, ((Queue)this.config).getName()});
            }
        }
        catch (Exception e) {
            log.error("[{}] Failed to drain queue", this.queueKey, (Object)e);
        }
    }

    @Generated
    public static TbRuleEngineQueueConsumerManagerBuilder create() {
        return new TbRuleEngineQueueConsumerManagerBuilder();
    }

    @Generated
    public static class TbRuleEngineQueueConsumerManagerBuilder {
        @Generated
        private TbRuleEngineConsumerContext ctx;
        @Generated
        private QueueKey queueKey;
        @Generated
        private ExecutorService consumerExecutor;
        @Generated
        private ScheduledExecutorService scheduler;
        @Generated
        private ExecutorService taskExecutor;

        @Generated
        TbRuleEngineQueueConsumerManagerBuilder() {
        }

        @Generated
        public TbRuleEngineQueueConsumerManagerBuilder ctx(TbRuleEngineConsumerContext ctx) {
            this.ctx = ctx;
            return this;
        }

        @Generated
        public TbRuleEngineQueueConsumerManagerBuilder queueKey(QueueKey queueKey) {
            this.queueKey = queueKey;
            return this;
        }

        @Generated
        public TbRuleEngineQueueConsumerManagerBuilder consumerExecutor(ExecutorService consumerExecutor) {
            this.consumerExecutor = consumerExecutor;
            return this;
        }

        @Generated
        public TbRuleEngineQueueConsumerManagerBuilder scheduler(ScheduledExecutorService scheduler) {
            this.scheduler = scheduler;
            return this;
        }

        @Generated
        public TbRuleEngineQueueConsumerManagerBuilder taskExecutor(ExecutorService taskExecutor) {
            this.taskExecutor = taskExecutor;
            return this;
        }

        @Generated
        public TbRuleEngineQueueConsumerManager build() {
            return new TbRuleEngineQueueConsumerManager(this.ctx, this.queueKey, this.consumerExecutor, this.scheduler, this.taskExecutor);
        }

        @Generated
        public String toString() {
            return "TbRuleEngineQueueConsumerManager.TbRuleEngineQueueConsumerManagerBuilder(ctx=" + String.valueOf(this.ctx) + ", queueKey=" + String.valueOf(this.queueKey) + ", consumerExecutor=" + String.valueOf(this.consumerExecutor) + ", scheduler=" + String.valueOf(this.scheduler) + ", taskExecutor=" + String.valueOf(this.taskExecutor) + ")";
        }
    }
}

