package org.thingsboard.server.service.queue.ruleengine;

import com.google.protobuf.ProtocolStringList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.data.queue.QueueConfig;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.gen.MsgProtos;
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
import org.thingsboard.server.common.msg.queue.RuleNodeInfo;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager;
import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.service.queue.TbMsgPackCallback;
import org.thingsboard.server.service.queue.TbMsgPackProcessingContext;
import org.thingsboard.server.service.queue.TbRuleEngineConsumerStats;
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingDecision;
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingResult;
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategy;
import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategy;

/* loaded from: input_file:org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.class */
public class TbRuleEngineQueueConsumerManager extends MainQueueConsumerManager<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>, Queue> {
    private static final Logger log = LoggerFactory.getLogger(TbRuleEngineQueueConsumerManager.class);
    public static final String SUCCESSFUL_STATUS = "successful";
    public static final String FAILED_STATUS = "failed";
    private final TbRuleEngineConsumerContext ctx;
    private final TbRuleEngineConsumerStats stats;

    /* loaded from: input_file:org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager$TbRuleEngineQueueConsumerManagerBuilder.class */
    public static class TbRuleEngineQueueConsumerManagerBuilder {
        private TbRuleEngineConsumerContext ctx;
        private QueueKey queueKey;
        private ExecutorService consumerExecutor;
        private ScheduledExecutorService scheduler;
        private ExecutorService taskExecutor;

        TbRuleEngineQueueConsumerManagerBuilder() {
        }

        public TbRuleEngineQueueConsumerManagerBuilder ctx(TbRuleEngineConsumerContext tbRuleEngineConsumerContext) {
            this.ctx = tbRuleEngineConsumerContext;
            return this;
        }

        public TbRuleEngineQueueConsumerManagerBuilder queueKey(QueueKey queueKey) {
            this.queueKey = queueKey;
            return this;
        }

        public TbRuleEngineQueueConsumerManagerBuilder consumerExecutor(ExecutorService executorService) {
            this.consumerExecutor = executorService;
            return this;
        }

        public TbRuleEngineQueueConsumerManagerBuilder scheduler(ScheduledExecutorService scheduledExecutorService) {
            this.scheduler = scheduledExecutorService;
            return this;
        }

        public TbRuleEngineQueueConsumerManagerBuilder taskExecutor(ExecutorService executorService) {
            this.taskExecutor = executorService;
            return this;
        }

        public TbRuleEngineQueueConsumerManager build() {
            return new TbRuleEngineQueueConsumerManager(this.ctx, this.queueKey, this.consumerExecutor, this.scheduler, this.taskExecutor);
        }

        public String toString() {
            return "TbRuleEngineQueueConsumerManager.TbRuleEngineQueueConsumerManagerBuilder(ctx=" + String.valueOf(this.ctx) + ", queueKey=" + String.valueOf(this.queueKey) + ", consumerExecutor=" + String.valueOf(this.consumerExecutor) + ", scheduler=" + String.valueOf(this.scheduler) + ", taskExecutor=" + String.valueOf(this.taskExecutor) + ")";
        }
    }

    public TbRuleEngineQueueConsumerManager(TbRuleEngineConsumerContext tbRuleEngineConsumerContext, QueueKey queueKey, ExecutorService executorService, ScheduledExecutorService scheduledExecutorService, ExecutorService executorService2) {
        super(queueKey, (QueueConfig) null, (MainQueueConsumerManager.MsgPackProcessor) null, (queue, topicPartitionInfo) -> {
            return tbRuleEngineConsumerContext.getQueueFactory().createToRuleEngineMsgConsumer(queue, topicPartitionInfo != null ? (Integer) topicPartitionInfo.getPartition().orElse(-1) : null);
        }, executorService, scheduledExecutorService, executorService2, (Consumer) null);
        this.ctx = tbRuleEngineConsumerContext;
        this.stats = new TbRuleEngineConsumerStats(queueKey, tbRuleEngineConsumerContext.getStatsFactory());
    }

    public void delete(boolean z) {
        addTask(new TbQueueConsumerManagerTask.DeleteQueueTask(z));
    }

    protected void processTask(TbQueueConsumerManagerTask tbQueueConsumerManagerTask) {
        if (tbQueueConsumerManagerTask instanceof TbQueueConsumerManagerTask.DeleteQueueTask) {
            doDelete(((TbQueueConsumerManagerTask.DeleteQueueTask) tbQueueConsumerManagerTask).drainQueue());
        }
    }

