/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.service.subscription;

import com.google.common.util.concurrent.ListenableFuture;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import org.springframework.util.ConcurrentReferenceHashMap;
import org.thingsboard.common.util.DeduplicationUtil;
import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.cache.limits.RateLimitService;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.alarm.AlarmInfo;
import org.thingsboard.server.common.data.exception.TenantNotFoundException;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.limit.LimitedApi;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.event.ClusterTopologyChangeEvent;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.subscription.SubscriptionErrorCode;
import org.thingsboard.server.service.subscription.SubscriptionManagerService;
import org.thingsboard.server.service.subscription.SubscriptionModificationResult;
import org.thingsboard.server.service.subscription.TbAttributeSubscription;
import org.thingsboard.server.service.subscription.TbAttributeSubscriptionScope;
import org.thingsboard.server.service.subscription.TbEntityLocalSubsInfo;
import org.thingsboard.server.service.subscription.TbEntitySubEvent;
import org.thingsboard.server.service.subscription.TbEntityUpdatesInfo;
import org.thingsboard.server.service.subscription.TbLocalSubscriptionService;
import org.thingsboard.server.service.subscription.TbSubscription;
import org.thingsboard.server.service.subscription.TbSubscriptionType;
import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
import org.thingsboard.server.service.subscription.TbTimeSeriesSubscription;
import org.thingsboard.server.service.ws.WebSocketService;
import org.thingsboard.server.service.ws.WebSocketSessionRef;
import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpdate;
import org.thingsboard.server.service.ws.notification.sub.NotificationsSubscriptionUpdate;
import org.thingsboard.server.service.ws.telemetry.sub.AlarmSubscriptionUpdate;
import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate;

