package org.thingsboard.server.service.queue;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.data.JavaSerDesUtil;
import org.thingsboard.server.common.data.alarm.AlarmInfo;
import org.thingsboard.server.common.data.edqs.ToCoreEdqsMsg;
import org.thingsboard.server.common.data.event.ErrorEvent;
import org.thingsboard.server.common.data.event.Event;
import org.thingsboard.server.common.data.event.LifecycleEvent;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.NotificationRequestId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTrigger;
import org.thingsboard.server.common.data.queue.QueueConfig;
import org.thingsboard.server.common.data.rpc.RpcError;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.edqs.EdqsService;
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequestActorMsg;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.common.util.KvProtoUtil;
import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.dao.resource.ImageCacheKey;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager;
import org.thingsboard.server.queue.common.consumer.QueueConsumerManager;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.cf.CalculatedFieldCache;
import org.thingsboard.server.service.notification.NotificationSchedulerService;
import org.thingsboard.server.service.ota.OtaPackageStateService;
import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.service.queue.processing.AbstractConsumerService;
import org.thingsboard.server.service.queue.processing.IdMsgPair;
import org.thingsboard.server.service.resource.TbImageService;
import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
import org.thingsboard.server.service.ruleengine.RuleEngineCallService;
import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService;
import org.thingsboard.server.service.state.DeviceStateService;
import org.thingsboard.server.service.subscription.SubscriptionManagerService;
import org.thingsboard.server.service.subscription.TbLocalSubscriptionService;
import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
import org.thingsboard.server.service.sync.vc.GitVersionControlQueueService;
import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpdate;
import org.thingsboard.server.service.ws.notification.sub.NotificationUpdate;

@TbCoreComponent
@Service
/* loaded from: input_file:org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.class */
public class DefaultTbCoreConsumerService extends AbstractConsumerService<TransportProtos.ToCoreNotificationMsg> implements TbCoreConsumerService {
    private static final Logger log = LoggerFactory.getLogger(DefaultTbCoreConsumerService.class);

    @Value("${queue.core.poll-interval}")
    private long pollInterval;

    @Value("${queue.core.pack-processing-timeout}")
    private long packProcessingTimeout;

    @Value("${queue.core.consumer-per-partition:true}")
    private boolean consumerPerPartition;

    @Value("${queue.core.stats.enabled:false}")
    private boolean statsEnabled;

    @Value("${queue.core.ota.pack-interval-ms:60000}")
    private long firmwarePackInterval;

    @Value("${queue.core.ota.pack-size:100}")
    private int firmwarePackSize;
    private final DeviceStateService stateService;
    private final TbApiUsageStateService statsService;
    private final TbLocalSubscriptionService localSubscriptionService;
    private final SubscriptionManagerService subscriptionManagerService;
    private final TbCoreDeviceRpcService tbCoreDeviceRpcService;
    private final OtaPackageStateService firmwareStateService;
    private final GitVersionControlQueueService vcQueueService;
    private final NotificationSchedulerService notificationSchedulerService;
    private final NotificationRuleProcessor notificationRuleProcessor;
    private final TbCoreQueueFactory queueFactory;
    private final TbImageService imageService;
    private final RuleEngineCallService ruleEngineCallService;
    private final EdqsService edqsService;
    private final TbCoreConsumerStats stats;
    private MainQueueConsumerManager<TbProtoQueueMsg<TransportProtos.ToCoreMsg>, QueueConfig> mainConsumer;
    private QueueConsumerManager<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> usageStatsConsumer;
    private QueueConsumerManager<TbProtoQueueMsg<TransportProtos.ToOtaPackageStateServiceMsg>> firmwareStatesConsumer;
    private volatile ListeningExecutorService deviceActivityEventsExecutor;

    public DefaultTbCoreConsumerService(TbCoreQueueFactory tbCoreQueueFactory, ActorSystemContext actorSystemContext, DeviceStateService deviceStateService, TbLocalSubscriptionService tbLocalSubscriptionService, SubscriptionManagerService subscriptionManagerService, TbCoreDeviceRpcService tbCoreDeviceRpcService, StatsFactory statsFactory, TbDeviceProfileCache tbDeviceProfileCache, TbAssetProfileCache tbAssetProfileCache, TbApiUsageStateService tbApiUsageStateService, TbTenantProfileCache tbTenantProfileCache, TbApiUsageStateService tbApiUsageStateService2, OtaPackageStateService otaPackageStateService, GitVersionControlQueueService gitVersionControlQueueService, PartitionService partitionService, ApplicationEventPublisher applicationEventPublisher, JwtSettingsService jwtSettingsService, NotificationSchedulerService notificationSchedulerService, NotificationRuleProcessor notificationRuleProcessor, TbImageService tbImageService, RuleEngineCallService ruleEngineCallService, CalculatedFieldCache calculatedFieldCache, EdqsService edqsService) {
        super(actorSystemContext, tbTenantProfileCache, tbDeviceProfileCache, tbAssetProfileCache, calculatedFieldCache, tbApiUsageStateService2, partitionService, applicationEventPublisher, jwtSettingsService);
        this.stateService = deviceStateService;
        this.localSubscriptionService = tbLocalSubscriptionService;
        this.subscriptionManagerService = subscriptionManagerService;
        this.tbCoreDeviceRpcService = tbCoreDeviceRpcService;
        this.stats = new TbCoreConsumerStats(statsFactory);
        this.statsService = tbApiUsageStateService;
        this.firmwareStateService = otaPackageStateService;
        this.vcQueueService = gitVersionControlQueueService;
        this.notificationSchedulerService = notificationSchedulerService;
        this.notificationRuleProcessor = notificationRuleProcessor;
        this.imageService = tbImageService;
        this.ruleEngineCallService = ruleEngineCallService;
        this.queueFactory = tbCoreQueueFactory;
        this.edqsService = edqsService;
    }