    private void doDelete(boolean z) {
        this.stopped = true;
        log.info("[{}] Handling queue deletion", this.queueKey);
        this.consumerWrapper.getConsumers().forEach((v0) -> {
            v0.awaitCompletion();
        });
        List list = (List) this.consumerWrapper.getConsumers().stream().map((v0) -> {
            return v0.getConsumer();
        }).collect(Collectors.toList());
        this.consumerExecutor.submit(() -> {
            if (z) {
                drainQueue(list);
            }
            list.forEach(tbQueueConsumer -> {
                for (String str : tbQueueConsumer.getFullTopicNames()) {
                    try {
                        this.ctx.getQueueAdmin().deleteTopic(str);
                        log.info("Deleted topic {}", str);
                    } catch (Exception e) {
                        log.error("Failed to delete topic {}", str, e);
                    }
                }
                try {
                    tbQueueConsumer.unsubscribe();
                } catch (Exception e2) {
                    log.error("[{}] Failed to unsubscribe consumer", this.queueKey, e2);
                }
            });
        });
    }

    protected void processMsgs(List<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> list, TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> tbQueueConsumer, Queue queue) throws Exception {
        TbRuleEngineSubmitStrategy submitStrategy = getSubmitStrategy(queue);
        TbRuleEngineProcessingStrategy processingStrategy = getProcessingStrategy(queue);
        submitStrategy.init(list);
        while (!this.stopped && !tbQueueConsumer.isStopped()) {
            TbMsgPackProcessingContext tbMsgPackProcessingContext = new TbMsgPackProcessingContext(queue.getName(), submitStrategy, processingStrategy.isSkipTimeoutMsgs());
            submitStrategy.submitAttempt((uuid, tbProtoQueueMsg) -> {
                submitMessage(tbMsgPackProcessingContext, uuid, tbProtoQueueMsg);
            });
            boolean z = !tbMsgPackProcessingContext.await(queue.getPackProcessingTimeout(), TimeUnit.MILLISECONDS);
            TbRuleEngineProcessingResult tbRuleEngineProcessingResult = new TbRuleEngineProcessingResult(queue.getName(), z, tbMsgPackProcessingContext);
            if (z) {
                printFirstOrAll(tbMsgPackProcessingContext, tbMsgPackProcessingContext.getPendingMap(), "Timeout");
            }
            if (!tbMsgPackProcessingContext.getFailedMap().isEmpty()) {
                printFirstOrAll(tbMsgPackProcessingContext, tbMsgPackProcessingContext.getFailedMap(), "Failed");
            }
            tbMsgPackProcessingContext.printProfilerStats();
            TbRuleEngineProcessingDecision analyze = processingStrategy.analyze(tbRuleEngineProcessingResult);
            if (this.ctx.isStatsEnabled()) {
                this.stats.log(tbRuleEngineProcessingResult, analyze.isCommit());
            }
            tbMsgPackProcessingContext.cleanup();
            if (analyze.isCommit()) {
                submitStrategy.stop();
                tbQueueConsumer.commit();
                return;
            }
            submitStrategy.update(analyze.getReprocessMap());
        }
    }

    private TbRuleEngineSubmitStrategy getSubmitStrategy(Queue queue) {
        return this.ctx.getSubmitStrategyFactory().newInstance(queue.getName(), queue.getSubmitStrategy());
    }

    private TbRuleEngineProcessingStrategy getProcessingStrategy(Queue queue) {
        return this.ctx.getProcessingStrategyFactory().newInstance(queue.getName(), queue.getProcessingStrategy());
    }

