package org.thingsboard.server.service.ws;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import jakarta.annotation.Nullable;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.beans.ConstructorProperties;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.web.socket.CloseStatus;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.actors.calculatedField.CalculatedFieldEntityMessageProcessor;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.util.TenantRateLimitException;
import org.thingsboard.server.exception.UnauthorizedException;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.security.AccessValidator;
import org.thingsboard.server.service.security.ValidationCallback;
import org.thingsboard.server.service.security.ValidationResult;
import org.thingsboard.server.service.security.ValidationResultCode;
import org.thingsboard.server.service.security.model.SecurityUser;
import org.thingsboard.server.service.security.model.UserPrincipal;
import org.thingsboard.server.service.security.permission.Operation;
import org.thingsboard.server.service.subscription.SubscriptionErrorCode;
import org.thingsboard.server.service.subscription.TbAttributeSubscription;
import org.thingsboard.server.service.subscription.TbAttributeSubscriptionScope;
import org.thingsboard.server.service.subscription.TbEntityDataSubscriptionService;
import org.thingsboard.server.service.subscription.TbLocalSubscriptionService;
import org.thingsboard.server.service.subscription.TbTimeSeriesSubscription;
import org.thingsboard.server.service.ws.SessionEvent;
import org.thingsboard.server.service.ws.notification.NotificationCommandsHandler;
import org.thingsboard.server.service.ws.telemetry.cmd.v1.AttributesSubscriptionCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v1.GetHistoryCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v1.SubscriptionCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v1.TelemetryPluginCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v1.TimeseriesSubscriptionCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmDataCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmStatusCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.CmdUpdate;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityCountCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUpdate;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.UnsubscribeCmd;
import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate;

@TbCoreComponent
@Service
/* loaded from: input_file:org/thingsboard/server/service/ws/DefaultWebSocketService.class */
public class DefaultWebSocketService implements WebSocketService {
    public static final int NUMBER_OF_PING_ATTEMPTS = 3;
    private static final int DEFAULT_LIMIT = 100;
    private static final int UNKNOWN_SUBSCRIPTION_ID = 0;
    private static final String PROCESSING_MSG = "[{}] Processing: {}";
    private static final String FAILED_TO_FETCH_DATA = "Failed to fetch data!";
    private static final String FAILED_TO_FETCH_ATTRIBUTES = "Failed to fetch attributes!";
    private static final String SESSION_META_DATA_NOT_FOUND = "Session meta-data not found!";
    private final TbLocalSubscriptionService oldSubService;
    private final TbEntityDataSubscriptionService entityDataSubService;
    private final NotificationCommandsHandler notificationCmdsHandler;
    private final WebSocketMsgEndpoint msgEndpoint;
    private final AccessValidator accessValidator;
    private final AttributesService attributesService;
    private final TimeseriesService tsService;
    private final TbServiceInfoProvider serviceInfoProvider;
    private final TbTenantProfileCache tenantProfileCache;

    @Value("${server.ws.ping_timeout:30000}")
    private long pingTimeout;
    private ExecutorService executor;
    private ScheduledExecutorService pingExecutor;
    private String serviceId;
    private Map<WsCmdType, WsCmdHandler<? extends WsCmd>> cmdsHandlers;
    private static final Logger log = LoggerFactory.getLogger(DefaultWebSocketService.class);
    private static final Aggregation DEFAULT_AGGREGATION = Aggregation.NONE;
    private final ConcurrentMap<String, WsSessionMetaData> wsSessionsMap = new ConcurrentHashMap();
    private final ConcurrentMap<TenantId, Set<String>> tenantSubscriptionsMap = new ConcurrentHashMap();
    private final ConcurrentMap<CustomerId, Set<String>> customerSubscriptionsMap = new ConcurrentHashMap();
    private final ConcurrentMap<UserId, Set<String>> regularUserSubscriptionsMap = new ConcurrentHashMap();
    private final ConcurrentMap<UserId, Set<String>> publicUserSubscriptionsMap = new ConcurrentHashMap();
    private final ConcurrentMap<String, Map<Integer, Integer>> sessionCmdMap = new ConcurrentHashMap();