    @PostConstruct
    public void init() {
        super.init("tb-core");
        this.deviceActivityEventsExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-core-device-activity-events-executor")));
        this.mainConsumer = MainQueueConsumerManager.builder().queueKey(new QueueKey(ServiceType.TB_CORE)).config(QueueConfig.of(this.consumerPerPartition, this.pollInterval)).msgPackProcessor(this::processMsgs).consumerCreator((queueConfig, topicPartitionInfo) -> {
            return this.queueFactory.createToCoreMsgConsumer();
        }).consumerExecutor(this.consumersExecutor).scheduler(this.scheduler).taskExecutor(this.mgmtExecutor).build();
        QueueConsumerManager.QueueConsumerManagerBuilder pollInterval = QueueConsumerManager.builder().name("TB Usage Stats").msgPackProcessor(this::processUsageStatsMsg).pollInterval(this.pollInterval);
        TbCoreQueueFactory tbCoreQueueFactory = this.queueFactory;
        Objects.requireNonNull(tbCoreQueueFactory);
        this.usageStatsConsumer = pollInterval.consumerCreator(tbCoreQueueFactory::createToUsageStatsServiceMsgConsumer).consumerExecutor(this.consumersExecutor).threadPrefix("usage-stats").build();
        QueueConsumerManager.QueueConsumerManagerBuilder pollInterval2 = QueueConsumerManager.builder().name("TB Ota Package States").msgPackProcessor(this::processFirmwareMsgs).pollInterval(this.pollInterval);
        TbCoreQueueFactory tbCoreQueueFactory2 = this.queueFactory;
        Objects.requireNonNull(tbCoreQueueFactory2);
        this.firmwareStatesConsumer = pollInterval2.consumerCreator(tbCoreQueueFactory2::createToOtaPackageStateServiceMsgConsumer).consumerExecutor(this.consumersExecutor).threadPrefix("firmware").build();
    }