    private void submitMessage(TbMsgPackProcessingContext tbMsgPackProcessingContext, UUID uuid, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg> tbProtoQueueMsg) {
        log.trace("[{}] Creating callback for topic {} message: {}", new Object[]{uuid, this.config.getName(), tbProtoQueueMsg.getValue()});
        TransportProtos.ToRuleEngineMsg toRuleEngineMsg = (TransportProtos.ToRuleEngineMsg) tbProtoQueueMsg.getValue();
        TenantId fromUUID = TenantId.fromUUID(new UUID(toRuleEngineMsg.getTenantIdMSB(), toRuleEngineMsg.getTenantIdLSB()));
        TbMsgPackCallback tbMsgPackCallback = this.ctx.isPrometheusStatsEnabled() ? new TbMsgPackCallback(uuid, fromUUID, tbMsgPackProcessingContext, this.stats.getTimer(fromUUID, SUCCESSFUL_STATUS), this.stats.getTimer(fromUUID, FAILED_STATUS)) : new TbMsgPackCallback(uuid, fromUUID, tbMsgPackProcessingContext);
        try {
            if (toRuleEngineMsg.getTbMsg().isEmpty()) {
                tbMsgPackCallback.onSuccess();
            } else {
                forwardToRuleEngineActor(this.config.getName(), fromUUID, toRuleEngineMsg, tbMsgPackCallback);
            }
        } catch (Exception e) {
            tbMsgPackCallback.onFailure(new RuleEngineException(e.getMessage(), e));
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v14, types: [java.util.Set] */
    private void forwardToRuleEngineActor(String str, TenantId tenantId, TransportProtos.ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback tbMsgCallback) {
        TbMsg fromBytes = TbMsg.fromBytes(str, toRuleEngineMsg.getTbMsg().toByteArray(), tbMsgCallback);
        ProtocolStringList relationTypesList = toRuleEngineMsg.getRelationTypesList();
        this.ctx.getActorContext().tell(new QueueToRuleEngineMsg(tenantId, fromBytes, relationTypesList.size() == 1 ? Collections.singleton((String) relationTypesList.get(0)) : new HashSet((Collection) relationTypesList), toRuleEngineMsg.getFailureMessage()));
    }

    private void printFirstOrAll(TbMsgPackProcessingContext tbMsgPackProcessingContext, Map<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> map, String str) {
        boolean isTraceEnabled = log.isTraceEnabled();
        log.info("[{}] {} to process [{}] messages", new Object[]{this.queueKey, str, Integer.valueOf(map.size())});
        for (Map.Entry<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> entry : map.entrySet()) {
            TransportProtos.ToRuleEngineMsg value = entry.getValue().getValue();
            TbMsg fromBytes = TbMsg.fromBytes(this.config.getName(), value.getTbMsg().toByteArray(), TbMsgCallback.EMPTY);
            RuleNodeInfo lastVisitedRuleNode = tbMsgPackProcessingContext.getLastVisitedRuleNode(entry.getKey());
            if (!isTraceEnabled) {
                log.info("[{}] {} to process message: {}, Last Rule Node: {}", new Object[]{TenantId.fromUUID(new UUID(value.getTenantIdMSB(), value.getTenantIdLSB())), str, fromBytes, lastVisitedRuleNode});
                return;
            }
            log.trace("[{}][{}] {} to process message: {}, Last Rule Node: {}", new Object[]{this.queueKey, TenantId.fromUUID(new UUID(value.getTenantIdMSB(), value.getTenantIdLSB())), str, fromBytes, lastVisitedRuleNode});
        }
    }

    public void printStats(long j) {
        this.stats.printStats();
        this.ctx.getStatisticsService().reportQueueStats(j, this.stats);
        this.stats.reset();
    }

    private void drainQueue(List<TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>>> list) {
        long currentTimeMillis = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(this.ctx.getTopicDeletionDelayInSec());
        int i = 0;
        while (System.currentTimeMillis() <= currentTimeMillis) {
            try {
                for (TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> tbQueueConsumer : list) {
                    List<TbProtoQueueMsg> poll = tbQueueConsumer.poll(this.config.getPollInterval());
                    if (!poll.isEmpty()) {
                        for (TbProtoQueueMsg tbProtoQueueMsg : poll) {
                            try {
                                MsgProtos.TbMsgProto parseFrom = MsgProtos.TbMsgProto.parseFrom(tbProtoQueueMsg.getValue().getTbMsg().toByteArray());
                                this.ctx.getProducerProvider().getRuleEngineMsgProducer().send(this.ctx.getPartitionService().resolve(ServiceType.TB_RULE_ENGINE, this.config.getName(), TenantId.SYS_TENANT_ID, EntityIdFactory.getByTypeAndUuid(parseFrom.getEntityType(), new UUID(parseFrom.getEntityIdMSB(), parseFrom.getEntityIdLSB()))), tbProtoQueueMsg, (TbQueueCallback) null);
                                i++;
                            } catch (Throwable th) {
                                log.warn("Failed to move message to system {}: {}", new Object[]{tbQueueConsumer.getTopic(), tbProtoQueueMsg, th});
                            }
                        }
                        tbQueueConsumer.commit();
                    }
                }
            } catch (Exception e) {
                log.error("[{}] Failed to drain queue", this.queueKey, e);
                return;
            }
        }
        if (i > 0) {
            log.info("Moved {} messages from {} to system {}", new Object[]{Integer.valueOf(i), this.queueKey, this.config.getName()});
        }
    }

    public static TbRuleEngineQueueConsumerManagerBuilder create() {
        return new TbRuleEngineQueueConsumerManagerBuilder();
    }

    protected /* bridge */ /* synthetic */ void processMsgs(List list, TbQueueConsumer tbQueueConsumer, QueueConfig queueConfig) throws Exception {
        processMsgs((List<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>>) list, (TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>>) tbQueueConsumer, (Queue) queueConfig);
    }
}
