/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.service.subscription;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.lang3.ArrayUtils;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.web.socket.CloseStatus;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.ReadTsKvQueryResult;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.query.AlarmData;
import org.thingsboard.server.common.data.query.AlarmDataPageLink;
import org.thingsboard.server.common.data.query.AlarmDataQuery;
import org.thingsboard.server.common.data.query.ComparisonTsValue;
import org.thingsboard.server.common.data.query.EntityData;
import org.thingsboard.server.common.data.query.EntityDataQuery;
import org.thingsboard.server.common.data.query.EntityKey;
import org.thingsboard.server.common.data.query.EntityKeyType;
import org.thingsboard.server.common.data.query.TsValue;
import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
import org.thingsboard.server.dao.alarm.AlarmService;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.entity.EntityService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.subscription.ReadTsKvQueryInfo;
import org.thingsboard.server.service.subscription.SubscriptionErrorCode;
import org.thingsboard.server.service.subscription.SubscriptionServiceStatistics;
import org.thingsboard.server.service.subscription.TbAbstractEntityQuerySubCtx;
import org.thingsboard.server.service.subscription.TbAbstractSubCtx;
import org.thingsboard.server.service.subscription.TbAlarmCountSubCtx;
import org.thingsboard.server.service.subscription.TbAlarmDataSubCtx;
import org.thingsboard.server.service.subscription.TbAlarmStatusSubCtx;
import org.thingsboard.server.service.subscription.TbEntityCountSubCtx;
import org.thingsboard.server.service.subscription.TbEntityDataSubCtx;
import org.thingsboard.server.service.subscription.TbEntityDataSubscriptionService;
import org.thingsboard.server.service.subscription.TbLocalSubscriptionService;
import org.thingsboard.server.service.ws.WebSocketService;
import org.thingsboard.server.service.ws.WebSocketSessionRef;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AggHistoryCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AggKey;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AggTimeSeriesCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountUpdate;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmDataCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmDataUpdate;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmStatusCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityCountCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUpdate;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityHistoryCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.GetTsCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.LatestValueCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.TimeSeriesCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.UnsubscribeCmd;

