/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.service.queue;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.data.JavaSerDesUtil;
import org.thingsboard.server.common.data.alarm.AlarmInfo;
import org.thingsboard.server.common.data.edqs.ToCoreEdqsMsg;
import org.thingsboard.server.common.data.event.ErrorEvent;
import org.thingsboard.server.common.data.event.Event;
import org.thingsboard.server.common.data.event.LifecycleEvent;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.NotificationRequestId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTrigger;
import org.thingsboard.server.common.data.queue.QueueConfig;
import org.thingsboard.server.common.data.rpc.RpcError;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.ToDeviceActorNotificationMsg;
import org.thingsboard.server.common.msg.edqs.EdqsService;
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequestActorMsg;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.common.util.KvProtoUtil;
import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.dao.resource.ImageCacheKey;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager;
import org.thingsboard.server.queue.common.consumer.QueueConsumerManager;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.cf.CalculatedFieldCache;
import org.thingsboard.server.service.notification.NotificationSchedulerService;
import org.thingsboard.server.service.ota.OtaPackageStateService;
import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.service.queue.PendingMsgHolder;
import org.thingsboard.server.service.queue.TbCoreConsumerService;
import org.thingsboard.server.service.queue.TbCoreConsumerStats;
import org.thingsboard.server.service.queue.TbPackCallback;
import org.thingsboard.server.service.queue.TbPackProcessingContext;
import org.thingsboard.server.service.queue.processing.AbstractConsumerService;
import org.thingsboard.server.service.queue.processing.IdMsgPair;
import org.thingsboard.server.service.resource.TbImageService;
import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
import org.thingsboard.server.service.ruleengine.RuleEngineCallService;
import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService;
import org.thingsboard.server.service.state.DeviceStateService;
import org.thingsboard.server.service.subscription.SubscriptionManagerService;
import org.thingsboard.server.service.subscription.TbLocalSubscriptionService;
import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
import org.thingsboard.server.service.sync.vc.GitVersionControlQueueService;
import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpdate;
import org.thingsboard.server.service.ws.notification.sub.NotificationUpdate;

