package org.thingsboard.server.service.queue;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.data.queue.QueueConfig;
import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg;
import org.thingsboard.server.common.msg.edge.EdgeHighPriorityMsg;
import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse;
import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.edge.EdgeContextComponent;
import org.thingsboard.server.service.edge.rpc.EdgeRpcService;
import org.thingsboard.server.service.queue.processing.AbstractConsumerService;
import org.thingsboard.server.service.queue.processing.IdMsgPair;

@TbCoreComponent
@Service
/* loaded from: input_file:org/thingsboard/server/service/queue/DefaultTbEdgeConsumerService.class */
public class DefaultTbEdgeConsumerService extends AbstractConsumerService<TransportProtos.ToEdgeNotificationMsg> implements TbEdgeConsumerService {

    @Value("${queue.edge.pool-interval:25}")
    private int pollInterval;

    @Value("${queue.edge.pack-processing-timeout:10000}")
    private int packProcessingTimeout;

    @Value("${queue.edge.consumer-per-partition:false}")
    private boolean consumerPerPartition;

    @Value("${queue.edge.pack-processing-retries:3}")
    private int packProcessingRetries;

    @Value("${queue.edge.stats.enabled:false}")
    private boolean statsEnabled;
    private final TbCoreQueueFactory queueFactory;
    private final EdgeContextComponent edgeCtx;
    private final EdgeConsumerStats stats;
    private MainQueueConsumerManager<TbProtoQueueMsg<TransportProtos.ToEdgeMsg>, QueueConfig> mainConsumer;

    public DefaultTbEdgeConsumerService(TbCoreQueueFactory tbCoreQueueFactory, ActorSystemContext actorSystemContext, StatsFactory statsFactory, EdgeContextComponent edgeContextComponent) {
        super(actorSystemContext, null, null, null, null, null, null, null, null);
        this.edgeCtx = edgeContextComponent;
        this.stats = new EdgeConsumerStats(statsFactory);
        this.queueFactory = tbCoreQueueFactory;
    }

    @PostConstruct
    public void init() {
        super.init("tb-edge");
        this.mainConsumer = MainQueueConsumerManager.builder().queueKey(new QueueKey(ServiceType.TB_CORE).withQueueName("Edge")).config(QueueConfig.of(this.consumerPerPartition, this.pollInterval)).msgPackProcessor(this::processMsgs).consumerCreator((queueConfig, topicPartitionInfo) -> {
            return this.queueFactory.createEdgeMsgConsumer();
        }).consumerExecutor(this.consumersExecutor).scheduler(this.scheduler).taskExecutor(this.mgmtExecutor).build();
    }