@TbCoreComponent
@Service
public class DefaultTbEntityDataSubscriptionService
implements TbEntityDataSubscriptionService {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(DefaultTbEntityDataSubscriptionService.class);
    private static final int DEFAULT_LIMIT = 100;
    private final ConcurrentMap<String, ConcurrentMap<Integer, TbAbstractSubCtx>> subscriptionsBySessionId = new ConcurrentHashMap<String, ConcurrentMap<Integer, TbAbstractSubCtx>>();
    @Autowired
    @Lazy
    private WebSocketService wsService;
    @Autowired
    private EntityService entityService;
    @Autowired
    private AlarmService alarmService;
    @Autowired
    private AttributesService attributesService;
    @Autowired
    @Lazy
    private TbLocalSubscriptionService localSubscriptionService;
    @Autowired
    private TimeseriesService tsService;
    @Autowired
    private TbServiceInfoProvider serviceInfoProvider;
    @Autowired
    private DbCallbackExecutorService dbCallbackExecutor;
    private ScheduledExecutorService scheduler;
    @Value(value="${database.ts.type}")
    private String databaseTsType;
    @Value(value="${server.ws.dynamic_page_link.refresh_interval:6}")
    private long dynamicPageLinkRefreshInterval;
    @Value(value="${server.ws.dynamic_page_link.refresh_pool_size:1}")
    private int dynamicPageLinkRefreshPoolSize;
    @Value(value="${server.ws.max_entities_per_data_subscription:1000}")
    private int maxEntitiesPerDataSubscription;
    @Value(value="${server.ws.max_entities_per_alarm_subscription:1000}")
    private int maxEntitiesPerAlarmSubscription;
    @Value(value="${server.ws.dynamic_page_link.max_alarm_queries_per_refresh_interval:10}")
    private int maxAlarmQueriesPerRefreshInterval;
    @Value(value="${ui.dashboard.max_datapoints_limit:50000}")
    private int maxDatapointLimit;
    @Value(value="${server.ws.alarms_per_alarm_status_subscription_cache_size:10}")
    private int alarmsPerAlarmStatusSubscriptionCacheSize;
    private ExecutorService wsCallBackExecutor;
    private boolean tsInSqlDB;
    private String serviceId;
    private SubscriptionServiceStatistics stats = new SubscriptionServiceStatistics();

    @PostConstruct
    public void initExecutor() {
        this.serviceId = this.serviceInfoProvider.getServiceId();
        this.wsCallBackExecutor = Executors.newSingleThreadExecutor((ThreadFactory)ThingsBoardThreadFactory.forName((String)"ws-entity-sub-callback"));
        this.tsInSqlDB = this.databaseTsType.equalsIgnoreCase("sql") || this.databaseTsType.equalsIgnoreCase("timescale");
        this.scheduler = this.dynamicPageLinkRefreshPoolSize == 1 ? ThingsBoardExecutors.newSingleThreadScheduledExecutor((String)"ws-entity-sub-scheduler") : ThingsBoardExecutors.newScheduledThreadPool((int)this.dynamicPageLinkRefreshPoolSize, (String)"ws-entity-sub-scheduler");
    }

    @PreDestroy
    public void shutdownExecutor() {
        if (this.wsCallBackExecutor != null) {
            this.wsCallBackExecutor.shutdownNow();
        }
        if (this.scheduler != null) {
            this.scheduler.shutdownNow();
        }
    }

    @Override
    public void handleCmd(WebSocketSessionRef session, final EntityDataCmd cmd) {
        TbEntityDataSubCtx ctx = (TbEntityDataSubCtx)this.getSubCtx(session.getSessionId(), cmd.getCmdId());
        if (ctx != null) {
            log.debug("[{}][{}] Updating existing subscriptions using: {}", new Object[]{session.getSessionId(), cmd.getCmdId(), cmd});
            if (cmd.hasAnyCmd()) {
                ctx.clearEntitySubscriptions();
            }
        } else {
            log.debug("[{}][{}] Creating new subscription using: {}", new Object[]{session.getSessionId(), cmd.getCmdId(), cmd});
            ctx = this.createSubCtx(session, cmd);
        }
        ctx.setCurrentCmd(cmd);
        if (cmd.getQuery() != null) {
            if (ctx.getQuery() == null) {
                log.debug("[{}][{}] Initializing data using query: {}", new Object[]{session.getSessionId(), cmd.getCmdId(), cmd.getQuery()});
            } else {
                log.debug("[{}][{}] Updating data using query: {}", new Object[]{session.getSessionId(), cmd.getCmdId(), cmd.getQuery()});
            }
            ctx.setAndResolveQuery(cmd.getQuery());
            EntityDataQuery query = (EntityDataQuery)ctx.getQuery();
            if (cmd.getLatestCmd() != null) {
                cmd.getLatestCmd().getKeys().forEach(key -> {
                    if (!query.getLatestValues().contains(key)) {
                        query.getLatestValues().add(key);
                    }
                });
            }
            long start = System.currentTimeMillis();
            ctx.fetchData();
            long end = System.currentTimeMillis();
            this.stats.getRegularQueryInvocationCnt().incrementAndGet();
            this.stats.getRegularQueryTimeSpent().addAndGet(end - start);
            ctx.cancelTasks();
            if (((EntityDataQuery)ctx.getQuery()).getPageLink().isDynamic()) {
                TbEntityDataSubCtx finalCtx = ctx;
                ScheduledFuture<?> task = this.scheduler.scheduleWithFixedDelay(() -> this.refreshDynamicQuery(finalCtx), this.dynamicPageLinkRefreshInterval, this.dynamicPageLinkRefreshInterval, TimeUnit.SECONDS);
                finalCtx.setRefreshTask(task);
            }
        }
        try {
            ArrayList<ListenableFuture<TbEntityDataSubCtx>> cmdFutures = new ArrayList<ListenableFuture<TbEntityDataSubCtx>>();
            if (cmd.getAggHistoryCmd() != null) {
                cmdFutures.add(this.handleAggHistoryCmd(ctx, cmd.getAggHistoryCmd()));
            }
            if (cmd.getAggTsCmd() != null) {
                cmdFutures.add(this.handleAggTsCmd(ctx, cmd.getAggTsCmd()));
            }
            if (cmd.getHistoryCmd() != null) {
                cmdFutures.add(this.handleHistoryCmd(ctx, cmd.getHistoryCmd()));
            }
            if (cmdFutures.isEmpty()) {
                this.handleRegularCommands(ctx, cmd);
            } else {
                final TbEntityDataSubCtx finalCtx = ctx;
                Futures.addCallback((ListenableFuture)Futures.allAsList(cmdFutures), (FutureCallback)new FutureCallback<List<Object>>(){

                    public void onSuccess(@Nullable List<Object> result) {
                        DefaultTbEntityDataSubscriptionService.this.handleRegularCommands(finalCtx, cmd);
                    }

                    public void onFailure(Throwable t) {
                        log.warn("[{}][{}] Failed to process command", (Object)finalCtx.getSessionId(), (Object)finalCtx.getCmdId());
                    }
                }, (Executor)this.wsCallBackExecutor);
            }
        }
        catch (RuntimeException e) {
            this.handleWsCmdRuntimeException(ctx.getSessionId(), e, cmd);
        }
    }

    private void handleRegularCommands(TbEntityDataSubCtx ctx, EntityDataCmd cmd) {
        try {
            if (cmd.getLatestCmd() != null || cmd.getTsCmd() != null) {
                if (cmd.getLatestCmd() != null) {
                    this.handleLatestCmd(ctx, cmd.getLatestCmd());
                }
                if (cmd.getTsCmd() != null) {
                    this.handleTimeSeriesCmd(ctx, cmd.getTsCmd());
                }
            } else {
                this.checkAndSendInitialData(ctx);
            }
        }
        catch (RuntimeException e) {
            this.handleWsCmdRuntimeException(ctx.getSessionId(), e, cmd);
        }
    }

    private void checkAndSendInitialData(@Nullable TbEntityDataSubCtx theCtx) {
        if (!theCtx.isInitialDataSent()) {
            EntityDataUpdate update = new EntityDataUpdate(theCtx.getCmdId(), theCtx.getData(), null, theCtx.getMaxEntitiesPerDataSubscription());
            theCtx.sendWsMsg(update);
            theCtx.setInitialDataSent(true);
        }
    }

    private ListenableFuture<TbEntityDataSubCtx> handleAggHistoryCmd(TbEntityDataSubCtx ctx, AggHistoryCmd cmd) {
        ConcurrentHashMap<Integer, ReadTsKvQueryInfo> queries = new ConcurrentHashMap<Integer, ReadTsKvQueryInfo>();
        for (AggKey key : cmd.getKeys()) {
            BaseReadTsKvQuery query;
            if (key.getPreviousValueOnly() == null || !key.getPreviousValueOnly().booleanValue()) {
                query = new BaseReadTsKvQuery(key.getKey(), cmd.getStartTs(), cmd.getEndTs(), cmd.getEndTs() - cmd.getStartTs(), 1, key.getAgg());
                queries.put(query.getId(), new ReadTsKvQueryInfo(key, (ReadTsKvQuery)query, false));
            }
            if (key.getPreviousStartTs() == null || key.getPreviousEndTs() == null || key.getPreviousEndTs() < key.getPreviousStartTs()) continue;
            query = new BaseReadTsKvQuery(key.getKey(), key.getPreviousStartTs().longValue(), key.getPreviousEndTs().longValue(), key.getPreviousEndTs() - key.getPreviousStartTs(), 1, key.getAgg());
            queries.put(query.getId(), new ReadTsKvQueryInfo(key, (ReadTsKvQuery)query, true));
        }
        return this.handleAggCmd(ctx, cmd.getKeys(), queries, cmd.getStartTs(), cmd.getEndTs(), false);
    }

    private ListenableFuture<TbEntityDataSubCtx> handleAggTsCmd(TbEntityDataSubCtx ctx, AggTimeSeriesCmd cmd) {
        ConcurrentHashMap<Integer, ReadTsKvQueryInfo> queries = new ConcurrentHashMap<Integer, ReadTsKvQueryInfo>();
        for (AggKey key : cmd.getKeys()) {
            BaseReadTsKvQuery query = new BaseReadTsKvQuery(key.getKey(), cmd.getStartTs(), cmd.getStartTs() + cmd.getTimeWindow(), cmd.getTimeWindow(), 1, key.getAgg());
            queries.put(query.getId(), new ReadTsKvQueryInfo(key, (ReadTsKvQuery)query, false));
        }
        return this.handleAggCmd(ctx, cmd.getKeys(), queries, cmd.getStartTs(), cmd.getStartTs() + cmd.getTimeWindow(), true);
    }

    private ListenableFuture<TbEntityDataSubCtx> handleAggCmd(TbEntityDataSubCtx ctx, List<AggKey> keys, ConcurrentMap<Integer, ReadTsKvQueryInfo> queries, long startTs, long endTs, boolean subscribe) {
        HashMap fetchResultMap = new HashMap();
        List entityDataList = ctx.getData().getData();
        List queryList = queries.values().stream().map(ReadTsKvQueryInfo::getQuery).collect(Collectors.toList());
        entityDataList.forEach(entityData -> fetchResultMap.put(entityData, this.tsService.findAllByQueries(ctx.getTenantId(), entityData.getEntityId(), queryList)));
        return Futures.transform((ListenableFuture)Futures.allAsList(fetchResultMap.values()), f -> {
            HashMap<EntityData, Map<String, Long>> lastTsEntityMap = new HashMap<EntityData, Map<String, Long>>();
            fetchResultMap.forEach((entityData, future) -> {
                try {
                    HashMap<String, Long> lastTsMap = new HashMap<String, Long>();
                    lastTsEntityMap.put((EntityData)entityData, (Map<String, Long>)lastTsMap);
                    List queryResults = (List)future.get();
                    if (queryResults != null) {
                        for (ReadTsKvQueryResult queryResult : queryResults) {
                            ReadTsKvQueryInfo queryInfo = (ReadTsKvQueryInfo)queries.get(queryResult.getQueryId());
                            ComparisonTsValue comparisonTsValue = entityData.getAggLatest().computeIfAbsent(queryInfo.getKey().getId(), agg -> new ComparisonTsValue());
                            if (queryInfo.isPrevious()) {
                                comparisonTsValue.setPrevious(queryResult.toTsValue(queryInfo.getQuery()));
                                continue;
                            }
                            comparisonTsValue.setCurrent(queryResult.toTsValue(queryInfo.getQuery()));
                            lastTsMap.put(queryInfo.getQuery().getKey(), queryResult.getLastEntryTs());
                        }
                    }
                    keys.forEach(key -> entityData.getAggLatest().putIfAbsent(key.getId(), new ComparisonTsValue(TsValue.EMPTY, TsValue.EMPTY)));
                }
                catch (InterruptedException | ExecutionException e) {
                    log.warn("[{}][{}][{}] Failed to fetch historical data", new Object[]{ctx.getSessionId(), ctx.getCmdId(), entityData.getEntityId(), e});
                    ctx.sendWsMsg(new EntityDataUpdate(ctx.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR.getCode(), "Failed to fetch historical data!"));
                }
            });
            ctx.getWsLock().lock();
            try {
                EntityDataUpdate update;
                if (!ctx.isInitialDataSent()) {
                    update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null, ctx.getMaxEntitiesPerDataSubscription());
                    ctx.setInitialDataSent(true);
                } else {
                    update = new EntityDataUpdate(ctx.getCmdId(), null, entityDataList, ctx.getMaxEntitiesPerDataSubscription());
                }
                if (subscribe) {
                    ctx.createTimeSeriesSubscriptions(lastTsEntityMap, startTs, endTs, true);
                }
                ctx.sendWsMsg(update);
                entityDataList.forEach(EntityData::clearTsAndAggData);
            }
            finally {
                ctx.getWsLock().unlock();
            }
            return ctx;
        }, (Executor)this.wsCallBackExecutor);
    }

    private void handleWsCmdRuntimeException(String sessionId, RuntimeException e, EntityDataCmd cmd) {
        log.debug("[{}] Failed to process ws cmd: {}", new Object[]{sessionId, cmd, e});
        if (e instanceof TbRateLimitsException) {
            return;
        }
        this.wsService.close(sessionId, CloseStatus.SERVICE_RESTARTED);
    }

    @Override
    public void handleCmd(WebSocketSessionRef session, EntityCountCmd cmd) {
        TbEntityCountSubCtx ctx = (TbEntityCountSubCtx)this.getSubCtx(session.getSessionId(), cmd.getCmdId());
        if (ctx == null) {
            ctx = this.createSubCtx(session, cmd);
            long start = System.currentTimeMillis();
            ctx.fetchData();
            long end = System.currentTimeMillis();
            this.stats.getRegularQueryInvocationCnt().incrementAndGet();
            this.stats.getRegularQueryTimeSpent().addAndGet(end - start);
            TbEntityCountSubCtx finalCtx = ctx;
            ScheduledFuture<?> task = this.scheduler.scheduleWithFixedDelay(() -> this.refreshDynamicQuery(finalCtx), this.dynamicPageLinkRefreshInterval, this.dynamicPageLinkRefreshInterval, TimeUnit.SECONDS);
            finalCtx.setRefreshTask(task);
        } else {
            log.debug("[{}][{}] Received duplicate command: {}", new Object[]{session.getSessionId(), cmd.getCmdId(), cmd});
        }
    }

    @Override
    public void handleCmd(WebSocketSessionRef session, AlarmDataCmd cmd) {
        TbAlarmDataSubCtx ctx = (TbAlarmDataSubCtx)this.getSubCtx(session.getSessionId(), cmd.getCmdId());
        if (ctx == null) {
            log.debug("[{}][{}] Creating new alarm subscription using: {}", new Object[]{session.getSessionId(), cmd.getCmdId(), cmd});
            ctx = this.createSubCtx(session, cmd);
        }
        ctx.setAndResolveQuery(cmd.getQuery());
        AlarmDataQuery adq = (AlarmDataQuery)ctx.getQuery();
        long start = System.currentTimeMillis();
        ctx.fetchData();
        long end = System.currentTimeMillis();
        this.stats.getRegularQueryInvocationCnt().incrementAndGet();
        this.stats.getRegularQueryTimeSpent().addAndGet(end - start);
        List<EntityData> entities = ctx.getEntitiesData();
        ctx.cancelTasks();
        ctx.clearEntitySubscriptions();
        if (entities.isEmpty()) {
            AlarmDataUpdate update = new AlarmDataUpdate(cmd.getCmdId(), (PageData<AlarmData>)new PageData(), null, 0L, 0L);
            ctx.sendWsMsg(update);
        } else {
            ctx.fetchAlarms();
            ctx.createLatestValuesSubscriptions(cmd.getQuery().getLatestValues());
            if (((AlarmDataPageLink)adq.getPageLink()).getTimeWindow() > 0L) {
                TbAlarmDataSubCtx finalCtx = ctx;
                ScheduledFuture<?> task = this.scheduler.scheduleWithFixedDelay(() -> this.refreshAlarmQuery(finalCtx), this.dynamicPageLinkRefreshInterval, this.dynamicPageLinkRefreshInterval, TimeUnit.SECONDS);
                finalCtx.setRefreshTask(task);
            }
        }
    }

    @Override
    public void handleCmd(WebSocketSessionRef session, AlarmCountCmd cmd) {
        TbAlarmCountSubCtx ctx = (TbAlarmCountSubCtx)this.getSubCtx(session.getSessionId(), cmd.getCmdId());
        if (ctx == null) {
            ctx = this.createSubCtx(session, cmd);
            long start = System.currentTimeMillis();
            ctx.fetchData();
            long end = System.currentTimeMillis();
            this.stats.getRegularQueryInvocationCnt().incrementAndGet();
            this.stats.getRegularQueryTimeSpent().addAndGet(end - start);
            LinkedHashSet<EntityId> entitiesIds = ctx.getEntitiesIds();
            ctx.cancelTasks();
            ctx.clearAlarmSubscriptions();
            if (entitiesIds != null && entitiesIds.isEmpty()) {
                AlarmCountUpdate update = new AlarmCountUpdate(cmd.getCmdId(), 0);
                ctx.sendWsMsg(update);
            } else {
                ctx.doFetchAlarmCount();
                if (entitiesIds != null) {
                    ctx.createAlarmSubscriptions();
                }
                TbAlarmCountSubCtx finalCtx = ctx;
                ScheduledFuture<?> task = this.scheduler.scheduleWithFixedDelay(() -> this.refreshDynamicQuery(finalCtx), this.dynamicPageLinkRefreshInterval, this.dynamicPageLinkRefreshInterval, TimeUnit.SECONDS);
                finalCtx.setRefreshTask(task);
            }
        } else {
            log.debug("[{}][{}] Received duplicate command: {}", new Object[]{session.getSessionId(), cmd.getCmdId(), cmd});
        }
    }

    @Override
    public void handleCmd(WebSocketSessionRef session, AlarmStatusCmd cmd) {
        log.debug("[{}] Handling alarm status subscription cmd (cmdId: {})", (Object)session.getSessionId(), (Object)cmd.getCmdId());
        TbAlarmStatusSubCtx ctx = (TbAlarmStatusSubCtx)this.getSubCtx(session.getSessionId(), cmd.getCmdId());
        if (ctx == null) {
            ctx = this.createSubCtx(session, cmd);
            long start = System.currentTimeMillis();
            ctx.fetchActiveAlarms();
            long end = System.currentTimeMillis();
            this.stats.getAlarmQueryInvocationCnt().incrementAndGet();
            this.stats.getAlarmQueryTimeSpent().addAndGet(end - start);
            ctx.sendUpdate();
        } else {
            log.debug("[{}][{}] Received duplicate command: {}", new Object[]{session.getSessionId(), cmd.getCmdId(), cmd});
        }
    }

    private boolean validate(TbAbstractSubCtx finalCtx) {
        if (finalCtx.isStopped()) {
            log.warn("[{}][{}][{}] Received validation task for already stopped context.", new Object[]{finalCtx.getTenantId(), finalCtx.getSessionId(), finalCtx.getCmdId()});
            return false;
        }
        ConcurrentMap cmdMap = (ConcurrentMap)this.subscriptionsBySessionId.get(finalCtx.getSessionId());
        if (cmdMap == null) {
            log.warn("[{}][{}][{}] Received validation task for already removed session.", new Object[]{finalCtx.getTenantId(), finalCtx.getSessionId(), finalCtx.getCmdId()});
            return false;
        }
        if (!cmdMap.containsKey(finalCtx.getCmdId())) {
            log.warn("[{}][{}][{}] Received validation task for unregistered cmdId.", new Object[]{finalCtx.getTenantId(), finalCtx.getSessionId(), finalCtx.getCmdId()});
            return false;
        }
        return true;
    }

    private void refreshDynamicQuery(TbAbstractEntityQuerySubCtx<?> finalCtx) {
        try {
            if (this.validate(finalCtx)) {
                long start = System.currentTimeMillis();
                finalCtx.update();
                long end = System.currentTimeMillis();
                log.trace("[{}][{}] Executing query: {}", new Object[]{finalCtx.getSessionId(), finalCtx.getCmdId(), finalCtx.getQuery()});
                this.stats.getDynamicQueryInvocationCnt().incrementAndGet();
                this.stats.getDynamicQueryTimeSpent().addAndGet(end - start);
            } else {
                finalCtx.stop();
            }
        }
        catch (Exception e) {
            log.warn("[{}][{}] Failed to refresh query", new Object[]{finalCtx.getSessionId(), finalCtx.getCmdId(), e});
        }
    }

    private void refreshAlarmQuery(TbAlarmDataSubCtx finalCtx) {
        if (this.validate(finalCtx)) {
            finalCtx.checkAndResetInvocationCounter();
        } else {
            finalCtx.stop();
        }
    }

    @Scheduled(fixedDelayString="${server.ws.dynamic_page_link.stats:10000}")
    public void printStats() {
        int alarmQueryInvocationCntValue = this.stats.getAlarmQueryInvocationCnt().getAndSet(0);
        long alarmQueryInvocationTimeValue = this.stats.getAlarmQueryTimeSpent().getAndSet(0L);
        int regularQueryInvocationCntValue = this.stats.getRegularQueryInvocationCnt().getAndSet(0);
        long regularQueryInvocationTimeValue = this.stats.getRegularQueryTimeSpent().getAndSet(0L);
        int dynamicQueryInvocationCntValue = this.stats.getDynamicQueryInvocationCnt().getAndSet(0);
        long dynamicQueryInvocationTimeValue = this.stats.getDynamicQueryTimeSpent().getAndSet(0L);
        long dynamicQueryCnt = this.subscriptionsBySessionId.values().stream().mapToLong(m -> m.values().stream().filter(TbAbstractSubCtx::isDynamic).count()).sum();
        if (regularQueryInvocationCntValue > 0 || dynamicQueryInvocationCntValue > 0 || dynamicQueryCnt > 0L || alarmQueryInvocationCntValue > 0) {
            log.info("Stats: regularQueryInvocationCnt = [{}], regularQueryInvocationTime = [{}], dynamicQueryCnt = [{}] dynamicQueryInvocationCnt = [{}], dynamicQueryInvocationTime = [{}], alarmQueryInvocationCnt = [{}], alarmQueryInvocationTime = [{}]", new Object[]{regularQueryInvocationCntValue, regularQueryInvocationTimeValue, dynamicQueryCnt, dynamicQueryInvocationCntValue, dynamicQueryInvocationTimeValue, alarmQueryInvocationCntValue, alarmQueryInvocationTimeValue});
        }
    }

    private TbEntityDataSubCtx createSubCtx(WebSocketSessionRef sessionRef, EntityDataCmd cmd) {
        Map sessionSubs = this.subscriptionsBySessionId.computeIfAbsent(sessionRef.getSessionId(), k -> new ConcurrentHashMap());
        TbEntityDataSubCtx ctx = new TbEntityDataSubCtx(this.serviceId, this.wsService, this.entityService, this.localSubscriptionService, this.attributesService, this.stats, sessionRef, cmd.getCmdId(), this.maxEntitiesPerDataSubscription);
        if (cmd.getQuery() != null) {
            ctx.setAndResolveQuery(cmd.getQuery());
        }
        sessionSubs.put(cmd.getCmdId(), ctx);
        return ctx;
    }

    private TbEntityCountSubCtx createSubCtx(WebSocketSessionRef sessionRef, EntityCountCmd cmd) {
        Map sessionSubs = this.subscriptionsBySessionId.computeIfAbsent(sessionRef.getSessionId(), k -> new ConcurrentHashMap());
        TbEntityCountSubCtx ctx = new TbEntityCountSubCtx(this.serviceId, this.wsService, this.entityService, this.localSubscriptionService, this.attributesService, this.stats, sessionRef, cmd.getCmdId());
        if (cmd.getQuery() != null) {
            ctx.setAndResolveQuery(cmd.getQuery());
        }
        sessionSubs.put(cmd.getCmdId(), ctx);
        return ctx;
    }

    private TbAlarmDataSubCtx createSubCtx(WebSocketSessionRef sessionRef, AlarmDataCmd cmd) {
        Map sessionSubs = this.subscriptionsBySessionId.computeIfAbsent(sessionRef.getSessionId(), k -> new ConcurrentHashMap());
        TbAlarmDataSubCtx ctx = new TbAlarmDataSubCtx(this.serviceId, this.wsService, this.entityService, this.localSubscriptionService, this.attributesService, this.stats, this.alarmService, sessionRef, cmd.getCmdId(), this.maxEntitiesPerAlarmSubscription, this.maxAlarmQueriesPerRefreshInterval);
        ctx.setAndResolveQuery(cmd.getQuery());
        sessionSubs.put(cmd.getCmdId(), ctx);
        return ctx;
    }

    private TbAlarmCountSubCtx createSubCtx(WebSocketSessionRef sessionRef, AlarmCountCmd cmd) {
        Map sessionSubs = this.subscriptionsBySessionId.computeIfAbsent(sessionRef.getSessionId(), k -> new ConcurrentHashMap());
        TbAlarmCountSubCtx ctx = new TbAlarmCountSubCtx(this.serviceId, this.wsService, this.entityService, this.localSubscriptionService, this.attributesService, this.stats, this.alarmService, sessionRef, cmd.getCmdId(), this.maxEntitiesPerAlarmSubscription, this.maxAlarmQueriesPerRefreshInterval);
        if (cmd.getQuery() != null) {
            ctx.setAndResolveQuery(cmd.getQuery());
        }
        sessionSubs.put(cmd.getCmdId(), ctx);
        return ctx;
    }

    private TbAlarmStatusSubCtx createSubCtx(WebSocketSessionRef sessionRef, AlarmStatusCmd cmd) {
        Map sessionSubs = this.subscriptionsBySessionId.computeIfAbsent(sessionRef.getSessionId(), k -> new ConcurrentHashMap());
        TbAlarmStatusSubCtx ctx = new TbAlarmStatusSubCtx(this.serviceId, this.wsService, this.localSubscriptionService, this.stats, this.alarmService, this.alarmsPerAlarmStatusSubscriptionCacheSize, sessionRef, cmd.getCmdId());
        ctx.createSubscription(cmd);
        sessionSubs.put(cmd.getCmdId(), ctx);
        return ctx;
    }

    private <T extends TbAbstractSubCtx> T getSubCtx(String sessionId, int cmdId) {
        Map sessionSubs = (Map)this.subscriptionsBySessionId.get(sessionId);
        if (sessionSubs != null) {
            return (T)((TbAbstractSubCtx)sessionSubs.get(cmdId));
        }
        return null;
    }

    private ListenableFuture<TbEntityDataSubCtx> handleTimeSeriesCmd(TbEntityDataSubCtx ctx, TimeSeriesCmd cmd) {
        log.debug("[{}][{}] Fetching time-series data for last {} ms for keys: ({})", new Object[]{ctx.getSessionId(), ctx.getCmdId(), cmd.getTimeWindow(), cmd.getKeys()});
        return this.handleGetTsCmd(ctx, cmd, true);
    }

    private ListenableFuture<TbEntityDataSubCtx> handleHistoryCmd(TbEntityDataSubCtx ctx, EntityHistoryCmd cmd) {
        log.debug("[{}][{}] Fetching history data for start {} and end {} ms for keys: ({})", new Object[]{ctx.getSessionId(), ctx.getCmdId(), cmd.getStartTs(), cmd.getEndTs(), cmd.getKeys()});
        return this.handleGetTsCmd(ctx, cmd, false);
    }

    private ListenableFuture<TbEntityDataSubCtx> handleGetTsCmd(TbEntityDataSubCtx ctx, GetTsCmd cmd, boolean subscribe) {
        List finalTsKvQueryList;
        ConcurrentHashMap queriesKeys = new ConcurrentHashMap();
        List<String> keys = cmd.getKeys();
        List tsKvQueryList = keys.stream().map(key -> {
            BaseReadTsKvQuery query = new BaseReadTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.toAggregationParams(), this.getLimit(cmd.getLimit()));
            queriesKeys.put(query.getId(), query.getKey());
            return query;
        }).collect(Collectors.toList());
        if (cmd.isFetchLatestPreviousPoint()) {
            finalTsKvQueryList = new ArrayList(tsKvQueryList);
            finalTsKvQueryList.addAll(keys.stream().map(key -> {
                BaseReadTsKvQuery query = new BaseReadTsKvQuery(key, cmd.getStartTs() - TimeUnit.DAYS.toMillis(365L), cmd.getStartTs(), cmd.toAggregationParams(), 1);
                queriesKeys.put(query.getId(), query.getKey());
                return query;
            }).collect(Collectors.toList()));
        } else {
            finalTsKvQueryList = tsKvQueryList;
        }
        HashMap fetchResultMap = new HashMap();
        List entityDataList = ctx.getData().getData();
        entityDataList.forEach(entityData -> fetchResultMap.put(entityData, this.tsService.findAllByQueries(ctx.getTenantId(), entityData.getEntityId(), finalTsKvQueryList)));
        return Futures.transform((ListenableFuture)Futures.allAsList(fetchResultMap.values()), f -> {
            HashMap<EntityData, Map<String, Long>> lastTsEntityMap = new HashMap<EntityData, Map<String, Long>>();
            fetchResultMap.forEach((entityData, future) -> {
                try {
                    HashMap<String, Long> lastTsMap = new HashMap<String, Long>();
                    lastTsEntityMap.put((EntityData)entityData, (Map<String, Long>)lastTsMap);
                    List queryResults = (List)future.get();
                    if (queryResults != null) {
                        for (ReadTsKvQueryResult queryResult : queryResults) {
                            String queryKey = (String)queriesKeys.get(queryResult.getQueryId());
                            if (queryKey != null) {
                                entityData.getTimeseries().merge(queryKey, queryResult.toTsValues(), ArrayUtils::addAll);
                                lastTsMap.merge(queryKey, queryResult.getLastEntryTs(), Math::max);
                                continue;
                            }
                            log.warn("ReadTsKvQueryResult for {} {} has queryId not matching the initial query", (Object)entityData.getEntityId().getEntityType(), (Object)entityData.getEntityId());
                        }
                    }
                    keys.forEach(key -> {
                        if (!entityData.getTimeseries().containsKey(key)) {
                            entityData.getTimeseries().put(key, new TsValue[0]);
                        }
                    });
                    if (cmd.isFetchLatestPreviousPoint()) {
                        entityData.getTimeseries().values().forEach(dataArray -> Arrays.sort(dataArray, (o1, o2) -> Long.compare(o2.getTs(), o1.getTs())));
                    }
                }
                catch (InterruptedException | ExecutionException e) {
                    log.warn("[{}][{}][{}] Failed to fetch historical data", new Object[]{ctx.getSessionId(), ctx.getCmdId(), entityData.getEntityId(), e});
                    ctx.sendWsMsg(new EntityDataUpdate(ctx.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR.getCode(), "Failed to fetch historical data!"));
                }
            });
            ctx.getWsLock().lock();
            try {
                EntityDataUpdate update;
                if (!ctx.isInitialDataSent()) {
                    update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null, ctx.getMaxEntitiesPerDataSubscription());
                    ctx.setInitialDataSent(true);
                } else {
                    update = new EntityDataUpdate(ctx.getCmdId(), null, entityDataList, ctx.getMaxEntitiesPerDataSubscription());
                }
                if (subscribe) {
                    ctx.createTimeSeriesSubscriptions(lastTsEntityMap, cmd.getStartTs(), cmd.getEndTs());
                }
                ctx.sendWsMsg(update);
                entityDataList.forEach(EntityData::clearTsAndAggData);
            }
            finally {
                ctx.getWsLock().unlock();
            }
            return ctx;
        }, (Executor)this.wsCallBackExecutor);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleLatestCmd(final TbEntityDataSubCtx ctx, final LatestValueCmd latestCmd) {
        log.trace("[{}][{}] Going to process latest command: {}", new Object[]{ctx.getSessionId(), ctx.getCmdId(), latestCmd});
        if (!this.tsInSqlDB) {
            log.trace("[{}][{}] Going to fetch missing latest values: {}", new Object[]{ctx.getSessionId(), ctx.getCmdId(), latestCmd});
            final List allTsKeys = latestCmd.getKeys().stream().filter(key -> key.getType().equals((Object)EntityKeyType.TIME_SERIES)).map(EntityKey::getKey).collect(Collectors.toList());
            final HashMap<EntityData, ListenableFuture> missingTelemetryFutures = new HashMap<EntityData, ListenableFuture>();
            for (EntityData entityData : ctx.getData().getData()) {
                Map latestEntityData = entityData.getLatest();
                HashMap tsEntityData = (HashMap)latestEntityData.get(EntityKeyType.TIME_SERIES);
                LinkedHashSet missingTsKeys = new LinkedHashSet(allTsKeys);
                if (tsEntityData != null) {
                    missingTsKeys.removeAll(tsEntityData.keySet());
                } else {
                    tsEntityData = new HashMap();
                    latestEntityData.put(EntityKeyType.TIME_SERIES, tsEntityData);
                }
                ListenableFuture missingTsData = this.tsService.findLatest(ctx.getTenantId(), entityData.getEntityId(), missingTsKeys);
                missingTelemetryFutures.put(entityData, Futures.transform((ListenableFuture)missingTsData, this::toTsValue, (Executor)MoreExecutors.directExecutor()));
            }
            Futures.addCallback((ListenableFuture)Futures.allAsList(missingTelemetryFutures.values()), (FutureCallback)new FutureCallback<List<Map<String, TsValue>>>(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void onSuccess(@Nullable List<Map<String, TsValue>> result) {
                    missingTelemetryFutures.forEach((key, value) -> {
                        try {
                            ((Map)key.getLatest().get(EntityKeyType.TIME_SERIES)).putAll((Map)value.get());
                        }
                        catch (InterruptedException | ExecutionException e) {
                            log.warn("[{}][{}] Failed to lookup latest telemetry: {}:{}", new Object[]{ctx.getSessionId(), ctx.getCmdId(), key.getEntityId(), allTsKeys, e});
                        }
                    });
                    ctx.getWsLock().lock();
                    try {
                        EntityDataUpdate update;
                        ctx.createLatestValuesSubscriptions(latestCmd.getKeys());
                        if (!ctx.isInitialDataSent()) {
                            update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null, ctx.getMaxEntitiesPerDataSubscription());
                            ctx.setInitialDataSent(true);
                        } else {
                            List<EntityData> preparedData = ctx.getData().getData().stream().map(entityData -> new EntityData(entityData.getEntityId(), entityData.getLatest(), null)).toList();
                            update = new EntityDataUpdate(ctx.getCmdId(), null, preparedData, ctx.getMaxEntitiesPerDataSubscription());
                        }
                        ctx.sendWsMsg(update);
                    }
                    finally {
                        ctx.getWsLock().unlock();
                    }
                }

                public void onFailure(Throwable t) {
                    log.warn("[{}][{}] Failed to process websocket command: {}:{}", new Object[]{ctx.getSessionId(), ctx.getCmdId(), ctx.getQuery(), latestCmd, t});
                    ctx.sendWsMsg(new EntityDataUpdate(ctx.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR.getCode(), "Failed to process websocket command!"));
                }
            }, (Executor)this.wsCallBackExecutor);
        } else {
            ctx.getWsLock().lock();
            try {
                ctx.createLatestValuesSubscriptions(latestCmd.getKeys());
                this.checkAndSendInitialData(ctx);
            }
            finally {
                ctx.getWsLock().unlock();
            }
        }
    }

    private Map<String, TsValue> toTsValue(List<TsKvEntry> data) {
        return data.stream().collect(Collectors.toMap(KvEntry::getKey, value -> new TsValue(value.getTs(), value.getValueAsString())));
    }

    @Override
    public void cancelSubscription(String sessionId, UnsubscribeCmd cmd) {
        this.cleanupAndCancel((TbAbstractSubCtx)this.getSubCtx(sessionId, cmd.getCmdId()));
    }

    private void cleanupAndCancel(TbAbstractSubCtx ctx) {
        if (ctx != null) {
            Map sessionSubs;
            ctx.stop();
            if (ctx.getSessionId() != null && (sessionSubs = (Map)this.subscriptionsBySessionId.get(ctx.getSessionId())) != null) {
                sessionSubs.remove(ctx.getCmdId());
            }
        }
    }

    @Override
    public void cancelAllSessionSubscriptions(String sessionId) {
        Map sessionSubs = (Map)this.subscriptionsBySessionId.remove(sessionId);
        if (sessionSubs != null) {
            sessionSubs.values().forEach(sub -> {
                try {
                    this.cleanupAndCancel((TbAbstractSubCtx)sub);
                }
                catch (Exception e) {
                    log.warn("[{}] Failed to remove subscription {} due to ", new Object[]{sub.getTenantId(), sub, e});
                }
            });
        }
    }

    private int getLimit(int limit) {
        return limit == 0 ? 100 : limit;
    }

    @Generated
    public DbCallbackExecutorService getDbCallbackExecutor() {
        return this.dbCallbackExecutor;
    }
}