@TbCoreComponent
@Service
public class DefaultTbLocalSubscriptionService
implements TbLocalSubscriptionService {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(DefaultTbLocalSubscriptionService.class);
    private final ConcurrentMap<String, ConcurrentMap<Integer, TbSubscription<?>>> subscriptionsBySessionId = new ConcurrentHashMap();
    private final ConcurrentMap<UUID, TbEntityLocalSubsInfo> subscriptionsByEntityId = new ConcurrentHashMap<UUID, TbEntityLocalSubsInfo>();
    private final ConcurrentMap<UUID, TbEntityUpdatesInfo> entityUpdates = new ConcurrentHashMap<UUID, TbEntityUpdatesInfo>();
    private final AttributesService attrService;
    private final TimeseriesService tsService;
    private final TbServiceInfoProvider serviceInfoProvider;
    private final PartitionService partitionService;
    private final TbClusterService clusterService;
    private final SubscriptionManagerService subscriptionManagerService;
    private final WebSocketService webSocketService;
    private final RateLimitService rateLimitService;
    private ExecutorService tsCallBackExecutor;
    private ScheduledExecutorService staleSessionCleanupExecutor;
    @Value(value="${server.ws.rate_limits.subscriptions_per_tenant:}")
    private String subscriptionsPerTenantRateLimit;
    @Value(value="${server.ws.rate_limits.subscriptions_per_user:}")
    private String subscriptionsPerUserRateLimit;
    private String serviceId;
    private ExecutorService subscriptionUpdateExecutor;
    private final ConcurrentReferenceHashMap<TenantId, Lock> locks = new ConcurrentReferenceHashMap(16, ConcurrentReferenceHashMap.ReferenceType.SOFT);

    public DefaultTbLocalSubscriptionService(AttributesService attrService, TimeseriesService tsService, TbServiceInfoProvider serviceInfoProvider, PartitionService partitionService, TbClusterService clusterService, @Lazy SubscriptionManagerService subscriptionManagerService, @Lazy WebSocketService webSocketService, RateLimitService rateLimitService) {
        this.attrService = attrService;
        this.tsService = tsService;
        this.serviceInfoProvider = serviceInfoProvider;
        this.partitionService = partitionService;
        this.clusterService = clusterService;
        this.subscriptionManagerService = subscriptionManagerService;
        this.webSocketService = webSocketService;
        this.rateLimitService = rateLimitService;
    }

    @PostConstruct
    public void initExecutor() {
        this.subscriptionUpdateExecutor = ThingsBoardExecutors.newWorkStealingPool((int)20, this.getClass());
        this.tsCallBackExecutor = Executors.newFixedThreadPool(8, (ThreadFactory)ThingsBoardThreadFactory.forName((String)"ts-sub-callback"));
        this.serviceId = this.serviceInfoProvider.getServiceId();
        this.staleSessionCleanupExecutor = ThingsBoardExecutors.newSingleThreadScheduledExecutor((String)"stale-session-cleanup");
        this.staleSessionCleanupExecutor.scheduleWithFixedDelay(this::cleanupStaleSessions, 60L, 60L, TimeUnit.SECONDS);
    }

    @PreDestroy
    public void shutdownExecutor() {
        if (this.subscriptionUpdateExecutor != null) {
            this.subscriptionUpdateExecutor.shutdownNow();
        }
        if (this.tsCallBackExecutor != null) {
            this.tsCallBackExecutor.shutdownNow();
        }
        if (this.staleSessionCleanupExecutor != null) {
            this.staleSessionCleanupExecutor.shutdownNow();
        }
    }

    @Override
    @EventListener(value={ClusterTopologyChangeEvent.class})
    public void onApplicationEvent(ClusterTopologyChangeEvent event) {
        if (event.getQueueKeys().stream().anyMatch(key -> ServiceType.TB_CORE.equals((Object)key.getType()))) {
            HashMap<TenantId, Set> staleSubs = new HashMap<TenantId, Set>();
            this.subscriptionsByEntityId.forEach((id, sub) -> {
                try {
                    this.pushSubEventToManagerService(sub.getTenantId(), sub.getEntityId(), sub.toEvent(ComponentLifecycleEvent.UPDATED));
                }
                catch (TenantNotFoundException e) {
                    staleSubs.computeIfAbsent(sub.getTenantId(), key -> new HashSet()).add(id);
                    log.warn("Cleaning up stale subscription {} for tenant {} due to TenantNotFoundException", id, (Object)sub.getTenantId());
                }
                catch (Exception e) {
                    log.error("Failed to push subscription {} to manager service", sub, (Object)e);
                }
            });
            staleSubs.forEach((tenantId, subs) -> {
                Lock subsLock = this.getSubsLock((TenantId)tenantId);
                subsLock.lock();
                try {
                    subs.forEach(entityId -> {
                        this.subscriptionsByEntityId.remove(entityId);
                        this.entityUpdates.remove(entityId);
                    });
                }
                finally {
                    subsLock.unlock();
                }
            });
        }
    }

    @Override
    public void onCoreStartupMsg(TransportProtos.CoreStartupMsg coreStartupMsg) {
        this.subscriptionUpdateExecutor.submit(() -> {
            HashSet partitions = new HashSet(coreStartupMsg.getPartitionsList());
            AtomicInteger counter = new AtomicInteger();
            this.subscriptionsByEntityId.values().forEach(sub -> {
                TopicPartitionInfo tpi = this.partitionService.resolve(ServiceType.TB_CORE, sub.getTenantId(), sub.getEntityId());
                if (!tpi.isMyPartition() && partitions.contains(tpi.getPartition().orElse(Integer.MAX_VALUE))) {
                    this.pushToQueue(sub.getEntityId(), sub.toEvent(ComponentLifecycleEvent.UPDATED), tpi);
                    counter.incrementAndGet();
                }
            });
            log.info("[{}] Pushed {} subscriptions to [{}]", new Object[]{this.serviceId, counter.get(), coreStartupMsg.getServiceId()});
        });
    }

    Lock getSubsLock(TenantId tenantId) {
        return (Lock)this.locks.computeIfAbsent((Object)tenantId, x -> new ReentrantLock());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void addSubscription(TbSubscription<?> subscription, WebSocketSessionRef sessionRef) {
        SubscriptionModificationResult result;
        TenantId tenantId = subscription.getTenantId();
        EntityId entityId = subscription.getEntityId();
        if (!this.rateLimitService.checkRateLimit(LimitedApi.WS_SUBSCRIPTIONS, (Object)tenantId, this.subscriptionsPerTenantRateLimit)) {
            this.handleRateLimitError(subscription, sessionRef, "Exceeded rate limit for WS subscriptions per tenant");
            return;
        }
        if (sessionRef.getSecurityCtx() != null && !this.rateLimitService.checkRateLimit(LimitedApi.WS_SUBSCRIPTIONS, (Object)sessionRef.getSecurityCtx().getId(), this.subscriptionsPerUserRateLimit)) {
            this.handleRateLimitError(subscription, sessionRef, "Exceeded rate limit for WS subscriptions per user");
            return;
        }
        log.debug("[{}][{}] Register subscription: {}", new Object[]{tenantId, entityId, subscription});
        Lock subsLock = this.getSubsLock(tenantId);
        subsLock.lock();
        try {
            Map sessionSubscriptions = this.subscriptionsBySessionId.computeIfAbsent(subscription.getSessionId(), k -> new ConcurrentHashMap());
            sessionSubscriptions.put(subscription.getSubscriptionId(), subscription);
            result = this.modifySubscription(tenantId, entityId, subscription, true);
        }
        finally {
            subsLock.unlock();
        }
        if (result.hasEvent()) {
            this.pushSubscriptionEvent(result);
        }
    }

    @Override
    public void onSubEventCallback(TransportProtos.TbEntitySubEventCallbackProto subEventCallback, TbCallback callback) {
        TenantId tenantId = subEventCallback.getTenantIdMSB() == 0L && subEventCallback.getTenantIdLSB() == 0L ? TenantId.SYS_TENANT_ID : TenantId.fromUUID((UUID)new UUID(subEventCallback.getTenantIdMSB(), subEventCallback.getTenantIdLSB()));
        UUID entityId = new UUID(subEventCallback.getEntityIdMSB(), subEventCallback.getEntityIdLSB());
        this.onSubEventCallback(tenantId, entityId, subEventCallback.getSeqNumber(), new TbEntityUpdatesInfo(subEventCallback.getAttributesUpdateTs(), subEventCallback.getTimeSeriesUpdateTs()), callback);
    }

    @Override
    public void onSubEventCallback(TenantId tenantId, EntityId entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback callback) {
        this.onSubEventCallback(tenantId, entityId.getId(), seqNumber, entityUpdatesInfo, callback);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void onSubEventCallback(TenantId tenantId, UUID entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback callback) {
        log.debug("[{}][{}][{}] Processing sub event callback: {}.", new Object[]{tenantId, entityId, seqNumber, entityUpdatesInfo});
        this.entityUpdates.put(entityId, entityUpdatesInfo);
        Set<TbSubscription<?>> pendingSubs = null;
        Lock subsLock = this.getSubsLock(tenantId);
        subsLock.lock();
        try {
            TbEntityLocalSubsInfo entitySubs = (TbEntityLocalSubsInfo)this.subscriptionsByEntityId.get(entityId);
            if (entitySubs != null) {
                pendingSubs = entitySubs.clearPendingSubscriptions(seqNumber);
            }
        }
        finally {
            subsLock.unlock();
        }
        if (pendingSubs != null) {
            pendingSubs.forEach(this::checkMissedUpdates);
        }
        callback.onSuccess();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void cancelSubscription(TenantId tenantId, String sessionId, int subscriptionId) {
        log.debug("[{}][{}][{}] Going to remove subscription.", new Object[]{tenantId, sessionId, subscriptionId});
        SubscriptionModificationResult result = null;
        Lock subsLock = this.getSubsLock(tenantId);
        subsLock.lock();
        try {
            Map sessionSubscriptions = (Map)this.subscriptionsBySessionId.get(sessionId);
            if (sessionSubscriptions != null) {
                TbSubscription subscription = (TbSubscription)sessionSubscriptions.remove(subscriptionId);
                if (subscription != null) {
                    if (sessionSubscriptions.isEmpty()) {
                        this.subscriptionsBySessionId.remove(sessionId);
                    }
                    result = this.modifySubscription(subscription.getTenantId(), subscription.getEntityId(), subscription, false);
                } else {
                    log.debug("[{}][{}][{}] Subscription not found!", new Object[]{tenantId, sessionId, subscriptionId});
                }
            } else {
                log.debug("[{}][{}] No session subscriptions found!", (Object)tenantId, (Object)sessionId);
            }
        }
        finally {
            subsLock.unlock();
        }
        if (result != null && result.hasEvent()) {
            this.pushSubscriptionEvent(result);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void cancelAllSessionSubscriptions(TenantId tenantId, String sessionId) {
        log.debug("[{}][{}] Going to remove session subscriptions.", (Object)tenantId, (Object)sessionId);
        Lock subsLock = this.getSubsLock(tenantId);
        subsLock.lock();
        try {
            Map sessionSubscriptions = (Map)this.subscriptionsBySessionId.remove(sessionId);
            if (sessionSubscriptions != null) {
                Map<EntityId, List<TbSubscription>> entitySubscriptions = sessionSubscriptions.values().stream().collect(Collectors.groupingBy(TbSubscription::getEntityId));
                entitySubscriptions.forEach((entityId, subscriptions) -> {
                    TbEntitySubEvent event = this.removeAllSubscriptions(tenantId, (EntityId)entityId, (List<TbSubscription<?>>)subscriptions);
                    if (event != null) {
                        this.pushSubscriptionsEvent(tenantId, (EntityId)entityId, event);
                    }
                });
            } else {
                log.debug("[{}][{}] No session subscriptions found!", (Object)tenantId, (Object)sessionId);
            }
        }
        finally {
            subsLock.unlock();
        }
    }

    @Override
    public void onTimeSeriesUpdate(TransportProtos.TbSubUpdateProto proto, TbCallback callback) {
        this.onTimeSeriesUpdate(new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()), TbSubscriptionUtils.fromProto(proto), callback);
    }

    @Override
    public void onTimeSeriesUpdate(EntityId entityId, List<TsKvEntry> data, TbCallback callback) {
        this.onTimeSeriesUpdate(entityId.getId(), data, callback);
    }

    private void onTimeSeriesUpdate(UUID entityId, List<TsKvEntry> data, TbCallback callback) {
        this.getEntityUpdatesInfo((UUID)entityId).timeSeriesUpdateTs = System.currentTimeMillis();
        this.processSubscriptionData(entityId, (TbSubscription<?> sub) -> TbSubscriptionType.TIMESERIES.equals((Object)sub.getType()), s -> {
            TbTimeSeriesSubscription sub = (TbTimeSeriesSubscription)s;
            ArrayList<TsKvEntry> updateData = null;
            Map<String, Long> keyStates = sub.getKeyStates();
            if (sub.isAllKeys()) {
                if (sub.isLatestValues()) {
                    for (TsKvEntry kv : data) {
                        Long stateTs = keyStates.get(kv.getKey());
                        if (stateTs != null && kv.getTs() < stateTs) continue;
                        if (updateData == null) {
                            updateData = new ArrayList<TsKvEntry>();
                        }
                        updateData.add(kv);
                    }
                } else {
                    updateData = data;
                }
            } else {
                for (TsKvEntry kv : data) {
                    Long stateTs = keyStates.get(kv.getKey());
                    if (stateTs == null || sub.isLatestValues() && kv.getTs() < stateTs) continue;
                    if (updateData == null) {
                        updateData = new ArrayList();
                    }
                    updateData.add(kv);
                }
            }
            if (updateData != null) {
                TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(sub.getSubscriptionId(), (List<TsKvEntry>)updateData);
                update.getLatestValues().forEach((key, value) -> sub.getKeyStates().put((String)key, (Long)value));
                this.subscriptionUpdateExecutor.submit(() -> sub.getUpdateProcessor().accept(sub, (TbTimeSeriesSubscription)((Object)update)));
            }
        }, callback);
    }

    @Override
    public void onAttributesUpdate(TransportProtos.TbSubUpdateProto proto, TbCallback callback) {
        this.onAttributesUpdate(new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()), proto.getScope(), TbSubscriptionUtils.fromProto(proto), callback);
    }

    @Override
    public void onAttributesUpdate(EntityId entityId, String scope, List<TsKvEntry> data, TbCallback callback) {
        this.onAttributesUpdate(entityId.getId(), scope, data, callback);
    }

    private void onAttributesUpdate(UUID entityId, String scope, List<TsKvEntry> data, TbCallback callback) {
        this.getEntityUpdatesInfo((UUID)entityId).attributesUpdateTs = System.currentTimeMillis();
        this.processSubscriptionData(entityId, (TbSubscription<?> sub) -> TbSubscriptionType.ATTRIBUTES.equals((Object)sub.getType()), s -> {
            TbAttributeSubscription sub = (TbAttributeSubscription)s;
            if (sub.getScope() == null || TbAttributeSubscriptionScope.ANY_SCOPE.equals((Object)sub.getScope()) || sub.getScope().name().equals(scope)) {
                ArrayList<TsKvEntry> updateData = null;
                if (sub.isAllKeys()) {
                    updateData = data;
                } else {
                    for (TsKvEntry kv : data) {
                        if (!sub.getKeyStates().containsKey(kv.getKey())) continue;
                        if (updateData == null) {
                            updateData = new ArrayList<TsKvEntry>();
                        }
                        updateData.add(kv);
                    }
                }
                if (updateData != null) {
                    TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(sub.getSubscriptionId(), (List<TsKvEntry>)updateData);
                    update.getLatestValues().forEach((key, value) -> sub.getKeyStates().put((String)key, (Long)value));
                    this.subscriptionUpdateExecutor.submit(() -> sub.getUpdateProcessor().accept(sub, (TbAttributeSubscription)((Object)update)));
                }
            }
        }, callback);
    }

    @Override
    public void onAlarmUpdate(TransportProtos.TbAlarmSubUpdateProto proto, TbCallback callback) {
        this.onAlarmUpdate(new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()), TbSubscriptionUtils.fromProto(proto), callback);
    }

    @Override
    public void onAlarmUpdate(EntityId entityId, AlarmInfo alarm, boolean deleted, TbCallback callback) {
        this.onAlarmUpdate(entityId.getId(), new AlarmSubscriptionUpdate(alarm, deleted), callback);
    }

    private void onAlarmUpdate(UUID entityId, AlarmSubscriptionUpdate update, TbCallback callback) {
        this.processSubscriptionData(entityId, (TbSubscription<?> sub) -> TbSubscriptionType.ALARMS.equals((Object)sub.getType()), update, callback);
    }

    @Override
    public void onNotificationUpdate(TransportProtos.NotificationsSubUpdateProto proto, TbCallback callback) {
        this.onNotificationUpdate(new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()), TbSubscriptionUtils.fromProto(proto), callback);
    }

    @Override
    public void onNotificationUpdate(EntityId entityId, NotificationsSubscriptionUpdate update, TbCallback callback) {
        this.onNotificationUpdate(entityId.getId(), update, callback);
    }

    private void onNotificationUpdate(UUID entityId, NotificationsSubscriptionUpdate update, TbCallback callback) {
        this.processSubscriptionData(entityId, (TbSubscription<?> sub) -> TbSubscriptionType.NOTIFICATIONS.equals((Object)sub.getType()) || TbSubscriptionType.NOTIFICATIONS_COUNT.equals((Object)sub.getType()), update, callback);
    }

    @Override
    public void onNotificationRequestUpdate(TenantId tenantId, NotificationRequestUpdate update, TbCallback callback) {
        log.trace("[{}] Received notification request update: {}", (Object)tenantId, (Object)update);
        NotificationsSubscriptionUpdate theUpdate = new NotificationsSubscriptionUpdate(update);
        this.subscriptionsByEntityId.values().forEach(subInfo -> {
            if (subInfo.isNf() && tenantId.equals((Object)subInfo.getTenantId()) && EntityType.USER.equals((Object)subInfo.getEntityId().getEntityType())) {
                subInfo.getSubs().forEach(s -> {
                    TbSubscription sub = s;
                    this.subscriptionUpdateExecutor.submit(() -> sub.getUpdateProcessor().accept(sub, (TbSubscription)((Object)theUpdate)));
                });
            }
        });
        callback.onSuccess();
    }

    private <T> void processSubscriptionData(UUID entityId, Predicate<TbSubscription<?>> filter, T data, TbCallback callback) {
        log.trace("[{}] Received subscription data: {}", (Object)entityId, data);
        TbEntityLocalSubsInfo subs = (TbEntityLocalSubsInfo)this.subscriptionsByEntityId.get(entityId);
        if (subs != null) {
            subs.getSubs().forEach(s -> {
                if (filter.test((TbSubscription<?>)s)) {
                    this.subscriptionUpdateExecutor.submit(() -> {
                        TbSubscription sub = s;
                        sub.getUpdateProcessor().accept(sub, (TbSubscription)data);
                    });
                }
            });
        }
        callback.onSuccess();
    }

    private void processSubscriptionData(UUID entityId, Predicate<TbSubscription<?>> filter, Consumer<TbSubscription<?>> processor, TbCallback callback) {
        TbEntityLocalSubsInfo subs = (TbEntityLocalSubsInfo)this.subscriptionsByEntityId.get(entityId);
        if (subs != null) {
            subs.getSubs().forEach(s -> {
                if (filter.test((TbSubscription<?>)s)) {
                    processor.accept((TbSubscription<?>)s);
                }
            });
        }
        callback.onSuccess();
    }

    private SubscriptionModificationResult modifySubscription(TenantId tenantId, EntityId entityId, TbSubscription<?> subscription, boolean add) {
        TbSubscription<?> missedUpdatesCandidate = null;
        TbEntitySubEvent event = null;
        try {
            TbEntityLocalSubsInfo entitySubs = this.subscriptionsByEntityId.computeIfAbsent(entityId.getId(), id -> new TbEntityLocalSubsInfo(tenantId, entityId));
            TbEntitySubEvent tbEntitySubEvent = event = add ? entitySubs.add(subscription) : entitySubs.remove(subscription);
            if (entitySubs.isEmpty()) {
                this.subscriptionsByEntityId.remove(entityId.getId());
                this.entityUpdates.remove(entityId.getId());
            } else if (add) {
                missedUpdatesCandidate = entitySubs.registerPendingSubscription(subscription, event);
            }
        }
        catch (Exception e) {
            log.warn("[{}][{}] Failed to {} subscription {} due to ", new Object[]{tenantId, entityId, add ? "add" : "remove", subscription, e});
        }
        return new SubscriptionModificationResult(tenantId, entityId, subscription, missedUpdatesCandidate, event);
    }

    private TbEntitySubEvent removeAllSubscriptions(TenantId tenantId, EntityId entityId, List<TbSubscription<?>> subscriptions) {
        TbEntitySubEvent event = null;
        try {
            TbEntityLocalSubsInfo entitySubs = (TbEntityLocalSubsInfo)this.subscriptionsByEntityId.get(entityId.getId());
            event = entitySubs.removeAll(subscriptions);
            if (entitySubs.isEmpty()) {
                this.subscriptionsByEntityId.remove(entityId.getId());
                this.entityUpdates.remove(entityId.getId());
            }
        }
        catch (Exception e) {
            log.warn("[{}][{}] Failed to remove all subscriptions {} due to ", new Object[]{tenantId, entityId, subscriptions, e});
        }
        return event;
    }

    private void pushSubscriptionsEvent(TenantId tenantId, EntityId entityId, TbEntitySubEvent event) {
        try {
            log.trace("[{}][{}] Event: {}", new Object[]{tenantId, entityId, event});
            this.pushSubEventToManagerService(tenantId, entityId, event);
        }
        catch (Exception e) {
            log.warn("[{}][{}] Failed to push subscription event {} due to ", new Object[]{tenantId, entityId, event, e});
        }
    }

    private void pushSubscriptionEvent(SubscriptionModificationResult modificationResult) {
        try {
            TbEntitySubEvent event = modificationResult.getEvent();
            log.trace("[{}][{}][{}] Event: {}", new Object[]{modificationResult.getTenantId(), modificationResult.getEntityId(), modificationResult.getSubscription().getSubscriptionId(), event});
            this.pushSubEventToManagerService(modificationResult.getTenantId(), modificationResult.getEntityId(), event);
            TbSubscription<?> missedUpdatesCandidate = modificationResult.getMissedUpdatesCandidate();
            if (missedUpdatesCandidate != null) {
                this.checkMissedUpdates(missedUpdatesCandidate);
            }
        }
        catch (Exception e) {
            log.warn("[{}][{}] Failed to push subscription event {} due to ", new Object[]{modificationResult.getTenantId(), modificationResult.getEntityId(), modificationResult.getEvent(), e});
        }
    }

    private void pushSubEventToManagerService(TenantId tenantId, EntityId entityId, TbEntitySubEvent event) {
        TopicPartitionInfo tpi = this.partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
        if (tpi.isMyPartition()) {
            this.subscriptionManagerService.onSubEvent(this.serviceId, event, TbCallback.EMPTY);
        } else {
            this.pushToQueue(entityId, event, tpi);
        }
    }

    private void pushToQueue(EntityId entityId, TbEntitySubEvent event, TopicPartitionInfo tpi) {
        this.clusterService.pushMsgToCore(tpi, entityId.getId(), TbSubscriptionUtils.toSubEventProto(this.serviceId, event), null);
    }

    private void checkMissedUpdates(TbSubscription<?> subscription) {
        log.trace("[{}][{}][{}] Check missed updates for subscription: {}", new Object[]{subscription.getTenantId(), subscription.getEntityId(), subscription.getSubscriptionId(), subscription});
        switch (subscription.getType()) {
            case TIMESERIES: {
                this.handleNewTelemetrySubscription((TbTimeSeriesSubscription)subscription);
                break;
            }
            case ATTRIBUTES: {
                this.handleNewAttributeSubscription((TbAttributeSubscription)subscription);
            }
        }
    }

    private void handleNewAttributeSubscription(TbAttributeSubscription subscription) {
        log.trace("[{}][{}][{}] Processing attribute subscription for entity [{}]", new Object[]{subscription.getTenantId(), subscription.getSessionId(), subscription.getSubscriptionId(), subscription.getEntityId()});
        TbEntityUpdatesInfo entityUpdateInfo = (TbEntityUpdatesInfo)this.entityUpdates.get(subscription.getEntityId().getId());
        if (entityUpdateInfo != null && entityUpdateInfo.attributesUpdateTs > 0L && subscription.getQueryTs() > entityUpdateInfo.attributesUpdateTs) {
            log.trace("[{}][{}][{}] No need to check for missed updates [{}]", new Object[]{subscription.getTenantId(), subscription.getSessionId(), subscription.getSubscriptionId(), subscription.getEntityId()});
            return;
        }
        Map<String, Long> keyStates = subscription.getKeyStates();
        AttributeScope scope = subscription.getScope() != null && subscription.getScope().getAttributeScope() != null ? subscription.getScope().getAttributeScope() : AttributeScope.CLIENT_SCOPE;
        DonAsynchron.withCallback((ListenableFuture)this.attrService.find(subscription.getTenantId(), subscription.getEntityId(), scope, keyStates.keySet()), values -> {
            ArrayList updates = new ArrayList();
            values.forEach(latestEntry -> {
                if (latestEntry.getLastUpdateTs() > (Long)keyStates.get(latestEntry.getKey())) {
                    updates.add(new BasicTsKvEntry(latestEntry.getLastUpdateTs(), (KvEntry)latestEntry));
                }
            });
            List<TsKvEntry> missedUpdates = updates.stream().filter(u -> u.getValue() != null).collect(Collectors.toList());
            if (!missedUpdates.isEmpty()) {
                this.onAttributesUpdate(subscription.getEntityId(), scope.name(), missedUpdates, TbCallback.EMPTY);
            }
        }, e -> log.error("Failed to fetch missed updates.", e), (Executor)this.tsCallBackExecutor);
    }

    private void handleNewTelemetrySubscription(TbTimeSeriesSubscription subscription) {
        log.trace("[{}][{}][{}] Processing telemetry subscription for entity [{}]", new Object[]{subscription.getTenantId(), subscription.getSessionId(), subscription.getSubscriptionId(), subscription.getEntityId()});
        TbEntityUpdatesInfo entityUpdateInfo = (TbEntityUpdatesInfo)this.entityUpdates.get(subscription.getEntityId().getId());
        if (entityUpdateInfo != null && entityUpdateInfo.timeSeriesUpdateTs > 0L && subscription.getQueryTs() > entityUpdateInfo.timeSeriesUpdateTs) {
            log.trace("[{}][{}][{}] No need to check for missed updates. time [{}][{}] diff: {}ms", new Object[]{subscription.getTenantId(), subscription.getSessionId(), subscription.getSubscriptionId(), subscription.getQueryTs(), entityUpdateInfo.timeSeriesUpdateTs, subscription.getQueryTs() - entityUpdateInfo.timeSeriesUpdateTs});
            return;
        }
        long curTs = System.currentTimeMillis();
        if (subscription.isLatestValues()) {
            DonAsynchron.withCallback((ListenableFuture)this.tsService.findLatest(subscription.getTenantId(), subscription.getEntityId(), subscription.getKeyStates().keySet()), missedUpdates -> {
                if (missedUpdates != null && !missedUpdates.isEmpty() && !(missedUpdates = missedUpdates.stream().filter(u -> u.getValue() != null).collect(Collectors.toList())).isEmpty()) {
                    this.onTimeSeriesUpdate(subscription.getEntityId(), (List<TsKvEntry>)missedUpdates, TbCallback.EMPTY);
                }
            }, e -> log.error("Failed to fetch missed updates.", e), (Executor)this.tsCallBackExecutor);
        } else {
            ArrayList queries = new ArrayList();
            subscription.getKeyStates().forEach((key, value) -> {
                if (curTs > value) {
                    long startTs = subscription.getStartTime() > 0L ? Math.max(subscription.getStartTime(), value + 1L) : value + 1L;
                    long endTs = subscription.getEndTime() > 0L ? Math.min(subscription.getEndTime(), curTs) : curTs;
                    queries.add(new BaseReadTsKvQuery(key, startTs, endTs, 0L, 1000, Aggregation.NONE));
                }
            });
            if (!queries.isEmpty()) {
                DonAsynchron.withCallback((ListenableFuture)this.tsService.findAll(subscription.getTenantId(), subscription.getEntityId(), queries), missedUpdates -> {
                    if (missedUpdates != null && !missedUpdates.isEmpty() && !(missedUpdates = missedUpdates.stream().filter(u -> u.getValue() != null).collect(Collectors.toList())).isEmpty()) {
                        this.onTimeSeriesUpdate(subscription.getEntityId(), (List<TsKvEntry>)missedUpdates, TbCallback.EMPTY);
                    }
                }, e -> log.error("Failed to fetch missed updates.", e), (Executor)this.tsCallBackExecutor);
            }
        }
    }

    private void cleanupStaleSessions() {
        this.subscriptionsBySessionId.forEach((sessionId, subscriptions) -> subscriptions.values().stream().findAny().ifPresent(subscription -> this.webSocketService.cleanupIfStale(subscription.getTenantId(), (String)sessionId)));
    }

    private void handleRateLimitError(TbSubscription<?> subscription, WebSocketSessionRef sessionRef, String message) {
        String deduplicationKey = sessionRef.getSessionId() + message;
        if (!DeduplicationUtil.alreadyProcessed((Object)deduplicationKey, (long)TimeUnit.SECONDS.toMillis(15L))) {
            log.info("{} {}", (Object)sessionRef, (Object)message);
            this.webSocketService.sendError(sessionRef, subscription.getSubscriptionId(), SubscriptionErrorCode.BAD_REQUEST, message);
        }
        throw new TbRateLimitsException(message);
    }

    private TbEntityUpdatesInfo getEntityUpdatesInfo(UUID entityId) {
        return this.entityUpdates.computeIfAbsent(entityId, id -> new TbEntityUpdatesInfo(0L));
    }
}