    /* renamed from: org.thingsboard.server.service.ws.DefaultWebSocketService$11, reason: invalid class name */
    /* loaded from: input_file:org/thingsboard/server/service/ws/DefaultWebSocketService$11.class */
    static /* synthetic */ class AnonymousClass11 {
        static final /* synthetic */ int[] $SwitchMap$org$thingsboard$server$service$ws$SessionEvent$SessionEventType = new int[SessionEvent.SessionEventType.values().length];

        static {
            try {
                $SwitchMap$org$thingsboard$server$service$ws$SessionEvent$SessionEventType[SessionEvent.SessionEventType.ESTABLISHED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$thingsboard$server$service$ws$SessionEvent$SessionEventType[SessionEvent.SessionEventType.ERROR.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$thingsboard$server$service$ws$SessionEvent$SessionEventType[SessionEvent.SessionEventType.CLOSED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:org/thingsboard/server/service/ws/DefaultWebSocketService$WsCmdHandler.class */
    public static class WsCmdHandler<C extends WsCmd> {
        protected final BiConsumer<WebSocketSessionRef, C> handler;

        public void handle(WebSocketSessionRef webSocketSessionRef, WsCmd wsCmd) {
            this.handler.accept(webSocketSessionRef, wsCmd);
        }

        @ConstructorProperties({"handler"})
        public WsCmdHandler(BiConsumer<WebSocketSessionRef, C> biConsumer) {
            this.handler = biConsumer;
        }

        public BiConsumer<WebSocketSessionRef, C> getHandler() {
            return this.handler;
        }
    }

    @PostConstruct
    public void init() {
        this.serviceId = this.serviceInfoProvider.getServiceId();
        this.executor = ThingsBoardExecutors.newWorkStealingPool(50, getClass());
        this.pingExecutor = ThingsBoardExecutors.newSingleThreadScheduledExecutor("telemetry-web-socket-ping");
        this.pingExecutor.scheduleWithFixedDelay(this::sendPing, this.pingTimeout / 3, this.pingTimeout / 3, TimeUnit.MILLISECONDS);
        this.cmdsHandlers = new EnumMap(WsCmdType.class);
        this.cmdsHandlers.put(WsCmdType.ATTRIBUTES, newCmdHandler(this::handleWsAttributesSubscriptionCmd));
        this.cmdsHandlers.put(WsCmdType.TIMESERIES, newCmdHandler(this::handleWsTimeseriesSubscriptionCmd));
        this.cmdsHandlers.put(WsCmdType.TIMESERIES_HISTORY, newCmdHandler(this::handleWsHistoryCmd));
        this.cmdsHandlers.put(WsCmdType.ENTITY_DATA, newCmdHandler(this::handleWsEntityDataCmd));
        this.cmdsHandlers.put(WsCmdType.ALARM_DATA, newCmdHandler(this::handleWsAlarmDataCmd));
        this.cmdsHandlers.put(WsCmdType.ENTITY_COUNT, newCmdHandler(this::handleWsEntityCountCmd));
        this.cmdsHandlers.put(WsCmdType.ALARM_COUNT, newCmdHandler(this::handleWsAlarmCountCmd));
        this.cmdsHandlers.put(WsCmdType.ALARM_STATUS, newCmdHandler(this::handleWsAlarmsStatusCmd));
        this.cmdsHandlers.put(WsCmdType.ENTITY_DATA_UNSUBSCRIBE, newCmdHandler(this::handleWsDataUnsubscribeCmd));
        this.cmdsHandlers.put(WsCmdType.ALARM_DATA_UNSUBSCRIBE, newCmdHandler(this::handleWsDataUnsubscribeCmd));
        this.cmdsHandlers.put(WsCmdType.ENTITY_COUNT_UNSUBSCRIBE, newCmdHandler(this::handleWsDataUnsubscribeCmd));
        this.cmdsHandlers.put(WsCmdType.ALARM_COUNT_UNSUBSCRIBE, newCmdHandler(this::handleWsDataUnsubscribeCmd));
        this.cmdsHandlers.put(WsCmdType.ALARM_STATUS_UNSUBSCRIBE, newCmdHandler(this::handleWsDataUnsubscribeCmd));
        Map<WsCmdType, WsCmdHandler<? extends WsCmd>> map = this.cmdsHandlers;
        WsCmdType wsCmdType = WsCmdType.NOTIFICATIONS;
        NotificationCommandsHandler notificationCommandsHandler = this.notificationCmdsHandler;
        Objects.requireNonNull(notificationCommandsHandler);
        map.put(wsCmdType, newCmdHandler(notificationCommandsHandler::handleUnreadNotificationsSubCmd));
        Map<WsCmdType, WsCmdHandler<? extends WsCmd>> map2 = this.cmdsHandlers;
        WsCmdType wsCmdType2 = WsCmdType.NOTIFICATIONS_COUNT;
        NotificationCommandsHandler notificationCommandsHandler2 = this.notificationCmdsHandler;
        Objects.requireNonNull(notificationCommandsHandler2);
        map2.put(wsCmdType2, newCmdHandler(notificationCommandsHandler2::handleUnreadNotificationsCountSubCmd));
        Map<WsCmdType, WsCmdHandler<? extends WsCmd>> map3 = this.cmdsHandlers;
        WsCmdType wsCmdType3 = WsCmdType.MARK_NOTIFICATIONS_AS_READ;
        NotificationCommandsHandler notificationCommandsHandler3 = this.notificationCmdsHandler;
        Objects.requireNonNull(notificationCommandsHandler3);
        map3.put(wsCmdType3, newCmdHandler(notificationCommandsHandler3::handleMarkAsReadCmd));
        Map<WsCmdType, WsCmdHandler<? extends WsCmd>> map4 = this.cmdsHandlers;
        WsCmdType wsCmdType4 = WsCmdType.MARK_ALL_NOTIFICATIONS_AS_READ;
        NotificationCommandsHandler notificationCommandsHandler4 = this.notificationCmdsHandler;
        Objects.requireNonNull(notificationCommandsHandler4);
        map4.put(wsCmdType4, newCmdHandler(notificationCommandsHandler4::handleMarkAllAsReadCmd));
        Map<WsCmdType, WsCmdHandler<? extends WsCmd>> map5 = this.cmdsHandlers;
        WsCmdType wsCmdType5 = WsCmdType.NOTIFICATIONS_UNSUBSCRIBE;
        NotificationCommandsHandler notificationCommandsHandler5 = this.notificationCmdsHandler;
        Objects.requireNonNull(notificationCommandsHandler5);
        map5.put(wsCmdType5, newCmdHandler(notificationCommandsHandler5::handleUnsubCmd));
    }

    @PreDestroy
    public void shutdownExecutor() {
        if (this.pingExecutor != null) {
            this.pingExecutor.shutdownNow();
        }
        if (this.executor != null) {
            this.executor.shutdownNow();
        }
    }

    @Override // org.thingsboard.server.service.ws.WebSocketService
    public void handleSessionEvent(WebSocketSessionRef webSocketSessionRef, SessionEvent sessionEvent) {
        String sessionId = webSocketSessionRef.getSessionId();
        TenantId tenantId = webSocketSessionRef.getSecurityCtx().getTenantId();
        log.debug(PROCESSING_MSG, sessionId, sessionEvent);
        switch (AnonymousClass11.$SwitchMap$org$thingsboard$server$service$ws$SessionEvent$SessionEventType[sessionEvent.getEventType().ordinal()]) {
            case 1:
                this.wsSessionsMap.put(sessionId, new WsSessionMetaData(webSocketSessionRef));
                return;
            case CalculatedFieldEntityMessageProcessor.CALLBACKS_PER_CF /* 2 */:
                log.debug("[{}][{}] Unknown websocket session error: ", new Object[]{tenantId, sessionId, sessionEvent.getError().orElse(new RuntimeException("No error specified"))});
                return;
            case 3:
                cleanupSessionById(tenantId, sessionId);
                processSessionClose(webSocketSessionRef);
                return;
            default:
                return;
        }
    }

    @Override // org.thingsboard.server.service.ws.WebSocketService
    public void handleCommands(WebSocketSessionRef webSocketSessionRef, WsCommandsWrapper wsCommandsWrapper) {
        if (wsCommandsWrapper == null || CollectionUtils.isEmpty(wsCommandsWrapper.getCmds())) {
            return;
        }
        String sessionId = webSocketSessionRef.getSessionId();
        if (validateSessionMetadata(webSocketSessionRef, UNKNOWN_SUBSCRIPTION_ID, sessionId)) {
            for (WsCmd wsCmd : wsCommandsWrapper.getCmds()) {
                log.debug("[{}][{}][{}] Processing cmd: {}", new Object[]{sessionId, wsCmd.getType(), Integer.valueOf(wsCmd.getCmdId()), wsCmd});
                try {
                    Optional.ofNullable(this.cmdsHandlers.get(wsCmd.getType())).ifPresent(wsCmdHandler -> {
                        wsCmdHandler.handle(webSocketSessionRef, wsCmd);
                    });
                } catch (Exception e) {
                    sendError(webSocketSessionRef, wsCmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, e.getMessage());
                    log.error("{} Failed to handle WS cmd: {}", new Object[]{webSocketSessionRef, wsCmd, e});
                } catch (TbRateLimitsException e2) {
                    log.debug("{} Failed to handle WS cmd: {}", new Object[]{webSocketSessionRef, wsCmd, e2});
                }
            }
        }
    }

    private void handleWsEntityDataCmd(WebSocketSessionRef webSocketSessionRef, EntityDataCmd entityDataCmd) {
        if (validateSubscriptionCmd(webSocketSessionRef, entityDataCmd)) {
            this.entityDataSubService.handleCmd(webSocketSessionRef, entityDataCmd);
        }
    }

    private void handleWsEntityCountCmd(WebSocketSessionRef webSocketSessionRef, EntityCountCmd entityCountCmd) {
        if (validateSubscriptionCmd(webSocketSessionRef, entityCountCmd)) {
            this.entityDataSubService.handleCmd(webSocketSessionRef, entityCountCmd);
        }
    }

    private void handleWsAlarmDataCmd(WebSocketSessionRef webSocketSessionRef, AlarmDataCmd alarmDataCmd) {
        if (validateSubscriptionCmd(webSocketSessionRef, alarmDataCmd)) {
            this.entityDataSubService.handleCmd(webSocketSessionRef, alarmDataCmd);
        }
    }

    private void handleWsDataUnsubscribeCmd(WebSocketSessionRef webSocketSessionRef, UnsubscribeCmd unsubscribeCmd) {
        this.entityDataSubService.cancelSubscription(webSocketSessionRef.getSessionId(), unsubscribeCmd);
    }

    private void handleWsAlarmCountCmd(WebSocketSessionRef webSocketSessionRef, AlarmCountCmd alarmCountCmd) {
        if (validateCmd(webSocketSessionRef, alarmCountCmd)) {
            this.entityDataSubService.handleCmd(webSocketSessionRef, alarmCountCmd);
        }
    }

    private void handleWsAlarmsStatusCmd(WebSocketSessionRef webSocketSessionRef, AlarmStatusCmd alarmStatusCmd) {
        if (validateCmd(webSocketSessionRef, alarmStatusCmd)) {
            this.entityDataSubService.handleCmd(webSocketSessionRef, alarmStatusCmd);
        }
    }

    @Override // org.thingsboard.server.service.ws.WebSocketService
    public void sendUpdate(String str, int i, TelemetrySubscriptionUpdate telemetrySubscriptionUpdate) {
        doSendUpdate(str, i, telemetrySubscriptionUpdate.copyWithNewSubscriptionId(i));
    }

    @Override // org.thingsboard.server.service.ws.WebSocketService
    public void sendUpdate(String str, CmdUpdate cmdUpdate) {
        doSendUpdate(str, cmdUpdate.getCmdId(), cmdUpdate);
    }

    @Override // org.thingsboard.server.service.ws.WebSocketService
    public void sendError(WebSocketSessionRef webSocketSessionRef, int i, SubscriptionErrorCode subscriptionErrorCode, String str) {
        sendUpdate(webSocketSessionRef, new TelemetrySubscriptionUpdate(i, subscriptionErrorCode, str));
    }

    private <T> void doSendUpdate(String str, int i, T t) {
        WsSessionMetaData wsSessionMetaData = this.wsSessionsMap.get(str);
        if (wsSessionMetaData != null) {
            sendUpdate(wsSessionMetaData.getSessionRef(), i, t);
        }
    }

    @Override // org.thingsboard.server.service.ws.WebSocketService
    public void close(String str, CloseStatus closeStatus) {
        WsSessionMetaData wsSessionMetaData = this.wsSessionsMap.get(str);
        if (wsSessionMetaData != null) {
            try {
                this.msgEndpoint.close(wsSessionMetaData.getSessionRef(), closeStatus);
            } catch (IOException e) {
                log.warn("[{}] Failed to send session close", str, e);
            }
        }
    }

    @Override // org.thingsboard.server.service.ws.WebSocketService
    public void cleanupIfStale(TenantId tenantId, String str) {
        if (this.msgEndpoint.isOpen(str)) {
            return;
        }
        log.info("[{}] Cleaning up stale session ", str);
        cleanupSessionById(tenantId, str);
    }

    private void processSessionClose(WebSocketSessionRef webSocketSessionRef) {
        DefaultTenantProfileConfiguration tenantProfileConfiguration = getTenantProfileConfiguration(webSocketSessionRef);
        if (tenantProfileConfiguration != null) {
            String str = "[" + webSocketSessionRef.getSessionId() + "]";
            if (tenantProfileConfiguration.getMaxWsSubscriptionsPerTenant() > 0) {
                Set<String> computeIfAbsent = this.tenantSubscriptionsMap.computeIfAbsent(webSocketSessionRef.getSecurityCtx().getTenantId(), tenantId -> {
                    return ConcurrentHashMap.newKeySet();
                });
                synchronized (computeIfAbsent) {
                    computeIfAbsent.removeIf(str2 -> {
                        return str2.startsWith(str);
                    });
                }
            }
            if (webSocketSessionRef.getSecurityCtx().isCustomerUser()) {
                if (tenantProfileConfiguration.getMaxWsSubscriptionsPerCustomer() > 0) {
                    Set<String> computeIfAbsent2 = this.customerSubscriptionsMap.computeIfAbsent(webSocketSessionRef.getSecurityCtx().getCustomerId(), customerId -> {
                        return ConcurrentHashMap.newKeySet();
                    });
                    synchronized (computeIfAbsent2) {
                        computeIfAbsent2.removeIf(str3 -> {
                            return str3.startsWith(str);
                        });
                    }
                }
                if (tenantProfileConfiguration.getMaxWsSubscriptionsPerRegularUser() > 0 && UserPrincipal.Type.USER_NAME.equals(webSocketSessionRef.getSecurityCtx().getUserPrincipal().getType())) {
                    Set<String> computeIfAbsent3 = this.regularUserSubscriptionsMap.computeIfAbsent(webSocketSessionRef.getSecurityCtx().getId(), userId -> {
                        return ConcurrentHashMap.newKeySet();
                    });
                    synchronized (computeIfAbsent3) {
                        computeIfAbsent3.removeIf(str4 -> {
                            return str4.startsWith(str);
                        });
                    }
                }
                if (tenantProfileConfiguration.getMaxWsSubscriptionsPerPublicUser() <= 0 || !UserPrincipal.Type.PUBLIC_ID.equals(webSocketSessionRef.getSecurityCtx().getUserPrincipal().getType())) {
                    return;
                }
                Set<String> computeIfAbsent4 = this.publicUserSubscriptionsMap.computeIfAbsent(webSocketSessionRef.getSecurityCtx().getId(), userId2 -> {
                    return ConcurrentHashMap.newKeySet();
                });
                synchronized (computeIfAbsent4) {
                    computeIfAbsent4.removeIf(str5 -> {
                        return str5.startsWith(str);
                    });
                }
            }
        }
    }

    private boolean processSubscription(WebSocketSessionRef webSocketSessionRef, SubscriptionCmd subscriptionCmd) {
        DefaultTenantProfileConfiguration tenantProfileConfiguration = getTenantProfileConfiguration(webSocketSessionRef);
        if (tenantProfileConfiguration == null) {
            return true;
        }
        String str = "[" + webSocketSessionRef.getSessionId() + "]:[" + subscriptionCmd.getCmdId() + "]";
        try {
            if (tenantProfileConfiguration.getMaxWsSubscriptionsPerTenant() > 0) {
                Set<String> computeIfAbsent = this.tenantSubscriptionsMap.computeIfAbsent(webSocketSessionRef.getSecurityCtx().getTenantId(), tenantId -> {
                    return ConcurrentHashMap.newKeySet();
                });
                synchronized (computeIfAbsent) {
                    if (subscriptionCmd.isUnsubscribe()) {
                        computeIfAbsent.remove(str);
                    } else {
                        if (computeIfAbsent.size() >= tenantProfileConfiguration.getMaxWsSubscriptionsPerTenant()) {
                            log.info("[{}][{}][{}] Failed to start subscription. Max tenant subscriptions limit reached", new Object[]{webSocketSessionRef.getSecurityCtx().getTenantId(), webSocketSessionRef.getSecurityCtx().getId(), str});
                            this.msgEndpoint.close(webSocketSessionRef, CloseStatus.POLICY_VIOLATION.withReason("Max tenant subscriptions limit reached!"));
                            return false;
                        }
                        computeIfAbsent.add(str);
                    }
                }
            }
            if (webSocketSessionRef.getSecurityCtx().isCustomerUser()) {
                if (tenantProfileConfiguration.getMaxWsSubscriptionsPerCustomer() > 0) {
                    Set<String> computeIfAbsent2 = this.customerSubscriptionsMap.computeIfAbsent(webSocketSessionRef.getSecurityCtx().getCustomerId(), customerId -> {
                        return ConcurrentHashMap.newKeySet();
                    });
                    synchronized (computeIfAbsent2) {
                        if (subscriptionCmd.isUnsubscribe()) {
                            computeIfAbsent2.remove(str);
                        } else {
                            if (computeIfAbsent2.size() >= tenantProfileConfiguration.getMaxWsSubscriptionsPerCustomer()) {
                                log.info("[{}][{}][{}] Failed to start subscription. Max customer subscriptions limit reached", new Object[]{webSocketSessionRef.getSecurityCtx().getTenantId(), webSocketSessionRef.getSecurityCtx().getId(), str});
                                this.msgEndpoint.close(webSocketSessionRef, CloseStatus.POLICY_VIOLATION.withReason("Max customer subscriptions limit reached"));
                                return false;
                            }
                            computeIfAbsent2.add(str);
                        }
                    }
                }
                if (tenantProfileConfiguration.getMaxWsSubscriptionsPerRegularUser() > 0 && UserPrincipal.Type.USER_NAME.equals(webSocketSessionRef.getSecurityCtx().getUserPrincipal().getType())) {
                    Set<String> computeIfAbsent3 = this.regularUserSubscriptionsMap.computeIfAbsent(webSocketSessionRef.getSecurityCtx().getId(), userId -> {
                        return ConcurrentHashMap.newKeySet();
                    });
                    synchronized (computeIfAbsent3) {
                        if (computeIfAbsent3.size() >= tenantProfileConfiguration.getMaxWsSubscriptionsPerRegularUser()) {
                            log.info("[{}][{}][{}] Failed to start subscription. Max regular user subscriptions limit reached", new Object[]{webSocketSessionRef.getSecurityCtx().getTenantId(), webSocketSessionRef.getSecurityCtx().getId(), str});
                            this.msgEndpoint.close(webSocketSessionRef, CloseStatus.POLICY_VIOLATION.withReason("Max regular user subscriptions limit reached"));
                            return false;
                        }
                        computeIfAbsent3.add(str);
                    }
                }
                if (tenantProfileConfiguration.getMaxWsSubscriptionsPerPublicUser() > 0 && UserPrincipal.Type.PUBLIC_ID.equals(webSocketSessionRef.getSecurityCtx().getUserPrincipal().getType())) {
                    Set<String> computeIfAbsent4 = this.publicUserSubscriptionsMap.computeIfAbsent(webSocketSessionRef.getSecurityCtx().getId(), userId2 -> {
                        return ConcurrentHashMap.newKeySet();
                    });
                    synchronized (computeIfAbsent4) {
                        if (computeIfAbsent4.size() >= tenantProfileConfiguration.getMaxWsSubscriptionsPerPublicUser()) {
                            log.info("[{}][{}][{}] Failed to start subscription. Max public user subscriptions limit reached", new Object[]{webSocketSessionRef.getSecurityCtx().getTenantId(), webSocketSessionRef.getSecurityCtx().getId(), str});
                            this.msgEndpoint.close(webSocketSessionRef, CloseStatus.POLICY_VIOLATION.withReason("Max public user subscriptions limit reached"));
                            return false;
                        }
                        computeIfAbsent4.add(str);
                    }
                }
            }
            return true;
        } catch (IOException e) {
            log.warn("[{}] Failed to send session close:", webSocketSessionRef.getSessionId(), e);
            return false;
        }
    }

    private void handleWsAttributesSubscriptionCmd(WebSocketSessionRef webSocketSessionRef, AttributesSubscriptionCmd attributesSubscriptionCmd) {
        if (processSubscription(webSocketSessionRef, attributesSubscriptionCmd)) {
            String sessionId = webSocketSessionRef.getSessionId();
            if (attributesSubscriptionCmd.isUnsubscribe()) {
                unsubscribe(webSocketSessionRef, attributesSubscriptionCmd, sessionId);
                return;
            }
            if (validateSubscriptionCmd(webSocketSessionRef, attributesSubscriptionCmd)) {
                EntityId byTypeAndId = EntityIdFactory.getByTypeAndId(attributesSubscriptionCmd.getEntityType(), attributesSubscriptionCmd.getEntityId());
                log.debug("[{}] fetching latest attributes ({}) values for device: {}", new Object[]{sessionId, attributesSubscriptionCmd.getKeys(), byTypeAndId});
                Optional<Set<String>> keys = getKeys(attributesSubscriptionCmd);
                if (keys.isPresent()) {
                    handleWsAttributesSubscriptionByKeys(webSocketSessionRef, attributesSubscriptionCmd, sessionId, byTypeAndId, new ArrayList(keys.get()));
                } else {
                    handleWsAttributesSubscription(webSocketSessionRef, attributesSubscriptionCmd, sessionId, byTypeAndId);
                }
            }
        }
    }

    private void handleWsAttributesSubscriptionByKeys(final WebSocketSessionRef webSocketSessionRef, final AttributesSubscriptionCmd attributesSubscriptionCmd, final String str, final EntityId entityId, final List<String> list) {
        final long currentTimeMillis = System.currentTimeMillis();
        FutureCallback<List<AttributeKvEntry>> futureCallback = new FutureCallback<List<AttributeKvEntry>>() { // from class: org.thingsboard.server.service.ws.DefaultWebSocketService.1
            public void onSuccess(List<AttributeKvEntry> list2) {
                List list3 = (List) list2.stream().map(attributeKvEntry -> {
                    return new BasicTsKvEntry(attributeKvEntry.getLastUpdateTs(), attributeKvEntry);
                }).collect(Collectors.toList());
                HashMap hashMap = new HashMap(list.size());
                list.forEach(str2 -> {
                    hashMap.put(str2, 0L);
                });
                list3.forEach(tsKvEntry -> {
                    hashMap.put(tsKvEntry.getKey(), Long.valueOf(tsKvEntry.getTs()));
                });
                TbAttributeSubscriptionScope valueOf = StringUtils.isEmpty(attributesSubscriptionCmd.getScope()) ? TbAttributeSubscriptionScope.ANY_SCOPE : TbAttributeSubscriptionScope.valueOf(attributesSubscriptionCmd.getScope());
                ReentrantLock reentrantLock = new ReentrantLock();
                TbAttributeSubscription.TbAttributeSubscriptionBuilder scope = TbAttributeSubscription.builder().serviceId(DefaultWebSocketService.this.serviceId).sessionId(str).subscriptionId(DefaultWebSocketService.this.registerNewSessionSubId(str, webSocketSessionRef, attributesSubscriptionCmd.getCmdId())).tenantId(webSocketSessionRef.getSecurityCtx().getTenantId()).entityId(entityId).queryTs(currentTimeMillis).allKeys(false).keyStates(hashMap).scope(valueOf);
                AttributesSubscriptionCmd attributesSubscriptionCmd2 = attributesSubscriptionCmd;
                TbAttributeSubscription build = scope.updateProcessor((tbSubscription, telemetrySubscriptionUpdate) -> {
                    reentrantLock.lock();
                    try {
                        DefaultWebSocketService.this.sendUpdate(tbSubscription.getSessionId(), attributesSubscriptionCmd2.getCmdId(), telemetrySubscriptionUpdate);
                        reentrantLock.unlock();
                    } catch (Throwable th) {
                        reentrantLock.unlock();
                        throw th;
                    }
                }).build();
                reentrantLock.lock();
                try {
                    DefaultWebSocketService.this.oldSubService.addSubscription(build, webSocketSessionRef);
                    DefaultWebSocketService.this.sendUpdate(webSocketSessionRef, new TelemetrySubscriptionUpdate(attributesSubscriptionCmd.getCmdId(), (List<TsKvEntry>) list3));
                    reentrantLock.unlock();
                } catch (Throwable th) {
                    reentrantLock.unlock();
                    throw th;
                }
            }

            public void onFailure(Throwable th) {
                DefaultWebSocketService.log.error(DefaultWebSocketService.FAILED_TO_FETCH_ATTRIBUTES, th);
                DefaultWebSocketService.this.sendUpdate(webSocketSessionRef, th instanceof UnauthorizedException ? new TelemetrySubscriptionUpdate(attributesSubscriptionCmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED, SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg()) : new TelemetrySubscriptionUpdate(attributesSubscriptionCmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, DefaultWebSocketService.FAILED_TO_FETCH_ATTRIBUTES));
            }
        };
        if (StringUtils.isEmpty(attributesSubscriptionCmd.getScope())) {
            this.accessValidator.validate(webSocketSessionRef.getSecurityCtx(), Operation.READ_ATTRIBUTES, entityId, getAttributesFetchCallback(webSocketSessionRef.getSecurityCtx().getTenantId(), entityId, list, futureCallback));
        } else {
            this.accessValidator.validate(webSocketSessionRef.getSecurityCtx(), Operation.READ_ATTRIBUTES, entityId, getAttributesFetchCallback(webSocketSessionRef.getSecurityCtx().getTenantId(), entityId, attributesSubscriptionCmd.getScope(), list, futureCallback));
        }
    }

    private int registerNewSessionSubId(String str, WebSocketSessionRef webSocketSessionRef, int i) {
        Map<Integer, Integer> computeIfAbsent = this.sessionCmdMap.computeIfAbsent(str, str2 -> {
            return new ConcurrentHashMap();
        });
        int incrementAndGet = webSocketSessionRef.getSessionSubIdSeq().incrementAndGet();
        computeIfAbsent.put(Integer.valueOf(i), Integer.valueOf(incrementAndGet));
        return incrementAndGet;
    }

    private void handleWsHistoryCmd(final WebSocketSessionRef webSocketSessionRef, final GetHistoryCmd getHistoryCmd) {
        if (validateCmd(webSocketSessionRef, getHistoryCmd, () -> {
            if (getHistoryCmd.getEntityId() == null || getHistoryCmd.getEntityId().isEmpty() || getHistoryCmd.getEntityType() == null || getHistoryCmd.getEntityType().isEmpty()) {
                throw new IllegalArgumentException("Device id is empty!");
            }
            if (getHistoryCmd.getKeys() == null || getHistoryCmd.getKeys().isEmpty()) {
                throw new IllegalArgumentException("Keys are empty!");
            }
        })) {
            EntityId byTypeAndId = EntityIdFactory.getByTypeAndId(getHistoryCmd.getEntityType(), getHistoryCmd.getEntityId());
            List list = (List) new ArrayList(getKeys(getHistoryCmd).orElse(Collections.emptySet())).stream().map(str -> {
                return new BaseReadTsKvQuery(str, getHistoryCmd.getStartTs(), getHistoryCmd.getEndTs(), getHistoryCmd.getInterval(), getLimit(getHistoryCmd.getLimit()), getAggregation(getHistoryCmd.getAgg()));
            }).collect(Collectors.toList());
            FutureCallback<List<TsKvEntry>> futureCallback = new FutureCallback<List<TsKvEntry>>() { // from class: org.thingsboard.server.service.ws.DefaultWebSocketService.2
                public void onSuccess(List<TsKvEntry> list2) {
                    DefaultWebSocketService.this.sendUpdate(webSocketSessionRef, new TelemetrySubscriptionUpdate(getHistoryCmd.getCmdId(), list2));
                }

                public void onFailure(Throwable th) {
                    DefaultWebSocketService.this.sendUpdate(webSocketSessionRef, UnauthorizedException.class.isInstance(th) ? new TelemetrySubscriptionUpdate(getHistoryCmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED, SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg()) : new TelemetrySubscriptionUpdate(getHistoryCmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, DefaultWebSocketService.FAILED_TO_FETCH_DATA));
                }
            };
            AccessValidator accessValidator = this.accessValidator;
            SecurityUser securityCtx = webSocketSessionRef.getSecurityCtx();
            Operation operation = Operation.READ_TELEMETRY;
            Consumer<Void> consumer = r10 -> {
                Futures.addCallback(this.tsService.findAll(webSocketSessionRef.getSecurityCtx().getTenantId(), byTypeAndId, list), futureCallback, this.executor);
            };
            Objects.requireNonNull(futureCallback);
            accessValidator.validate(securityCtx, operation, byTypeAndId, on(consumer, futureCallback::onFailure));
        }
    }

    private void handleWsAttributesSubscription(final WebSocketSessionRef webSocketSessionRef, final AttributesSubscriptionCmd attributesSubscriptionCmd, final String str, final EntityId entityId) {
        final long currentTimeMillis = System.currentTimeMillis();
        FutureCallback<List<AttributeKvEntry>> futureCallback = new FutureCallback<List<AttributeKvEntry>>() { // from class: org.thingsboard.server.service.ws.DefaultWebSocketService.3
            public void onSuccess(List<AttributeKvEntry> list) {
                List list2 = (List) list.stream().map(attributeKvEntry -> {
                    return new BasicTsKvEntry(attributeKvEntry.getLastUpdateTs(), attributeKvEntry);
                }).collect(Collectors.toList());
                HashMap hashMap = new HashMap(list2.size());
                list2.forEach(tsKvEntry -> {
                    hashMap.put(tsKvEntry.getKey(), Long.valueOf(tsKvEntry.getTs()));
                });
                TbAttributeSubscriptionScope valueOf = StringUtils.isEmpty(attributesSubscriptionCmd.getScope()) ? TbAttributeSubscriptionScope.ANY_SCOPE : TbAttributeSubscriptionScope.valueOf(attributesSubscriptionCmd.getScope());
                ReentrantLock reentrantLock = new ReentrantLock();
                TbAttributeSubscription.TbAttributeSubscriptionBuilder keyStates = TbAttributeSubscription.builder().serviceId(DefaultWebSocketService.this.serviceId).sessionId(str).subscriptionId(DefaultWebSocketService.this.registerNewSessionSubId(str, webSocketSessionRef, attributesSubscriptionCmd.getCmdId())).tenantId(webSocketSessionRef.getSecurityCtx().getTenantId()).entityId(entityId).queryTs(currentTimeMillis).allKeys(true).keyStates(hashMap);
                AttributesSubscriptionCmd attributesSubscriptionCmd2 = attributesSubscriptionCmd;
                TbAttributeSubscription build = keyStates.updateProcessor((tbSubscription, telemetrySubscriptionUpdate) -> {
                    reentrantLock.lock();
                    try {
                        DefaultWebSocketService.this.sendUpdate(tbSubscription.getSessionId(), attributesSubscriptionCmd2.getCmdId(), telemetrySubscriptionUpdate);
                        reentrantLock.unlock();
                    } catch (Throwable th) {
                        reentrantLock.unlock();
                        throw th;
                    }
                }).scope(valueOf).build();
                reentrantLock.lock();
                try {
                    DefaultWebSocketService.this.oldSubService.addSubscription(build, webSocketSessionRef);
                    DefaultWebSocketService.this.sendUpdate(webSocketSessionRef, new TelemetrySubscriptionUpdate(attributesSubscriptionCmd.getCmdId(), (List<TsKvEntry>) list2));
                    reentrantLock.unlock();
                } catch (Throwable th) {
                    reentrantLock.unlock();
                    throw th;
                }
            }

            public void onFailure(Throwable th) {
                DefaultWebSocketService.log.error(DefaultWebSocketService.FAILED_TO_FETCH_ATTRIBUTES, th);
                DefaultWebSocketService.this.sendError(webSocketSessionRef, attributesSubscriptionCmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, DefaultWebSocketService.FAILED_TO_FETCH_ATTRIBUTES);
            }
        };
        if (StringUtils.isEmpty(attributesSubscriptionCmd.getScope())) {
            this.accessValidator.validate(webSocketSessionRef.getSecurityCtx(), Operation.READ_ATTRIBUTES, entityId, getAttributesFetchCallback(webSocketSessionRef.getSecurityCtx().getTenantId(), entityId, futureCallback));
        } else {
            this.accessValidator.validate(webSocketSessionRef.getSecurityCtx(), Operation.READ_ATTRIBUTES, entityId, getAttributesFetchCallback(webSocketSessionRef.getSecurityCtx().getTenantId(), entityId, attributesSubscriptionCmd.getScope(), futureCallback));
        }
    }

    private void handleWsTimeseriesSubscriptionCmd(WebSocketSessionRef webSocketSessionRef, TimeseriesSubscriptionCmd timeseriesSubscriptionCmd) {
        if (processSubscription(webSocketSessionRef, timeseriesSubscriptionCmd)) {
            String sessionId = webSocketSessionRef.getSessionId();
            if (timeseriesSubscriptionCmd.isUnsubscribe()) {
                unsubscribe(webSocketSessionRef, timeseriesSubscriptionCmd, sessionId);
                return;
            }
            if (validateSubscriptionCmd(webSocketSessionRef, timeseriesSubscriptionCmd)) {
                EntityId byTypeAndId = EntityIdFactory.getByTypeAndId(timeseriesSubscriptionCmd.getEntityType(), timeseriesSubscriptionCmd.getEntityId());
                if (getKeys(timeseriesSubscriptionCmd).isPresent()) {
                    handleWsTimeSeriesSubscriptionByKeys(webSocketSessionRef, timeseriesSubscriptionCmd, sessionId, byTypeAndId);
                } else {
                    handleWsTimeSeriesSubscription(webSocketSessionRef, timeseriesSubscriptionCmd, sessionId, byTypeAndId);
                }
            }
        }
    }

    private void handleWsTimeSeriesSubscriptionByKeys(WebSocketSessionRef webSocketSessionRef, TimeseriesSubscriptionCmd timeseriesSubscriptionCmd, String str, EntityId entityId) {
        long currentTimeMillis = System.currentTimeMillis();
        if (timeseriesSubscriptionCmd.getTimeWindow() <= 0) {
            ArrayList arrayList = new ArrayList(getKeys(timeseriesSubscriptionCmd).orElse(Collections.emptySet()));
            long currentTimeMillis2 = System.currentTimeMillis();
            log.debug("[{}] fetching latest timeseries data for keys: ({}) for device : {}", new Object[]{str, timeseriesSubscriptionCmd.getKeys(), entityId});
            FutureCallback<List<TsKvEntry>> subscriptionCallback = getSubscriptionCallback(webSocketSessionRef, timeseriesSubscriptionCmd, str, entityId, currentTimeMillis, currentTimeMillis2, arrayList);
            AccessValidator accessValidator = this.accessValidator;
            SecurityUser securityCtx = webSocketSessionRef.getSecurityCtx();
            Operation operation = Operation.READ_TELEMETRY;
            Consumer<Void> consumer = r10 -> {
                Futures.addCallback(this.tsService.findLatest(webSocketSessionRef.getSecurityCtx().getTenantId(), entityId, arrayList), subscriptionCallback, this.executor);
            };
            Objects.requireNonNull(subscriptionCallback);
            accessValidator.validate(securityCtx, operation, entityId, on(consumer, subscriptionCallback::onFailure));
            return;
        }
        ArrayList arrayList2 = new ArrayList(getKeys(timeseriesSubscriptionCmd).orElse(Collections.emptySet()));
        log.debug("[{}] fetching timeseries data for last {} ms for keys: ({}) for device : {}", new Object[]{str, Long.valueOf(timeseriesSubscriptionCmd.getTimeWindow()), timeseriesSubscriptionCmd.getKeys(), entityId});
        long startTs = timeseriesSubscriptionCmd.getStartTs();
        long startTs2 = timeseriesSubscriptionCmd.getStartTs() + timeseriesSubscriptionCmd.getTimeWindow();
        List list = (List) arrayList2.stream().map(str2 -> {
            return new BaseReadTsKvQuery(str2, startTs, startTs2, timeseriesSubscriptionCmd.getInterval(), getLimit(timeseriesSubscriptionCmd.getLimit()), getAggregation(timeseriesSubscriptionCmd.getAgg()));
        }).collect(Collectors.toList());
        FutureCallback<List<TsKvEntry>> subscriptionCallback2 = getSubscriptionCallback(webSocketSessionRef, timeseriesSubscriptionCmd, str, entityId, currentTimeMillis, startTs, arrayList2);
        AccessValidator accessValidator2 = this.accessValidator;
        SecurityUser securityCtx2 = webSocketSessionRef.getSecurityCtx();
        Operation operation2 = Operation.READ_TELEMETRY;
        Consumer<Void> consumer2 = r102 -> {
            Futures.addCallback(this.tsService.findAll(webSocketSessionRef.getSecurityCtx().getTenantId(), entityId, list), subscriptionCallback2, this.executor);
        };
        Objects.requireNonNull(subscriptionCallback2);
        accessValidator2.validate(securityCtx2, operation2, entityId, on(consumer2, subscriptionCallback2::onFailure));
    }

    private void handleWsTimeSeriesSubscription(final WebSocketSessionRef webSocketSessionRef, final TimeseriesSubscriptionCmd timeseriesSubscriptionCmd, final String str, final EntityId entityId) {
        final long currentTimeMillis = System.currentTimeMillis();
        FutureCallback<List<TsKvEntry>> futureCallback = new FutureCallback<List<TsKvEntry>>() { // from class: org.thingsboard.server.service.ws.DefaultWebSocketService.4
            public void onSuccess(List<TsKvEntry> list) {
                HashMap hashMap = new HashMap(list.size());
                list.forEach(tsKvEntry -> {
                    hashMap.put(tsKvEntry.getKey(), Long.valueOf(tsKvEntry.getTs()));
                });
                ReentrantLock reentrantLock = new ReentrantLock();
                TbTimeSeriesSubscription tsSubscription = DefaultWebSocketService.this.getTsSubscription(hashMap, reentrantLock, str, webSocketSessionRef, timeseriesSubscriptionCmd, entityId, currentTimeMillis, true);
                reentrantLock.lock();
                try {
                    DefaultWebSocketService.this.oldSubService.addSubscription(tsSubscription, webSocketSessionRef);
                    DefaultWebSocketService.this.sendUpdate(webSocketSessionRef, new TelemetrySubscriptionUpdate(timeseriesSubscriptionCmd.getCmdId(), list));
                    reentrantLock.unlock();
                } catch (Throwable th) {
                    reentrantLock.unlock();
                    throw th;
                }
            }

            public void onFailure(Throwable th) {
                DefaultWebSocketService.this.sendUpdate(webSocketSessionRef, UnauthorizedException.class.isInstance(th) ? new TelemetrySubscriptionUpdate(timeseriesSubscriptionCmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED, SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg()) : new TelemetrySubscriptionUpdate(timeseriesSubscriptionCmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, DefaultWebSocketService.FAILED_TO_FETCH_DATA));
            }
        };
        AccessValidator accessValidator = this.accessValidator;
        SecurityUser securityCtx = webSocketSessionRef.getSecurityCtx();
        Operation operation = Operation.READ_TELEMETRY;
        Consumer<Void> consumer = r8 -> {
            Futures.addCallback(this.tsService.findAllLatest(webSocketSessionRef.getSecurityCtx().getTenantId(), entityId), futureCallback, this.executor);
        };
        Objects.requireNonNull(futureCallback);
        accessValidator.validate(securityCtx, operation, entityId, on(consumer, futureCallback::onFailure));
    }

    private TbTimeSeriesSubscription getTsSubscription(Map<String, Long> map, Lock lock, String str, WebSocketSessionRef webSocketSessionRef, TimeseriesSubscriptionCmd timeseriesSubscriptionCmd, EntityId entityId, long j, boolean z) {
        return TbTimeSeriesSubscription.builder().serviceId(this.serviceId).sessionId(str).subscriptionId(registerNewSessionSubId(str, webSocketSessionRef, timeseriesSubscriptionCmd.getCmdId())).tenantId(webSocketSessionRef.getSecurityCtx().getTenantId()).entityId(entityId).updateProcessor((tbSubscription, telemetrySubscriptionUpdate) -> {
            lock.lock();
            try {
                sendUpdate(tbSubscription.getSessionId(), timeseriesSubscriptionCmd.getCmdId(), telemetrySubscriptionUpdate);
                lock.unlock();
            } catch (Throwable th) {
                lock.unlock();
                throw th;
            }
        }).queryTs(j).allKeys(z).keyStates(map).latestValues("LATEST_TELEMETRY".equals(timeseriesSubscriptionCmd.getScope())).build();
    }

    private FutureCallback<List<TsKvEntry>> getSubscriptionCallback(final WebSocketSessionRef webSocketSessionRef, final TimeseriesSubscriptionCmd timeseriesSubscriptionCmd, final String str, final EntityId entityId, final long j, final long j2, final List<String> list) {
        return new FutureCallback<List<TsKvEntry>>() { // from class: org.thingsboard.server.service.ws.DefaultWebSocketService.5
            public void onSuccess(List<TsKvEntry> list2) {
                HashMap hashMap = new HashMap(list.size());
                List list3 = list;
                long j3 = j2;
                list3.forEach(str2 -> {
                    hashMap.put(str2, Long.valueOf(j3));
                });
                list2.forEach(tsKvEntry -> {
                    hashMap.put(tsKvEntry.getKey(), Long.valueOf(tsKvEntry.getTs()));
                });
                ReentrantLock reentrantLock = new ReentrantLock();
                TbTimeSeriesSubscription tsSubscription = DefaultWebSocketService.this.getTsSubscription(hashMap, reentrantLock, str, webSocketSessionRef, timeseriesSubscriptionCmd, entityId, j, false);
                reentrantLock.lock();
                try {
                    DefaultWebSocketService.this.oldSubService.addSubscription(tsSubscription, webSocketSessionRef);
                    DefaultWebSocketService.this.sendUpdate(webSocketSessionRef, new TelemetrySubscriptionUpdate(timeseriesSubscriptionCmd.getCmdId(), list2));
                    reentrantLock.unlock();
                } catch (Throwable th) {
                    reentrantLock.unlock();
                    throw th;
                }
            }

            public void onFailure(Throwable th) {
                if ((th instanceof TenantRateLimitException) || (th.getCause() instanceof TenantRateLimitException)) {
                    DefaultWebSocketService.log.trace("[{}] Tenant rate limit detected for subscription: [{}]:{}", new Object[]{webSocketSessionRef.getSecurityCtx().getTenantId(), entityId, timeseriesSubscriptionCmd});
                } else {
                    DefaultWebSocketService.log.info(DefaultWebSocketService.FAILED_TO_FETCH_DATA, th);
                }
                DefaultWebSocketService.this.sendError(webSocketSessionRef, timeseriesSubscriptionCmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, DefaultWebSocketService.FAILED_TO_FETCH_DATA);
            }
        };
    }

    private void unsubscribe(WebSocketSessionRef webSocketSessionRef, SubscriptionCmd subscriptionCmd, String str) {
        TenantId tenantId = webSocketSessionRef.getSecurityCtx().getTenantId();
        if (subscriptionCmd.getEntityId() == null || subscriptionCmd.getEntityId().isEmpty()) {
            log.warn("[{}][{}][{}] Cleanup session due to empty entity id.", new Object[]{tenantId, str, Integer.valueOf(subscriptionCmd.getCmdId())});
            cleanupSessionById(tenantId, str);
            return;
        }
        Integer remove = this.sessionCmdMap.getOrDefault(str, Collections.emptyMap()).remove(Integer.valueOf(subscriptionCmd.getCmdId()));
        if (remove == null) {
            log.trace("[{}][{}][{}] Failed to lookup subscription id mapping", new Object[]{tenantId, str, Integer.valueOf(subscriptionCmd.getCmdId())});
            remove = Integer.valueOf(subscriptionCmd.getCmdId());
        }
        this.oldSubService.cancelSubscription(tenantId, str, remove.intValue());
    }

    private void cleanupSessionById(TenantId tenantId, String str) {
        this.wsSessionsMap.remove(str);
        this.oldSubService.cancelAllSessionSubscriptions(tenantId, str);
        this.sessionCmdMap.remove(str);
        this.entityDataSubService.cancelAllSessionSubscriptions(str);
    }

    private boolean validateSubscriptionCmd(WebSocketSessionRef webSocketSessionRef, EntityDataCmd entityDataCmd) {
        return validateCmd(webSocketSessionRef, entityDataCmd, () -> {
            if (entityDataCmd.getQuery() == null && !entityDataCmd.hasAnyCmd()) {
                throw new IllegalArgumentException("Query is empty!");
            }
        });
    }

    private boolean validateSubscriptionCmd(WebSocketSessionRef webSocketSessionRef, EntityCountCmd entityCountCmd) {
        return validateCmd(webSocketSessionRef, entityCountCmd, () -> {
            if (entityCountCmd.getQuery() == null) {
                throw new IllegalArgumentException("Query is empty!");
            }
        });
    }

    private boolean validateSubscriptionCmd(WebSocketSessionRef webSocketSessionRef, AlarmDataCmd alarmDataCmd) {
        return validateCmd(webSocketSessionRef, alarmDataCmd, () -> {
            if (alarmDataCmd.getQuery() == null) {
                throw new IllegalArgumentException("Query is empty!");
            }
        });
    }

    private boolean validateSubscriptionCmd(WebSocketSessionRef webSocketSessionRef, SubscriptionCmd subscriptionCmd) {
        return validateCmd(webSocketSessionRef, subscriptionCmd, () -> {
            if (subscriptionCmd.getEntityId() == null || subscriptionCmd.getEntityId().isEmpty()) {
                throw new IllegalArgumentException("Device id is empty!");
            }
        });
    }

    private boolean validateSessionMetadata(WebSocketSessionRef webSocketSessionRef, int i, String str) {
        if (this.wsSessionsMap.get(str) != null) {
            return true;
        }
        log.warn("[{}] Session meta data not found. ", str);
        sendError(webSocketSessionRef, i, SubscriptionErrorCode.INTERNAL_ERROR, SESSION_META_DATA_NOT_FOUND);
        return false;
    }

    private boolean validateCmd(WebSocketSessionRef webSocketSessionRef, WsCmd wsCmd) {
        return validateCmd(webSocketSessionRef, wsCmd, null);
    }

    private <C extends WsCmd> boolean validateCmd(WebSocketSessionRef webSocketSessionRef, C c, Runnable runnable) {
        if (c.getCmdId() < 0) {
            sendError(webSocketSessionRef, c.getCmdId(), SubscriptionErrorCode.BAD_REQUEST, "Cmd id is negative value!");
            return false;
        }
        if (runnable != null) {
            try {
                runnable.run();
            } catch (Exception e) {
                sendError(webSocketSessionRef, c.getCmdId(), SubscriptionErrorCode.BAD_REQUEST, e.getMessage());
                return false;
            }
        }
        return true;
    }

    private void sendUpdate(WebSocketSessionRef webSocketSessionRef, EntityDataUpdate entityDataUpdate) {
        sendUpdate(webSocketSessionRef, entityDataUpdate.getCmdId(), entityDataUpdate);
    }

    private void sendUpdate(WebSocketSessionRef webSocketSessionRef, TelemetrySubscriptionUpdate telemetrySubscriptionUpdate) {
        sendUpdate(webSocketSessionRef, telemetrySubscriptionUpdate.getSubscriptionId(), telemetrySubscriptionUpdate);
    }

    private void sendUpdate(WebSocketSessionRef webSocketSessionRef, int i, Object obj) {
        try {
            String writeValueAsString = JacksonUtil.OBJECT_MAPPER.writeValueAsString(obj);
            this.executor.submit(() -> {
                try {
                    this.msgEndpoint.send(webSocketSessionRef, i, writeValueAsString);
                } catch (IOException e) {
                    log.warn("[{}] Failed to send reply: {}", new Object[]{webSocketSessionRef.getSessionId(), obj, e});
                }
            });
        } catch (JsonProcessingException e) {
            log.warn("[{}] Failed to encode reply: {}", new Object[]{webSocketSessionRef.getSessionId(), obj, e});
        }
    }

    private void sendPing() {
        long currentTimeMillis = System.currentTimeMillis();
        this.wsSessionsMap.values().forEach(wsSessionMetaData -> {
            this.executor.submit(() -> {
                try {
                    this.msgEndpoint.sendPing(wsSessionMetaData.getSessionRef(), currentTimeMillis);
                } catch (IOException e) {
                    log.warn("[{}] Failed to send ping:", wsSessionMetaData.getSessionRef().getSessionId(), e);
                }
            });
        });
    }

    private static Optional<Set<String>> getKeys(TelemetryPluginCmd telemetryPluginCmd) {
        if (StringUtils.isEmpty(telemetryPluginCmd.getKeys())) {
            return Optional.empty();
        }
        HashSet hashSet = new HashSet();
        Collections.addAll(hashSet, telemetryPluginCmd.getKeys().split(","));
        return Optional.of(hashSet);
    }

    private ListenableFuture<List<AttributeKvEntry>> mergeAllAttributesFutures(List<ListenableFuture<List<AttributeKvEntry>>> list) {
        return Futures.transform(Futures.successfulAsList(list), list2 -> {
            ArrayList arrayList = new ArrayList();
            if (list2 != null) {
                Objects.requireNonNull(arrayList);
                list2.forEach((v1) -> {
                    r1.addAll(v1);
                });
            }
            return arrayList;
        }, this.executor);
    }

    private <T> FutureCallback<ValidationResult> getAttributesFetchCallback(final TenantId tenantId, final EntityId entityId, final List<String> list, final FutureCallback<List<AttributeKvEntry>> futureCallback) {
        return new FutureCallback<ValidationResult>() { // from class: org.thingsboard.server.service.ws.DefaultWebSocketService.6
            public void onSuccess(@Nullable ValidationResult validationResult) {
                ArrayList arrayList = new ArrayList();
                AttributeScope[] values = AttributeScope.values();
                int length = values.length;
                for (int i = DefaultWebSocketService.UNKNOWN_SUBSCRIPTION_ID; i < length; i++) {
                    arrayList.add(DefaultWebSocketService.this.attributesService.find(tenantId, entityId, values[i], list));
                }
                Futures.addCallback(DefaultWebSocketService.this.mergeAllAttributesFutures(arrayList), futureCallback, MoreExecutors.directExecutor());
            }

            public void onFailure(Throwable th) {
                futureCallback.onFailure(th);
            }
        };
    }

    private <T> FutureCallback<ValidationResult> getAttributesFetchCallback(final TenantId tenantId, final EntityId entityId, final String str, final List<String> list, final FutureCallback<List<AttributeKvEntry>> futureCallback) {
        return new FutureCallback<ValidationResult>() { // from class: org.thingsboard.server.service.ws.DefaultWebSocketService.7
            public void onSuccess(@Nullable ValidationResult validationResult) {
                Futures.addCallback(DefaultWebSocketService.this.attributesService.find(tenantId, entityId, AttributeScope.valueOf(str), list), futureCallback, MoreExecutors.directExecutor());
            }

            public void onFailure(Throwable th) {
                futureCallback.onFailure(th);
            }
        };
    }

    private <T> FutureCallback<ValidationResult> getAttributesFetchCallback(final TenantId tenantId, final EntityId entityId, final FutureCallback<List<AttributeKvEntry>> futureCallback) {
        return new FutureCallback<ValidationResult>() { // from class: org.thingsboard.server.service.ws.DefaultWebSocketService.8
            public void onSuccess(@Nullable ValidationResult validationResult) {
                ArrayList arrayList = new ArrayList();
                AttributeScope[] values = AttributeScope.values();
                int length = values.length;
                for (int i = DefaultWebSocketService.UNKNOWN_SUBSCRIPTION_ID; i < length; i++) {
                    arrayList.add(DefaultWebSocketService.this.attributesService.findAll(tenantId, entityId, values[i]));
                }
                Futures.addCallback(DefaultWebSocketService.this.mergeAllAttributesFutures(arrayList), futureCallback, MoreExecutors.directExecutor());
            }

            public void onFailure(Throwable th) {
                futureCallback.onFailure(th);
            }
        };
    }

    private <T> FutureCallback<ValidationResult> getAttributesFetchCallback(final TenantId tenantId, final EntityId entityId, final String str, final FutureCallback<List<AttributeKvEntry>> futureCallback) {
        return new FutureCallback<ValidationResult>() { // from class: org.thingsboard.server.service.ws.DefaultWebSocketService.9
            public void onSuccess(@Nullable ValidationResult validationResult) {
                Futures.addCallback(DefaultWebSocketService.this.attributesService.findAll(tenantId, entityId, AttributeScope.valueOf(str)), futureCallback, MoreExecutors.directExecutor());
            }

            public void onFailure(Throwable th) {
                futureCallback.onFailure(th);
            }
        };
    }

    private FutureCallback<ValidationResult> on(final Consumer<Void> consumer, final Consumer<Throwable> consumer2) {
        return new FutureCallback<ValidationResult>() { // from class: org.thingsboard.server.service.ws.DefaultWebSocketService.10
            public void onSuccess(@Nullable ValidationResult validationResult) {
                if (validationResult.getResultCode() == ValidationResultCode.OK) {
                    consumer.accept(null);
                } else {
                    onFailure(ValidationCallback.getException(validationResult));
                }
            }

            public void onFailure(Throwable th) {
                consumer2.accept(th);
            }
        };
    }

    public static Aggregation getAggregation(String str) {
        return StringUtils.isEmpty(str) ? DEFAULT_AGGREGATION : Aggregation.valueOf(str);
    }

    private int getLimit(int i) {
        return i == 0 ? DEFAULT_LIMIT : i;
    }

    private DefaultTenantProfileConfiguration getTenantProfileConfiguration(WebSocketSessionRef webSocketSessionRef) {
        return (DefaultTenantProfileConfiguration) Optional.ofNullable(this.tenantProfileCache.get(webSocketSessionRef.getSecurityCtx().getTenantId())).map((v0) -> {
            return v0.getDefaultProfileConfiguration();
        }).orElse(null);
    }

    public static <C extends WsCmd> WsCmdHandler<C> newCmdHandler(BiConsumer<WebSocketSessionRef, C> biConsumer) {
        return new WsCmdHandler<>(biConsumer);
    }

    @ConstructorProperties({"oldSubService", "entityDataSubService", "notificationCmdsHandler", "msgEndpoint", "accessValidator", "attributesService", "tsService", "serviceInfoProvider", "tenantProfileCache"})
    public DefaultWebSocketService(TbLocalSubscriptionService tbLocalSubscriptionService, TbEntityDataSubscriptionService tbEntityDataSubscriptionService, NotificationCommandsHandler notificationCommandsHandler, WebSocketMsgEndpoint webSocketMsgEndpoint, AccessValidator accessValidator, AttributesService attributesService, TimeseriesService timeseriesService, TbServiceInfoProvider tbServiceInfoProvider, TbTenantProfileCache tbTenantProfileCache) {
        this.oldSubService = tbLocalSubscriptionService;
        this.entityDataSubService = tbEntityDataSubscriptionService;
        this.notificationCmdsHandler = notificationCommandsHandler;
        this.msgEndpoint = webSocketMsgEndpoint;
        this.accessValidator = accessValidator;
        this.attributesService = attributesService;
        this.tsService = timeseriesService;
        this.serviceInfoProvider = tbServiceInfoProvider;
        this.tenantProfileCache = tbTenantProfileCache;
    }
}
