/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.service.ws;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import jakarta.annotation.Nullable;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.beans.ConstructorProperties;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.web.socket.CloseStatus;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.exception.RateLimitExceededException;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.exception.UnauthorizedException;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.security.AccessValidator;
import org.thingsboard.server.service.security.ValidationCallback;
import org.thingsboard.server.service.security.ValidationResult;
import org.thingsboard.server.service.security.ValidationResultCode;
import org.thingsboard.server.service.security.model.UserPrincipal;
import org.thingsboard.server.service.security.permission.Operation;
import org.thingsboard.server.service.subscription.SubscriptionErrorCode;
import org.thingsboard.server.service.subscription.TbAttributeSubscription;
import org.thingsboard.server.service.subscription.TbAttributeSubscriptionScope;
import org.thingsboard.server.service.subscription.TbEntityDataSubscriptionService;
import org.thingsboard.server.service.subscription.TbLocalSubscriptionService;
import org.thingsboard.server.service.subscription.TbTimeSeriesSubscription;
import org.thingsboard.server.service.ws.SessionEvent;
import org.thingsboard.server.service.ws.WebSocketMsgEndpoint;
import org.thingsboard.server.service.ws.WebSocketService;
import org.thingsboard.server.service.ws.WebSocketSessionRef;
import org.thingsboard.server.service.ws.WsCmd;
import org.thingsboard.server.service.ws.WsCmdType;
import org.thingsboard.server.service.ws.WsCommandsWrapper;
import org.thingsboard.server.service.ws.WsSessionMetaData;
import org.thingsboard.server.service.ws.notification.NotificationCommandsHandler;
import org.thingsboard.server.service.ws.telemetry.cmd.v1.AttributesSubscriptionCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v1.GetHistoryCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v1.SubscriptionCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v1.TelemetryPluginCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v1.TimeseriesSubscriptionCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmDataCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmStatusCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.CmdUpdate;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityCountCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUpdate;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.UnsubscribeCmd;
import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate;

