package org.thingsboard.server.service.subscription;

import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import org.springframework.util.ConcurrentReferenceHashMap;
import org.thingsboard.common.util.DeduplicationUtil;
import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.actors.calculatedField.CalculatedFieldEntityMessageProcessor;
import org.thingsboard.server.cache.limits.RateLimitService;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.alarm.AlarmInfo;
import org.thingsboard.server.common.data.exception.TenantNotFoundException;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.limit.LimitedApi;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.event.ClusterTopologyChangeEvent;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.ws.WebSocketService;
import org.thingsboard.server.service.ws.WebSocketSessionRef;
import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpdate;
import org.thingsboard.server.service.ws.notification.sub.NotificationsSubscriptionUpdate;
import org.thingsboard.server.service.ws.telemetry.sub.AlarmSubscriptionUpdate;
import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate;

@TbCoreComponent
@Service
/* loaded from: input_file:org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.class */
public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionService {
    private static final Logger log = LoggerFactory.getLogger(DefaultTbLocalSubscriptionService.class);
    private final AttributesService attrService;
    private final TimeseriesService tsService;
    private final TbServiceInfoProvider serviceInfoProvider;
    private final PartitionService partitionService;
    private final TbClusterService clusterService;
    private final SubscriptionManagerService subscriptionManagerService;
    private final WebSocketService webSocketService;
    private final RateLimitService rateLimitService;
    private ExecutorService tsCallBackExecutor;
    private ScheduledExecutorService staleSessionCleanupExecutor;

    @Value("${server.ws.rate_limits.subscriptions_per_tenant:}")
    private String subscriptionsPerTenantRateLimit;

    @Value("${server.ws.rate_limits.subscriptions_per_user:}")
    private String subscriptionsPerUserRateLimit;
    private String serviceId;
    private ExecutorService subscriptionUpdateExecutor;
    private final ConcurrentMap<String, ConcurrentMap<Integer, TbSubscription<?>>> subscriptionsBySessionId = new ConcurrentHashMap();
    private final ConcurrentMap<UUID, TbEntityLocalSubsInfo> subscriptionsByEntityId = new ConcurrentHashMap();
    private final ConcurrentMap<UUID, TbEntityUpdatesInfo> entityUpdates = new ConcurrentHashMap();
    private final ConcurrentReferenceHashMap<TenantId, Lock> locks = new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.SOFT);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.thingsboard.server.service.subscription.DefaultTbLocalSubscriptionService$1, reason: invalid class name */
    /* loaded from: input_file:org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$thingsboard$server$service$subscription$TbSubscriptionType = new int[TbSubscriptionType.values().length];

        static {
            try {
                $SwitchMap$org$thingsboard$server$service$subscription$TbSubscriptionType[TbSubscriptionType.TIMESERIES.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$thingsboard$server$service$subscription$TbSubscriptionType[TbSubscriptionType.ATTRIBUTES.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public DefaultTbLocalSubscriptionService(AttributesService attributesService, TimeseriesService timeseriesService, TbServiceInfoProvider tbServiceInfoProvider, PartitionService partitionService, TbClusterService tbClusterService, @Lazy SubscriptionManagerService subscriptionManagerService, @Lazy WebSocketService webSocketService, RateLimitService rateLimitService) {
        this.attrService = attributesService;
        this.tsService = timeseriesService;
        this.serviceInfoProvider = tbServiceInfoProvider;
        this.partitionService = partitionService;
        this.clusterService = tbClusterService;
        this.subscriptionManagerService = subscriptionManagerService;
        this.webSocketService = webSocketService;
        this.rateLimitService = rateLimitService;
    }

    @PostConstruct
    public void initExecutor() {
        this.subscriptionUpdateExecutor = ThingsBoardExecutors.newWorkStealingPool(20, getClass());
        this.tsCallBackExecutor = Executors.newFixedThreadPool(8, ThingsBoardThreadFactory.forName("ts-sub-callback"));
        this.serviceId = this.serviceInfoProvider.getServiceId();
        this.staleSessionCleanupExecutor = ThingsBoardExecutors.newSingleThreadScheduledExecutor("stale-session-cleanup");
        this.staleSessionCleanupExecutor.scheduleWithFixedDelay(this::cleanupStaleSessions, 60L, 60L, TimeUnit.SECONDS);
    }

    @PreDestroy
    public void shutdownExecutor() {
        if (this.subscriptionUpdateExecutor != null) {
            this.subscriptionUpdateExecutor.shutdownNow();
        }
        if (this.tsCallBackExecutor != null) {
            this.tsCallBackExecutor.shutdownNow();
        }
        if (this.staleSessionCleanupExecutor != null) {
            this.staleSessionCleanupExecutor.shutdownNow();
        }
    }

    @Override // org.thingsboard.server.service.subscription.TbLocalSubscriptionService
    @EventListener({ClusterTopologyChangeEvent.class})
    public void onApplicationEvent(ClusterTopologyChangeEvent clusterTopologyChangeEvent) {
        if (clusterTopologyChangeEvent.getQueueKeys().stream().anyMatch(queueKey -> {
            return ServiceType.TB_CORE.equals(queueKey.getType());
        })) {
            HashMap hashMap = new HashMap();
            this.subscriptionsByEntityId.forEach((uuid, tbEntityLocalSubsInfo) -> {
                try {
                    pushSubEventToManagerService(tbEntityLocalSubsInfo.getTenantId(), tbEntityLocalSubsInfo.getEntityId(), tbEntityLocalSubsInfo.toEvent(ComponentLifecycleEvent.UPDATED));
                } catch (Exception e) {
                    log.error("Failed to push subscription {} to manager service", tbEntityLocalSubsInfo, e);
                } catch (TenantNotFoundException e2) {
                    ((Set) hashMap.computeIfAbsent(tbEntityLocalSubsInfo.getTenantId(), tenantId -> {
                        return new HashSet();
                    })).add(uuid);
                    log.warn("Cleaning up stale subscription {} for tenant {} due to TenantNotFoundException", uuid, tbEntityLocalSubsInfo.getTenantId());
                }
            });
            hashMap.forEach((tenantId, set) -> {
                Lock subsLock = getSubsLock(tenantId);
                subsLock.lock();
                try {
                    set.forEach(uuid2 -> {
                        this.subscriptionsByEntityId.remove(uuid2);
                        this.entityUpdates.remove(uuid2);
                    });
                    subsLock.unlock();
                } catch (Throwable th) {
                    subsLock.unlock();
                    throw th;
                }
            });
        }
    }

    @Override // org.thingsboard.server.service.subscription.TbLocalSubscriptionService
    public void onCoreStartupMsg(TransportProtos.CoreStartupMsg coreStartupMsg) {
        this.subscriptionUpdateExecutor.submit(() -> {
            HashSet hashSet = new HashSet(coreStartupMsg.getPartitionsList());
            AtomicInteger atomicInteger = new AtomicInteger();
            this.subscriptionsByEntityId.values().forEach(tbEntityLocalSubsInfo -> {
                TopicPartitionInfo resolve = this.partitionService.resolve(ServiceType.TB_CORE, tbEntityLocalSubsInfo.getTenantId(), tbEntityLocalSubsInfo.getEntityId());
                if (resolve.isMyPartition() || !hashSet.contains(resolve.getPartition().orElse(Integer.MAX_VALUE))) {
                    return;
                }
                pushToQueue(tbEntityLocalSubsInfo.getEntityId(), tbEntityLocalSubsInfo.toEvent(ComponentLifecycleEvent.UPDATED), resolve);
                atomicInteger.incrementAndGet();
            });
            log.info("[{}] Pushed {} subscriptions to [{}]", new Object[]{this.serviceId, Integer.valueOf(atomicInteger.get()), coreStartupMsg.getServiceId()});
        });
    }

    Lock getSubsLock(TenantId tenantId) {
        return (Lock) this.locks.computeIfAbsent(tenantId, tenantId2 -> {
            return new ReentrantLock();
        });
    }

    @Override // org.thingsboard.server.service.subscription.TbLocalSubscriptionService
    public void addSubscription(TbSubscription<?> tbSubscription, WebSocketSessionRef webSocketSessionRef) {
        TenantId tenantId = tbSubscription.getTenantId();
        EntityId entityId = tbSubscription.getEntityId();
        if (!this.rateLimitService.checkRateLimit(LimitedApi.WS_SUBSCRIPTIONS, tenantId, this.subscriptionsPerTenantRateLimit)) {
            handleRateLimitError(tbSubscription, webSocketSessionRef, "Exceeded rate limit for WS subscriptions per tenant");
            return;
        }
        if (webSocketSessionRef.getSecurityCtx() != null && !this.rateLimitService.checkRateLimit(LimitedApi.WS_SUBSCRIPTIONS, webSocketSessionRef.getSecurityCtx().getId(), this.subscriptionsPerUserRateLimit)) {
            handleRateLimitError(tbSubscription, webSocketSessionRef, "Exceeded rate limit for WS subscriptions per user");
            return;
        }
        log.debug("[{}][{}] Register subscription: {}", new Object[]{tenantId, entityId, tbSubscription});
        Lock subsLock = getSubsLock(tenantId);
        subsLock.lock();
        try {
            this.subscriptionsBySessionId.computeIfAbsent(tbSubscription.getSessionId(), str -> {
                return new ConcurrentHashMap();
            }).put(Integer.valueOf(tbSubscription.getSubscriptionId()), tbSubscription);
            SubscriptionModificationResult modifySubscription = modifySubscription(tenantId, entityId, tbSubscription, true);
            subsLock.unlock();
            if (modifySubscription.hasEvent()) {
                pushSubscriptionEvent(modifySubscription);
            }
        } catch (Throwable th) {
            subsLock.unlock();
            throw th;
        }
    }

    @Override // org.thingsboard.server.service.subscription.TbLocalSubscriptionService
    public void onSubEventCallback(TransportProtos.TbEntitySubEventCallbackProto tbEntitySubEventCallbackProto, TbCallback tbCallback) {
        onSubEventCallback((tbEntitySubEventCallbackProto.getTenantIdMSB() == 0 && tbEntitySubEventCallbackProto.getTenantIdLSB() == 0) ? TenantId.SYS_TENANT_ID : TenantId.fromUUID(new UUID(tbEntitySubEventCallbackProto.getTenantIdMSB(), tbEntitySubEventCallbackProto.getTenantIdLSB())), new UUID(tbEntitySubEventCallbackProto.getEntityIdMSB(), tbEntitySubEventCallbackProto.getEntityIdLSB()), tbEntitySubEventCallbackProto.getSeqNumber(), new TbEntityUpdatesInfo(tbEntitySubEventCallbackProto.getAttributesUpdateTs(), tbEntitySubEventCallbackProto.getTimeSeriesUpdateTs()), tbCallback);
    }

    @Override // org.thingsboard.server.service.subscription.TbLocalSubscriptionService
    public void onSubEventCallback(TenantId tenantId, EntityId entityId, int i, TbEntityUpdatesInfo tbEntityUpdatesInfo, TbCallback tbCallback) {
        onSubEventCallback(tenantId, entityId.getId(), i, tbEntityUpdatesInfo, tbCallback);
    }

    private void onSubEventCallback(TenantId tenantId, UUID uuid, int i, TbEntityUpdatesInfo tbEntityUpdatesInfo, TbCallback tbCallback) {
        log.debug("[{}][{}][{}] Processing sub event callback: {}.", new Object[]{tenantId, uuid, Integer.valueOf(i), tbEntityUpdatesInfo});
        this.entityUpdates.put(uuid, tbEntityUpdatesInfo);
        Set<TbSubscription<?>> set = null;
        Lock subsLock = getSubsLock(tenantId);
        subsLock.lock();
        try {
            TbEntityLocalSubsInfo tbEntityLocalSubsInfo = this.subscriptionsByEntityId.get(uuid);
            if (tbEntityLocalSubsInfo != null) {
                set = tbEntityLocalSubsInfo.clearPendingSubscriptions(i);
            }
            if (set != null) {
                set.forEach(this::checkMissedUpdates);
            }
            tbCallback.onSuccess();
        } finally {
            subsLock.unlock();
        }
    }

    @Override // org.thingsboard.server.service.subscription.TbLocalSubscriptionService
    public void cancelSubscription(TenantId tenantId, String str, int i) {
        log.debug("[{}][{}][{}] Going to remove subscription.", new Object[]{tenantId, str, Integer.valueOf(i)});
        SubscriptionModificationResult subscriptionModificationResult = null;
        Lock subsLock = getSubsLock(tenantId);
        subsLock.lock();
        try {
            ConcurrentMap<Integer, TbSubscription<?>> concurrentMap = this.subscriptionsBySessionId.get(str);
            if (concurrentMap != null) {
                TbSubscription<?> remove = concurrentMap.remove(Integer.valueOf(i));
                if (remove != null) {
                    if (concurrentMap.isEmpty()) {
                        this.subscriptionsBySessionId.remove(str);
                    }
                    subscriptionModificationResult = modifySubscription(remove.getTenantId(), remove.getEntityId(), remove, false);
                } else {
                    log.debug("[{}][{}][{}] Subscription not found!", new Object[]{tenantId, str, Integer.valueOf(i)});
                }
            } else {
                log.debug("[{}][{}] No session subscriptions found!", tenantId, str);
            }
            if (subscriptionModificationResult == null || !subscriptionModificationResult.hasEvent()) {
                return;
            }
            pushSubscriptionEvent(subscriptionModificationResult);
        } finally {
            subsLock.unlock();
        }
    }

    @Override // org.thingsboard.server.service.subscription.TbLocalSubscriptionService
    public void cancelAllSessionSubscriptions(TenantId tenantId, String str) {
        log.debug("[{}][{}] Going to remove session subscriptions.", tenantId, str);
        Lock subsLock = getSubsLock(tenantId);
        subsLock.lock();
        try {
            ConcurrentMap<Integer, TbSubscription<?>> remove = this.subscriptionsBySessionId.remove(str);
            if (remove != null) {
                ((Map) remove.values().stream().collect(Collectors.groupingBy((v0) -> {
                    return v0.getEntityId();
                }))).forEach((entityId, list) -> {
                    TbEntitySubEvent removeAllSubscriptions = removeAllSubscriptions(tenantId, entityId, list);
                    if (removeAllSubscriptions != null) {
                        pushSubscriptionsEvent(tenantId, entityId, removeAllSubscriptions);
                    }
                });
            } else {
                log.debug("[{}][{}] No session subscriptions found!", tenantId, str);
            }
        } finally {
            subsLock.unlock();
        }
    }

    @Override // org.thingsboard.server.service.subscription.TbLocalSubscriptionService
    public void onTimeSeriesUpdate(TransportProtos.TbSubUpdateProto tbSubUpdateProto, TbCallback tbCallback) {
        onTimeSeriesUpdate(new UUID(tbSubUpdateProto.getEntityIdMSB(), tbSubUpdateProto.getEntityIdLSB()), TbSubscriptionUtils.fromProto(tbSubUpdateProto), tbCallback);
    }

    @Override // org.thingsboard.server.service.subscription.TbLocalSubscriptionService
    public void onTimeSeriesUpdate(EntityId entityId, List<TsKvEntry> list, TbCallback tbCallback) {
        onTimeSeriesUpdate(entityId.getId(), list, tbCallback);
    }

    private void onTimeSeriesUpdate(UUID uuid, List<TsKvEntry> list, TbCallback tbCallback) {
        getEntityUpdatesInfo(uuid).timeSeriesUpdateTs = System.currentTimeMillis();
        processSubscriptionData(uuid, tbSubscription -> {
            return TbSubscriptionType.TIMESERIES.equals(tbSubscription.getType());
        }, tbSubscription2 -> {
            TbTimeSeriesSubscription tbTimeSeriesSubscription = (TbTimeSeriesSubscription) tbSubscription2;
            List list2 = null;
            Map<String, Long> keyStates = tbTimeSeriesSubscription.getKeyStates();
            if (!tbTimeSeriesSubscription.isAllKeys()) {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    TsKvEntry tsKvEntry = (TsKvEntry) it.next();
                    Long l = keyStates.get(tsKvEntry.getKey());
                    if (l != null && (!tbTimeSeriesSubscription.isLatestValues() || tsKvEntry.getTs() >= l.longValue())) {
                        if (list2 == null) {
                            list2 = new ArrayList();
                        }
                        list2.add(tsKvEntry);
                    }
                }
            } else if (tbTimeSeriesSubscription.isLatestValues()) {
                Iterator it2 = list.iterator();
                while (it2.hasNext()) {
                    TsKvEntry tsKvEntry2 = (TsKvEntry) it2.next();
                    Long l2 = keyStates.get(tsKvEntry2.getKey());
                    if (l2 == null || tsKvEntry2.getTs() >= l2.longValue()) {
                        if (list2 == null) {
                            list2 = new ArrayList();
                        }
                        list2.add(tsKvEntry2);
                    }
                }
            } else {
                list2 = list;
            }
            if (list2 != null) {
                TelemetrySubscriptionUpdate telemetrySubscriptionUpdate = new TelemetrySubscriptionUpdate(tbTimeSeriesSubscription.getSubscriptionId(), (List<TsKvEntry>) list2);
                telemetrySubscriptionUpdate.getLatestValues().forEach((str, l3) -> {
                    tbTimeSeriesSubscription.getKeyStates().put(str, l3);
                });
                this.subscriptionUpdateExecutor.submit(() -> {
                    tbTimeSeriesSubscription.getUpdateProcessor().accept(tbTimeSeriesSubscription, telemetrySubscriptionUpdate);
                });
            }
        }, tbCallback);
    }

    @Override // org.thingsboard.server.service.subscription.TbLocalSubscriptionService
    public void onAttributesUpdate(TransportProtos.TbSubUpdateProto tbSubUpdateProto, TbCallback tbCallback) {
        onAttributesUpdate(new UUID(tbSubUpdateProto.getEntityIdMSB(), tbSubUpdateProto.getEntityIdLSB()), tbSubUpdateProto.getScope(), TbSubscriptionUtils.fromProto(tbSubUpdateProto), tbCallback);
    }

    @Override // org.thingsboard.server.service.subscription.TbLocalSubscriptionService
    public void onAttributesUpdate(EntityId entityId, String str, List<TsKvEntry> list, TbCallback tbCallback) {
        onAttributesUpdate(entityId.getId(), str, list, tbCallback);
    }

    private void onAttributesUpdate(UUID uuid, String str, List<TsKvEntry> list, TbCallback tbCallback) {
        getEntityUpdatesInfo(uuid).attributesUpdateTs = System.currentTimeMillis();
        processSubscriptionData(uuid, tbSubscription -> {
            return TbSubscriptionType.ATTRIBUTES.equals(tbSubscription.getType());
        }, tbSubscription2 -> {
            TbAttributeSubscription tbAttributeSubscription = (TbAttributeSubscription) tbSubscription2;
            if (tbAttributeSubscription.getScope() == null || TbAttributeSubscriptionScope.ANY_SCOPE.equals(tbAttributeSubscription.getScope()) || tbAttributeSubscription.getScope().name().equals(str)) {
                List list2 = null;
                if (tbAttributeSubscription.isAllKeys()) {
                    list2 = list;
                } else {
                    Iterator it = list.iterator();
                    while (it.hasNext()) {
                        TsKvEntry tsKvEntry = (TsKvEntry) it.next();
                        if (tbAttributeSubscription.getKeyStates().containsKey(tsKvEntry.getKey())) {
                            if (list2 == null) {
                                list2 = new ArrayList();
                            }
                            list2.add(tsKvEntry);
                        }
                    }
                }
                if (list2 != null) {
                    TelemetrySubscriptionUpdate telemetrySubscriptionUpdate = new TelemetrySubscriptionUpdate(tbAttributeSubscription.getSubscriptionId(), (List<TsKvEntry>) list2);
                    telemetrySubscriptionUpdate.getLatestValues().forEach((str2, l) -> {
                        tbAttributeSubscription.getKeyStates().put(str2, l);
                    });
                    this.subscriptionUpdateExecutor.submit(() -> {
                        tbAttributeSubscription.getUpdateProcessor().accept(tbAttributeSubscription, telemetrySubscriptionUpdate);
                    });
                }
            }
        }, tbCallback);
    }

    @Override // org.thingsboard.server.service.subscription.TbLocalSubscriptionService
    public void onAlarmUpdate(TransportProtos.TbAlarmSubUpdateProto tbAlarmSubUpdateProto, TbCallback tbCallback) {
        onAlarmUpdate(new UUID(tbAlarmSubUpdateProto.getEntityIdMSB(), tbAlarmSubUpdateProto.getEntityIdLSB()), TbSubscriptionUtils.fromProto(tbAlarmSubUpdateProto), tbCallback);
    }

    @Override // org.thingsboard.server.service.subscription.TbLocalSubscriptionService
    public void onAlarmUpdate(EntityId entityId, AlarmInfo alarmInfo, boolean z, TbCallback tbCallback) {
        onAlarmUpdate(entityId.getId(), new AlarmSubscriptionUpdate(alarmInfo, z), tbCallback);
    }

    private void onAlarmUpdate(UUID uuid, AlarmSubscriptionUpdate alarmSubscriptionUpdate, TbCallback tbCallback) {
        processSubscriptionData(uuid, tbSubscription -> {
            return TbSubscriptionType.ALARMS.equals(tbSubscription.getType());
        }, (Predicate<TbSubscription<?>>) alarmSubscriptionUpdate, tbCallback);
    }

    @Override // org.thingsboard.server.service.subscription.TbLocalSubscriptionService
    public void onNotificationUpdate(TransportProtos.NotificationsSubUpdateProto notificationsSubUpdateProto, TbCallback tbCallback) {
        onNotificationUpdate(new UUID(notificationsSubUpdateProto.getEntityIdMSB(), notificationsSubUpdateProto.getEntityIdLSB()), TbSubscriptionUtils.fromProto(notificationsSubUpdateProto), tbCallback);
    }

    @Override // org.thingsboard.server.service.subscription.TbLocalSubscriptionService
    public void onNotificationUpdate(EntityId entityId, NotificationsSubscriptionUpdate notificationsSubscriptionUpdate, TbCallback tbCallback) {
        onNotificationUpdate(entityId.getId(), notificationsSubscriptionUpdate, tbCallback);
    }

    private void onNotificationUpdate(UUID uuid, NotificationsSubscriptionUpdate notificationsSubscriptionUpdate, TbCallback tbCallback) {
        processSubscriptionData(uuid, tbSubscription -> {
            return TbSubscriptionType.NOTIFICATIONS.equals(tbSubscription.getType()) || TbSubscriptionType.NOTIFICATIONS_COUNT.equals(tbSubscription.getType());
        }, (Predicate<TbSubscription<?>>) notificationsSubscriptionUpdate, tbCallback);
    }

    @Override // org.thingsboard.server.service.subscription.TbLocalSubscriptionService
    public void onNotificationRequestUpdate(TenantId tenantId, NotificationRequestUpdate notificationRequestUpdate, TbCallback tbCallback) {
        log.trace("[{}] Received notification request update: {}", tenantId, notificationRequestUpdate);
        NotificationsSubscriptionUpdate notificationsSubscriptionUpdate = new NotificationsSubscriptionUpdate(notificationRequestUpdate);
        this.subscriptionsByEntityId.values().forEach(tbEntityLocalSubsInfo -> {
            if (tbEntityLocalSubsInfo.isNf() && tenantId.equals(tbEntityLocalSubsInfo.getTenantId()) && EntityType.USER.equals(tbEntityLocalSubsInfo.getEntityId().getEntityType())) {
                tbEntityLocalSubsInfo.getSubs().forEach(tbSubscription -> {
                    this.subscriptionUpdateExecutor.submit(() -> {
                        tbSubscription.getUpdateProcessor().accept(tbSubscription, notificationsSubscriptionUpdate);
                    });
                });
            }
        });
        tbCallback.onSuccess();
    }

    private <T> void processSubscriptionData(UUID uuid, Predicate<TbSubscription<?>> predicate, T t, TbCallback tbCallback) {
        log.trace("[{}] Received subscription data: {}", uuid, t);
        TbEntityLocalSubsInfo tbEntityLocalSubsInfo = this.subscriptionsByEntityId.get(uuid);
        if (tbEntityLocalSubsInfo != null) {
            tbEntityLocalSubsInfo.getSubs().forEach(tbSubscription -> {
                if (predicate.test(tbSubscription)) {
                    this.subscriptionUpdateExecutor.submit(() -> {
                        tbSubscription.getUpdateProcessor().accept(tbSubscription, t);
                    });
                }
            });
        }
        tbCallback.onSuccess();
    }

    private void processSubscriptionData(UUID uuid, Predicate<TbSubscription<?>> predicate, Consumer<TbSubscription<?>> consumer, TbCallback tbCallback) {
        TbEntityLocalSubsInfo tbEntityLocalSubsInfo = this.subscriptionsByEntityId.get(uuid);
        if (tbEntityLocalSubsInfo != null) {
            tbEntityLocalSubsInfo.getSubs().forEach(tbSubscription -> {
                if (predicate.test(tbSubscription)) {
                    consumer.accept(tbSubscription);
                }
            });
        }
        tbCallback.onSuccess();
    }

    private SubscriptionModificationResult modifySubscription(TenantId tenantId, EntityId entityId, TbSubscription<?> tbSubscription, boolean z) {
        TbSubscription<?> tbSubscription2 = null;
        TbEntitySubEvent tbEntitySubEvent = null;
        try {
            TbEntityLocalSubsInfo computeIfAbsent = this.subscriptionsByEntityId.computeIfAbsent(entityId.getId(), uuid -> {
                return new TbEntityLocalSubsInfo(tenantId, entityId);
            });
            tbEntitySubEvent = z ? computeIfAbsent.add(tbSubscription) : computeIfAbsent.remove(tbSubscription);
            if (computeIfAbsent.isEmpty()) {
                this.subscriptionsByEntityId.remove(entityId.getId());
                this.entityUpdates.remove(entityId.getId());
            } else if (z) {
                tbSubscription2 = computeIfAbsent.registerPendingSubscription(tbSubscription, tbEntitySubEvent);
            }
        } catch (Exception e) {
            Logger logger = log;
            Object[] objArr = new Object[5];
            objArr[0] = tenantId;
            objArr[1] = entityId;
            objArr[2] = z ? "add" : "remove";
            objArr[3] = tbSubscription;
            objArr[4] = e;
            logger.warn("[{}][{}] Failed to {} subscription {} due to ", objArr);
        }
        return new SubscriptionModificationResult(tenantId, entityId, tbSubscription, tbSubscription2, tbEntitySubEvent);
    }

    private TbEntitySubEvent removeAllSubscriptions(TenantId tenantId, EntityId entityId, List<TbSubscription<?>> list) {
        TbEntitySubEvent tbEntitySubEvent = null;
        try {
            TbEntityLocalSubsInfo tbEntityLocalSubsInfo = this.subscriptionsByEntityId.get(entityId.getId());
            tbEntitySubEvent = tbEntityLocalSubsInfo.removeAll(list);
            if (tbEntityLocalSubsInfo.isEmpty()) {
                this.subscriptionsByEntityId.remove(entityId.getId());
                this.entityUpdates.remove(entityId.getId());
            }
        } catch (Exception e) {
            log.warn("[{}][{}] Failed to remove all subscriptions {} due to ", new Object[]{tenantId, entityId, list, e});
        }
        return tbEntitySubEvent;
    }

    private void pushSubscriptionsEvent(TenantId tenantId, EntityId entityId, TbEntitySubEvent tbEntitySubEvent) {
        try {
            log.trace("[{}][{}] Event: {}", new Object[]{tenantId, entityId, tbEntitySubEvent});
            pushSubEventToManagerService(tenantId, entityId, tbEntitySubEvent);
        } catch (Exception e) {
            log.warn("[{}][{}] Failed to push subscription event {} due to ", new Object[]{tenantId, entityId, tbEntitySubEvent, e});
        }
    }

    private void pushSubscriptionEvent(SubscriptionModificationResult subscriptionModificationResult) {
        try {
            TbEntitySubEvent event = subscriptionModificationResult.getEvent();
            log.trace("[{}][{}][{}] Event: {}", new Object[]{subscriptionModificationResult.getTenantId(), subscriptionModificationResult.getEntityId(), Integer.valueOf(subscriptionModificationResult.getSubscription().getSubscriptionId()), event});
            pushSubEventToManagerService(subscriptionModificationResult.getTenantId(), subscriptionModificationResult.getEntityId(), event);
            TbSubscription<?> missedUpdatesCandidate = subscriptionModificationResult.getMissedUpdatesCandidate();
            if (missedUpdatesCandidate != null) {
                checkMissedUpdates(missedUpdatesCandidate);
            }
        } catch (Exception e) {
            log.warn("[{}][{}] Failed to push subscription event {} due to ", new Object[]{subscriptionModificationResult.getTenantId(), subscriptionModificationResult.getEntityId(), subscriptionModificationResult.getEvent(), e});
        }
    }

    private void pushSubEventToManagerService(TenantId tenantId, EntityId entityId, TbEntitySubEvent tbEntitySubEvent) {
        TopicPartitionInfo resolve = this.partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
        if (resolve.isMyPartition()) {
            this.subscriptionManagerService.onSubEvent(this.serviceId, tbEntitySubEvent, TbCallback.EMPTY);
        } else {
            pushToQueue(entityId, tbEntitySubEvent, resolve);
        }
    }

    private void pushToQueue(EntityId entityId, TbEntitySubEvent tbEntitySubEvent, TopicPartitionInfo topicPartitionInfo) {
        this.clusterService.pushMsgToCore(topicPartitionInfo, entityId.getId(), TbSubscriptionUtils.toSubEventProto(this.serviceId, tbEntitySubEvent), (TbQueueCallback) null);
    }

    private void checkMissedUpdates(TbSubscription<?> tbSubscription) {
        log.trace("[{}][{}][{}] Check missed updates for subscription: {}", new Object[]{tbSubscription.getTenantId(), tbSubscription.getEntityId(), Integer.valueOf(tbSubscription.getSubscriptionId()), tbSubscription});
        switch (AnonymousClass1.$SwitchMap$org$thingsboard$server$service$subscription$TbSubscriptionType[tbSubscription.getType().ordinal()]) {
            case 1:
                handleNewTelemetrySubscription((TbTimeSeriesSubscription) tbSubscription);
                return;
            case CalculatedFieldEntityMessageProcessor.CALLBACKS_PER_CF /* 2 */:
                handleNewAttributeSubscription((TbAttributeSubscription) tbSubscription);
                return;
            default:
                return;
        }
    }

    private void handleNewAttributeSubscription(TbAttributeSubscription tbAttributeSubscription) {
        log.trace("[{}][{}][{}] Processing attribute subscription for entity [{}]", new Object[]{tbAttributeSubscription.getTenantId(), tbAttributeSubscription.getSessionId(), Integer.valueOf(tbAttributeSubscription.getSubscriptionId()), tbAttributeSubscription.getEntityId()});
        TbEntityUpdatesInfo tbEntityUpdatesInfo = this.entityUpdates.get(tbAttributeSubscription.getEntityId().getId());
        if (tbEntityUpdatesInfo != null && tbEntityUpdatesInfo.attributesUpdateTs > 0 && tbAttributeSubscription.getQueryTs() > tbEntityUpdatesInfo.attributesUpdateTs) {
            log.trace("[{}][{}][{}] No need to check for missed updates [{}]", new Object[]{tbAttributeSubscription.getTenantId(), tbAttributeSubscription.getSessionId(), Integer.valueOf(tbAttributeSubscription.getSubscriptionId()), tbAttributeSubscription.getEntityId()});
            return;
        }
        Map<String, Long> keyStates = tbAttributeSubscription.getKeyStates();
        AttributeScope attributeScope = (tbAttributeSubscription.getScope() == null || tbAttributeSubscription.getScope().getAttributeScope() == null) ? AttributeScope.CLIENT_SCOPE : tbAttributeSubscription.getScope().getAttributeScope();
        AttributeScope attributeScope2 = attributeScope;
        DonAsynchron.withCallback(this.attrService.find(tbAttributeSubscription.getTenantId(), tbAttributeSubscription.getEntityId(), attributeScope, keyStates.keySet()), list -> {
            ArrayList arrayList = new ArrayList();
            list.forEach(attributeKvEntry -> {
                if (attributeKvEntry.getLastUpdateTs() > ((Long) keyStates.get(attributeKvEntry.getKey())).longValue()) {
                    arrayList.add(new BasicTsKvEntry(attributeKvEntry.getLastUpdateTs(), attributeKvEntry));
                }
            });
            List<TsKvEntry> list = (List) arrayList.stream().filter(tsKvEntry -> {
                return tsKvEntry.getValue() != null;
            }).collect(Collectors.toList());
            if (list.isEmpty()) {
                return;
            }
            onAttributesUpdate(tbAttributeSubscription.getEntityId(), attributeScope2.name(), list, TbCallback.EMPTY);
        }, th -> {
            log.error("Failed to fetch missed updates.", th);
        }, this.tsCallBackExecutor);
    }

    private void handleNewTelemetrySubscription(TbTimeSeriesSubscription tbTimeSeriesSubscription) {
        log.trace("[{}][{}][{}] Processing telemetry subscription for entity [{}]", new Object[]{tbTimeSeriesSubscription.getTenantId(), tbTimeSeriesSubscription.getSessionId(), Integer.valueOf(tbTimeSeriesSubscription.getSubscriptionId()), tbTimeSeriesSubscription.getEntityId()});
        TbEntityUpdatesInfo tbEntityUpdatesInfo = this.entityUpdates.get(tbTimeSeriesSubscription.getEntityId().getId());
        if (tbEntityUpdatesInfo != null && tbEntityUpdatesInfo.timeSeriesUpdateTs > 0 && tbTimeSeriesSubscription.getQueryTs() > tbEntityUpdatesInfo.timeSeriesUpdateTs) {
            log.trace("[{}][{}][{}] No need to check for missed updates. time [{}][{}] diff: {}ms", new Object[]{tbTimeSeriesSubscription.getTenantId(), tbTimeSeriesSubscription.getSessionId(), Integer.valueOf(tbTimeSeriesSubscription.getSubscriptionId()), Long.valueOf(tbTimeSeriesSubscription.getQueryTs()), Long.valueOf(tbEntityUpdatesInfo.timeSeriesUpdateTs), Long.valueOf(tbTimeSeriesSubscription.getQueryTs() - tbEntityUpdatesInfo.timeSeriesUpdateTs)});
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        if (tbTimeSeriesSubscription.isLatestValues()) {
            DonAsynchron.withCallback(this.tsService.findLatest(tbTimeSeriesSubscription.getTenantId(), tbTimeSeriesSubscription.getEntityId(), tbTimeSeriesSubscription.getKeyStates().keySet()), list -> {
                if (list == null || list.isEmpty()) {
                    return;
                }
                List<TsKvEntry> list = (List) list.stream().filter(tsKvEntry -> {
                    return tsKvEntry.getValue() != null;
                }).collect(Collectors.toList());
                if (list.isEmpty()) {
                    return;
                }
                onTimeSeriesUpdate(tbTimeSeriesSubscription.getEntityId(), list, TbCallback.EMPTY);
            }, th -> {
                log.error("Failed to fetch missed updates.", th);
            }, this.tsCallBackExecutor);
            return;
        }
        ArrayList arrayList = new ArrayList();
        tbTimeSeriesSubscription.getKeyStates().forEach((str, l) -> {
            if (currentTimeMillis > l.longValue()) {
                arrayList.add(new BaseReadTsKvQuery(str, tbTimeSeriesSubscription.getStartTime() > 0 ? Math.max(tbTimeSeriesSubscription.getStartTime(), l.longValue() + 1) : l.longValue() + 1, tbTimeSeriesSubscription.getEndTime() > 0 ? Math.min(tbTimeSeriesSubscription.getEndTime(), currentTimeMillis) : currentTimeMillis, 0L, 1000, Aggregation.NONE));
            }
        });
        if (arrayList.isEmpty()) {
            return;
        }
        DonAsynchron.withCallback(this.tsService.findAll(tbTimeSeriesSubscription.getTenantId(), tbTimeSeriesSubscription.getEntityId(), arrayList), list2 -> {
            if (list2 == null || list2.isEmpty()) {
                return;
            }
            List<TsKvEntry> list2 = (List) list2.stream().filter(tsKvEntry -> {
                return tsKvEntry.getValue() != null;
            }).collect(Collectors.toList());
            if (list2.isEmpty()) {
                return;
            }
            onTimeSeriesUpdate(tbTimeSeriesSubscription.getEntityId(), list2, TbCallback.EMPTY);
        }, th2 -> {
            log.error("Failed to fetch missed updates.", th2);
        }, this.tsCallBackExecutor);
    }

    private void cleanupStaleSessions() {
        this.subscriptionsBySessionId.forEach((str, concurrentMap) -> {
            concurrentMap.values().stream().findAny().ifPresent(tbSubscription -> {
                this.webSocketService.cleanupIfStale(tbSubscription.getTenantId(), str);
            });
        });
    }

    private void handleRateLimitError(TbSubscription<?> tbSubscription, WebSocketSessionRef webSocketSessionRef, String str) {
        if (!DeduplicationUtil.alreadyProcessed(webSocketSessionRef.getSessionId() + str, TimeUnit.SECONDS.toMillis(15L))) {
            log.info("{} {}", webSocketSessionRef, str);
            this.webSocketService.sendError(webSocketSessionRef, tbSubscription.getSubscriptionId(), SubscriptionErrorCode.BAD_REQUEST, str);
        }
        throw new TbRateLimitsException(str);
    }

    private TbEntityUpdatesInfo getEntityUpdatesInfo(UUID uuid) {
        return this.entityUpdates.computeIfAbsent(uuid, uuid2 -> {
            return new TbEntityUpdatesInfo(0L);
        });
    }
}