    @Override // org.thingsboard.server.service.queue.processing.AbstractConsumerService
    @PreDestroy
    public void destroy() {
        super.destroy();
        if (this.deviceActivityEventsExecutor != null) {
            this.deviceActivityEventsExecutor.shutdownNow();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.thingsboard.server.service.queue.processing.AbstractConsumerService
    public void startConsumers() {
        super.startConsumers();
        this.firmwareStatesConsumer.subscribe();
        this.firmwareStatesConsumer.launch();
        this.usageStatsConsumer.launch();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
        log.debug("Subscribing to partitions: {}", partitionChangeEvent.getCorePartitions());
        this.mainConsumer.update(partitionChangeEvent.getCorePartitions());
        this.usageStatsConsumer.subscribe((Set) partitionChangeEvent.getCorePartitions().stream().map(topicPartitionInfo -> {
            return topicPartitionInfo.newByTopic(this.usageStatsConsumer.getConsumer().getTopic());
        }).collect(Collectors.toSet()));
    }

    private void processMsgs(List<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> list, TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> tbQueueConsumer, QueueConfig queueConfig) throws Exception {
        List list2 = list.stream().map(tbProtoQueueMsg -> {
            return new IdMsgPair(UUID.randomUUID(), tbProtoQueueMsg);
        }).toList();
        ConcurrentMap concurrentMap = (ConcurrentMap) list2.stream().collect(Collectors.toConcurrentMap((v0) -> {
            return v0.getUuid();
        }, (v0) -> {
            return v0.getMsg();
        }));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        TbPackProcessingContext tbPackProcessingContext = new TbPackProcessingContext(countDownLatch, concurrentMap, new ConcurrentHashMap());
        PendingMsgHolder pendingMsgHolder = new PendingMsgHolder();
        Future<?> submit = this.consumersExecutor.submit(() -> {
            list2.forEach(idMsgPair -> {
                UUID uuid = idMsgPair.getUuid();
                TbProtoQueueMsg msg = idMsgPair.getMsg();
                log.trace("[{}] Creating main callback for message: {}", uuid, msg.getValue());
                TbPackCallback tbPackCallback = new TbPackCallback(uuid, tbPackProcessingContext);
                try {
                    TransportProtos.ToCoreMsg value = msg.getValue();
                    pendingMsgHolder.setMsg(value);
                    if (value.hasToSubscriptionMgrMsg()) {
                        log.trace("[{}] Forwarding message to subscription manager service {}", uuid, value.getToSubscriptionMgrMsg());
                        forwardToSubMgrService(value.getToSubscriptionMgrMsg(), tbPackCallback);
                    } else if (value.hasToDeviceActorMsg()) {
                        log.trace("[{}] Forwarding message to device actor {}", uuid, value.getToDeviceActorMsg());
                        forwardToDeviceActor(value.getToDeviceActorMsg(), tbPackCallback);
                    } else if (value.hasDeviceStateServiceMsg()) {
                        log.trace("[{}] Forwarding message to device state service {}", uuid, value.getDeviceStateServiceMsg());
                        forwardToStateService(value.getDeviceStateServiceMsg(), tbPackCallback);
                    } else if (value.hasDeviceConnectMsg()) {
                        log.trace("[{}] Forwarding message to device state service {}", uuid, value.getDeviceConnectMsg());
                        forwardToStateService(value.getDeviceConnectMsg(), tbPackCallback);
                    } else if (value.hasDeviceActivityMsg()) {
                        log.trace("[{}] Forwarding message to device state service {}", uuid, value.getDeviceActivityMsg());
                        forwardToStateService(value.getDeviceActivityMsg(), tbPackCallback);
                    } else if (value.hasDeviceDisconnectMsg()) {
                        log.trace("[{}] Forwarding message to device state service {}", uuid, value.getDeviceDisconnectMsg());
                        forwardToStateService(value.getDeviceDisconnectMsg(), tbPackCallback);
                    } else if (value.hasDeviceInactivityMsg()) {
                        log.trace("[{}] Forwarding message to device state service {}", uuid, value.getDeviceInactivityMsg());
                        forwardToStateService(value.getDeviceInactivityMsg(), tbPackCallback);
                    } else if (value.hasDeviceInactivityTimeoutUpdateMsg()) {
                        log.trace("[{}] Forwarding message to device state service {}", uuid, value.getDeviceInactivityTimeoutUpdateMsg());
                        forwardToStateService(value.getDeviceInactivityTimeoutUpdateMsg(), tbPackCallback);
                    } else if (value.hasToDeviceActorNotification()) {
                        ToDeviceRpcRequestActorMsg fromProto = ProtoUtils.fromProto(value.getToDeviceActorNotification());
                        if (fromProto != null) {
                            if (fromProto.getMsgType().equals(MsgType.DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG)) {
                                this.tbCoreDeviceRpcService.forwardRpcRequestToDeviceActor(fromProto);
                            } else {
                                log.trace("[{}] Forwarding message to App Actor {}", uuid, fromProto);
                                this.actorContext.tell(fromProto);
                            }
                        }
                        tbPackCallback.onSuccess();
                    } else if (value.hasNotificationSchedulerServiceMsg()) {
                        TransportProtos.NotificationSchedulerServiceMsg notificationSchedulerServiceMsg = value.getNotificationSchedulerServiceMsg();
                        log.trace("[{}] Forwarding message to notification scheduler service {}", uuid, value.getNotificationSchedulerServiceMsg());
                        forwardToNotificationSchedulerService(notificationSchedulerServiceMsg, tbPackCallback);
                    } else if (value.hasErrorEventMsg()) {
                        forwardToEventService(value.getErrorEventMsg(), tbPackCallback);
                    } else if (value.hasLifecycleEventMsg()) {
                        forwardToEventService(value.getLifecycleEventMsg(), tbPackCallback);
                    }
                } catch (Throwable th) {
                    log.warn("[{}] Failed to process message: {}", new Object[]{uuid, msg, th});
                    tbPackCallback.onFailure(th);
                }
            });
        });
        if (!countDownLatch.await(this.packProcessingTimeout, TimeUnit.MILLISECONDS)) {
            if (!submit.isDone()) {
                submit.cancel(true);
                log.info("Timeout to process message: {}", pendingMsgHolder.getMsg());
            }
            if (log.isDebugEnabled()) {
                tbPackProcessingContext.getAckMap().forEach((uuid, tbProtoQueueMsg2) -> {
                    log.debug("[{}] Timeout to process message: {}", uuid, tbProtoQueueMsg2.getValue());
                });
            }
            tbPackProcessingContext.getFailedMap().forEach((uuid2, tbProtoQueueMsg3) -> {
                log.warn("[{}] Failed to process message: {}", uuid2, tbProtoQueueMsg3.getValue());
            });
        }
        tbQueueConsumer.commit();
    }

    @Override // org.thingsboard.server.service.queue.processing.AbstractConsumerService
    protected ServiceType getServiceType() {
        return ServiceType.TB_CORE;
    }

    @Override // org.thingsboard.server.service.queue.processing.AbstractConsumerService
    protected long getNotificationPollDuration() {
        return this.pollInterval;
    }

    @Override // org.thingsboard.server.service.queue.processing.AbstractConsumerService
    protected long getNotificationPackProcessingTimeout() {
        return this.packProcessingTimeout;
    }

    @Override // org.thingsboard.server.service.queue.processing.AbstractConsumerService
    protected int getMgmtThreadPoolSize() {
        return Math.max(Runtime.getRuntime().availableProcessors(), 4);
    }

    @Override // org.thingsboard.server.service.queue.processing.AbstractConsumerService
    protected TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createNotificationsConsumer() {
        return this.queueFactory.createToCoreNotificationsMsgConsumer();
    }

    @Override // org.thingsboard.server.service.queue.processing.AbstractConsumerService
    protected void handleNotification(UUID uuid, TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg> tbProtoQueueMsg, TbCallback tbCallback) {
        TransportProtos.ToCoreNotificationMsg value = tbProtoQueueMsg.getValue();
        if (value.hasToLocalSubscriptionServiceMsg()) {
            log.trace("[{}] Forwarding message to local subscription service {}", uuid, value.getToLocalSubscriptionServiceMsg());
            forwardToLocalSubMgrService(value.getToLocalSubscriptionServiceMsg(), tbCallback);
        } else if (value.hasCoreStartupMsg()) {
            log.trace("[{}] Forwarding message to local subscription service {}", uuid, value.getCoreStartupMsg());
            forwardCoreStartupMsg(value.getCoreStartupMsg(), tbCallback);
        } else if (value.hasFromDeviceRpcResponse()) {
            log.trace("[{}] Forwarding message to RPC service {}", uuid, value.getFromDeviceRpcResponse());
            forwardToCoreRpcService(value.getFromDeviceRpcResponse(), tbCallback);
        } else if (value.hasRestApiCallResponseMsg()) {
            log.trace("[{}] Forwarding message to RuleEngineCallService service {}", uuid, value.getRestApiCallResponseMsg());
            forwardToRuleEngineCallService(value.getRestApiCallResponseMsg(), tbCallback);
        } else if (value.hasComponentLifecycle()) {
            handleComponentLifecycleMsg(uuid, ProtoUtils.fromProto(value.getComponentLifecycle()));
            tbCallback.onSuccess();
        } else if (value.getQueueUpdateMsgsCount() > 0) {
            this.partitionService.updateQueues(value.getQueueUpdateMsgsList());
            tbCallback.onSuccess();
        } else if (value.getQueueDeleteMsgsCount() > 0) {
            this.partitionService.removeQueues(value.getQueueDeleteMsgsList());
            tbCallback.onSuccess();
        } else if (value.hasVcResponseMsg()) {
            this.vcQueueService.processResponse(value.getVcResponseMsg());
            tbCallback.onSuccess();
        } else if (value.hasToSubscriptionMgrMsg()) {
            forwardToSubMgrService(value.getToSubscriptionMgrMsg(), tbCallback);
        } else if (value.hasNotificationRuleProcessorMsg()) {
            this.notificationRuleProcessor.process((NotificationRuleTrigger) JavaSerDesUtil.decode(value.getNotificationRuleProcessorMsg().getTrigger().toByteArray()));
            tbCallback.onSuccess();
        } else if (value.hasResourceCacheInvalidateMsg()) {
            forwardToResourceService(value.getResourceCacheInvalidateMsg(), tbCallback);
        } else if (value.hasToEdqsCoreServiceMsg()) {
            this.edqsService.processSystemMsg((ToCoreEdqsMsg) JacksonUtil.fromBytes(value.getToEdqsCoreServiceMsg().getValue().toByteArray(), ToCoreEdqsMsg.class));
            tbCallback.onSuccess();
        }
        if (this.statsEnabled) {
            this.stats.log(value);
        }
    }

    private void processUsageStatsMsg(List<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> list, TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> tbQueueConsumer) throws Exception {
        ConcurrentMap concurrentMap = (ConcurrentMap) list.stream().collect(Collectors.toConcurrentMap(tbProtoQueueMsg -> {
            return UUID.randomUUID();
        }, Function.identity()));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        TbPackProcessingContext tbPackProcessingContext = new TbPackProcessingContext(countDownLatch, concurrentMap, new ConcurrentHashMap());
        concurrentMap.forEach((uuid, tbProtoQueueMsg2) -> {
            log.trace("[{}] Creating usage stats callback for message: {}", uuid, tbProtoQueueMsg2.getValue());
            TbPackCallback tbPackCallback = new TbPackCallback(uuid, tbPackProcessingContext);
            try {
                handleUsageStats(tbProtoQueueMsg2, tbPackCallback);
            } catch (Throwable th) {
                log.warn("[{}] Failed to process usage stats: {}", new Object[]{uuid, tbProtoQueueMsg2, th});
                tbPackCallback.onFailure(th);
            }
        });
        if (!countDownLatch.await(getNotificationPackProcessingTimeout(), TimeUnit.MILLISECONDS)) {
            tbPackProcessingContext.getAckMap().forEach((uuid2, tbProtoQueueMsg3) -> {
                log.warn("[{}] Timeout to process usage stats: {}", uuid2, tbProtoQueueMsg3.getValue());
            });
            tbPackProcessingContext.getFailedMap().forEach((uuid3, tbProtoQueueMsg4) -> {
                log.warn("[{}] Failed to process usage stats: {}", uuid3, tbProtoQueueMsg4.getValue());
            });
        }
        tbQueueConsumer.commit();
    }

    private void processFirmwareMsgs(List<TbProtoQueueMsg<TransportProtos.ToOtaPackageStateServiceMsg>> list, TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToOtaPackageStateServiceMsg>> tbQueueConsumer) {
        long j = this.firmwarePackInterval / this.firmwarePackSize;
        long j2 = j;
        for (TbProtoQueueMsg<TransportProtos.ToOtaPackageStateServiceMsg> tbProtoQueueMsg : list) {
            try {
                long currentTimeMillis = System.currentTimeMillis();
                boolean handleOtaPackageUpdates = handleOtaPackageUpdates(tbProtoQueueMsg);
                long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                j2 -= currentTimeMillis2;
                if (handleOtaPackageUpdates) {
                    if (j2 > 0) {
                        log.debug("Spent time per record is: [{}]!", Long.valueOf(currentTimeMillis2));
                        Thread.sleep(j2);
                        j2 = 0;
                    }
                    j2 += j;
                }
            } catch (InterruptedException e) {
                return;
            } catch (Throwable th) {
                log.warn("Failed to process firmware update msg: {}", tbProtoQueueMsg, th);
            }
        }
        tbQueueConsumer.commit();
    }

    private void handleUsageStats(TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg> tbProtoQueueMsg, TbCallback tbCallback) {
        this.statsService.process(tbProtoQueueMsg, tbCallback);
    }

    private boolean handleOtaPackageUpdates(TbProtoQueueMsg<TransportProtos.ToOtaPackageStateServiceMsg> tbProtoQueueMsg) {
        return this.firmwareStateService.process((TransportProtos.ToOtaPackageStateServiceMsg) tbProtoQueueMsg.getValue());
    }

    private void forwardToCoreRpcService(TransportProtos.FromDeviceRPCResponseProto fromDeviceRPCResponseProto, TbCallback tbCallback) {
        this.tbCoreDeviceRpcService.processRpcResponseFromRuleEngine(new FromDeviceRpcResponse(new UUID(fromDeviceRPCResponseProto.getRequestIdMSB(), fromDeviceRPCResponseProto.getRequestIdLSB()), fromDeviceRPCResponseProto.getResponse(), fromDeviceRPCResponseProto.getError() > 0 ? RpcError.values()[fromDeviceRPCResponseProto.getError()] : null));
        tbCallback.onSuccess();
    }

    @Scheduled(fixedDelayString = "${queue.core.stats.print-interval-ms}")
    public void printStats() {
        if (this.statsEnabled) {
            this.stats.printStats();
            this.stats.reset();
        }
    }

    private void forwardToLocalSubMgrService(TransportProtos.LocalSubscriptionServiceMsgProto localSubscriptionServiceMsgProto, TbCallback tbCallback) {
        if (localSubscriptionServiceMsgProto.hasSubEventCallback()) {
            this.localSubscriptionService.onSubEventCallback(localSubscriptionServiceMsgProto.getSubEventCallback(), tbCallback);
            return;
        }
        if (localSubscriptionServiceMsgProto.hasTsUpdate()) {
            this.localSubscriptionService.onTimeSeriesUpdate(localSubscriptionServiceMsgProto.getTsUpdate(), tbCallback);
            return;
        }
        if (localSubscriptionServiceMsgProto.hasAttrUpdate()) {
            this.localSubscriptionService.onAttributesUpdate(localSubscriptionServiceMsgProto.getAttrUpdate(), tbCallback);
            return;
        }
        if (localSubscriptionServiceMsgProto.hasAlarmUpdate()) {
            this.localSubscriptionService.onAlarmUpdate(localSubscriptionServiceMsgProto.getAlarmUpdate(), tbCallback);
            return;
        }
        if (localSubscriptionServiceMsgProto.hasNotificationsUpdate()) {
            this.localSubscriptionService.onNotificationUpdate(localSubscriptionServiceMsgProto.getNotificationsUpdate(), tbCallback);
        } else if (localSubscriptionServiceMsgProto.hasSubUpdate() || localSubscriptionServiceMsgProto.hasAlarmSubUpdate() || localSubscriptionServiceMsgProto.hasNotificationsSubUpdate()) {
            tbCallback.onSuccess();
        } else {
            throwNotHandled(localSubscriptionServiceMsgProto, tbCallback);
        }
    }

    private void forwardCoreStartupMsg(TransportProtos.CoreStartupMsg coreStartupMsg, TbCallback tbCallback) {
        log.info("[{}] Processing core startup with partitions: {}", coreStartupMsg.getServiceId(), coreStartupMsg.getPartitionsList());
        this.localSubscriptionService.onCoreStartupMsg(coreStartupMsg);
        tbCallback.onSuccess();
    }

    private void forwardToResourceService(TransportProtos.ResourceCacheInvalidateMsg resourceCacheInvalidateMsg, TbCallback tbCallback) {
        TenantId fromUUID = TenantId.fromUUID(new UUID(resourceCacheInvalidateMsg.getTenantIdMSB(), resourceCacheInvalidateMsg.getTenantIdLSB()));
        Stream map = resourceCacheInvalidateMsg.getKeysList().stream().map(imageCacheKeyProto -> {
            return imageCacheKeyProto.hasResourceKey() ? ImageCacheKey.forImage(fromUUID, imageCacheKeyProto.getResourceKey()) : ImageCacheKey.forPublicImage(imageCacheKeyProto.getPublicResourceKey());
        });
        TbImageService tbImageService = this.imageService;
        Objects.requireNonNull(tbImageService);
        map.forEach(tbImageService::evictETags);
        tbCallback.onSuccess();
    }

    private void forwardToSubMgrService(TransportProtos.SubscriptionMgrMsgProto subscriptionMgrMsgProto, TbCallback tbCallback) {
        if (subscriptionMgrMsgProto.hasSubEvent()) {
            TransportProtos.TbEntitySubEventProto subEvent = subscriptionMgrMsgProto.getSubEvent();
            this.subscriptionManagerService.onSubEvent(subEvent.getServiceId(), TbSubscriptionUtils.fromProto(subEvent), tbCallback);
        } else if (subscriptionMgrMsgProto.hasTelemetrySub()) {
            tbCallback.onSuccess();
        } else if (subscriptionMgrMsgProto.hasAlarmSub()) {
            tbCallback.onSuccess();
        } else if (subscriptionMgrMsgProto.hasNotificationsSub()) {
            tbCallback.onSuccess();
        } else if (subscriptionMgrMsgProto.hasNotificationsCountSub()) {
            tbCallback.onSuccess();
        } else if (subscriptionMgrMsgProto.hasSubClose()) {
            tbCallback.onSuccess();
        } else if (subscriptionMgrMsgProto.hasTsUpdate()) {
            TransportProtos.TbTimeSeriesUpdateProto tsUpdate = subscriptionMgrMsgProto.getTsUpdate();
            this.subscriptionManagerService.onTimeSeriesUpdate(toTenantId(tsUpdate.getTenantIdMSB(), tsUpdate.getTenantIdLSB()), TbSubscriptionUtils.toEntityId(tsUpdate.getEntityType(), tsUpdate.getEntityIdMSB(), tsUpdate.getEntityIdLSB()), KvProtoUtil.fromTsKvProtoList(tsUpdate.getDataList()), tbCallback);
        } else if (subscriptionMgrMsgProto.hasAttrUpdate()) {
            TransportProtos.TbAttributeUpdateProto attrUpdate = subscriptionMgrMsgProto.getAttrUpdate();
            this.subscriptionManagerService.onAttributesUpdate(toTenantId(attrUpdate.getTenantIdMSB(), attrUpdate.getTenantIdLSB()), TbSubscriptionUtils.toEntityId(attrUpdate.getEntityType(), attrUpdate.getEntityIdMSB(), attrUpdate.getEntityIdLSB()), attrUpdate.getScope(), KvProtoUtil.toAttributeKvList(attrUpdate.getDataList()), tbCallback);
        } else if (subscriptionMgrMsgProto.hasAttrDelete()) {
            TransportProtos.TbAttributeDeleteProto attrDelete = subscriptionMgrMsgProto.getAttrDelete();
            if (attrDelete.hasNotifyDevice()) {
                this.subscriptionManagerService.onAttributesDelete(toTenantId(attrDelete.getTenantIdMSB(), attrDelete.getTenantIdLSB()), TbSubscriptionUtils.toEntityId(attrDelete.getEntityType(), attrDelete.getEntityIdMSB(), attrDelete.getEntityIdLSB()), attrDelete.getScope(), attrDelete.getKeysList(), attrDelete.getNotifyDevice(), tbCallback);
            } else {
                this.subscriptionManagerService.onAttributesDelete(toTenantId(attrDelete.getTenantIdMSB(), attrDelete.getTenantIdLSB()), TbSubscriptionUtils.toEntityId(attrDelete.getEntityType(), attrDelete.getEntityIdMSB(), attrDelete.getEntityIdLSB()), attrDelete.getScope(), attrDelete.getKeysList(), tbCallback);
            }
        } else if (subscriptionMgrMsgProto.hasTsDelete()) {
            TransportProtos.TbTimeSeriesDeleteProto tsDelete = subscriptionMgrMsgProto.getTsDelete();
            this.subscriptionManagerService.onTimeSeriesDelete(toTenantId(tsDelete.getTenantIdMSB(), tsDelete.getTenantIdLSB()), TbSubscriptionUtils.toEntityId(tsDelete.getEntityType(), tsDelete.getEntityIdMSB(), tsDelete.getEntityIdLSB()), tsDelete.getKeysList(), tbCallback);
        } else if (subscriptionMgrMsgProto.hasAlarmUpdate()) {
            TransportProtos.TbAlarmUpdateProto alarmUpdate = subscriptionMgrMsgProto.getAlarmUpdate();
            this.subscriptionManagerService.onAlarmUpdate(toTenantId(alarmUpdate.getTenantIdMSB(), alarmUpdate.getTenantIdLSB()), TbSubscriptionUtils.toEntityId(alarmUpdate.getEntityType(), alarmUpdate.getEntityIdMSB(), alarmUpdate.getEntityIdLSB()), (AlarmInfo) JacksonUtil.fromString(alarmUpdate.getAlarm(), AlarmInfo.class), tbCallback);
        } else if (subscriptionMgrMsgProto.hasAlarmDelete()) {
            TransportProtos.TbAlarmDeleteProto alarmDelete = subscriptionMgrMsgProto.getAlarmDelete();
            this.subscriptionManagerService.onAlarmDeleted(toTenantId(alarmDelete.getTenantIdMSB(), alarmDelete.getTenantIdLSB()), TbSubscriptionUtils.toEntityId(alarmDelete.getEntityType(), alarmDelete.getEntityIdMSB(), alarmDelete.getEntityIdLSB()), (AlarmInfo) JacksonUtil.fromString(alarmDelete.getAlarm(), AlarmInfo.class), tbCallback);
        } else if (subscriptionMgrMsgProto.hasNotificationUpdate()) {
            TransportProtos.NotificationUpdateProto notificationUpdate = subscriptionMgrMsgProto.getNotificationUpdate();
            this.subscriptionManagerService.onNotificationUpdate(toTenantId(notificationUpdate.getTenantIdMSB(), notificationUpdate.getTenantIdLSB()), new UserId(new UUID(notificationUpdate.getRecipientIdMSB(), notificationUpdate.getRecipientIdLSB())), (NotificationUpdate) JacksonUtil.fromString(notificationUpdate.getUpdate(), NotificationUpdate.class), tbCallback);
        } else if (subscriptionMgrMsgProto.hasNotificationRequestUpdate()) {
            TransportProtos.NotificationRequestUpdateProto notificationRequestUpdate = subscriptionMgrMsgProto.getNotificationRequestUpdate();
            this.localSubscriptionService.onNotificationRequestUpdate(toTenantId(notificationRequestUpdate.getTenantIdMSB(), notificationRequestUpdate.getTenantIdLSB()), (NotificationRequestUpdate) JacksonUtil.fromString(notificationRequestUpdate.getUpdate(), NotificationRequestUpdate.class), tbCallback);
        } else {
            throwNotHandled(subscriptionMgrMsgProto, tbCallback);
        }
        if (this.statsEnabled) {
            this.stats.log(subscriptionMgrMsgProto);
        }
    }

    void forwardToStateService(TransportProtos.DeviceStateServiceMsgProto deviceStateServiceMsgProto, TbCallback tbCallback) {
        if (this.statsEnabled) {
            this.stats.log(deviceStateServiceMsgProto);
        }
        this.stateService.onQueueMsg(deviceStateServiceMsgProto, tbCallback);
    }

    void forwardToStateService(TransportProtos.DeviceConnectProto deviceConnectProto, TbCallback tbCallback) {
        if (this.statsEnabled) {
            this.stats.log(deviceConnectProto);
        }
        TenantId tenantId = toTenantId(deviceConnectProto.getTenantIdMSB(), deviceConnectProto.getTenantIdLSB());
        DeviceId deviceId = new DeviceId(new UUID(deviceConnectProto.getDeviceIdMSB(), deviceConnectProto.getDeviceIdLSB()));
        DonAsynchron.withCallback(this.deviceActivityEventsExecutor.submit(() -> {
            this.stateService.onDeviceConnect(tenantId, deviceId, deviceConnectProto.getLastConnectTime());
        }), obj -> {
            tbCallback.onSuccess();
        }, th -> {
            log.warn("[{}] Failed to process device connect message for device [{}]", new Object[]{tenantId.getId(), deviceId.getId(), th});
            tbCallback.onFailure(th);
        });
    }

    void forwardToStateService(TransportProtos.DeviceActivityProto deviceActivityProto, TbCallback tbCallback) {
        if (this.statsEnabled) {
            this.stats.log(deviceActivityProto);
        }
        TenantId tenantId = toTenantId(deviceActivityProto.getTenantIdMSB(), deviceActivityProto.getTenantIdLSB());
        DeviceId deviceId = new DeviceId(new UUID(deviceActivityProto.getDeviceIdMSB(), deviceActivityProto.getDeviceIdLSB()));
        DonAsynchron.withCallback(this.deviceActivityEventsExecutor.submit(() -> {
            this.stateService.onDeviceActivity(tenantId, deviceId, deviceActivityProto.getLastActivityTime());
        }), obj -> {
            tbCallback.onSuccess();
        }, th -> {
            log.warn("[{}] Failed to process device activity message for device [{}]", new Object[]{tenantId.getId(), deviceId.getId(), th});
            tbCallback.onFailure(new RuntimeException("Failed to update device activity for device [" + String.valueOf(deviceId.getId()) + "]!", th));
        });
    }

    void forwardToStateService(TransportProtos.DeviceDisconnectProto deviceDisconnectProto, TbCallback tbCallback) {
        if (this.statsEnabled) {
            this.stats.log(deviceDisconnectProto);
        }
        TenantId tenantId = toTenantId(deviceDisconnectProto.getTenantIdMSB(), deviceDisconnectProto.getTenantIdLSB());
        DeviceId deviceId = new DeviceId(new UUID(deviceDisconnectProto.getDeviceIdMSB(), deviceDisconnectProto.getDeviceIdLSB()));
        DonAsynchron.withCallback(this.deviceActivityEventsExecutor.submit(() -> {
            this.stateService.onDeviceDisconnect(tenantId, deviceId, deviceDisconnectProto.getLastDisconnectTime());
        }), obj -> {
            tbCallback.onSuccess();
        }, th -> {
            log.warn("[{}] Failed to process device disconnect message for device [{}]", new Object[]{tenantId.getId(), deviceId.getId(), th});
            tbCallback.onFailure(th);
        });
    }

    void forwardToStateService(TransportProtos.DeviceInactivityProto deviceInactivityProto, TbCallback tbCallback) {
        if (this.statsEnabled) {
            this.stats.log(deviceInactivityProto);
        }
        TenantId tenantId = toTenantId(deviceInactivityProto.getTenantIdMSB(), deviceInactivityProto.getTenantIdLSB());
        DeviceId deviceId = new DeviceId(new UUID(deviceInactivityProto.getDeviceIdMSB(), deviceInactivityProto.getDeviceIdLSB()));
        DonAsynchron.withCallback(this.deviceActivityEventsExecutor.submit(() -> {
            this.stateService.onDeviceInactivity(tenantId, deviceId, deviceInactivityProto.getLastInactivityTime());
        }), obj -> {
            tbCallback.onSuccess();
        }, th -> {
            log.warn("[{}] Failed to process device inactivity message for device [{}]", new Object[]{tenantId.getId(), deviceId.getId(), th});
            tbCallback.onFailure(th);
        });
    }

    void forwardToStateService(TransportProtos.DeviceInactivityTimeoutUpdateProto deviceInactivityTimeoutUpdateProto, TbCallback tbCallback) {
        if (this.statsEnabled) {
            this.stats.log(deviceInactivityTimeoutUpdateProto);
        }
        TenantId tenantId = toTenantId(deviceInactivityTimeoutUpdateProto.getTenantIdMSB(), deviceInactivityTimeoutUpdateProto.getTenantIdLSB());
        DeviceId deviceId = new DeviceId(new UUID(deviceInactivityTimeoutUpdateProto.getDeviceIdMSB(), deviceInactivityTimeoutUpdateProto.getDeviceIdLSB()));
        DonAsynchron.withCallback(this.deviceActivityEventsExecutor.submit(() -> {
            this.stateService.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, deviceInactivityTimeoutUpdateProto.getInactivityTimeout());
        }), obj -> {
            tbCallback.onSuccess();
        }, th -> {
            log.warn("[{}] Failed to process device inactivity timeout update message for device [{}]", new Object[]{tenantId.getId(), deviceId.getId(), th});
            tbCallback.onFailure(th);
        });
    }

    private void forwardToNotificationSchedulerService(TransportProtos.NotificationSchedulerServiceMsg notificationSchedulerServiceMsg, TbCallback tbCallback) {
        try {
            this.notificationSchedulerService.scheduleNotificationRequest(toTenantId(notificationSchedulerServiceMsg.getTenantIdMSB(), notificationSchedulerServiceMsg.getTenantIdLSB()), new NotificationRequestId(new UUID(notificationSchedulerServiceMsg.getRequestIdMSB(), notificationSchedulerServiceMsg.getRequestIdLSB())), notificationSchedulerServiceMsg.getTs());
            tbCallback.onSuccess();
        } catch (Exception e) {
            tbCallback.onFailure(new RuntimeException("Failed to schedule notification request", e));
        }
    }

    private void forwardToDeviceActor(TransportProtos.TransportToDeviceActorMsg transportToDeviceActorMsg, TbCallback tbCallback) {
        if (this.statsEnabled) {
            this.stats.log(transportToDeviceActorMsg);
        }
        this.actorContext.tell(new TransportToDeviceActorMsgWrapper(transportToDeviceActorMsg, tbCallback));
    }

    private void forwardToEventService(TransportProtos.ErrorEventProto errorEventProto, TbCallback tbCallback) {
        forwardToEventService((Event) ErrorEvent.builder().tenantId(toTenantId(errorEventProto.getTenantIdMSB(), errorEventProto.getTenantIdLSB())).entityId(new UUID(errorEventProto.getEntityIdMSB(), errorEventProto.getEntityIdLSB())).serviceId(errorEventProto.getServiceId()).ts(System.currentTimeMillis()).method(errorEventProto.getMethod()).error(errorEventProto.getError()).build(), tbCallback);
    }

    private void forwardToEventService(TransportProtos.LifecycleEventProto lifecycleEventProto, TbCallback tbCallback) {
        forwardToEventService((Event) LifecycleEvent.builder().tenantId(toTenantId(lifecycleEventProto.getTenantIdMSB(), lifecycleEventProto.getTenantIdLSB())).entityId(new UUID(lifecycleEventProto.getEntityIdMSB(), lifecycleEventProto.getEntityIdLSB())).serviceId(lifecycleEventProto.getServiceId()).ts(System.currentTimeMillis()).lcEventType(lifecycleEventProto.getLcEventType()).success(lifecycleEventProto.getSuccess()).error(StringUtils.isNotEmpty(lifecycleEventProto.getError()) ? lifecycleEventProto.getError() : null).build(), tbCallback);
    }

    private void forwardToEventService(Event event, TbCallback tbCallback) {
        ListenableFuture saveAsync = this.actorContext.getEventService().saveAsync(event);
        Consumer consumer = r3 -> {
            tbCallback.onSuccess();
        };
        Objects.requireNonNull(tbCallback);
        DonAsynchron.withCallback(saveAsync, consumer, tbCallback::onFailure, this.actorContext.getDbCallbackExecutor());
    }

    void forwardToRuleEngineCallService(TransportProtos.RestApiCallResponseMsgProto restApiCallResponseMsgProto, TbCallback tbCallback) {
        this.ruleEngineCallService.onQueueMsg(restApiCallResponseMsgProto, tbCallback);
    }

    private void throwNotHandled(Object obj, TbCallback tbCallback) {
        log.warn("Message not handled: {}", obj);
        tbCallback.onFailure(new RuntimeException("Message not handled!"));
    }

    private TenantId toTenantId(long j, long j2) {
        return TenantId.fromUUID(new UUID(j, j2));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.thingsboard.server.service.queue.processing.AbstractConsumerService
    public void stopConsumers() {
        super.stopConsumers();
        this.mainConsumer.stop();
        this.mainConsumer.awaitStop();
        this.usageStatsConsumer.stop();
        this.firmwareStatesConsumer.stop();
    }
}