@Service
@TbCoreComponent
public class DefaultWebSocketService
implements WebSocketService {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(DefaultWebSocketService.class);
    public static final int NUMBER_OF_PING_ATTEMPTS = 3;
    private static final int DEFAULT_LIMIT = 100;
    private static final Aggregation DEFAULT_AGGREGATION = Aggregation.NONE;
    private static final int UNKNOWN_SUBSCRIPTION_ID = 0;
    private static final String PROCESSING_MSG = "[{}] Processing: {}";
    private static final String FAILED_TO_FETCH_DATA = "Failed to fetch data!";
    private static final String FAILED_TO_FETCH_ATTRIBUTES = "Failed to fetch attributes!";
    private static final String SESSION_META_DATA_NOT_FOUND = "Session meta-data not found!";
    private final ConcurrentMap<String, WsSessionMetaData> wsSessionsMap = new ConcurrentHashMap<String, WsSessionMetaData>();
    private final TbLocalSubscriptionService oldSubService;
    private final TbEntityDataSubscriptionService entityDataSubService;
    private final NotificationCommandsHandler notificationCmdsHandler;
    private final WebSocketMsgEndpoint msgEndpoint;
    private final AccessValidator accessValidator;
    private final AttributesService attributesService;
    private final TimeseriesService tsService;
    private final TbServiceInfoProvider serviceInfoProvider;
    private final TbTenantProfileCache tenantProfileCache;
    @Value(value="${server.ws.ping_timeout:30000}")
    private long pingTimeout;
    private final ConcurrentMap<TenantId, Set<String>> tenantSubscriptionsMap = new ConcurrentHashMap<TenantId, Set<String>>();
    private final ConcurrentMap<CustomerId, Set<String>> customerSubscriptionsMap = new ConcurrentHashMap<CustomerId, Set<String>>();
    private final ConcurrentMap<UserId, Set<String>> regularUserSubscriptionsMap = new ConcurrentHashMap<UserId, Set<String>>();
    private final ConcurrentMap<UserId, Set<String>> publicUserSubscriptionsMap = new ConcurrentHashMap<UserId, Set<String>>();
    private final ConcurrentMap<String, Map<Integer, Integer>> sessionCmdMap = new ConcurrentHashMap<String, Map<Integer, Integer>>();
    private ExecutorService executor;
    private ScheduledExecutorService pingExecutor;
    private String serviceId;
    private Map<WsCmdType, WsCmdHandler<? extends WsCmd>> cmdsHandlers;

    @PostConstruct
    public void init() {
        this.serviceId = this.serviceInfoProvider.getServiceId();
        this.executor = ThingsBoardExecutors.newWorkStealingPool((int)50, this.getClass());
        this.pingExecutor = ThingsBoardExecutors.newSingleThreadScheduledExecutor((String)"telemetry-web-socket-ping");
        this.pingExecutor.scheduleWithFixedDelay(this::sendPing, this.pingTimeout / 3L, this.pingTimeout / 3L, TimeUnit.MILLISECONDS);
        this.cmdsHandlers = new EnumMap<WsCmdType, WsCmdHandler<? extends WsCmd>>(WsCmdType.class);
        this.cmdsHandlers.put(WsCmdType.ATTRIBUTES, DefaultWebSocketService.newCmdHandler(this::handleWsAttributesSubscriptionCmd));
        this.cmdsHandlers.put(WsCmdType.TIMESERIES, DefaultWebSocketService.newCmdHandler(this::handleWsTimeseriesSubscriptionCmd));
        this.cmdsHandlers.put(WsCmdType.TIMESERIES_HISTORY, DefaultWebSocketService.newCmdHandler(this::handleWsHistoryCmd));
        this.cmdsHandlers.put(WsCmdType.ENTITY_DATA, DefaultWebSocketService.newCmdHandler(this::handleWsEntityDataCmd));
        this.cmdsHandlers.put(WsCmdType.ALARM_DATA, DefaultWebSocketService.newCmdHandler(this::handleWsAlarmDataCmd));
        this.cmdsHandlers.put(WsCmdType.ENTITY_COUNT, DefaultWebSocketService.newCmdHandler(this::handleWsEntityCountCmd));
        this.cmdsHandlers.put(WsCmdType.ALARM_COUNT, DefaultWebSocketService.newCmdHandler(this::handleWsAlarmCountCmd));
        this.cmdsHandlers.put(WsCmdType.ALARM_STATUS, DefaultWebSocketService.newCmdHandler(this::handleWsAlarmsStatusCmd));
        this.cmdsHandlers.put(WsCmdType.ENTITY_DATA_UNSUBSCRIBE, DefaultWebSocketService.newCmdHandler(this::handleWsDataUnsubscribeCmd));
        this.cmdsHandlers.put(WsCmdType.ALARM_DATA_UNSUBSCRIBE, DefaultWebSocketService.newCmdHandler(this::handleWsDataUnsubscribeCmd));
        this.cmdsHandlers.put(WsCmdType.ENTITY_COUNT_UNSUBSCRIBE, DefaultWebSocketService.newCmdHandler(this::handleWsDataUnsubscribeCmd));
        this.cmdsHandlers.put(WsCmdType.ALARM_COUNT_UNSUBSCRIBE, DefaultWebSocketService.newCmdHandler(this::handleWsDataUnsubscribeCmd));
        this.cmdsHandlers.put(WsCmdType.ALARM_STATUS_UNSUBSCRIBE, DefaultWebSocketService.newCmdHandler(this::handleWsDataUnsubscribeCmd));
        this.cmdsHandlers.put(WsCmdType.NOTIFICATIONS, DefaultWebSocketService.newCmdHandler(this.notificationCmdsHandler::handleUnreadNotificationsSubCmd));
        this.cmdsHandlers.put(WsCmdType.NOTIFICATIONS_COUNT, DefaultWebSocketService.newCmdHandler(this.notificationCmdsHandler::handleUnreadNotificationsCountSubCmd));
        this.cmdsHandlers.put(WsCmdType.MARK_NOTIFICATIONS_AS_READ, DefaultWebSocketService.newCmdHandler(this.notificationCmdsHandler::handleMarkAsReadCmd));
        this.cmdsHandlers.put(WsCmdType.MARK_ALL_NOTIFICATIONS_AS_READ, DefaultWebSocketService.newCmdHandler(this.notificationCmdsHandler::handleMarkAllAsReadCmd));
        this.cmdsHandlers.put(WsCmdType.NOTIFICATIONS_UNSUBSCRIBE, DefaultWebSocketService.newCmdHandler(this.notificationCmdsHandler::handleUnsubCmd));
    }

    @PreDestroy
    public void shutdownExecutor() {
        if (this.pingExecutor != null) {
            this.pingExecutor.shutdownNow();
        }
        if (this.executor != null) {
            this.executor.shutdownNow();
        }
    }

    @Override
    public void handleSessionEvent(WebSocketSessionRef sessionRef, SessionEvent event) {
        String sessionId = sessionRef.getSessionId();
        TenantId tenantId = sessionRef.getSecurityCtx().getTenantId();
        log.debug(PROCESSING_MSG, (Object)sessionId, (Object)event);
        switch (event.getEventType()) {
            case ESTABLISHED: {
                this.wsSessionsMap.put(sessionId, new WsSessionMetaData(sessionRef));
                break;
            }
            case ERROR: {
                log.debug("[{}][{}] Unknown websocket session error: ", new Object[]{tenantId, sessionId, event.getError().orElse(new RuntimeException("No error specified"))});
                break;
            }
            case CLOSED: {
                this.cleanupSessionById(tenantId, sessionId);
                this.processSessionClose(sessionRef);
            }
        }
    }

    @Override
    public void handleCommands(WebSocketSessionRef sessionRef, WsCommandsWrapper commandsWrapper) {
        if (commandsWrapper == null || CollectionUtils.isEmpty(commandsWrapper.getCmds())) {
            return;
        }
        String sessionId = sessionRef.getSessionId();
        if (!this.validateSessionMetadata(sessionRef, 0, sessionId)) {
            return;
        }
        for (WsCmd cmd : commandsWrapper.getCmds()) {
            log.debug("[{}][{}][{}] Processing cmd: {}", new Object[]{sessionId, cmd.getType(), cmd.getCmdId(), cmd});
            try {
                Optional.ofNullable(this.cmdsHandlers.get((Object)cmd.getType())).ifPresent(cmdHandler -> cmdHandler.handle(sessionRef, cmd));
            }
            catch (TbRateLimitsException e) {
                log.debug("{} Failed to handle WS cmd: {}", new Object[]{sessionRef, cmd, e});
            }
            catch (Exception e) {
                this.sendError(sessionRef, cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, e.getMessage());
                log.error("{} Failed to handle WS cmd: {}", new Object[]{sessionRef, cmd, e});
            }
        }
    }

    private void handleWsEntityDataCmd(WebSocketSessionRef sessionRef, EntityDataCmd cmd) {
        if (this.validateSubscriptionCmd(sessionRef, cmd)) {
            this.entityDataSubService.handleCmd(sessionRef, cmd);
        }
    }

    private void handleWsEntityCountCmd(WebSocketSessionRef sessionRef, EntityCountCmd cmd) {
        if (this.validateSubscriptionCmd(sessionRef, cmd)) {
            this.entityDataSubService.handleCmd(sessionRef, cmd);
        }
    }

    private void handleWsAlarmDataCmd(WebSocketSessionRef sessionRef, AlarmDataCmd cmd) {
        if (this.validateSubscriptionCmd(sessionRef, cmd)) {
            this.entityDataSubService.handleCmd(sessionRef, cmd);
        }
    }

    private void handleWsDataUnsubscribeCmd(WebSocketSessionRef sessionRef, UnsubscribeCmd cmd) {
        this.entityDataSubService.cancelSubscription(sessionRef.getSessionId(), cmd);
    }

    private void handleWsAlarmCountCmd(WebSocketSessionRef sessionRef, AlarmCountCmd cmd) {
        if (this.validateCmd(sessionRef, cmd)) {
            this.entityDataSubService.handleCmd(sessionRef, cmd);
        }
    }

    private void handleWsAlarmsStatusCmd(WebSocketSessionRef sessionRef, AlarmStatusCmd cmd) {
        if (this.validateCmd(sessionRef, cmd)) {
            this.entityDataSubService.handleCmd(sessionRef, cmd);
        }
    }

    @Override
    public void sendUpdate(String sessionId, int cmdId, TelemetrySubscriptionUpdate update) {
        this.doSendUpdate(sessionId, cmdId, update.withSubscriptionId(cmdId));
    }

    @Override
    public void sendUpdate(String sessionId, CmdUpdate update) {
        this.doSendUpdate(sessionId, update.getCmdId(), update);
    }

    @Override
    public void sendError(WebSocketSessionRef sessionRef, int subId, SubscriptionErrorCode errorCode, String errorMsg) {
        TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(subId, errorCode, errorMsg);
        this.sendUpdate(sessionRef, update);
    }

    private <T> void doSendUpdate(String sessionId, int cmdId, T update) {
        WsSessionMetaData md = (WsSessionMetaData)this.wsSessionsMap.get(sessionId);
        if (md != null) {
            this.sendUpdate(md.getSessionRef(), cmdId, update);
        }
    }

    @Override
    public void close(String sessionId, CloseStatus status) {
        WsSessionMetaData md = (WsSessionMetaData)this.wsSessionsMap.get(sessionId);
        if (md != null) {
            try {
                this.msgEndpoint.close(md.getSessionRef(), status);
            }
            catch (IOException e) {
                log.warn("[{}] Failed to send session close", (Object)sessionId, (Object)e);
            }
        }
    }

    @Override
    public void cleanupIfStale(TenantId tenantId, String sessionId) {
        if (!this.msgEndpoint.isOpen(sessionId)) {
            log.info("[{}] Cleaning up stale session ", (Object)sessionId);
            this.cleanupSessionById(tenantId, sessionId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processSessionClose(WebSocketSessionRef sessionRef) {
        DefaultTenantProfileConfiguration tenantProfileConfiguration = this.getTenantProfileConfiguration(sessionRef);
        if (tenantProfileConfiguration != null) {
            Set set;
            String sessionId = "[" + sessionRef.getSessionId() + "]";
            if (tenantProfileConfiguration.getMaxWsSubscriptionsPerTenant() > 0L) {
                Set tenantSubscriptions;
                set = tenantSubscriptions = this.tenantSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getTenantId(), id -> ConcurrentHashMap.newKeySet());
                synchronized (set) {
                    tenantSubscriptions.removeIf(subId -> subId.startsWith(sessionId));
                }
            }
            if (sessionRef.getSecurityCtx().isCustomerUser()) {
                if (tenantProfileConfiguration.getMaxWsSubscriptionsPerCustomer() > 0L) {
                    Set customerSessions;
                    set = customerSessions = this.customerSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getCustomerId(), id -> ConcurrentHashMap.newKeySet());
                    synchronized (set) {
                        customerSessions.removeIf(subId -> subId.startsWith(sessionId));
                    }
                }
                if (tenantProfileConfiguration.getMaxWsSubscriptionsPerRegularUser() > 0L && UserPrincipal.Type.USER_NAME.equals((Object)sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
                    Set regularUserSessions;
                    set = regularUserSessions = this.regularUserSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet());
                    synchronized (set) {
                        regularUserSessions.removeIf(subId -> subId.startsWith(sessionId));
                    }
                }
                if (tenantProfileConfiguration.getMaxWsSubscriptionsPerPublicUser() > 0L && UserPrincipal.Type.PUBLIC_ID.equals((Object)sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
                    Set publicUserSessions;
                    set = publicUserSessions = this.publicUserSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet());
                    synchronized (set) {
                        publicUserSessions.removeIf(subId -> subId.startsWith(sessionId));
                    }
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean processSubscription(WebSocketSessionRef sessionRef, SubscriptionCmd cmd) {
        block28: {
            DefaultTenantProfileConfiguration tenantProfileConfiguration = this.getTenantProfileConfiguration(sessionRef);
            if (tenantProfileConfiguration == null) {
                return true;
            }
            String subId = "[" + sessionRef.getSessionId() + "]:[" + cmd.getCmdId() + "]";
            try {
                Set publicUserSessions;
                Set set;
                if (tenantProfileConfiguration.getMaxWsSubscriptionsPerTenant() > 0L) {
                    Set tenantSubscriptions;
                    set = tenantSubscriptions = this.tenantSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getTenantId(), id -> ConcurrentHashMap.newKeySet());
                    synchronized (set) {
                        if (cmd.isUnsubscribe()) {
                            tenantSubscriptions.remove(subId);
                        } else if ((long)tenantSubscriptions.size() < tenantProfileConfiguration.getMaxWsSubscriptionsPerTenant()) {
                            tenantSubscriptions.add(subId);
                        } else {
                            log.info("[{}][{}][{}] Failed to start subscription. Max tenant subscriptions limit reached", new Object[]{sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), subId});
                            this.msgEndpoint.close(sessionRef, CloseStatus.POLICY_VIOLATION.withReason("Max tenant subscriptions limit reached!"));
                            return false;
                        }
                    }
                }
                if (!sessionRef.getSecurityCtx().isCustomerUser()) break block28;
                if (tenantProfileConfiguration.getMaxWsSubscriptionsPerCustomer() > 0L) {
                    Set customerSessions;
                    set = customerSessions = this.customerSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getCustomerId(), id -> ConcurrentHashMap.newKeySet());
                    synchronized (set) {
                        if (cmd.isUnsubscribe()) {
                            customerSessions.remove(subId);
                        } else if ((long)customerSessions.size() < tenantProfileConfiguration.getMaxWsSubscriptionsPerCustomer()) {
                            customerSessions.add(subId);
                        } else {
                            log.info("[{}][{}][{}] Failed to start subscription. Max customer subscriptions limit reached", new Object[]{sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), subId});
                            this.msgEndpoint.close(sessionRef, CloseStatus.POLICY_VIOLATION.withReason("Max customer subscriptions limit reached"));
                            return false;
                        }
                    }
                }
                if (tenantProfileConfiguration.getMaxWsSubscriptionsPerRegularUser() > 0L && UserPrincipal.Type.USER_NAME.equals((Object)sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
                    Set regularUserSessions;
                    set = regularUserSessions = this.regularUserSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet());
                    synchronized (set) {
                        if ((long)regularUserSessions.size() >= tenantProfileConfiguration.getMaxWsSubscriptionsPerRegularUser()) {
                            log.info("[{}][{}][{}] Failed to start subscription. Max regular user subscriptions limit reached", new Object[]{sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), subId});
                            this.msgEndpoint.close(sessionRef, CloseStatus.POLICY_VIOLATION.withReason("Max regular user subscriptions limit reached"));
                            return false;
                        }
                        regularUserSessions.add(subId);
                    }
                }
                if (tenantProfileConfiguration.getMaxWsSubscriptionsPerPublicUser() <= 0L || !UserPrincipal.Type.PUBLIC_ID.equals((Object)sessionRef.getSecurityCtx().getUserPrincipal().getType())) break block28;
                set = publicUserSessions = this.publicUserSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet());
                synchronized (set) {
                    if ((long)publicUserSessions.size() >= tenantProfileConfiguration.getMaxWsSubscriptionsPerPublicUser()) {
                        log.info("[{}][{}][{}] Failed to start subscription. Max public user subscriptions limit reached", new Object[]{sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), subId});
                        this.msgEndpoint.close(sessionRef, CloseStatus.POLICY_VIOLATION.withReason("Max public user subscriptions limit reached"));
                        return false;
                    }
                    publicUserSessions.add(subId);
                }
            }
            catch (IOException e) {
                log.warn("[{}] Failed to send session close:", (Object)sessionRef.getSessionId(), (Object)e);
                return false;
            }
        }
        return true;
    }

    private void handleWsAttributesSubscriptionCmd(WebSocketSessionRef sessionRef, AttributesSubscriptionCmd cmd) {
        if (!this.processSubscription(sessionRef, cmd)) {
            return;
        }
        String sessionId = sessionRef.getSessionId();
        if (cmd.isUnsubscribe()) {
            this.unsubscribe(sessionRef, cmd, sessionId);
        } else if (this.validateSubscriptionCmd(sessionRef, cmd)) {
            EntityId entityId = EntityIdFactory.getByTypeAndId((String)cmd.getEntityType(), (String)cmd.getEntityId());
            log.debug("[{}] fetching latest attributes ({}) values for device: {}", new Object[]{sessionId, cmd.getKeys(), entityId});
            Optional<Set<String>> keysOptional = DefaultWebSocketService.getKeys(cmd);
            if (keysOptional.isPresent()) {
                ArrayList<String> keys = new ArrayList<String>((Collection)keysOptional.get());
                this.handleWsAttributesSubscriptionByKeys(sessionRef, cmd, sessionId, entityId, keys);
            } else {
                this.handleWsAttributesSubscription(sessionRef, cmd, sessionId, entityId);
            }
        }
    }

    private void handleWsAttributesSubscriptionByKeys(final WebSocketSessionRef sessionRef, final AttributesSubscriptionCmd cmd, final String sessionId, final EntityId entityId, final List<String> keys) {
        final long queryTs = System.currentTimeMillis();
        FutureCallback<List<AttributeKvEntry>> callback = new FutureCallback<List<AttributeKvEntry>>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void onSuccess(List<AttributeKvEntry> data) {
                List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), (KvEntry)d)).collect(Collectors.toList());
                HashMap<String, Long> subState = new HashMap<String, Long>(keys.size());
                keys.forEach(key -> subState.put((String)key, 0L));
                attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
                TbAttributeSubscriptionScope scope = StringUtils.isEmpty((String)cmd.getScope()) ? TbAttributeSubscriptionScope.ANY_SCOPE : TbAttributeSubscriptionScope.valueOf(cmd.getScope());
                ReentrantLock subLock = new ReentrantLock();
                TbAttributeSubscription sub = TbAttributeSubscription.builder().serviceId(DefaultWebSocketService.this.serviceId).sessionId(sessionId).subscriptionId(DefaultWebSocketService.this.registerNewSessionSubId(sessionId, sessionRef, cmd.getCmdId())).tenantId(sessionRef.getSecurityCtx().getTenantId()).entityId(entityId).queryTs(queryTs).allKeys(false).keyStates(subState).scope(scope).updateProcessor((subscription, update) -> {
                    subLock.lock();
                    try {
                        DefaultWebSocketService.this.sendUpdate(subscription.getSessionId(), cmd.getCmdId(), (TelemetrySubscriptionUpdate)update);
                    }
                    finally {
                        subLock.unlock();
                    }
                }).build();
                subLock.lock();
                try {
                    DefaultWebSocketService.this.oldSubService.addSubscription(sub, sessionRef);
                    DefaultWebSocketService.this.sendUpdate(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), attributesData));
                }
                finally {
                    subLock.unlock();
                }
            }

            public void onFailure(Throwable e) {
                log.error(DefaultWebSocketService.FAILED_TO_FETCH_ATTRIBUTES, e);
                TelemetrySubscriptionUpdate update = e instanceof UnauthorizedException ? new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED, SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg()) : new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, DefaultWebSocketService.FAILED_TO_FETCH_ATTRIBUTES);
                DefaultWebSocketService.this.sendUpdate(sessionRef, update);
            }
        };
        if (StringUtils.isEmpty((String)cmd.getScope())) {
            this.accessValidator.validate(sessionRef.getSecurityCtx(), Operation.READ_ATTRIBUTES, entityId, this.getAttributesFetchCallback(sessionRef.getSecurityCtx().getTenantId(), entityId, keys, callback));
        } else {
            this.accessValidator.validate(sessionRef.getSecurityCtx(), Operation.READ_ATTRIBUTES, entityId, this.getAttributesFetchCallback(sessionRef.getSecurityCtx().getTenantId(), entityId, cmd.getScope(), keys, callback));
        }
    }

    private int registerNewSessionSubId(String sessionId, WebSocketSessionRef sessionRef, int cmdId) {
        Map cmdMap = this.sessionCmdMap.computeIfAbsent(sessionId, id -> new ConcurrentHashMap());
        int subId = sessionRef.getSessionSubIdSeq().incrementAndGet();
        cmdMap.put(cmdId, subId);
        return subId;
    }

    private void handleWsHistoryCmd(final WebSocketSessionRef sessionRef, final GetHistoryCmd cmd) {
        if (!this.validateCmd(sessionRef, cmd, () -> {
            if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty() || cmd.getEntityType() == null || cmd.getEntityType().isEmpty()) {
                throw new IllegalArgumentException("Device id is empty!");
            }
            if (cmd.getKeys() == null || cmd.getKeys().isEmpty()) {
                throw new IllegalArgumentException("Keys are empty!");
            }
        })) {
            return;
        }
        EntityId entityId = EntityIdFactory.getByTypeAndId((String)cmd.getEntityType(), (String)cmd.getEntityId());
        ArrayList keys = new ArrayList(DefaultWebSocketService.getKeys(cmd).orElse(Collections.emptySet()));
        List queries = keys.stream().map(key -> new BaseReadTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), this.getLimit(cmd.getLimit()), DefaultWebSocketService.getAggregation(cmd.getAgg()))).collect(Collectors.toList());
        FutureCallback<List<TsKvEntry>> callback = new FutureCallback<List<TsKvEntry>>(){

            public void onSuccess(List<TsKvEntry> data) {
                DefaultWebSocketService.this.sendUpdate(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), data));
            }

            public void onFailure(Throwable e) {
                TelemetrySubscriptionUpdate update = UnauthorizedException.class.isInstance(e) ? new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED, SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg()) : new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, DefaultWebSocketService.FAILED_TO_FETCH_DATA);
                DefaultWebSocketService.this.sendUpdate(sessionRef, update);
            }
        };
        this.accessValidator.validate(sessionRef.getSecurityCtx(), Operation.READ_TELEMETRY, entityId, this.on(arg_0 -> this.lambda$handleWsHistoryCmd$16(sessionRef, entityId, queries, (FutureCallback)callback, arg_0), arg_0 -> ((FutureCallback)callback).onFailure(arg_0)));
    }

    private void handleWsAttributesSubscription(final WebSocketSessionRef sessionRef, final AttributesSubscriptionCmd cmd, final String sessionId, final EntityId entityId) {
        final long queryTs = System.currentTimeMillis();
        FutureCallback<List<AttributeKvEntry>> callback = new FutureCallback<List<AttributeKvEntry>>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void onSuccess(List<AttributeKvEntry> data) {
                List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), (KvEntry)d)).collect(Collectors.toList());
                HashMap<String, Long> subState = new HashMap<String, Long>(attributesData.size());
                attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
                TbAttributeSubscriptionScope scope = StringUtils.isEmpty((String)cmd.getScope()) ? TbAttributeSubscriptionScope.ANY_SCOPE : TbAttributeSubscriptionScope.valueOf(cmd.getScope());
                ReentrantLock subLock = new ReentrantLock();
                TbAttributeSubscription sub = TbAttributeSubscription.builder().serviceId(DefaultWebSocketService.this.serviceId).sessionId(sessionId).subscriptionId(DefaultWebSocketService.this.registerNewSessionSubId(sessionId, sessionRef, cmd.getCmdId())).tenantId(sessionRef.getSecurityCtx().getTenantId()).entityId(entityId).queryTs(queryTs).allKeys(true).keyStates(subState).updateProcessor((subscription, update) -> {
                    subLock.lock();
                    try {
                        DefaultWebSocketService.this.sendUpdate(subscription.getSessionId(), cmd.getCmdId(), (TelemetrySubscriptionUpdate)update);
                    }
                    finally {
                        subLock.unlock();
                    }
                }).scope(scope).build();
                subLock.lock();
                try {
                    DefaultWebSocketService.this.oldSubService.addSubscription(sub, sessionRef);
                    DefaultWebSocketService.this.sendUpdate(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), attributesData));
                }
                finally {
                    subLock.unlock();
                }
            }

            public void onFailure(Throwable e) {
                log.error(DefaultWebSocketService.FAILED_TO_FETCH_ATTRIBUTES, e);
                DefaultWebSocketService.this.sendError(sessionRef, cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, DefaultWebSocketService.FAILED_TO_FETCH_ATTRIBUTES);
            }
        };
        if (StringUtils.isEmpty((String)cmd.getScope())) {
            this.accessValidator.validate(sessionRef.getSecurityCtx(), Operation.READ_ATTRIBUTES, entityId, this.getAttributesFetchCallback(sessionRef.getSecurityCtx().getTenantId(), entityId, callback));
        } else {
            this.accessValidator.validate(sessionRef.getSecurityCtx(), Operation.READ_ATTRIBUTES, entityId, this.getAttributesFetchCallback(sessionRef.getSecurityCtx().getTenantId(), entityId, cmd.getScope(), callback));
        }
    }

    private void handleWsTimeseriesSubscriptionCmd(WebSocketSessionRef sessionRef, TimeseriesSubscriptionCmd cmd) {
        if (!this.processSubscription(sessionRef, cmd)) {
            return;
        }
        String sessionId = sessionRef.getSessionId();
        if (cmd.isUnsubscribe()) {
            this.unsubscribe(sessionRef, cmd, sessionId);
        } else if (this.validateSubscriptionCmd(sessionRef, cmd)) {
            EntityId entityId = EntityIdFactory.getByTypeAndId((String)cmd.getEntityType(), (String)cmd.getEntityId());
            Optional<Set<String>> keysOptional = DefaultWebSocketService.getKeys(cmd);
            if (keysOptional.isPresent()) {
                this.handleWsTimeSeriesSubscriptionByKeys(sessionRef, cmd, sessionId, entityId);
            } else {
                this.handleWsTimeSeriesSubscription(sessionRef, cmd, sessionId, entityId);
            }
        }
    }

    private void handleWsTimeSeriesSubscriptionByKeys(WebSocketSessionRef sessionRef, TimeseriesSubscriptionCmd cmd, String sessionId, EntityId entityId) {
        long queryTs = System.currentTimeMillis();
        if (cmd.getTimeWindow() > 0L) {
            ArrayList<String> keys = new ArrayList<String>(DefaultWebSocketService.getKeys(cmd).orElse(Collections.emptySet()));
            log.debug("[{}] fetching timeseries data for last {} ms for keys: ({}) for device : {}", new Object[]{sessionId, cmd.getTimeWindow(), cmd.getKeys(), entityId});
            long startTs = cmd.getStartTs();
            long endTs = cmd.getStartTs() + cmd.getTimeWindow();
            List queries = keys.stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, cmd.getInterval(), this.getLimit(cmd.getLimit()), DefaultWebSocketService.getAggregation(cmd.getAgg()))).collect(Collectors.toList());
            FutureCallback<List<TsKvEntry>> callback = this.getSubscriptionCallback(sessionRef, cmd, sessionId, entityId, queryTs, startTs, keys);
            this.accessValidator.validate(sessionRef.getSecurityCtx(), Operation.READ_TELEMETRY, entityId, this.on(r -> Futures.addCallback((ListenableFuture)this.tsService.findAll(sessionRef.getSecurityCtx().getTenantId(), entityId, queries), (FutureCallback)callback, (Executor)this.executor), arg_0 -> callback.onFailure(arg_0)));
        } else {
            ArrayList<String> keys = new ArrayList<String>(DefaultWebSocketService.getKeys(cmd).orElse(Collections.emptySet()));
            long startTs = System.currentTimeMillis();
            log.debug("[{}] fetching latest timeseries data for keys: ({}) for device : {}", new Object[]{sessionId, cmd.getKeys(), entityId});
            FutureCallback<List<TsKvEntry>> callback = this.getSubscriptionCallback(sessionRef, cmd, sessionId, entityId, queryTs, startTs, keys);
            this.accessValidator.validate(sessionRef.getSecurityCtx(), Operation.READ_TELEMETRY, entityId, this.on(r -> Futures.addCallback((ListenableFuture)this.tsService.findLatest(sessionRef.getSecurityCtx().getTenantId(), entityId, (Collection)keys), (FutureCallback)callback, (Executor)this.executor), arg_0 -> callback.onFailure(arg_0)));
        }
    }

    private void handleWsTimeSeriesSubscription(final WebSocketSessionRef sessionRef, final TimeseriesSubscriptionCmd cmd, final String sessionId, final EntityId entityId) {
        final long queryTs = System.currentTimeMillis();
        FutureCallback<List<TsKvEntry>> callback = new FutureCallback<List<TsKvEntry>>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void onSuccess(List<TsKvEntry> data) {
                HashMap<String, Long> subState = new HashMap<String, Long>(data.size());
                data.forEach(v -> subState.put(v.getKey(), v.getTs()));
                ReentrantLock subLock = new ReentrantLock();
                TbTimeSeriesSubscription sub = DefaultWebSocketService.this.getTsSubscription(subState, subLock, sessionId, sessionRef, cmd, entityId, queryTs, true);
                subLock.lock();
                try {
                    DefaultWebSocketService.this.oldSubService.addSubscription(sub, sessionRef);
                    DefaultWebSocketService.this.sendUpdate(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), data));
                }
                finally {
                    subLock.unlock();
                }
            }

            public void onFailure(Throwable e) {
                TelemetrySubscriptionUpdate update = UnauthorizedException.class.isInstance(e) ? new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED, SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg()) : new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, DefaultWebSocketService.FAILED_TO_FETCH_DATA);
                DefaultWebSocketService.this.sendUpdate(sessionRef, update);
            }
        };
        this.accessValidator.validate(sessionRef.getSecurityCtx(), Operation.READ_TELEMETRY, entityId, this.on(arg_0 -> this.lambda$handleWsTimeSeriesSubscription$20(sessionRef, entityId, (FutureCallback)callback, arg_0), arg_0 -> ((FutureCallback)callback).onFailure(arg_0)));
    }

    private TbTimeSeriesSubscription getTsSubscription(Map<String, Long> subState, Lock subLock, String sessionId, WebSocketSessionRef sessionRef, TimeseriesSubscriptionCmd cmd, EntityId entityId, long queryTs, boolean allKeys) {
        return TbTimeSeriesSubscription.builder().serviceId(this.serviceId).sessionId(sessionId).subscriptionId(this.registerNewSessionSubId(sessionId, sessionRef, cmd.getCmdId())).tenantId(sessionRef.getSecurityCtx().getTenantId()).entityId(entityId).updateProcessor((subscription, update) -> {
            subLock.lock();
            try {
                this.sendUpdate(subscription.getSessionId(), cmd.getCmdId(), (TelemetrySubscriptionUpdate)update);
            }
            finally {
                subLock.unlock();
            }
        }).queryTs(queryTs).allKeys(allKeys).keyStates(subState).latestValues("LATEST_TELEMETRY".equals(cmd.getScope())).build();
    }

    private FutureCallback<List<TsKvEntry>> getSubscriptionCallback(final WebSocketSessionRef sessionRef, final TimeseriesSubscriptionCmd cmd, final String sessionId, final EntityId entityId, final long queryTs, final long startTs, final List<String> keys) {
        return new FutureCallback<List<TsKvEntry>>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void onSuccess(List<TsKvEntry> data) {
                HashMap<String, Long> subState = new HashMap<String, Long>(keys.size());
                keys.forEach(key -> subState.put((String)key, startTs));
                data.forEach(v -> subState.put(v.getKey(), v.getTs()));
                ReentrantLock subLock = new ReentrantLock();
                TbTimeSeriesSubscription sub = DefaultWebSocketService.this.getTsSubscription(subState, subLock, sessionId, sessionRef, cmd, entityId, queryTs, false);
                subLock.lock();
                try {
                    DefaultWebSocketService.this.oldSubService.addSubscription(sub, sessionRef);
                    DefaultWebSocketService.this.sendUpdate(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), data));
                }
                finally {
                    subLock.unlock();
                }
            }

            public void onFailure(Throwable e) {
                if (e instanceof RateLimitExceededException || e.getCause() instanceof RateLimitExceededException) {
                    log.trace("[{}] Tenant rate limit detected for subscription: [{}]:{}", new Object[]{sessionRef.getSecurityCtx().getTenantId(), entityId, cmd});
                } else {
                    log.info(DefaultWebSocketService.FAILED_TO_FETCH_DATA, e);
                }
                DefaultWebSocketService.this.sendError(sessionRef, cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, DefaultWebSocketService.FAILED_TO_FETCH_DATA);
            }
        };
    }

    private void unsubscribe(WebSocketSessionRef sessionRef, SubscriptionCmd cmd, String sessionId) {
        TenantId tenantId = sessionRef.getSecurityCtx().getTenantId();
        if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty()) {
            log.warn("[{}][{}][{}] Cleanup session due to empty entity id.", new Object[]{tenantId, sessionId, cmd.getCmdId()});
            this.cleanupSessionById(tenantId, sessionId);
        } else {
            Integer subId = (Integer)this.sessionCmdMap.getOrDefault(sessionId, Collections.emptyMap()).remove(cmd.getCmdId());
            if (subId == null) {
                log.trace("[{}][{}][{}] Failed to lookup subscription id mapping", new Object[]{tenantId, sessionId, cmd.getCmdId()});
                subId = cmd.getCmdId();
            }
            this.oldSubService.cancelSubscription(tenantId, sessionId, subId);
        }
    }

    private void cleanupSessionById(TenantId tenantId, String sessionId) {
        this.wsSessionsMap.remove(sessionId);
        this.oldSubService.cancelAllSessionSubscriptions(tenantId, sessionId);
        this.sessionCmdMap.remove(sessionId);
        this.entityDataSubService.cancelAllSessionSubscriptions(sessionId);
    }

    private boolean validateSubscriptionCmd(WebSocketSessionRef sessionRef, EntityDataCmd cmd) {
        return this.validateCmd(sessionRef, cmd, () -> {
            if (cmd.getQuery() == null && !cmd.hasAnyCmd()) {
                throw new IllegalArgumentException("Query is empty!");
            }
        });
    }

    private boolean validateSubscriptionCmd(WebSocketSessionRef sessionRef, EntityCountCmd cmd) {
        return this.validateCmd(sessionRef, cmd, () -> {
            if (cmd.getQuery() == null) {
                throw new IllegalArgumentException("Query is empty!");
            }
        });
    }

    private boolean validateSubscriptionCmd(WebSocketSessionRef sessionRef, AlarmDataCmd cmd) {
        return this.validateCmd(sessionRef, cmd, () -> {
            if (cmd.getQuery() == null) {
                throw new IllegalArgumentException("Query is empty!");
            }
        });
    }

    private boolean validateSubscriptionCmd(WebSocketSessionRef sessionRef, SubscriptionCmd cmd) {
        return this.validateCmd(sessionRef, cmd, () -> {
            if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty()) {
                throw new IllegalArgumentException("Device id is empty!");
            }
        });
    }

    private boolean validateSessionMetadata(WebSocketSessionRef sessionRef, int cmdId, String sessionId) {
        WsSessionMetaData sessionMD = (WsSessionMetaData)this.wsSessionsMap.get(sessionId);
        if (sessionMD == null) {
            log.warn("[{}] Session meta data not found. ", (Object)sessionId);
            this.sendError(sessionRef, cmdId, SubscriptionErrorCode.INTERNAL_ERROR, SESSION_META_DATA_NOT_FOUND);
            return false;
        }
        return true;
    }

    private boolean validateCmd(WebSocketSessionRef sessionRef, WsCmd cmd) {
        return this.validateCmd(sessionRef, cmd, null);
    }

    private <C extends WsCmd> boolean validateCmd(WebSocketSessionRef sessionRef, C cmd, Runnable validator) {
        if (cmd.getCmdId() < 0) {
            this.sendError(sessionRef, cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST, "Cmd id is negative value!");
            return false;
        }
        try {
            if (validator != null) {
                validator.run();
            }
        }
        catch (Exception e) {
            this.sendError(sessionRef, cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST, e.getMessage());
            return false;
        }
        return true;
    }

    private void sendUpdate(WebSocketSessionRef sessionRef, EntityDataUpdate update) {
        this.sendUpdate(sessionRef, update.getCmdId(), update);
    }

    private void sendUpdate(WebSocketSessionRef sessionRef, TelemetrySubscriptionUpdate update) {
        this.sendUpdate(sessionRef, update.getSubscriptionId(), (Object)update);
    }

    private void sendUpdate(WebSocketSessionRef sessionRef, int cmdId, Object update) {
        try {
            String msg = JacksonUtil.OBJECT_MAPPER.writeValueAsString(update);
            this.executor.submit(() -> {
                try {
                    this.msgEndpoint.send(sessionRef, cmdId, msg);
                }
                catch (IOException e) {
                    log.warn("[{}] Failed to send reply: {}", new Object[]{sessionRef.getSessionId(), update, e});
                }
            });
        }
        catch (JsonProcessingException e) {
            log.warn("[{}] Failed to encode reply: {}", new Object[]{sessionRef.getSessionId(), update, e});
        }
    }

    private void sendPing() {
        long currentTime = System.currentTimeMillis();
        this.wsSessionsMap.values().forEach(md -> this.executor.submit(() -> {
            try {
                this.msgEndpoint.sendPing(md.getSessionRef(), currentTime);
            }
            catch (IOException e) {
                log.warn("[{}] Failed to send ping:", (Object)md.getSessionRef().getSessionId(), (Object)e);
            }
        }));
    }

    private static Optional<Set<String>> getKeys(TelemetryPluginCmd cmd) {
        if (!StringUtils.isEmpty((String)cmd.getKeys())) {
            HashSet keys = new HashSet();
            Collections.addAll(keys, cmd.getKeys().split(","));
            return Optional.of(keys);
        }
        return Optional.empty();
    }

    private ListenableFuture<List<AttributeKvEntry>> mergeAllAttributesFutures(List<ListenableFuture<List<AttributeKvEntry>>> futures) {
        return Futures.transform((ListenableFuture)Futures.successfulAsList(futures), input -> {
            ArrayList tmp = new ArrayList();
            if (input != null) {
                input.forEach(tmp::addAll);
            }
            return tmp;
        }, (Executor)this.executor);
    }

    private <T> FutureCallback<ValidationResult> getAttributesFetchCallback(final TenantId tenantId, final EntityId entityId, final List<String> keys, final FutureCallback<List<AttributeKvEntry>> callback) {
        return new FutureCallback<ValidationResult>(){

            public void onSuccess(@Nullable ValidationResult result) {
                ArrayList<ListenableFuture<List<AttributeKvEntry>>> futures = new ArrayList<ListenableFuture<List<AttributeKvEntry>>>();
                for (AttributeScope scope : AttributeScope.values()) {
                    futures.add((ListenableFuture<List<AttributeKvEntry>>)DefaultWebSocketService.this.attributesService.find(tenantId, entityId, scope, (Collection)keys));
                }
                ListenableFuture<List<AttributeKvEntry>> future = DefaultWebSocketService.this.mergeAllAttributesFutures(futures);
                Futures.addCallback(future, (FutureCallback)callback, (Executor)MoreExecutors.directExecutor());
            }

            public void onFailure(Throwable t) {
                callback.onFailure(t);
            }
        };
    }

    private <T> FutureCallback<ValidationResult> getAttributesFetchCallback(final TenantId tenantId, final EntityId entityId, final String scope, final List<String> keys, final FutureCallback<List<AttributeKvEntry>> callback) {
        return new FutureCallback<ValidationResult>(){

            public void onSuccess(@Nullable ValidationResult result) {
                Futures.addCallback((ListenableFuture)DefaultWebSocketService.this.attributesService.find(tenantId, entityId, AttributeScope.valueOf((String)scope), (Collection)keys), (FutureCallback)callback, (Executor)MoreExecutors.directExecutor());
            }

            public void onFailure(Throwable t) {
                callback.onFailure(t);
            }
        };
    }

    private <T> FutureCallback<ValidationResult> getAttributesFetchCallback(final TenantId tenantId, final EntityId entityId, final FutureCallback<List<AttributeKvEntry>> callback) {
        return new FutureCallback<ValidationResult>(){

            public void onSuccess(@Nullable ValidationResult result) {
                ArrayList<ListenableFuture<List<AttributeKvEntry>>> futures = new ArrayList<ListenableFuture<List<AttributeKvEntry>>>();
                for (AttributeScope scope : AttributeScope.values()) {
                    futures.add((ListenableFuture<List<AttributeKvEntry>>)DefaultWebSocketService.this.attributesService.findAll(tenantId, entityId, scope));
                }
                ListenableFuture<List<AttributeKvEntry>> future = DefaultWebSocketService.this.mergeAllAttributesFutures(futures);
                Futures.addCallback(future, (FutureCallback)callback, (Executor)MoreExecutors.directExecutor());
            }

            public void onFailure(Throwable t) {
                callback.onFailure(t);
            }
        };
    }

    private <T> FutureCallback<ValidationResult> getAttributesFetchCallback(final TenantId tenantId, final EntityId entityId, final String scope, final FutureCallback<List<AttributeKvEntry>> callback) {
        return new FutureCallback<ValidationResult>(){

            public void onSuccess(@Nullable ValidationResult result) {
                Futures.addCallback((ListenableFuture)DefaultWebSocketService.this.attributesService.findAll(tenantId, entityId, AttributeScope.valueOf((String)scope)), (FutureCallback)callback, (Executor)MoreExecutors.directExecutor());
            }

            public void onFailure(Throwable t) {
                callback.onFailure(t);
            }
        };
    }

    private FutureCallback<ValidationResult> on(final Consumer<Void> success, final Consumer<Throwable> failure) {
        return new FutureCallback<ValidationResult>(){

            public void onSuccess(@Nullable ValidationResult result) {
                ValidationResultCode resultCode = result.getResultCode();
                if (resultCode == ValidationResultCode.OK) {
                    success.accept(null);
                } else {
                    this.onFailure(ValidationCallback.getException(result));
                }
            }

            public void onFailure(Throwable t) {
                failure.accept(t);
            }
        };
    }

    public static Aggregation getAggregation(String agg) {
        return StringUtils.isEmpty((String)agg) ? DEFAULT_AGGREGATION : Aggregation.valueOf((String)agg);
    }

    private int getLimit(int limit) {
        return limit == 0 ? 100 : limit;
    }

    private DefaultTenantProfileConfiguration getTenantProfileConfiguration(WebSocketSessionRef sessionRef) {
        return Optional.ofNullable(this.tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId())).map(TenantProfile::getDefaultProfileConfiguration).orElse(null);
    }

    public static <C extends WsCmd> WsCmdHandler<C> newCmdHandler(BiConsumer<WebSocketSessionRef, C> handler) {
        return new WsCmdHandler<C>(handler);
    }

    @ConstructorProperties(value={"oldSubService", "entityDataSubService", "notificationCmdsHandler", "msgEndpoint", "accessValidator", "attributesService", "tsService", "serviceInfoProvider", "tenantProfileCache"})
    @Generated
    public DefaultWebSocketService(TbLocalSubscriptionService oldSubService, TbEntityDataSubscriptionService entityDataSubService, NotificationCommandsHandler notificationCmdsHandler, WebSocketMsgEndpoint msgEndpoint, AccessValidator accessValidator, AttributesService attributesService, TimeseriesService tsService, TbServiceInfoProvider serviceInfoProvider, TbTenantProfileCache tenantProfileCache) {
        this.oldSubService = oldSubService;
        this.entityDataSubService = entityDataSubService;
        this.notificationCmdsHandler = notificationCmdsHandler;
        this.msgEndpoint = msgEndpoint;
        this.accessValidator = accessValidator;
        this.attributesService = attributesService;
        this.tsService = tsService;
        this.serviceInfoProvider = serviceInfoProvider;
        this.tenantProfileCache = tenantProfileCache;
    }

    private /* synthetic */ void lambda$handleWsTimeSeriesSubscription$20(WebSocketSessionRef sessionRef, EntityId entityId, FutureCallback callback, Void r) {
        Futures.addCallback((ListenableFuture)this.tsService.findAllLatest(sessionRef.getSecurityCtx().getTenantId(), entityId), (FutureCallback)callback, (Executor)this.executor);
    }

    private /* synthetic */ void lambda$handleWsHistoryCmd$16(WebSocketSessionRef sessionRef, EntityId entityId, List queries, FutureCallback callback, Void r) {
        Futures.addCallback((ListenableFuture)this.tsService.findAll(sessionRef.getSecurityCtx().getTenantId(), entityId, queries), (FutureCallback)callback, (Executor)this.executor);
    }

    public static class WsCmdHandler<C extends WsCmd> {
        protected final BiConsumer<WebSocketSessionRef, C> handler;

        public void handle(WebSocketSessionRef sessionRef, WsCmd cmd) {
            this.handler.accept(sessionRef, cmd);
        }

        @ConstructorProperties(value={"handler"})
        @Generated
        public WsCmdHandler(BiConsumer<WebSocketSessionRef, C> handler) {
            this.handler = handler;
        }

        @Generated
        public BiConsumer<WebSocketSessionRef, C> getHandler() {
            return this.handler;
        }
    }
}