    @Override // org.thingsboard.server.service.queue.processing.AbstractConsumerService
    @PreDestroy
    public void destroy() {
        super.destroy();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.thingsboard.server.service.queue.processing.AbstractConsumerService
    public void startConsumers() {
        super.startConsumers();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
        Set edgePartitions = partitionChangeEvent.getEdgePartitions();
        this.log.debug("Subscribing to partitions: {}", edgePartitions);
        this.mainConsumer.update(edgePartitions);
    }

    private void processMsgs(List<TbProtoQueueMsg<TransportProtos.ToEdgeMsg>> list, TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToEdgeMsg>> tbQueueConsumer, QueueConfig queueConfig) throws InterruptedException {
        List list2 = list.stream().map(tbProtoQueueMsg -> {
            return new IdMsgPair(UUID.randomUUID(), tbProtoQueueMsg);
        }).toList();
        ConcurrentMap concurrentMap = (ConcurrentMap) list2.stream().collect(Collectors.toConcurrentMap((v0) -> {
            return v0.getUuid();
        }, (v0) -> {
            return v0.getMsg();
        }));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        TbPackProcessingContext tbPackProcessingContext = new TbPackProcessingContext(countDownLatch, concurrentMap, new ConcurrentHashMap());
        PendingMsgHolder pendingMsgHolder = new PendingMsgHolder();
        Future<?> submit = this.consumersExecutor.submit(() -> {
            list2.forEach(idMsgPair -> {
                UUID uuid = idMsgPair.getUuid();
                TbProtoQueueMsg msg = idMsgPair.getMsg();
                TbPackCallback tbPackCallback = new TbPackCallback(uuid, tbPackProcessingContext);
                try {
                    TransportProtos.ToEdgeMsg value = msg.getValue();
                    pendingMsgHolder.setMsg(value);
                    if (value.hasEdgeNotificationMsg()) {
                        pushNotificationToEdge(value.getEdgeNotificationMsg(), 0, this.packProcessingRetries, tbPackCallback);
                    }
                    if (this.statsEnabled) {
                        this.stats.log(value);
                    }
                } catch (Throwable th) {
                    this.log.warn("[{}] Failed to process message: {}", new Object[]{uuid, msg, th});
                    tbPackCallback.onFailure(th);
                }
            });
        });
        if (!countDownLatch.await(this.packProcessingTimeout, TimeUnit.MILLISECONDS)) {
            if (!submit.isDone()) {
                submit.cancel(true);
                this.log.info("Timeout to process message: {}", pendingMsgHolder.getMsg());
            }
            tbPackProcessingContext.getFailedMap().forEach((uuid, tbProtoQueueMsg2) -> {
                this.log.warn("[{}] Failed to process message: {}", uuid, tbProtoQueueMsg2.getValue());
            });
        }
        tbQueueConsumer.commit();
    }

    @Override // org.thingsboard.server.service.queue.processing.AbstractConsumerService
    protected ServiceType getServiceType() {
        return ServiceType.TB_CORE;
    }

    @Override // org.thingsboard.server.service.queue.processing.AbstractConsumerService
    protected long getNotificationPollDuration() {
        return this.pollInterval;
    }

    @Override // org.thingsboard.server.service.queue.processing.AbstractConsumerService
    protected long getNotificationPackProcessingTimeout() {
        return this.packProcessingTimeout;
    }

    @Override // org.thingsboard.server.service.queue.processing.AbstractConsumerService
    protected int getMgmtThreadPoolSize() {
        return Math.max(Runtime.getRuntime().availableProcessors(), 4);
    }

    @Override // org.thingsboard.server.service.queue.processing.AbstractConsumerService
    protected TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToEdgeNotificationMsg>> createNotificationsConsumer() {
        return this.queueFactory.createToEdgeNotificationsMsgConsumer();
    }

    @Override // org.thingsboard.server.service.queue.processing.AbstractConsumerService
    protected void handleNotification(UUID uuid, TbProtoQueueMsg<TransportProtos.ToEdgeNotificationMsg> tbProtoQueueMsg, TbCallback tbCallback) {
        EdgeRpcService edgeRpcService;
        TransportProtos.ToEdgeNotificationMsg value = tbProtoQueueMsg.getValue();
        try {
            edgeRpcService = this.edgeCtx.getEdgeRpcService();
        } catch (Exception e) {
            this.log.error("Error processing edge notification message {}", value, e);
            tbCallback.onFailure(e);
        }
        if (edgeRpcService == null) {
            this.log.debug("No EdgeRpcService available (edge functionality disabled), ignoring msg: {}", value);
            tbCallback.onSuccess();
            return;
        }
        if (value.hasEdgeHighPriority()) {
            EdgeHighPriorityMsg fromProto = ProtoUtils.fromProto(value.getEdgeHighPriority());
            edgeRpcService.onToEdgeSessionMsg(fromProto.getTenantId(), fromProto);
            tbCallback.onSuccess();
        } else if (value.hasEdgeEventUpdate()) {
            EdgeEventUpdateMsg fromProto2 = ProtoUtils.fromProto(value.getEdgeEventUpdate());
            edgeRpcService.onToEdgeSessionMsg(fromProto2.getTenantId(), fromProto2);
            tbCallback.onSuccess();
        } else if (value.hasToEdgeSyncRequest()) {
            ToEdgeSyncRequest fromProto3 = ProtoUtils.fromProto(value.getToEdgeSyncRequest());
            edgeRpcService.onToEdgeSessionMsg(fromProto3.getTenantId(), fromProto3);
            tbCallback.onSuccess();
        } else if (value.hasFromEdgeSyncResponse()) {
            FromEdgeSyncResponse fromProto4 = ProtoUtils.fromProto(value.getFromEdgeSyncResponse());
            edgeRpcService.onToEdgeSessionMsg(fromProto4.getTenantId(), fromProto4);
            tbCallback.onSuccess();
        } else if (value.hasComponentLifecycle()) {
            ComponentLifecycleMsg fromProto5 = ProtoUtils.fromProto(value.getComponentLifecycle());
            TenantId tenantId = fromProto5.getTenantId();
            EdgeId edgeId = new EdgeId(fromProto5.getEntityId().getId());
            if (ComponentLifecycleEvent.DELETED.equals(fromProto5.getEvent())) {
                edgeRpcService.deleteEdge(tenantId, edgeId);
            } else if (ComponentLifecycleEvent.UPDATED.equals(fromProto5.getEvent())) {
                edgeRpcService.updateEdge(tenantId, this.edgeCtx.getEdgeService().findEdgeById(tenantId, edgeId));
            }
            tbCallback.onSuccess();
        }
        if (this.statsEnabled) {
            this.stats.log((TransportProtos.ToEdgeNotificationMsg) tbProtoQueueMsg.getValue());
        }
    }

    private void pushNotificationToEdge(final TransportProtos.EdgeNotificationMsgProto edgeNotificationMsgProto, final int i, final int i2, final TbCallback tbCallback) {
        final TenantId fromUUID = TenantId.fromUUID(new UUID(edgeNotificationMsgProto.getTenantIdMSB(), edgeNotificationMsgProto.getTenantIdLSB()));
        this.log.debug("[{}] Pushing notification to edge {}", fromUUID, edgeNotificationMsgProto);
        try {
            Futures.addCallback(this.edgeCtx.getProcessor(EdgeEventType.valueOf(edgeNotificationMsgProto.getType())).processEntityNotification(fromUUID, edgeNotificationMsgProto), new FutureCallback<Void>() { // from class: org.thingsboard.server.service.queue.DefaultTbEdgeConsumerService.1
                public void onSuccess(Void r3) {
                    tbCallback.onSuccess();
                }

                public void onFailure(@NotNull Throwable th) {
                    if (i >= i2) {
                        DefaultTbEdgeConsumerService.this.callBackFailure(fromUUID, edgeNotificationMsgProto, tbCallback, th);
                    } else {
                        DefaultTbEdgeConsumerService.this.log.warn("[{}] Retry {} for message due to failure: {}", new Object[]{fromUUID, Integer.valueOf(i + 1), th.getMessage()});
                        DefaultTbEdgeConsumerService.this.pushNotificationToEdge(edgeNotificationMsgProto, i + 1, i2, tbCallback);
                    }
                }
            }, MoreExecutors.directExecutor());
        } catch (Exception e) {
            if (i >= i2) {
                callBackFailure(fromUUID, edgeNotificationMsgProto, tbCallback, e);
            } else {
                this.log.warn("[{}] Retry {} for message due to exception: {}", new Object[]{fromUUID, Integer.valueOf(i + 1), e.getMessage()});
                pushNotificationToEdge(edgeNotificationMsgProto, i + 1, i2, tbCallback);
            }
        }
    }

    private void callBackFailure(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsgProto, TbCallback tbCallback, Throwable th) {
        this.log.error("[{}] Can't push to edge updates, edgeNotificationMsg [{}]", new Object[]{tenantId, edgeNotificationMsgProto, th});
        tbCallback.onFailure(th);
    }

    @Scheduled(fixedDelayString = "${queue.edge.stats.print-interval-ms}")
    public void printStats() {
        if (this.statsEnabled) {
            this.stats.printStats();
            this.stats.reset();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.thingsboard.server.service.queue.processing.AbstractConsumerService
    public void stopConsumers() {
        super.stopConsumers();
        this.mainConsumer.stop();
        this.mainConsumer.awaitStop();
    }
}