@Service
@TbCoreComponent
public class DefaultTbCoreConsumerService
extends AbstractConsumerService<TransportProtos.ToCoreNotificationMsg>
implements TbCoreConsumerService {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(DefaultTbCoreConsumerService.class);
    @Value(value="${queue.core.poll-interval}")
    private long pollInterval;
    @Value(value="${queue.core.pack-processing-timeout}")
    private long packProcessingTimeout;
    @Value(value="${queue.core.consumer-per-partition:true}")
    private boolean consumerPerPartition;
    @Value(value="${queue.core.stats.enabled:false}")
    private boolean statsEnabled;
    @Value(value="${queue.core.ota.pack-interval-ms:60000}")
    private long firmwarePackInterval;
    @Value(value="${queue.core.ota.pack-size:100}")
    private int firmwarePackSize;
    private final DeviceStateService stateService;
    private final TbApiUsageStateService statsService;
    private final TbLocalSubscriptionService localSubscriptionService;
    private final SubscriptionManagerService subscriptionManagerService;
    private final TbCoreDeviceRpcService tbCoreDeviceRpcService;
    private final OtaPackageStateService firmwareStateService;
    private final GitVersionControlQueueService vcQueueService;
    private final NotificationSchedulerService notificationSchedulerService;
    private final NotificationRuleProcessor notificationRuleProcessor;
    private final TbCoreQueueFactory queueFactory;
    private final TbImageService imageService;
    private final RuleEngineCallService ruleEngineCallService;
    private final EdqsService edqsService;
    private final TbCoreConsumerStats stats;
    private MainQueueConsumerManager<TbProtoQueueMsg<TransportProtos.ToCoreMsg>, QueueConfig> mainConsumer;
    private QueueConsumerManager<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> usageStatsConsumer;
    private QueueConsumerManager<TbProtoQueueMsg<TransportProtos.ToOtaPackageStateServiceMsg>> firmwareStatesConsumer;
    private volatile ListeningExecutorService deviceActivityEventsExecutor;

    public DefaultTbCoreConsumerService(TbCoreQueueFactory tbCoreQueueFactory, ActorSystemContext actorContext, DeviceStateService stateService, TbLocalSubscriptionService localSubscriptionService, SubscriptionManagerService subscriptionManagerService, TbCoreDeviceRpcService tbCoreDeviceRpcService, StatsFactory statsFactory, TbDeviceProfileCache deviceProfileCache, TbAssetProfileCache assetProfileCache, TbApiUsageStateService statsService, TbTenantProfileCache tenantProfileCache, TbApiUsageStateService apiUsageStateService, OtaPackageStateService firmwareStateService, GitVersionControlQueueService vcQueueService, PartitionService partitionService, ApplicationEventPublisher eventPublisher, JwtSettingsService jwtSettingsService, NotificationSchedulerService notificationSchedulerService, NotificationRuleProcessor notificationRuleProcessor, TbImageService imageService, RuleEngineCallService ruleEngineCallService, CalculatedFieldCache calculatedFieldCache, EdqsService edqsService) {
        super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, calculatedFieldCache, apiUsageStateService, partitionService, eventPublisher, jwtSettingsService);
        this.stateService = stateService;
        this.localSubscriptionService = localSubscriptionService;
        this.subscriptionManagerService = subscriptionManagerService;
        this.tbCoreDeviceRpcService = tbCoreDeviceRpcService;
        this.stats = new TbCoreConsumerStats(statsFactory);
        this.statsService = statsService;
        this.firmwareStateService = firmwareStateService;
        this.vcQueueService = vcQueueService;
        this.notificationSchedulerService = notificationSchedulerService;
        this.notificationRuleProcessor = notificationRuleProcessor;
        this.imageService = imageService;
        this.ruleEngineCallService = ruleEngineCallService;
        this.queueFactory = tbCoreQueueFactory;
        this.edqsService = edqsService;
    }

    @PostConstruct
    public void init() {
        super.init("tb-core");
        this.deviceActivityEventsExecutor = MoreExecutors.listeningDecorator((ExecutorService)Executors.newSingleThreadExecutor((ThreadFactory)ThingsBoardThreadFactory.forName((String)"tb-core-device-activity-events-executor")));
        this.mainConsumer = MainQueueConsumerManager.builder().queueKey((Object)new QueueKey(ServiceType.TB_CORE)).config(QueueConfig.of((boolean)this.consumerPerPartition, (long)this.pollInterval)).msgPackProcessor(this::processMsgs).consumerCreator((config, tpi) -> this.queueFactory.createToCoreMsgConsumer()).consumerExecutor(this.consumersExecutor).scheduler(this.scheduler).taskExecutor(this.mgmtExecutor).build();
        this.usageStatsConsumer = QueueConsumerManager.builder().name("TB Usage Stats").msgPackProcessor(this::processUsageStatsMsg).pollInterval(this.pollInterval).consumerCreator(() -> ((TbCoreQueueFactory)this.queueFactory).createToUsageStatsServiceMsgConsumer()).consumerExecutor(this.consumersExecutor).threadPrefix("usage-stats").build();
        this.firmwareStatesConsumer = QueueConsumerManager.builder().name("TB Ota Package States").msgPackProcessor(this::processFirmwareMsgs).pollInterval(this.pollInterval).consumerCreator(() -> ((TbCoreQueueFactory)this.queueFactory).createToOtaPackageStateServiceMsgConsumer()).consumerExecutor(this.consumersExecutor).threadPrefix("firmware").build();
    }

    @Override
    @PreDestroy
    public void destroy() {
        super.destroy();
        if (this.deviceActivityEventsExecutor != null) {
            this.deviceActivityEventsExecutor.shutdownNow();
        }
    }

    @Override
    protected void startConsumers() {
        super.startConsumers();
        this.firmwareStatesConsumer.subscribe();
        this.firmwareStatesConsumer.launch();
        this.usageStatsConsumer.launch();
    }

    protected void onTbApplicationEvent(PartitionChangeEvent event) {
        log.debug("Subscribing to partitions: {}", (Object)event.getCorePartitions());
        this.mainConsumer.update(event.getCorePartitions());
        this.usageStatsConsumer.subscribe(event.getCorePartitions().stream().map(tpi -> tpi.withTopic(this.usageStatsConsumer.getConsumer().getTopic())).collect(Collectors.toSet()));
    }

    private void processMsgs(List<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> consumer, Object consumerKey, QueueConfig config) throws Exception {
        List<IdMsgPair> orderedMsgList = msgs.stream().map(msg -> new IdMsgPair(UUID.randomUUID(), msg)).toList();
        ConcurrentMap<UUID, TbProtoQueueMsg> pendingMap = orderedMsgList.stream().collect(Collectors.toConcurrentMap(IdMsgPair::getUuid, IdMsgPair::getMsg));
        CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
        TbPackProcessingContext<TbProtoQueueMsg> ctx = new TbPackProcessingContext<TbProtoQueueMsg>(processingTimeoutLatch, pendingMap, new ConcurrentHashMap());
        PendingMsgHolder pendingMsgHolder = new PendingMsgHolder();
        Future<?> packSubmitFuture = this.consumersExecutor.submit(() -> orderedMsgList.forEach(element -> {
            UUID id = element.getUuid();
            TbProtoQueueMsg msg = element.getMsg();
            log.trace("[{}] Creating main callback for message: {}", (Object)id, (Object)msg.getValue());
            TbPackCallback callback = new TbPackCallback(id, ctx);
            try {
                TransportProtos.ToCoreMsg toCoreMsg = (TransportProtos.ToCoreMsg)msg.getValue();
                pendingMsgHolder.setMsg(toCoreMsg);
                if (toCoreMsg.hasToSubscriptionMgrMsg()) {
                    log.trace("[{}] Forwarding message to subscription manager service {}", (Object)id, (Object)toCoreMsg.getToSubscriptionMgrMsg());
                    this.forwardToSubMgrService(toCoreMsg.getToSubscriptionMgrMsg(), callback);
                } else if (toCoreMsg.hasToDeviceActorMsg()) {
                    log.trace("[{}] Forwarding message to device actor {}", (Object)id, (Object)toCoreMsg.getToDeviceActorMsg());
                    this.forwardToDeviceActor(toCoreMsg.getToDeviceActorMsg(), callback);
                } else if (toCoreMsg.hasDeviceStateServiceMsg()) {
                    log.trace("[{}] Forwarding message to device state service {}", (Object)id, (Object)toCoreMsg.getDeviceStateServiceMsg());
                    this.forwardToStateService(toCoreMsg.getDeviceStateServiceMsg(), callback);
                } else if (toCoreMsg.hasDeviceConnectMsg()) {
                    log.trace("[{}] Forwarding message to device state service {}", (Object)id, (Object)toCoreMsg.getDeviceConnectMsg());
                    this.forwardToStateService(toCoreMsg.getDeviceConnectMsg(), callback);
                } else if (toCoreMsg.hasDeviceActivityMsg()) {
                    log.trace("[{}] Forwarding message to device state service {}", (Object)id, (Object)toCoreMsg.getDeviceActivityMsg());
                    this.forwardToStateService(toCoreMsg.getDeviceActivityMsg(), callback);
                } else if (toCoreMsg.hasDeviceDisconnectMsg()) {
                    log.trace("[{}] Forwarding message to device state service {}", (Object)id, (Object)toCoreMsg.getDeviceDisconnectMsg());
                    this.forwardToStateService(toCoreMsg.getDeviceDisconnectMsg(), callback);
                } else if (toCoreMsg.hasDeviceInactivityMsg()) {
                    log.trace("[{}] Forwarding message to device state service {}", (Object)id, (Object)toCoreMsg.getDeviceInactivityMsg());
                    this.forwardToStateService(toCoreMsg.getDeviceInactivityMsg(), callback);
                } else if (toCoreMsg.hasDeviceInactivityTimeoutUpdateMsg()) {
                    log.trace("[{}] Forwarding message to device state service {}", (Object)id, (Object)toCoreMsg.getDeviceInactivityTimeoutUpdateMsg());
                    this.forwardToStateService(toCoreMsg.getDeviceInactivityTimeoutUpdateMsg(), callback);
                } else if (toCoreMsg.hasToDeviceActorNotification()) {
                    ToDeviceActorNotificationMsg actorMsg = ProtoUtils.fromProto((TransportProtos.ToDeviceActorNotificationMsgProto)toCoreMsg.getToDeviceActorNotification());
                    if (actorMsg != null) {
                        if (actorMsg.getMsgType().equals((Object)MsgType.DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG)) {
                            this.tbCoreDeviceRpcService.forwardRpcRequestToDeviceActor((ToDeviceRpcRequestActorMsg)actorMsg);
                        } else {
                            log.trace("[{}] Forwarding message to App Actor {}", (Object)id, (Object)actorMsg);
                            this.actorContext.tell((TbActorMsg)actorMsg);
                        }
                    }
                    callback.onSuccess();
                } else if (toCoreMsg.hasNotificationSchedulerServiceMsg()) {
                    TransportProtos.NotificationSchedulerServiceMsg notificationSchedulerServiceMsg = toCoreMsg.getNotificationSchedulerServiceMsg();
                    log.trace("[{}] Forwarding message to notification scheduler service {}", (Object)id, (Object)toCoreMsg.getNotificationSchedulerServiceMsg());
                    this.forwardToNotificationSchedulerService(notificationSchedulerServiceMsg, callback);
                } else if (toCoreMsg.hasErrorEventMsg()) {
                    this.forwardToEventService(toCoreMsg.getErrorEventMsg(), callback);
                } else if (toCoreMsg.hasLifecycleEventMsg()) {
                    this.forwardToEventService(toCoreMsg.getLifecycleEventMsg(), callback);
                }
            }
            catch (Throwable e) {
                log.warn("[{}] Failed to process message: {}", new Object[]{id, msg, e});
                callback.onFailure(e);
            }
        }));
        if (!processingTimeoutLatch.await(this.packProcessingTimeout, TimeUnit.MILLISECONDS)) {
            if (!packSubmitFuture.isDone()) {
                packSubmitFuture.cancel(true);
                log.info("Timeout to process message: {}", pendingMsgHolder.getMsg());
            }
            if (log.isDebugEnabled()) {
                ctx.getAckMap().forEach((id, msg) -> log.debug("[{}] Timeout to process message: {}", id, (Object)msg.getValue()));
            }
            ctx.getFailedMap().forEach((id, msg) -> log.warn("[{}] Failed to process message: {}", id, (Object)msg.getValue()));
        }
        consumer.commit();
    }

    @Override
    protected ServiceType getServiceType() {
        return ServiceType.TB_CORE;
    }

    @Override
    protected long getNotificationPollDuration() {
        return this.pollInterval;
    }

    @Override
    protected long getNotificationPackProcessingTimeout() {
        return this.packProcessingTimeout;
    }

    @Override
    protected int getMgmtThreadPoolSize() {
        return Math.max(Runtime.getRuntime().availableProcessors(), 4);
    }

    @Override
    protected TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createNotificationsConsumer() {
        return this.queueFactory.createToCoreNotificationsMsgConsumer();
    }

    @Override
    protected void handleNotification(UUID id, TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg> msg, TbCallback callback) {
        TransportProtos.ToCoreNotificationMsg toCoreNotification = (TransportProtos.ToCoreNotificationMsg)msg.getValue();
        if (toCoreNotification.hasToLocalSubscriptionServiceMsg()) {
            log.trace("[{}] Forwarding message to local subscription service {}", (Object)id, (Object)toCoreNotification.getToLocalSubscriptionServiceMsg());
            this.forwardToLocalSubMgrService(toCoreNotification.getToLocalSubscriptionServiceMsg(), callback);
        } else if (toCoreNotification.hasCoreStartupMsg()) {
            log.trace("[{}] Forwarding message to local subscription service {}", (Object)id, (Object)toCoreNotification.getCoreStartupMsg());
            this.forwardCoreStartupMsg(toCoreNotification.getCoreStartupMsg(), callback);
        } else if (toCoreNotification.hasFromDeviceRpcResponse()) {
            log.trace("[{}] Forwarding message to RPC service {}", (Object)id, (Object)toCoreNotification.getFromDeviceRpcResponse());
            this.forwardToCoreRpcService(toCoreNotification.getFromDeviceRpcResponse(), callback);
        } else if (toCoreNotification.hasRestApiCallResponseMsg()) {
            log.trace("[{}] Forwarding message to RuleEngineCallService service {}", (Object)id, (Object)toCoreNotification.getRestApiCallResponseMsg());
            this.forwardToRuleEngineCallService(toCoreNotification.getRestApiCallResponseMsg(), callback);
        } else if (toCoreNotification.hasComponentLifecycle()) {
            this.handleComponentLifecycleMsg(id, ProtoUtils.fromProto((TransportProtos.ComponentLifecycleMsgProto)toCoreNotification.getComponentLifecycle()));
            callback.onSuccess();
        } else if (toCoreNotification.getQueueUpdateMsgsCount() > 0) {
            this.partitionService.updateQueues(toCoreNotification.getQueueUpdateMsgsList());
            callback.onSuccess();
        } else if (toCoreNotification.getQueueDeleteMsgsCount() > 0) {
            this.partitionService.removeQueues(toCoreNotification.getQueueDeleteMsgsList());
            callback.onSuccess();
        } else if (toCoreNotification.hasVcResponseMsg()) {
            this.vcQueueService.processResponse(toCoreNotification.getVcResponseMsg());
            callback.onSuccess();
        } else if (toCoreNotification.hasToSubscriptionMgrMsg()) {
            this.forwardToSubMgrService(toCoreNotification.getToSubscriptionMgrMsg(), callback);
        } else if (toCoreNotification.hasNotificationRuleProcessorMsg()) {
            NotificationRuleTrigger notificationRuleTrigger = (NotificationRuleTrigger)JavaSerDesUtil.decode((byte[])toCoreNotification.getNotificationRuleProcessorMsg().getTrigger().toByteArray());
            this.notificationRuleProcessor.process(notificationRuleTrigger);
            callback.onSuccess();
        } else if (toCoreNotification.hasResourceCacheInvalidateMsg()) {
            this.forwardToResourceService(toCoreNotification.getResourceCacheInvalidateMsg(), callback);
        } else if (toCoreNotification.hasToEdqsCoreServiceMsg()) {
            this.edqsService.processSystemMsg((ToCoreEdqsMsg)JacksonUtil.fromBytes((byte[])toCoreNotification.getToEdqsCoreServiceMsg().getValue().toByteArray(), ToCoreEdqsMsg.class));
            callback.onSuccess();
        }
        if (this.statsEnabled) {
            this.stats.log(toCoreNotification);
        }
    }

    private void processUsageStatsMsg(List<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> consumer) throws Exception {
        ConcurrentMap pendingMap = msgs.stream().collect(Collectors.toConcurrentMap(s -> UUID.randomUUID(), Function.identity()));
        CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
        TbPackProcessingContext ctx = new TbPackProcessingContext(processingTimeoutLatch, pendingMap, new ConcurrentHashMap());
        pendingMap.forEach((id, msg) -> {
            log.trace("[{}] Creating usage stats callback for message: {}", id, (Object)msg.getValue());
            TbPackCallback callback = new TbPackCallback((UUID)id, ctx);
            try {
                this.handleUsageStats((TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>)msg, callback);
            }
            catch (Throwable e) {
                log.warn("[{}] Failed to process usage stats: {}", new Object[]{id, msg, e});
                callback.onFailure(e);
            }
        });
        if (!processingTimeoutLatch.await(this.getNotificationPackProcessingTimeout(), TimeUnit.MILLISECONDS)) {
            ctx.getAckMap().forEach((id, msg) -> log.warn("[{}] Timeout to process usage stats: {}", id, (Object)msg.getValue()));
            ctx.getFailedMap().forEach((id, msg) -> log.warn("[{}] Failed to process usage stats: {}", id, (Object)msg.getValue()));
        }
        consumer.commit();
    }

    private void processFirmwareMsgs(List<TbProtoQueueMsg<TransportProtos.ToOtaPackageStateServiceMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToOtaPackageStateServiceMsg>> consumer) {
        long maxProcessingTimeoutPerRecord;
        long timeToSleep = maxProcessingTimeoutPerRecord = this.firmwarePackInterval / (long)this.firmwarePackSize;
        for (TbProtoQueueMsg<TransportProtos.ToOtaPackageStateServiceMsg> msg : msgs) {
            try {
                long startTime = System.currentTimeMillis();
                boolean isSuccessUpdate = this.handleOtaPackageUpdates(msg);
                long endTime = System.currentTimeMillis();
                long spentTime = endTime - startTime;
                timeToSleep -= spentTime;
                if (!isSuccessUpdate) continue;
                if (timeToSleep > 0L) {
                    log.debug("Spent time per record is: [{}]!", (Object)spentTime);
                    Thread.sleep(timeToSleep);
                    timeToSleep = 0L;
                }
                timeToSleep += maxProcessingTimeoutPerRecord;
            }
            catch (InterruptedException e) {
                return;
            }
            catch (Throwable e) {
                log.warn("Failed to process firmware update msg: {}", msg, (Object)e);
            }
        }
        consumer.commit();
    }

    private void handleUsageStats(TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg> msg, TbCallback callback) {
        this.statsService.process(msg, callback);
    }

    private boolean handleOtaPackageUpdates(TbProtoQueueMsg<TransportProtos.ToOtaPackageStateServiceMsg> msg) {
        return this.firmwareStateService.process((TransportProtos.ToOtaPackageStateServiceMsg)msg.getValue());
    }

    private void forwardToCoreRpcService(TransportProtos.FromDeviceRPCResponseProto proto, TbCallback callback) {
        RpcError error = proto.getError() > 0 ? RpcError.values()[proto.getError()] : null;
        FromDeviceRpcResponse response = new FromDeviceRpcResponse(new UUID(proto.getRequestIdMSB(), proto.getRequestIdLSB()), proto.getResponse(), error);
        this.tbCoreDeviceRpcService.processRpcResponseFromRuleEngine(response);
        callback.onSuccess();
    }

    @Scheduled(fixedDelayString="${queue.core.stats.print-interval-ms}")
    public void printStats() {
        if (this.statsEnabled) {
            this.stats.printStats();
            this.stats.reset();
        }
    }

    private void forwardToLocalSubMgrService(TransportProtos.LocalSubscriptionServiceMsgProto msg, TbCallback callback) {
        if (msg.hasSubEventCallback()) {
            this.localSubscriptionService.onSubEventCallback(msg.getSubEventCallback(), callback);
        } else if (msg.hasTsUpdate()) {
            this.localSubscriptionService.onTimeSeriesUpdate(msg.getTsUpdate(), callback);
        } else if (msg.hasAttrUpdate()) {
            this.localSubscriptionService.onAttributesUpdate(msg.getAttrUpdate(), callback);
        } else if (msg.hasAlarmUpdate()) {
            this.localSubscriptionService.onAlarmUpdate(msg.getAlarmUpdate(), callback);
        } else if (msg.hasNotificationsUpdate()) {
            this.localSubscriptionService.onNotificationUpdate(msg.getNotificationsUpdate(), callback);
        } else if (msg.hasSubUpdate() || msg.hasAlarmSubUpdate() || msg.hasNotificationsSubUpdate()) {
            callback.onSuccess();
        } else {
            this.throwNotHandled(msg, callback);
        }
    }

    private void forwardCoreStartupMsg(TransportProtos.CoreStartupMsg coreStartupMsg, TbCallback callback) {
        log.info("[{}] Processing core startup with partitions: {}", (Object)coreStartupMsg.getServiceId(), (Object)coreStartupMsg.getPartitionsList());
        this.localSubscriptionService.onCoreStartupMsg(coreStartupMsg);
        callback.onSuccess();
    }

    private void forwardToResourceService(TransportProtos.ResourceCacheInvalidateMsg msg, TbCallback callback) {
        TenantId tenantId = TenantId.fromUUID((UUID)new UUID(msg.getTenantIdMSB(), msg.getTenantIdLSB()));
        msg.getKeysList().stream().map(cacheKeyProto -> {
            if (cacheKeyProto.hasResourceKey()) {
                return ImageCacheKey.forImage((TenantId)tenantId, (String)cacheKeyProto.getResourceKey());
            }
            return ImageCacheKey.forPublicImage((String)cacheKeyProto.getPublicResourceKey());
        }).forEach(this.imageService::evictETags);
        callback.onSuccess();
    }

    private void forwardToSubMgrService(TransportProtos.SubscriptionMgrMsgProto msg, TbCallback callback) {
        if (msg.hasSubEvent()) {
            TransportProtos.TbEntitySubEventProto subEvent = msg.getSubEvent();
            this.subscriptionManagerService.onSubEvent(subEvent.getServiceId(), TbSubscriptionUtils.fromProto(subEvent), callback);
        } else if (msg.hasTelemetrySub()) {
            callback.onSuccess();
        } else if (msg.hasAlarmSub()) {
            callback.onSuccess();
        } else if (msg.hasNotificationsSub()) {
            callback.onSuccess();
        } else if (msg.hasNotificationsCountSub()) {
            callback.onSuccess();
        } else if (msg.hasSubClose()) {
            callback.onSuccess();
        } else if (msg.hasTsUpdate()) {
            TransportProtos.TbTimeSeriesUpdateProto proto = msg.getTsUpdate();
            long tenantIdMSB = proto.getTenantIdMSB();
            long tenantIdLSB = proto.getTenantIdLSB();
            this.subscriptionManagerService.onTimeSeriesUpdate(this.toTenantId(tenantIdMSB, tenantIdLSB), TbSubscriptionUtils.toEntityId(proto.getEntityType(), proto.getEntityIdMSB(), proto.getEntityIdLSB()), KvProtoUtil.fromTsKvProtoList((List)proto.getDataList()), callback);
        } else if (msg.hasAttrUpdate()) {
            TransportProtos.TbAttributeUpdateProto proto = msg.getAttrUpdate();
            this.subscriptionManagerService.onAttributesUpdate(this.toTenantId(proto.getTenantIdMSB(), proto.getTenantIdLSB()), TbSubscriptionUtils.toEntityId(proto.getEntityType(), proto.getEntityIdMSB(), proto.getEntityIdLSB()), proto.getScope(), KvProtoUtil.toAttributeKvList((List)proto.getDataList()), callback);
        } else if (msg.hasAttrDelete()) {
            TransportProtos.TbAttributeDeleteProto proto = msg.getAttrDelete();
            this.subscriptionManagerService.onAttributesDelete(this.toTenantId(proto.getTenantIdMSB(), proto.getTenantIdLSB()), TbSubscriptionUtils.toEntityId(proto.getEntityType(), proto.getEntityIdMSB(), proto.getEntityIdLSB()), proto.getScope(), (List<String>)proto.getKeysList(), callback);
        } else if (msg.hasTsDelete()) {
            TransportProtos.TbTimeSeriesDeleteProto proto = msg.getTsDelete();
            this.subscriptionManagerService.onTimeSeriesDelete(this.toTenantId(proto.getTenantIdMSB(), proto.getTenantIdLSB()), TbSubscriptionUtils.toEntityId(proto.getEntityType(), proto.getEntityIdMSB(), proto.getEntityIdLSB()), (List<String>)proto.getKeysList(), callback);
        } else if (msg.hasAlarmUpdate()) {
            TransportProtos.TbAlarmUpdateProto proto = msg.getAlarmUpdate();
            this.subscriptionManagerService.onAlarmUpdate(this.toTenantId(proto.getTenantIdMSB(), proto.getTenantIdLSB()), TbSubscriptionUtils.toEntityId(proto.getEntityType(), proto.getEntityIdMSB(), proto.getEntityIdLSB()), (AlarmInfo)JacksonUtil.fromString((String)proto.getAlarm(), AlarmInfo.class), callback);
        } else if (msg.hasAlarmDelete()) {
            TransportProtos.TbAlarmDeleteProto proto = msg.getAlarmDelete();
            this.subscriptionManagerService.onAlarmDeleted(this.toTenantId(proto.getTenantIdMSB(), proto.getTenantIdLSB()), TbSubscriptionUtils.toEntityId(proto.getEntityType(), proto.getEntityIdMSB(), proto.getEntityIdLSB()), (AlarmInfo)JacksonUtil.fromString((String)proto.getAlarm(), AlarmInfo.class), callback);
        } else if (msg.hasNotificationUpdate()) {
            TransportProtos.NotificationUpdateProto updateProto = msg.getNotificationUpdate();
            TenantId tenantId = this.toTenantId(updateProto.getTenantIdMSB(), updateProto.getTenantIdLSB());
            UserId recipientId = new UserId(new UUID(updateProto.getRecipientIdMSB(), updateProto.getRecipientIdLSB()));
            NotificationUpdate update = (NotificationUpdate)JacksonUtil.fromString((String)updateProto.getUpdate(), NotificationUpdate.class);
            this.subscriptionManagerService.onNotificationUpdate(tenantId, recipientId, update, callback);
        } else if (msg.hasNotificationRequestUpdate()) {
            TransportProtos.NotificationRequestUpdateProto updateProto = msg.getNotificationRequestUpdate();
            TenantId tenantId = this.toTenantId(updateProto.getTenantIdMSB(), updateProto.getTenantIdLSB());
            NotificationRequestUpdate update = (NotificationRequestUpdate)JacksonUtil.fromString((String)updateProto.getUpdate(), NotificationRequestUpdate.class);
            this.localSubscriptionService.onNotificationRequestUpdate(tenantId, update, callback);
        } else {
            this.throwNotHandled(msg, callback);
        }
        if (this.statsEnabled) {
            this.stats.log(msg);
        }
    }

    void forwardToStateService(TransportProtos.DeviceStateServiceMsgProto deviceStateServiceMsg, TbCallback callback) {
        if (this.statsEnabled) {
            this.stats.log(deviceStateServiceMsg);
        }
        this.stateService.onQueueMsg(deviceStateServiceMsg, callback);
    }

    void forwardToStateService(TransportProtos.DeviceConnectProto deviceConnectMsg, TbCallback callback) {
        if (this.statsEnabled) {
            this.stats.log(deviceConnectMsg);
        }
        TenantId tenantId = this.toTenantId(deviceConnectMsg.getTenantIdMSB(), deviceConnectMsg.getTenantIdLSB());
        DeviceId deviceId = new DeviceId(new UUID(deviceConnectMsg.getDeviceIdMSB(), deviceConnectMsg.getDeviceIdLSB()));
        ListenableFuture future = this.deviceActivityEventsExecutor.submit(() -> this.stateService.onDeviceConnect(tenantId, deviceId, deviceConnectMsg.getLastConnectTime()));
        DonAsynchron.withCallback((ListenableFuture)future, __ -> callback.onSuccess(), t -> {
            log.warn("[{}] Failed to process device connect message for device [{}]", new Object[]{tenantId.getId(), deviceId.getId(), t});
            callback.onFailure(t);
        });
    }

    void forwardToStateService(TransportProtos.DeviceActivityProto deviceActivityMsg, TbCallback callback) {
        if (this.statsEnabled) {
            this.stats.log(deviceActivityMsg);
        }
        TenantId tenantId = this.toTenantId(deviceActivityMsg.getTenantIdMSB(), deviceActivityMsg.getTenantIdLSB());
        DeviceId deviceId = new DeviceId(new UUID(deviceActivityMsg.getDeviceIdMSB(), deviceActivityMsg.getDeviceIdLSB()));
        ListenableFuture future = this.deviceActivityEventsExecutor.submit(() -> this.stateService.onDeviceActivity(tenantId, deviceId, deviceActivityMsg.getLastActivityTime()));
        DonAsynchron.withCallback((ListenableFuture)future, __ -> callback.onSuccess(), t -> {
            log.warn("[{}] Failed to process device activity message for device [{}]", new Object[]{tenantId.getId(), deviceId.getId(), t});
            callback.onFailure((Throwable)new RuntimeException("Failed to update device activity for device [" + String.valueOf(deviceId.getId()) + "]!", (Throwable)t));
        });
    }

    void forwardToStateService(TransportProtos.DeviceDisconnectProto deviceDisconnectMsg, TbCallback callback) {
        if (this.statsEnabled) {
            this.stats.log(deviceDisconnectMsg);
        }
        TenantId tenantId = this.toTenantId(deviceDisconnectMsg.getTenantIdMSB(), deviceDisconnectMsg.getTenantIdLSB());
        DeviceId deviceId = new DeviceId(new UUID(deviceDisconnectMsg.getDeviceIdMSB(), deviceDisconnectMsg.getDeviceIdLSB()));
        ListenableFuture future = this.deviceActivityEventsExecutor.submit(() -> this.stateService.onDeviceDisconnect(tenantId, deviceId, deviceDisconnectMsg.getLastDisconnectTime()));
        DonAsynchron.withCallback((ListenableFuture)future, __ -> callback.onSuccess(), t -> {
            log.warn("[{}] Failed to process device disconnect message for device [{}]", new Object[]{tenantId.getId(), deviceId.getId(), t});
            callback.onFailure(t);
        });
    }

    void forwardToStateService(TransportProtos.DeviceInactivityProto deviceInactivityMsg, TbCallback callback) {
        if (this.statsEnabled) {
            this.stats.log(deviceInactivityMsg);
        }
        TenantId tenantId = this.toTenantId(deviceInactivityMsg.getTenantIdMSB(), deviceInactivityMsg.getTenantIdLSB());
        DeviceId deviceId = new DeviceId(new UUID(deviceInactivityMsg.getDeviceIdMSB(), deviceInactivityMsg.getDeviceIdLSB()));
        ListenableFuture future = this.deviceActivityEventsExecutor.submit(() -> this.stateService.onDeviceInactivity(tenantId, deviceId, deviceInactivityMsg.getLastInactivityTime()));
        DonAsynchron.withCallback((ListenableFuture)future, __ -> callback.onSuccess(), t -> {
            log.warn("[{}] Failed to process device inactivity message for device [{}]", new Object[]{tenantId.getId(), deviceId.getId(), t});
            callback.onFailure(t);
        });
    }

    void forwardToStateService(TransportProtos.DeviceInactivityTimeoutUpdateProto deviceInactivityTimeoutUpdateMsg, TbCallback callback) {
        if (this.statsEnabled) {
            this.stats.log(deviceInactivityTimeoutUpdateMsg);
        }
        TenantId tenantId = this.toTenantId(deviceInactivityTimeoutUpdateMsg.getTenantIdMSB(), deviceInactivityTimeoutUpdateMsg.getTenantIdLSB());
        DeviceId deviceId = new DeviceId(new UUID(deviceInactivityTimeoutUpdateMsg.getDeviceIdMSB(), deviceInactivityTimeoutUpdateMsg.getDeviceIdLSB()));
        ListenableFuture future = this.deviceActivityEventsExecutor.submit(() -> this.stateService.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, deviceInactivityTimeoutUpdateMsg.getInactivityTimeout()));
        DonAsynchron.withCallback((ListenableFuture)future, __ -> callback.onSuccess(), t -> {
            log.warn("[{}] Failed to process device inactivity timeout update message for device [{}]", new Object[]{tenantId.getId(), deviceId.getId(), t});
            callback.onFailure(t);
        });
    }

    private void forwardToNotificationSchedulerService(TransportProtos.NotificationSchedulerServiceMsg msg, TbCallback callback) {
        TenantId tenantId = this.toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB());
        NotificationRequestId notificationRequestId = new NotificationRequestId(new UUID(msg.getRequestIdMSB(), msg.getRequestIdLSB()));
        try {
            this.notificationSchedulerService.scheduleNotificationRequest(tenantId, notificationRequestId, msg.getTs());
            callback.onSuccess();
        }
        catch (Exception e) {
            callback.onFailure((Throwable)new RuntimeException("Failed to schedule notification request", e));
        }
    }

    private void forwardToDeviceActor(TransportProtos.TransportToDeviceActorMsg toDeviceActorMsg, TbCallback callback) {
        if (this.statsEnabled) {
            this.stats.log(toDeviceActorMsg);
        }
        this.actorContext.tell(new TransportToDeviceActorMsgWrapper(toDeviceActorMsg, callback));
    }

    private void forwardToEventService(TransportProtos.ErrorEventProto eventProto, TbCallback callback) {
        ErrorEvent event = ErrorEvent.builder().tenantId(this.toTenantId(eventProto.getTenantIdMSB(), eventProto.getTenantIdLSB())).entityId(new UUID(eventProto.getEntityIdMSB(), eventProto.getEntityIdLSB())).serviceId(eventProto.getServiceId()).ts(System.currentTimeMillis()).method(eventProto.getMethod()).error(eventProto.getError()).build();
        this.forwardToEventService((Event)event, callback);
    }

    private void forwardToEventService(TransportProtos.LifecycleEventProto eventProto, TbCallback callback) {
        LifecycleEvent event = LifecycleEvent.builder().tenantId(this.toTenantId(eventProto.getTenantIdMSB(), eventProto.getTenantIdLSB())).entityId(new UUID(eventProto.getEntityIdMSB(), eventProto.getEntityIdLSB())).serviceId(eventProto.getServiceId()).ts(System.currentTimeMillis()).lcEventType(eventProto.getLcEventType()).success(eventProto.getSuccess()).error(StringUtils.isNotEmpty((CharSequence)eventProto.getError()) ? eventProto.getError() : null).build();
        this.forwardToEventService((Event)event, callback);
    }

    private void forwardToEventService(Event event, TbCallback callback) {
        DonAsynchron.withCallback((ListenableFuture)this.actorContext.getEventService().saveAsync(event), result -> callback.onSuccess(), arg_0 -> ((TbCallback)callback).onFailure(arg_0), (Executor)((Object)this.actorContext.getDbCallbackExecutor()));
    }

    void forwardToRuleEngineCallService(TransportProtos.RestApiCallResponseMsgProto restApiCallResponseMsg, TbCallback callback) {
        this.ruleEngineCallService.onQueueMsg(restApiCallResponseMsg, callback);
    }

    private void throwNotHandled(Object msg, TbCallback callback) {
        log.warn("Message not handled: {}", msg);
        callback.onFailure((Throwable)new RuntimeException("Message not handled!"));
    }

    private TenantId toTenantId(long tenantIdMSB, long tenantIdLSB) {
        return TenantId.fromUUID((UUID)new UUID(tenantIdMSB, tenantIdLSB));
    }

    @Override
    protected void stopConsumers() {
        super.stopConsumers();
        this.mainConsumer.stop();
        this.mainConsumer.awaitStop();
        this.usageStatsConsumer.stop();
        this.firmwareStatesConsumer.stop();
    }
}

