package org.thingsboard.server.service.subscription;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.web.socket.CloseStatus;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.ReadTsKvQueryResult;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.query.AlarmDataQuery;
import org.thingsboard.server.common.data.query.ComparisonTsValue;
import org.thingsboard.server.common.data.query.EntityData;
import org.thingsboard.server.common.data.query.EntityDataQuery;
import org.thingsboard.server.common.data.query.EntityKeyType;
import org.thingsboard.server.common.data.query.TsValue;
import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
import org.thingsboard.server.dao.alarm.AlarmService;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.entity.EntityService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.ws.WebSocketService;
import org.thingsboard.server.service.ws.WebSocketSessionRef;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AggHistoryCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AggKey;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AggTimeSeriesCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountUpdate;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmDataCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmDataUpdate;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmStatusCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityCountCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUpdate;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityHistoryCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.GetTsCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.LatestValueCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.TimeSeriesCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.UnsubscribeCmd;

@TbCoreComponent
@Service
/* loaded from: input_file:org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.class */
public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubscriptionService {
    private static final Logger log = LoggerFactory.getLogger(DefaultTbEntityDataSubscriptionService.class);
    private static final int DEFAULT_LIMIT = 100;

    @Autowired
    @Lazy
    private WebSocketService wsService;

    @Autowired
    private EntityService entityService;

    @Autowired
    private AlarmService alarmService;

    @Autowired
    private AttributesService attributesService;

    @Autowired
    @Lazy
    private TbLocalSubscriptionService localSubscriptionService;

    @Autowired
    private TimeseriesService tsService;

    @Autowired
    private TbServiceInfoProvider serviceInfoProvider;

    @Autowired
    private DbCallbackExecutorService dbCallbackExecutor;
    private ScheduledExecutorService scheduler;

    @Value("${database.ts.type}")
    private String databaseTsType;

    @Value("${server.ws.dynamic_page_link.refresh_interval:6}")
    private long dynamicPageLinkRefreshInterval;

    @Value("${server.ws.dynamic_page_link.refresh_pool_size:1}")
    private int dynamicPageLinkRefreshPoolSize;

    @Value("${server.ws.max_entities_per_data_subscription:1000}")
    private int maxEntitiesPerDataSubscription;

    @Value("${server.ws.max_entities_per_alarm_subscription:1000}")
    private int maxEntitiesPerAlarmSubscription;

    @Value("${server.ws.dynamic_page_link.max_alarm_queries_per_refresh_interval:10}")
    private int maxAlarmQueriesPerRefreshInterval;

    @Value("${ui.dashboard.max_datapoints_limit:50000}")
    private int maxDatapointLimit;

    @Value("${server.ws.alarms_per_alarm_status_subscription_cache_size:10}")
    private int alarmsPerAlarmStatusSubscriptionCacheSize;
    private ExecutorService wsCallBackExecutor;
    private boolean tsInSqlDB;
    private String serviceId;
    private final ConcurrentMap<String, ConcurrentMap<Integer, TbAbstractSubCtx>> subscriptionsBySessionId = new ConcurrentHashMap();
    private SubscriptionServiceStatistics stats = new SubscriptionServiceStatistics();

    @PostConstruct
    public void initExecutor() {
        this.serviceId = this.serviceInfoProvider.getServiceId();
        this.wsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ws-entity-sub-callback"));
        this.tsInSqlDB = this.databaseTsType.equalsIgnoreCase("sql") || this.databaseTsType.equalsIgnoreCase("timescale");
        if (this.dynamicPageLinkRefreshPoolSize == 1) {
            this.scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("ws-entity-sub-scheduler");
        } else {
            this.scheduler = ThingsBoardExecutors.newScheduledThreadPool(this.dynamicPageLinkRefreshPoolSize, "ws-entity-sub-scheduler");
        }
    }

    @PreDestroy
    public void shutdownExecutor() {
        if (this.wsCallBackExecutor != null) {
            this.wsCallBackExecutor.shutdownNow();
        }
        if (this.scheduler != null) {
            this.scheduler.shutdownNow();
        }
    }

    @Override // org.thingsboard.server.service.subscription.TbEntityDataSubscriptionService
    public void handleCmd(WebSocketSessionRef webSocketSessionRef, final EntityDataCmd entityDataCmd) {
        TbEntityDataSubCtx tbEntityDataSubCtx = (TbEntityDataSubCtx) getSubCtx(webSocketSessionRef.getSessionId(), entityDataCmd.getCmdId());
        if (tbEntityDataSubCtx != null) {
            log.debug("[{}][{}] Updating existing subscriptions using: {}", new Object[]{webSocketSessionRef.getSessionId(), Integer.valueOf(entityDataCmd.getCmdId()), entityDataCmd});
            if (entityDataCmd.hasAnyCmd()) {
                tbEntityDataSubCtx.clearEntitySubscriptions();
            }
        } else {
            log.debug("[{}][{}] Creating new subscription using: {}", new Object[]{webSocketSessionRef.getSessionId(), Integer.valueOf(entityDataCmd.getCmdId()), entityDataCmd});
            tbEntityDataSubCtx = createSubCtx(webSocketSessionRef, entityDataCmd);
        }
        tbEntityDataSubCtx.setCurrentCmd(entityDataCmd);
        if (entityDataCmd.getQuery() != null) {
            if (tbEntityDataSubCtx.getQuery() == 0) {
                log.debug("[{}][{}] Initializing data using query: {}", new Object[]{webSocketSessionRef.getSessionId(), Integer.valueOf(entityDataCmd.getCmdId()), entityDataCmd.getQuery()});
            } else {
                log.debug("[{}][{}] Updating data using query: {}", new Object[]{webSocketSessionRef.getSessionId(), Integer.valueOf(entityDataCmd.getCmdId()), entityDataCmd.getQuery()});
            }
            tbEntityDataSubCtx.setAndResolveQuery(entityDataCmd.getQuery());
            EntityDataQuery entityDataQuery = (EntityDataQuery) tbEntityDataSubCtx.getQuery();
            if (entityDataCmd.getLatestCmd() != null) {
                entityDataCmd.getLatestCmd().getKeys().forEach(entityKey -> {
                    if (entityDataQuery.getLatestValues().contains(entityKey)) {
                        return;
                    }
                    entityDataQuery.getLatestValues().add(entityKey);
                });
            }
            long currentTimeMillis = System.currentTimeMillis();
            tbEntityDataSubCtx.fetchData();
            long currentTimeMillis2 = System.currentTimeMillis();
            this.stats.getRegularQueryInvocationCnt().incrementAndGet();
            this.stats.getRegularQueryTimeSpent().addAndGet(currentTimeMillis2 - currentTimeMillis);
            tbEntityDataSubCtx.cancelTasks();
            if (((EntityDataQuery) tbEntityDataSubCtx.getQuery()).getPageLink().isDynamic()) {
                TbEntityDataSubCtx tbEntityDataSubCtx2 = tbEntityDataSubCtx;
                tbEntityDataSubCtx2.setRefreshTask(this.scheduler.scheduleWithFixedDelay(() -> {
                    refreshDynamicQuery(tbEntityDataSubCtx2);
                }, this.dynamicPageLinkRefreshInterval, this.dynamicPageLinkRefreshInterval, TimeUnit.SECONDS));
            }
        }
        try {
            ArrayList arrayList = new ArrayList();
            if (entityDataCmd.getAggHistoryCmd() != null) {
                arrayList.add(handleAggHistoryCmd(tbEntityDataSubCtx, entityDataCmd.getAggHistoryCmd()));
            }
            if (entityDataCmd.getAggTsCmd() != null) {
                arrayList.add(handleAggTsCmd(tbEntityDataSubCtx, entityDataCmd.getAggTsCmd()));
            }
            if (entityDataCmd.getHistoryCmd() != null) {
                arrayList.add(handleHistoryCmd(tbEntityDataSubCtx, entityDataCmd.getHistoryCmd()));
            }
            if (arrayList.isEmpty()) {
                handleRegularCommands(tbEntityDataSubCtx, entityDataCmd);
            } else {
                final TbEntityDataSubCtx tbEntityDataSubCtx3 = tbEntityDataSubCtx;
                Futures.addCallback(Futures.allAsList(arrayList), new FutureCallback<List<Object>>() { // from class: org.thingsboard.server.service.subscription.DefaultTbEntityDataSubscriptionService.1
                    public void onSuccess(List<Object> list) {
                        DefaultTbEntityDataSubscriptionService.this.handleRegularCommands(tbEntityDataSubCtx3, entityDataCmd);
                    }

                    public void onFailure(Throwable th) {
                        DefaultTbEntityDataSubscriptionService.log.warn("[{}][{}] Failed to process command", tbEntityDataSubCtx3.getSessionId(), Integer.valueOf(tbEntityDataSubCtx3.getCmdId()));
                    }
                }, this.wsCallBackExecutor);
            }
        } catch (RuntimeException e) {
            handleWsCmdRuntimeException(tbEntityDataSubCtx.getSessionId(), e, entityDataCmd);
        }
    }

    private void handleRegularCommands(TbEntityDataSubCtx tbEntityDataSubCtx, EntityDataCmd entityDataCmd) {
        try {
            if (entityDataCmd.getLatestCmd() == null && entityDataCmd.getTsCmd() == null) {
                checkAndSendInitialData(tbEntityDataSubCtx);
            } else {
                if (entityDataCmd.getLatestCmd() != null) {
                    handleLatestCmd(tbEntityDataSubCtx, entityDataCmd.getLatestCmd());
                }
                if (entityDataCmd.getTsCmd() != null) {
                    handleTimeSeriesCmd(tbEntityDataSubCtx, entityDataCmd.getTsCmd());
                }
            }
        } catch (RuntimeException e) {
            handleWsCmdRuntimeException(tbEntityDataSubCtx.getSessionId(), e, entityDataCmd);
        }
    }

    private void checkAndSendInitialData(TbEntityDataSubCtx tbEntityDataSubCtx) {
        if (tbEntityDataSubCtx.isInitialDataSent()) {
            return;
        }
        tbEntityDataSubCtx.sendWsMsg(new EntityDataUpdate(tbEntityDataSubCtx.getCmdId(), tbEntityDataSubCtx.getData(), null, tbEntityDataSubCtx.getMaxEntitiesPerDataSubscription()));
        tbEntityDataSubCtx.setInitialDataSent(true);
    }

    private ListenableFuture<TbEntityDataSubCtx> handleAggHistoryCmd(TbEntityDataSubCtx tbEntityDataSubCtx, AggHistoryCmd aggHistoryCmd) {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        for (AggKey aggKey : aggHistoryCmd.getKeys()) {
            if (aggKey.getPreviousValueOnly() == null || !aggKey.getPreviousValueOnly().booleanValue()) {
                BaseReadTsKvQuery baseReadTsKvQuery = new BaseReadTsKvQuery(aggKey.getKey(), aggHistoryCmd.getStartTs(), aggHistoryCmd.getEndTs(), aggHistoryCmd.getEndTs() - aggHistoryCmd.getStartTs(), 1, aggKey.getAgg());
                concurrentHashMap.put(Integer.valueOf(baseReadTsKvQuery.getId()), new ReadTsKvQueryInfo(aggKey, baseReadTsKvQuery, false));
            }
            if (aggKey.getPreviousStartTs() != null && aggKey.getPreviousEndTs() != null && aggKey.getPreviousEndTs().longValue() >= aggKey.getPreviousStartTs().longValue()) {
                BaseReadTsKvQuery baseReadTsKvQuery2 = new BaseReadTsKvQuery(aggKey.getKey(), aggKey.getPreviousStartTs().longValue(), aggKey.getPreviousEndTs().longValue(), aggKey.getPreviousEndTs().longValue() - aggKey.getPreviousStartTs().longValue(), 1, aggKey.getAgg());
                concurrentHashMap.put(Integer.valueOf(baseReadTsKvQuery2.getId()), new ReadTsKvQueryInfo(aggKey, baseReadTsKvQuery2, true));
            }
        }
        return handleAggCmd(tbEntityDataSubCtx, aggHistoryCmd.getKeys(), concurrentHashMap, aggHistoryCmd.getStartTs(), aggHistoryCmd.getEndTs(), false);
    }

    private ListenableFuture<TbEntityDataSubCtx> handleAggTsCmd(TbEntityDataSubCtx tbEntityDataSubCtx, AggTimeSeriesCmd aggTimeSeriesCmd) {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        for (AggKey aggKey : aggTimeSeriesCmd.getKeys()) {
            BaseReadTsKvQuery baseReadTsKvQuery = new BaseReadTsKvQuery(aggKey.getKey(), aggTimeSeriesCmd.getStartTs(), aggTimeSeriesCmd.getStartTs() + aggTimeSeriesCmd.getTimeWindow(), aggTimeSeriesCmd.getTimeWindow(), 1, aggKey.getAgg());
            concurrentHashMap.put(Integer.valueOf(baseReadTsKvQuery.getId()), new ReadTsKvQueryInfo(aggKey, baseReadTsKvQuery, false));
        }
        return handleAggCmd(tbEntityDataSubCtx, aggTimeSeriesCmd.getKeys(), concurrentHashMap, aggTimeSeriesCmd.getStartTs(), aggTimeSeriesCmd.getStartTs() + aggTimeSeriesCmd.getTimeWindow(), true);
    }

    private ListenableFuture<TbEntityDataSubCtx> handleAggCmd(TbEntityDataSubCtx tbEntityDataSubCtx, List<AggKey> list, ConcurrentMap<Integer, ReadTsKvQueryInfo> concurrentMap, long j, long j2, boolean z) {
        HashMap hashMap = new HashMap();
        List data = tbEntityDataSubCtx.getData().getData();
        List list2 = (List) concurrentMap.values().stream().map((v0) -> {
            return v0.getQuery();
        }).collect(Collectors.toList());
        data.forEach(entityData -> {
            hashMap.put(entityData, this.tsService.findAllByQueries(tbEntityDataSubCtx.getTenantId(), entityData.getEntityId(), list2));
        });
        return Futures.transform(Futures.allAsList(hashMap.values()), list3 -> {
            EntityDataUpdate entityDataUpdate;
            HashMap hashMap2 = new HashMap();
            hashMap.forEach((entityData2, listenableFuture) -> {
                try {
                    HashMap hashMap3 = new HashMap();
                    hashMap2.put(entityData2, hashMap3);
                    List<ReadTsKvQueryResult> list3 = (List) listenableFuture.get();
                    if (list3 != null) {
                        for (ReadTsKvQueryResult readTsKvQueryResult : list3) {
                            ReadTsKvQueryInfo readTsKvQueryInfo = (ReadTsKvQueryInfo) concurrentMap.get(Integer.valueOf(readTsKvQueryResult.getQueryId()));
                            ComparisonTsValue comparisonTsValue = (ComparisonTsValue) entityData2.getAggLatest().computeIfAbsent(Integer.valueOf(readTsKvQueryInfo.getKey().getId()), num -> {
                                return new ComparisonTsValue();
                            });
                            if (readTsKvQueryInfo.isPrevious()) {
                                comparisonTsValue.setPrevious(readTsKvQueryResult.toTsValue(readTsKvQueryInfo.getQuery()));
                            } else {
                                comparisonTsValue.setCurrent(readTsKvQueryResult.toTsValue(readTsKvQueryInfo.getQuery()));
                                hashMap3.put(readTsKvQueryInfo.getQuery().getKey(), Long.valueOf(readTsKvQueryResult.getLastEntryTs()));
                            }
                        }
                    }
                    list.forEach(aggKey -> {
                        entityData2.getAggLatest().putIfAbsent(Integer.valueOf(aggKey.getId()), new ComparisonTsValue(TsValue.EMPTY, TsValue.EMPTY));
                    });
                } catch (InterruptedException | ExecutionException e) {
                    log.warn("[{}][{}][{}] Failed to fetch historical data", new Object[]{tbEntityDataSubCtx.getSessionId(), Integer.valueOf(tbEntityDataSubCtx.getCmdId()), entityData2.getEntityId(), e});
                    tbEntityDataSubCtx.sendWsMsg(new EntityDataUpdate(tbEntityDataSubCtx.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR.getCode(), "Failed to fetch historical data!"));
                }
            });
            tbEntityDataSubCtx.getWsLock().lock();
            try {
                if (tbEntityDataSubCtx.isInitialDataSent()) {
                    entityDataUpdate = new EntityDataUpdate(tbEntityDataSubCtx.getCmdId(), null, data, tbEntityDataSubCtx.getMaxEntitiesPerDataSubscription());
                } else {
                    entityDataUpdate = new EntityDataUpdate(tbEntityDataSubCtx.getCmdId(), tbEntityDataSubCtx.getData(), null, tbEntityDataSubCtx.getMaxEntitiesPerDataSubscription());
                    tbEntityDataSubCtx.setInitialDataSent(true);
                }
                if (z) {
                    tbEntityDataSubCtx.createTimeSeriesSubscriptions(hashMap2, j, j2, true);
                }
                tbEntityDataSubCtx.sendWsMsg(entityDataUpdate);
                data.forEach((v0) -> {
                    v0.clearTsAndAggData();
                });
                tbEntityDataSubCtx.getWsLock().unlock();
                return tbEntityDataSubCtx;
            } catch (Throwable th) {
                tbEntityDataSubCtx.getWsLock().unlock();
                throw th;
            }
        }, this.wsCallBackExecutor);
    }

    private void handleWsCmdRuntimeException(String str, RuntimeException runtimeException, EntityDataCmd entityDataCmd) {
        log.debug("[{}] Failed to process ws cmd: {}", new Object[]{str, entityDataCmd, runtimeException});
        if (runtimeException instanceof TbRateLimitsException) {
            return;
        }
        this.wsService.close(str, CloseStatus.SERVICE_RESTARTED);
    }

    @Override // org.thingsboard.server.service.subscription.TbEntityDataSubscriptionService
    public void handleCmd(WebSocketSessionRef webSocketSessionRef, EntityCountCmd entityCountCmd) {
        if (((TbEntityCountSubCtx) getSubCtx(webSocketSessionRef.getSessionId(), entityCountCmd.getCmdId())) != null) {
            log.debug("[{}][{}] Received duplicate command: {}", new Object[]{webSocketSessionRef.getSessionId(), Integer.valueOf(entityCountCmd.getCmdId()), entityCountCmd});
            return;
        }
        TbEntityCountSubCtx createSubCtx = createSubCtx(webSocketSessionRef, entityCountCmd);
        long currentTimeMillis = System.currentTimeMillis();
        createSubCtx.fetchData();
        long currentTimeMillis2 = System.currentTimeMillis();
        this.stats.getRegularQueryInvocationCnt().incrementAndGet();
        this.stats.getRegularQueryTimeSpent().addAndGet(currentTimeMillis2 - currentTimeMillis);
        createSubCtx.setRefreshTask(this.scheduler.scheduleWithFixedDelay(() -> {
            refreshDynamicQuery(createSubCtx);
        }, this.dynamicPageLinkRefreshInterval, this.dynamicPageLinkRefreshInterval, TimeUnit.SECONDS));
    }

    @Override // org.thingsboard.server.service.subscription.TbEntityDataSubscriptionService
    public void handleCmd(WebSocketSessionRef webSocketSessionRef, AlarmDataCmd alarmDataCmd) {
        TbAlarmDataSubCtx tbAlarmDataSubCtx = (TbAlarmDataSubCtx) getSubCtx(webSocketSessionRef.getSessionId(), alarmDataCmd.getCmdId());
        if (tbAlarmDataSubCtx == null) {
            log.debug("[{}][{}] Creating new alarm subscription using: {}", new Object[]{webSocketSessionRef.getSessionId(), Integer.valueOf(alarmDataCmd.getCmdId()), alarmDataCmd});
            tbAlarmDataSubCtx = createSubCtx(webSocketSessionRef, alarmDataCmd);
        }
        tbAlarmDataSubCtx.setAndResolveQuery(alarmDataCmd.getQuery());
        AlarmDataQuery alarmDataQuery = (AlarmDataQuery) tbAlarmDataSubCtx.getQuery();
        long currentTimeMillis = System.currentTimeMillis();
        tbAlarmDataSubCtx.fetchData();
        long currentTimeMillis2 = System.currentTimeMillis();
        this.stats.getRegularQueryInvocationCnt().incrementAndGet();
        this.stats.getRegularQueryTimeSpent().addAndGet(currentTimeMillis2 - currentTimeMillis);
        List<EntityData> entitiesData = tbAlarmDataSubCtx.getEntitiesData();
        tbAlarmDataSubCtx.cancelTasks();
        tbAlarmDataSubCtx.clearEntitySubscriptions();
        if (entitiesData.isEmpty()) {
            tbAlarmDataSubCtx.sendWsMsg(new AlarmDataUpdate(alarmDataCmd.getCmdId(), new PageData(), null, 0L, 0L));
            return;
        }
        tbAlarmDataSubCtx.fetchAlarms();
        tbAlarmDataSubCtx.createLatestValuesSubscriptions(alarmDataCmd.getQuery().getLatestValues());
        if (alarmDataQuery.getPageLink().getTimeWindow() > 0) {
            TbAlarmDataSubCtx tbAlarmDataSubCtx2 = tbAlarmDataSubCtx;
            tbAlarmDataSubCtx2.setRefreshTask(this.scheduler.scheduleWithFixedDelay(() -> {
                refreshAlarmQuery(tbAlarmDataSubCtx2);
            }, this.dynamicPageLinkRefreshInterval, this.dynamicPageLinkRefreshInterval, TimeUnit.SECONDS));
        }
    }

    @Override // org.thingsboard.server.service.subscription.TbEntityDataSubscriptionService
    public void handleCmd(WebSocketSessionRef webSocketSessionRef, AlarmCountCmd alarmCountCmd) {
        if (((TbAlarmCountSubCtx) getSubCtx(webSocketSessionRef.getSessionId(), alarmCountCmd.getCmdId())) != null) {
            log.debug("[{}][{}] Received duplicate command: {}", new Object[]{webSocketSessionRef.getSessionId(), Integer.valueOf(alarmCountCmd.getCmdId()), alarmCountCmd});
            return;
        }
        TbAlarmCountSubCtx createSubCtx = createSubCtx(webSocketSessionRef, alarmCountCmd);
        long currentTimeMillis = System.currentTimeMillis();
        createSubCtx.fetchData();
        long currentTimeMillis2 = System.currentTimeMillis();
        this.stats.getRegularQueryInvocationCnt().incrementAndGet();
        this.stats.getRegularQueryTimeSpent().addAndGet(currentTimeMillis2 - currentTimeMillis);
        LinkedHashSet<EntityId> entitiesIds = createSubCtx.getEntitiesIds();
        createSubCtx.cancelTasks();
        createSubCtx.clearAlarmSubscriptions();
        if (entitiesIds != null && entitiesIds.isEmpty()) {
            createSubCtx.sendWsMsg(new AlarmCountUpdate(alarmCountCmd.getCmdId(), 0));
            return;
        }
        createSubCtx.doFetchAlarmCount();
        if (entitiesIds != null) {
            createSubCtx.createAlarmSubscriptions();
        }
        createSubCtx.setRefreshTask(this.scheduler.scheduleWithFixedDelay(() -> {
            refreshDynamicQuery(createSubCtx);
        }, this.dynamicPageLinkRefreshInterval, this.dynamicPageLinkRefreshInterval, TimeUnit.SECONDS));
    }

    @Override // org.thingsboard.server.service.subscription.TbEntityDataSubscriptionService
    public void handleCmd(WebSocketSessionRef webSocketSessionRef, AlarmStatusCmd alarmStatusCmd) {
        log.debug("[{}] Handling alarm status subscription cmd (cmdId: {})", webSocketSessionRef.getSessionId(), Integer.valueOf(alarmStatusCmd.getCmdId()));
        if (((TbAlarmStatusSubCtx) getSubCtx(webSocketSessionRef.getSessionId(), alarmStatusCmd.getCmdId())) != null) {
            log.debug("[{}][{}] Received duplicate command: {}", new Object[]{webSocketSessionRef.getSessionId(), Integer.valueOf(alarmStatusCmd.getCmdId()), alarmStatusCmd});
            return;
        }
        TbAlarmStatusSubCtx createSubCtx = createSubCtx(webSocketSessionRef, alarmStatusCmd);
        long currentTimeMillis = System.currentTimeMillis();
        createSubCtx.fetchActiveAlarms();
        long currentTimeMillis2 = System.currentTimeMillis();
        this.stats.getAlarmQueryInvocationCnt().incrementAndGet();
        this.stats.getAlarmQueryTimeSpent().addAndGet(currentTimeMillis2 - currentTimeMillis);
        createSubCtx.sendUpdate();
    }

    private boolean validate(TbAbstractSubCtx tbAbstractSubCtx) {
        if (tbAbstractSubCtx.isStopped()) {
            log.warn("[{}][{}][{}] Received validation task for already stopped context.", new Object[]{tbAbstractSubCtx.getTenantId(), tbAbstractSubCtx.getSessionId(), Integer.valueOf(tbAbstractSubCtx.getCmdId())});
            return false;
        }
        ConcurrentMap<Integer, TbAbstractSubCtx> concurrentMap = this.subscriptionsBySessionId.get(tbAbstractSubCtx.getSessionId());
        if (concurrentMap == null) {
            log.warn("[{}][{}][{}] Received validation task for already removed session.", new Object[]{tbAbstractSubCtx.getTenantId(), tbAbstractSubCtx.getSessionId(), Integer.valueOf(tbAbstractSubCtx.getCmdId())});
            return false;
        }
        if (concurrentMap.containsKey(Integer.valueOf(tbAbstractSubCtx.getCmdId()))) {
            return true;
        }
        log.warn("[{}][{}][{}] Received validation task for unregistered cmdId.", new Object[]{tbAbstractSubCtx.getTenantId(), tbAbstractSubCtx.getSessionId(), Integer.valueOf(tbAbstractSubCtx.getCmdId())});
        return false;
    }

    private void refreshDynamicQuery(TbAbstractEntityQuerySubCtx<?> tbAbstractEntityQuerySubCtx) {
        try {
            if (validate(tbAbstractEntityQuerySubCtx)) {
                long currentTimeMillis = System.currentTimeMillis();
                tbAbstractEntityQuerySubCtx.update();
                long currentTimeMillis2 = System.currentTimeMillis();
                log.trace("[{}][{}] Executing query: {}", new Object[]{tbAbstractEntityQuerySubCtx.getSessionId(), Integer.valueOf(tbAbstractEntityQuerySubCtx.getCmdId()), tbAbstractEntityQuerySubCtx.getQuery()});
                this.stats.getDynamicQueryInvocationCnt().incrementAndGet();
                this.stats.getDynamicQueryTimeSpent().addAndGet(currentTimeMillis2 - currentTimeMillis);
            } else {
                tbAbstractEntityQuerySubCtx.stop();
            }
        } catch (Exception e) {
            log.warn("[{}][{}] Failed to refresh query", new Object[]{tbAbstractEntityQuerySubCtx.getSessionId(), Integer.valueOf(tbAbstractEntityQuerySubCtx.getCmdId()), e});
        }
    }

    private void refreshAlarmQuery(TbAlarmDataSubCtx tbAlarmDataSubCtx) {
        if (validate(tbAlarmDataSubCtx)) {
            tbAlarmDataSubCtx.checkAndResetInvocationCounter();
        } else {
            tbAlarmDataSubCtx.stop();
        }
    }

    @Scheduled(fixedDelayString = "${server.ws.dynamic_page_link.stats:10000}")
    public void printStats() {
        int andSet = this.stats.getAlarmQueryInvocationCnt().getAndSet(0);
        long andSet2 = this.stats.getAlarmQueryTimeSpent().getAndSet(0L);
        int andSet3 = this.stats.getRegularQueryInvocationCnt().getAndSet(0);
        long andSet4 = this.stats.getRegularQueryTimeSpent().getAndSet(0L);
        int andSet5 = this.stats.getDynamicQueryInvocationCnt().getAndSet(0);
        long andSet6 = this.stats.getDynamicQueryTimeSpent().getAndSet(0L);
        long sum = this.subscriptionsBySessionId.values().stream().mapToLong(concurrentMap -> {
            return concurrentMap.values().stream().filter((v0) -> {
                return v0.isDynamic();
            }).count();
        }).sum();
        if (andSet3 > 0 || andSet5 > 0 || sum > 0 || andSet > 0) {
            log.info("Stats: regularQueryInvocationCnt = [{}], regularQueryInvocationTime = [{}], dynamicQueryCnt = [{}] dynamicQueryInvocationCnt = [{}], dynamicQueryInvocationTime = [{}], alarmQueryInvocationCnt = [{}], alarmQueryInvocationTime = [{}]", new Object[]{Integer.valueOf(andSet3), Long.valueOf(andSet4), Long.valueOf(sum), Integer.valueOf(andSet5), Long.valueOf(andSet6), Integer.valueOf(andSet), Long.valueOf(andSet2)});
        }
    }

    private TbEntityDataSubCtx createSubCtx(WebSocketSessionRef webSocketSessionRef, EntityDataCmd entityDataCmd) {
        ConcurrentMap<Integer, TbAbstractSubCtx> computeIfAbsent = this.subscriptionsBySessionId.computeIfAbsent(webSocketSessionRef.getSessionId(), str -> {
            return new ConcurrentHashMap();
        });
        TbEntityDataSubCtx tbEntityDataSubCtx = new TbEntityDataSubCtx(this.serviceId, this.wsService, this.entityService, this.localSubscriptionService, this.attributesService, this.stats, webSocketSessionRef, entityDataCmd.getCmdId(), this.maxEntitiesPerDataSubscription);
        if (entityDataCmd.getQuery() != null) {
            tbEntityDataSubCtx.setAndResolveQuery(entityDataCmd.getQuery());
        }
        computeIfAbsent.put(Integer.valueOf(entityDataCmd.getCmdId()), tbEntityDataSubCtx);
        return tbEntityDataSubCtx;
    }

    private TbEntityCountSubCtx createSubCtx(WebSocketSessionRef webSocketSessionRef, EntityCountCmd entityCountCmd) {
        ConcurrentMap<Integer, TbAbstractSubCtx> computeIfAbsent = this.subscriptionsBySessionId.computeIfAbsent(webSocketSessionRef.getSessionId(), str -> {
            return new ConcurrentHashMap();
        });
        TbEntityCountSubCtx tbEntityCountSubCtx = new TbEntityCountSubCtx(this.serviceId, this.wsService, this.entityService, this.localSubscriptionService, this.attributesService, this.stats, webSocketSessionRef, entityCountCmd.getCmdId());
        if (entityCountCmd.getQuery() != null) {
            tbEntityCountSubCtx.setAndResolveQuery(entityCountCmd.getQuery());
        }
        computeIfAbsent.put(Integer.valueOf(entityCountCmd.getCmdId()), tbEntityCountSubCtx);
        return tbEntityCountSubCtx;
    }

    private TbAlarmDataSubCtx createSubCtx(WebSocketSessionRef webSocketSessionRef, AlarmDataCmd alarmDataCmd) {
        ConcurrentMap<Integer, TbAbstractSubCtx> computeIfAbsent = this.subscriptionsBySessionId.computeIfAbsent(webSocketSessionRef.getSessionId(), str -> {
            return new ConcurrentHashMap();
        });
        TbAlarmDataSubCtx tbAlarmDataSubCtx = new TbAlarmDataSubCtx(this.serviceId, this.wsService, this.entityService, this.localSubscriptionService, this.attributesService, this.stats, this.alarmService, webSocketSessionRef, alarmDataCmd.getCmdId(), this.maxEntitiesPerAlarmSubscription, this.maxAlarmQueriesPerRefreshInterval);
        tbAlarmDataSubCtx.setAndResolveQuery(alarmDataCmd.getQuery());
        computeIfAbsent.put(Integer.valueOf(alarmDataCmd.getCmdId()), tbAlarmDataSubCtx);
        return tbAlarmDataSubCtx;
    }

    private TbAlarmCountSubCtx createSubCtx(WebSocketSessionRef webSocketSessionRef, AlarmCountCmd alarmCountCmd) {
        ConcurrentMap<Integer, TbAbstractSubCtx> computeIfAbsent = this.subscriptionsBySessionId.computeIfAbsent(webSocketSessionRef.getSessionId(), str -> {
            return new ConcurrentHashMap();
        });
        TbAlarmCountSubCtx tbAlarmCountSubCtx = new TbAlarmCountSubCtx(this.serviceId, this.wsService, this.entityService, this.localSubscriptionService, this.attributesService, this.stats, this.alarmService, webSocketSessionRef, alarmCountCmd.getCmdId(), this.maxEntitiesPerAlarmSubscription, this.maxAlarmQueriesPerRefreshInterval);
        if (alarmCountCmd.getQuery() != null) {
            tbAlarmCountSubCtx.setAndResolveQuery(alarmCountCmd.getQuery());
        }
        computeIfAbsent.put(Integer.valueOf(alarmCountCmd.getCmdId()), tbAlarmCountSubCtx);
        return tbAlarmCountSubCtx;
    }

    private TbAlarmStatusSubCtx createSubCtx(WebSocketSessionRef webSocketSessionRef, AlarmStatusCmd alarmStatusCmd) {
        ConcurrentMap<Integer, TbAbstractSubCtx> computeIfAbsent = this.subscriptionsBySessionId.computeIfAbsent(webSocketSessionRef.getSessionId(), str -> {
            return new ConcurrentHashMap();
        });
        TbAlarmStatusSubCtx tbAlarmStatusSubCtx = new TbAlarmStatusSubCtx(this.serviceId, this.wsService, this.localSubscriptionService, this.stats, this.alarmService, this.alarmsPerAlarmStatusSubscriptionCacheSize, webSocketSessionRef, alarmStatusCmd.getCmdId());
        tbAlarmStatusSubCtx.createSubscription(alarmStatusCmd);
        computeIfAbsent.put(Integer.valueOf(alarmStatusCmd.getCmdId()), tbAlarmStatusSubCtx);
        return tbAlarmStatusSubCtx;
    }

    private <T extends TbAbstractSubCtx> T getSubCtx(String str, int i) {
        ConcurrentMap<Integer, TbAbstractSubCtx> concurrentMap = this.subscriptionsBySessionId.get(str);
        if (concurrentMap != null) {
            return (T) concurrentMap.get(Integer.valueOf(i));
        }
        return null;
    }

    private ListenableFuture<TbEntityDataSubCtx> handleTimeSeriesCmd(TbEntityDataSubCtx tbEntityDataSubCtx, TimeSeriesCmd timeSeriesCmd) {
        log.debug("[{}][{}] Fetching time-series data for last {} ms for keys: ({})", new Object[]{tbEntityDataSubCtx.getSessionId(), Integer.valueOf(tbEntityDataSubCtx.getCmdId()), Long.valueOf(timeSeriesCmd.getTimeWindow()), timeSeriesCmd.getKeys()});
        return handleGetTsCmd(tbEntityDataSubCtx, timeSeriesCmd, true);
    }

    private ListenableFuture<TbEntityDataSubCtx> handleHistoryCmd(TbEntityDataSubCtx tbEntityDataSubCtx, EntityHistoryCmd entityHistoryCmd) {
        log.debug("[{}][{}] Fetching history data for start {} and end {} ms for keys: ({})", new Object[]{tbEntityDataSubCtx.getSessionId(), Integer.valueOf(tbEntityDataSubCtx.getCmdId()), Long.valueOf(entityHistoryCmd.getStartTs()), Long.valueOf(entityHistoryCmd.getEndTs()), entityHistoryCmd.getKeys()});
        return handleGetTsCmd(tbEntityDataSubCtx, entityHistoryCmd, false);
    }

    private ListenableFuture<TbEntityDataSubCtx> handleGetTsCmd(TbEntityDataSubCtx tbEntityDataSubCtx, GetTsCmd getTsCmd, boolean z) {
        List list;
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        List<String> keys = getTsCmd.getKeys();
        List list2 = (List) keys.stream().map(str -> {
            BaseReadTsKvQuery baseReadTsKvQuery = new BaseReadTsKvQuery(str, getTsCmd.getStartTs(), getTsCmd.getEndTs(), getTsCmd.toAggregationParams(), getLimit(getTsCmd.getLimit()));
            concurrentHashMap.put(Integer.valueOf(baseReadTsKvQuery.getId()), baseReadTsKvQuery.getKey());
            return baseReadTsKvQuery;
        }).collect(Collectors.toList());
        if (getTsCmd.isFetchLatestPreviousPoint()) {
            list = new ArrayList(list2);
            list.addAll((Collection) keys.stream().map(str2 -> {
                BaseReadTsKvQuery baseReadTsKvQuery = new BaseReadTsKvQuery(str2, getTsCmd.getStartTs() - TimeUnit.DAYS.toMillis(365L), getTsCmd.getStartTs(), getTsCmd.toAggregationParams(), 1);
                concurrentHashMap.put(Integer.valueOf(baseReadTsKvQuery.getId()), baseReadTsKvQuery.getKey());
                return baseReadTsKvQuery;
            }).collect(Collectors.toList()));
        } else {
            list = list2;
        }
        HashMap hashMap = new HashMap();
        List data = tbEntityDataSubCtx.getData().getData();
        List list3 = list;
        data.forEach(entityData -> {
            hashMap.put(entityData, this.tsService.findAllByQueries(tbEntityDataSubCtx.getTenantId(), entityData.getEntityId(), list3));
        });
        return Futures.transform(Futures.allAsList(hashMap.values()), list4 -> {
            EntityDataUpdate entityDataUpdate;
            HashMap hashMap2 = new HashMap();
            hashMap.forEach((entityData2, listenableFuture) -> {
                try {
                    HashMap hashMap3 = new HashMap();
                    hashMap2.put(entityData2, hashMap3);
                    List<ReadTsKvQueryResult> list4 = (List) listenableFuture.get();
                    if (list4 != null) {
                        for (ReadTsKvQueryResult readTsKvQueryResult : list4) {
                            String str3 = (String) concurrentHashMap.get(Integer.valueOf(readTsKvQueryResult.getQueryId()));
                            if (str3 != null) {
                                entityData2.getTimeseries().merge(str3, readTsKvQueryResult.toTsValues(), (v0, v1) -> {
                                    return ArrayUtils.addAll(v0, v1);
                                });
                                hashMap3.merge(str3, Long.valueOf(readTsKvQueryResult.getLastEntryTs()), (v0, v1) -> {
                                    return Math.max(v0, v1);
                                });
                            } else {
                                log.warn("ReadTsKvQueryResult for {} {} has queryId not matching the initial query", entityData2.getEntityId().getEntityType(), entityData2.getEntityId());
                            }
                        }
                    }
                    keys.forEach(str4 -> {
                        if (entityData2.getTimeseries().containsKey(str4)) {
                            return;
                        }
                        entityData2.getTimeseries().put(str4, new TsValue[0]);
                    });
                    if (getTsCmd.isFetchLatestPreviousPoint()) {
                        entityData2.getTimeseries().values().forEach(tsValueArr -> {
                            Arrays.sort(tsValueArr, (tsValue, tsValue2) -> {
                                return Long.compare(tsValue2.getTs(), tsValue.getTs());
                            });
                        });
                    }
                } catch (InterruptedException | ExecutionException e) {
                    log.warn("[{}][{}][{}] Failed to fetch historical data", new Object[]{tbEntityDataSubCtx.getSessionId(), Integer.valueOf(tbEntityDataSubCtx.getCmdId()), entityData2.getEntityId(), e});
                    tbEntityDataSubCtx.sendWsMsg(new EntityDataUpdate(tbEntityDataSubCtx.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR.getCode(), "Failed to fetch historical data!"));
                }
            });
            tbEntityDataSubCtx.getWsLock().lock();
            try {
                if (tbEntityDataSubCtx.isInitialDataSent()) {
                    entityDataUpdate = new EntityDataUpdate(tbEntityDataSubCtx.getCmdId(), null, data, tbEntityDataSubCtx.getMaxEntitiesPerDataSubscription());
                } else {
                    entityDataUpdate = new EntityDataUpdate(tbEntityDataSubCtx.getCmdId(), tbEntityDataSubCtx.getData(), null, tbEntityDataSubCtx.getMaxEntitiesPerDataSubscription());
                    tbEntityDataSubCtx.setInitialDataSent(true);
                }
                if (z) {
                    tbEntityDataSubCtx.createTimeSeriesSubscriptions(hashMap2, getTsCmd.getStartTs(), getTsCmd.getEndTs());
                }
                tbEntityDataSubCtx.sendWsMsg(entityDataUpdate);
                data.forEach((v0) -> {
                    v0.clearTsAndAggData();
                });
                tbEntityDataSubCtx.getWsLock().unlock();
                return tbEntityDataSubCtx;
            } catch (Throwable th) {
                tbEntityDataSubCtx.getWsLock().unlock();
                throw th;
            }
        }, this.wsCallBackExecutor);
    }

    private void handleLatestCmd(final TbEntityDataSubCtx tbEntityDataSubCtx, final LatestValueCmd latestValueCmd) {
        log.trace("[{}][{}] Going to process latest command: {}", new Object[]{tbEntityDataSubCtx.getSessionId(), Integer.valueOf(tbEntityDataSubCtx.getCmdId()), latestValueCmd});
        if (this.tsInSqlDB) {
            tbEntityDataSubCtx.getWsLock().lock();
            try {
                tbEntityDataSubCtx.createLatestValuesSubscriptions(latestValueCmd.getKeys());
                checkAndSendInitialData(tbEntityDataSubCtx);
                tbEntityDataSubCtx.getWsLock().unlock();
                return;
            } catch (Throwable th) {
                tbEntityDataSubCtx.getWsLock().unlock();
                throw th;
            }
        }
        log.trace("[{}][{}] Going to fetch missing latest values: {}", new Object[]{tbEntityDataSubCtx.getSessionId(), Integer.valueOf(tbEntityDataSubCtx.getCmdId()), latestValueCmd});
        final List list = (List) latestValueCmd.getKeys().stream().filter(entityKey -> {
            return entityKey.getType().equals(EntityKeyType.TIME_SERIES);
        }).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toList());
        final HashMap hashMap = new HashMap();
        for (EntityData entityData : tbEntityDataSubCtx.getData().getData()) {
            Map latest = entityData.getLatest();
            Map map = (Map) latest.get(EntityKeyType.TIME_SERIES);
            LinkedHashSet linkedHashSet = new LinkedHashSet(list);
            if (map != null) {
                linkedHashSet.removeAll(map.keySet());
            } else {
                latest.put(EntityKeyType.TIME_SERIES, new HashMap());
            }
            hashMap.put(entityData, Futures.transform(this.tsService.findLatest(tbEntityDataSubCtx.getTenantId(), entityData.getEntityId(), linkedHashSet), this::toTsValue, MoreExecutors.directExecutor()));
        }
        Futures.addCallback(Futures.allAsList(hashMap.values()), new FutureCallback<List<Map<String, TsValue>>>() { // from class: org.thingsboard.server.service.subscription.DefaultTbEntityDataSubscriptionService.2
            public void onSuccess(List<Map<String, TsValue>> list2) {
                EntityDataUpdate entityDataUpdate;
                Map map2 = hashMap;
                TbEntityDataSubCtx tbEntityDataSubCtx2 = tbEntityDataSubCtx;
                List list3 = list;
                map2.forEach((entityData2, listenableFuture) -> {
                    try {
                        ((Map) entityData2.getLatest().get(EntityKeyType.TIME_SERIES)).putAll((Map) listenableFuture.get());
                    } catch (InterruptedException | ExecutionException e) {
                        DefaultTbEntityDataSubscriptionService.log.warn("[{}][{}] Failed to lookup latest telemetry: {}:{}", new Object[]{tbEntityDataSubCtx2.getSessionId(), Integer.valueOf(tbEntityDataSubCtx2.getCmdId()), entityData2.getEntityId(), list3, e});
                    }
                });
                tbEntityDataSubCtx.getWsLock().lock();
                try {
                    tbEntityDataSubCtx.createLatestValuesSubscriptions(latestValueCmd.getKeys());
                    if (tbEntityDataSubCtx.isInitialDataSent()) {
                        entityDataUpdate = new EntityDataUpdate(tbEntityDataSubCtx.getCmdId(), null, tbEntityDataSubCtx.getData().getData().stream().map(entityData3 -> {
                            return new EntityData(entityData3.getEntityId(), entityData3.getLatest(), (Map) null);
                        }).toList(), tbEntityDataSubCtx.getMaxEntitiesPerDataSubscription());
                    } else {
                        entityDataUpdate = new EntityDataUpdate(tbEntityDataSubCtx.getCmdId(), tbEntityDataSubCtx.getData(), null, tbEntityDataSubCtx.getMaxEntitiesPerDataSubscription());
                        tbEntityDataSubCtx.setInitialDataSent(true);
                    }
                    tbEntityDataSubCtx.sendWsMsg(entityDataUpdate);
                    tbEntityDataSubCtx.getWsLock().unlock();
                } catch (Throwable th2) {
                    tbEntityDataSubCtx.getWsLock().unlock();
                    throw th2;
                }
            }

            public void onFailure(Throwable th2) {
                DefaultTbEntityDataSubscriptionService.log.warn("[{}][{}] Failed to process websocket command: {}:{}", new Object[]{tbEntityDataSubCtx.getSessionId(), Integer.valueOf(tbEntityDataSubCtx.getCmdId()), tbEntityDataSubCtx.getQuery(), latestValueCmd, th2});
                tbEntityDataSubCtx.sendWsMsg(new EntityDataUpdate(tbEntityDataSubCtx.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR.getCode(), "Failed to process websocket command!"));
            }
        }, this.wsCallBackExecutor);
    }

    private Map<String, TsValue> toTsValue(List<TsKvEntry> list) {
        return (Map) list.stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, tsKvEntry -> {
            return new TsValue(tsKvEntry.getTs(), tsKvEntry.getValueAsString());
        }));
    }

    @Override // org.thingsboard.server.service.subscription.TbEntityDataSubscriptionService
    public void cancelSubscription(String str, UnsubscribeCmd unsubscribeCmd) {
        cleanupAndCancel(getSubCtx(str, unsubscribeCmd.getCmdId()));
    }

    private void cleanupAndCancel(TbAbstractSubCtx tbAbstractSubCtx) {
        ConcurrentMap<Integer, TbAbstractSubCtx> concurrentMap;
        if (tbAbstractSubCtx != null) {
            tbAbstractSubCtx.stop();
            if (tbAbstractSubCtx.getSessionId() == null || (concurrentMap = this.subscriptionsBySessionId.get(tbAbstractSubCtx.getSessionId())) == null) {
                return;
            }
            concurrentMap.remove(Integer.valueOf(tbAbstractSubCtx.getCmdId()));
        }
    }

    @Override // org.thingsboard.server.service.subscription.TbEntityDataSubscriptionService
    public void cancelAllSessionSubscriptions(String str) {
        ConcurrentMap<Integer, TbAbstractSubCtx> remove = this.subscriptionsBySessionId.remove(str);
        if (remove != null) {
            remove.values().forEach(tbAbstractSubCtx -> {
                try {
                    cleanupAndCancel(tbAbstractSubCtx);
                } catch (Exception e) {
                    log.warn("[{}] Failed to remove subscription {} due to ", new Object[]{tbAbstractSubCtx.getTenantId(), tbAbstractSubCtx, e});
                }
            });
        }
    }

    private int getLimit(int i) {
        return i == 0 ? DEFAULT_LIMIT : i;
    }

    public DbCallbackExecutorService getDbCallbackExecutor() {
        return this.dbCallbackExecutor;
    }
}
