package org.thingsboard.server.service.telemetry;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
import org.thingsboard.server.actors.service.ActorService;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
import org.thingsboard.server.common.data.kv.DataType;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.entityview.EntityViewService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
import org.thingsboard.server.service.state.DefaultDeviceStateService;
import org.thingsboard.server.service.state.DeviceStateService;
import org.thingsboard.server.service.telemetry.sub.Subscription;
import org.thingsboard.server.service.telemetry.sub.SubscriptionErrorCode;
import org.thingsboard.server.service.telemetry.sub.SubscriptionState;
import org.thingsboard.server.service.telemetry.sub.SubscriptionUpdate;

@Service
/* loaded from: input_file:org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.class */
public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptionService {
    private static final Logger log = LoggerFactory.getLogger(DefaultTelemetrySubscriptionService.class);

    @Autowired
    private TelemetryWebSocketService wsService;

    @Autowired
    private AttributesService attrService;

    @Autowired
    private TimeseriesService tsService;

    @Autowired
    private ClusterRoutingService routingService;

    @Autowired
    private ClusterRpcService rpcService;

    @Autowired
    private EntityViewService entityViewService;

    @Autowired
    @Lazy
    private DeviceStateService stateService;

    @Autowired
    @Lazy
    private ActorService actorService;
    private ExecutorService tsCallBackExecutor;
    private ExecutorService wsCallBackExecutor;
    private final Map<EntityId, Set<Subscription>> subscriptionsByEntityId = new ConcurrentHashMap();
    private final Map<String, Map<Integer, Subscription>> subscriptionsByWsSessionId = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.thingsboard.server.service.telemetry.DefaultTelemetrySubscriptionService$3, reason: invalid class name */
    /* loaded from: input_file:org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService$3.class */
    public static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$org$thingsboard$server$common$data$kv$DataType = new int[DataType.values().length];

        static {
            try {
                $SwitchMap$org$thingsboard$server$common$data$kv$DataType[DataType.BOOLEAN.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$thingsboard$server$common$data$kv$DataType[DataType.LONG.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$thingsboard$server$common$data$kv$DataType[DataType.DOUBLE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$thingsboard$server$common$data$kv$DataType[DataType.STRING.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    @PostConstruct
    public void initExecutor() {
        this.tsCallBackExecutor = Executors.newSingleThreadExecutor();
        this.wsCallBackExecutor = Executors.newSingleThreadExecutor();
    }

    @PreDestroy
    public void shutdownExecutor() {
        if (this.tsCallBackExecutor != null) {
            this.tsCallBackExecutor.shutdownNow();
        }
        if (this.wsCallBackExecutor != null) {
            this.wsCallBackExecutor.shutdownNow();
        }
    }

    @Override // org.thingsboard.server.service.telemetry.TelemetrySubscriptionService
    public void addLocalWsSubscription(String str, EntityId entityId, SubscriptionState subscriptionState) {
        Subscription subscription;
        long j = 0;
        long j2 = 0;
        if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW) && TelemetryFeature.TIMESERIES.equals(subscriptionState.getType())) {
            EntityView findEntityViewById = this.entityViewService.findEntityViewById(TenantId.SYS_TENANT_ID, new EntityViewId(entityId.getId()));
            entityId = findEntityViewById.getEntityId();
            j = findEntityViewById.getStartTimeMs();
            j2 = findEntityViewById.getEndTimeMs();
            subscriptionState = getUpdatedSubscriptionState(entityId, subscriptionState, findEntityViewById);
        }
        Optional<ServerAddress> resolveById = this.routingService.resolveById(entityId);
        if (resolveById.isPresent()) {
            ServerAddress serverAddress = resolveById.get();
            log.trace("[{}] Forwarding subscription [{}] for [{}] entity [{}] to [{}]", new Object[]{str, Integer.valueOf(subscriptionState.getSubscriptionId()), entityId.getEntityType().name(), entityId, serverAddress});
            subscription = new Subscription(subscriptionState, true, serverAddress, j, j2);
            tellNewSubscription(serverAddress, str, subscription);
        } else {
            log.trace("[{}] Registering local subscription [{}] for [{}] entity [{}]", new Object[]{str, Integer.valueOf(subscriptionState.getSubscriptionId()), entityId.getEntityType().name(), entityId});
            subscription = new Subscription(subscriptionState, true, null, j, j2);
        }
        registerSubscription(str, entityId, subscription);
    }

    private SubscriptionState getUpdatedSubscriptionState(EntityId entityId, SubscriptionState subscriptionState, EntityView entityView) {
        return new SubscriptionState(subscriptionState.getWsSessionId(), subscriptionState.getSubscriptionId(), subscriptionState.getTenantId(), entityId, subscriptionState.getType(), false, subscriptionState.isAllKeys() ? (Map) entityView.getKeys().getTimeseries().stream().collect(Collectors.toMap(str -> {
            return str;
        }, str2 -> {
            return 0L;
        })) : (Map) subscriptionState.getKeyStates().entrySet().stream().filter(entry -> {
            return entityView.getKeys().getTimeseries().contains(entry.getKey());
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        })), subscriptionState.getScope());
    }

    @Override // org.thingsboard.server.service.telemetry.TelemetrySubscriptionService
    public void cleanupLocalWsSessionSubscriptions(TelemetryWebSocketSessionRef telemetryWebSocketSessionRef, String str) {
        cleanupLocalWsSessionSubscriptions(str);
    }

    @Override // org.thingsboard.server.service.telemetry.TelemetrySubscriptionService
    public void removeSubscription(String str, int i) {
        log.debug("[{}][{}] Going to remove subscription.", str, Integer.valueOf(i));
        Map<Integer, Subscription> map = this.subscriptionsByWsSessionId.get(str);
        if (map == null) {
            log.debug("[{}] No session subscriptions found!", str);
            return;
        }
        Subscription remove = map.remove(Integer.valueOf(i));
        if (remove != null) {
            processSubscriptionRemoval(str, map, remove);
        } else {
            log.debug("[{}][{}] Subscription not found!", str, Integer.valueOf(i));
        }
    }

    public void saveAndNotify(TenantId tenantId, EntityId entityId, List<TsKvEntry> list, FutureCallback<Void> futureCallback) {
        saveAndNotify(tenantId, entityId, list, 0L, futureCallback);
    }

    public void saveAndNotify(TenantId tenantId, EntityId entityId, List<TsKvEntry> list, long j, FutureCallback<Void> futureCallback) {
        ListenableFuture<List<Void>> save = this.tsService.save(tenantId, entityId, list, j);
        addMainCallback(save, futureCallback);
        addWsCallback(save, r7 -> {
            onTimeseriesUpdate(entityId, list);
        });
    }

    public void saveAndNotify(TenantId tenantId, EntityId entityId, String str, List<AttributeKvEntry> list, FutureCallback<Void> futureCallback) {
        ListenableFuture<List<Void>> save = this.attrService.save(tenantId, entityId, str, list);
        addMainCallback(save, futureCallback);
        addWsCallback(save, r9 -> {
            onAttributesUpdate(entityId, str, list);
        });
    }

    public void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String str, String str2, long j, FutureCallback<Void> futureCallback) {
        saveAndNotify(tenantId, entityId, str, Collections.singletonList(new BaseAttributeKvEntry(new LongDataEntry(str2, Long.valueOf(j)), System.currentTimeMillis())), futureCallback);
    }

    public void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String str, String str2, String str3, FutureCallback<Void> futureCallback) {
        saveAndNotify(tenantId, entityId, str, Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry(str2, str3), System.currentTimeMillis())), futureCallback);
    }

    public void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String str, String str2, double d, FutureCallback<Void> futureCallback) {
        saveAndNotify(tenantId, entityId, str, Collections.singletonList(new BaseAttributeKvEntry(new DoubleDataEntry(str2, Double.valueOf(d)), System.currentTimeMillis())), futureCallback);
    }

    public void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String str, String str2, boolean z, FutureCallback<Void> futureCallback) {
        saveAndNotify(tenantId, entityId, str, Collections.singletonList(new BaseAttributeKvEntry(new BooleanDataEntry(str2, Boolean.valueOf(z)), System.currentTimeMillis())), futureCallback);
    }

    public void onSharedAttributesUpdate(TenantId tenantId, DeviceId deviceId, Set<AttributeKvEntry> set) {
        this.actorService.onMsg(new SendToClusterMsg(deviceId, DeviceAttributesEventNotificationMsg.onUpdate(tenantId, deviceId, "SHARED_SCOPE", new ArrayList(set))));
    }

    @Override // org.thingsboard.server.service.telemetry.TelemetrySubscriptionService
    public void onNewRemoteSubscription(ServerAddress serverAddress, byte[] bArr) {
        try {
            ClusterAPIProtos.SubscriptionProto parseFrom = ClusterAPIProtos.SubscriptionProto.parseFrom(bArr);
            addRemoteWsSubscription(serverAddress, parseFrom.getSessionId(), new Subscription(new SubscriptionState(parseFrom.getSessionId(), parseFrom.getSubscriptionId(), new TenantId(UUID.fromString(parseFrom.getTenantId())), EntityIdFactory.getByTypeAndId(parseFrom.getEntityType(), parseFrom.getEntityId()), TelemetryFeature.valueOf(parseFrom.getType()), parseFrom.getAllKeys(), (Map) parseFrom.getKeyStatesList().stream().collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getTs();
            })), parseFrom.getScope()), false, new ServerAddress(serverAddress.getHost(), serverAddress.getPort(), serverAddress.getServerType())));
        } catch (InvalidProtocolBufferException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    @Override // org.thingsboard.server.service.telemetry.TelemetrySubscriptionService
    public void onRemoteSubscriptionUpdate(ServerAddress serverAddress, byte[] bArr) {
        try {
            ClusterAPIProtos.SubscriptionUpdateProto parseFrom = ClusterAPIProtos.SubscriptionUpdateProto.parseFrom(bArr);
            SubscriptionUpdate convert = convert(parseFrom);
            String sessionId = parseFrom.getSessionId();
            log.trace("[{}] Processing remote subscription onUpdate [{}]", sessionId, convert);
            Optional<Subscription> subscription = getSubscription(sessionId, convert.getSubscriptionId());
            if (subscription.isPresent()) {
                updateSubscriptionState(sessionId, subscription.get(), convert);
                this.wsService.sendWsMsg(sessionId, convert);
            }
        } catch (InvalidProtocolBufferException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    @Override // org.thingsboard.server.service.telemetry.TelemetrySubscriptionService
    public void onRemoteSubscriptionClose(ServerAddress serverAddress, byte[] bArr) {
        try {
            ClusterAPIProtos.SubscriptionCloseProto parseFrom = ClusterAPIProtos.SubscriptionCloseProto.parseFrom(bArr);
            removeSubscription(parseFrom.getSessionId(), parseFrom.getSubscriptionId());
        } catch (InvalidProtocolBufferException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    @Override // org.thingsboard.server.service.telemetry.TelemetrySubscriptionService
    public void onRemoteSessionClose(ServerAddress serverAddress, byte[] bArr) {
        try {
            cleanupRemoteWsSessionSubscriptions(ClusterAPIProtos.SessionCloseProto.parseFrom(bArr).getSessionId());
        } catch (InvalidProtocolBufferException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    @Override // org.thingsboard.server.service.telemetry.TelemetrySubscriptionService
    public void onRemoteAttributesUpdate(ServerAddress serverAddress, byte[] bArr) {
        try {
            ClusterAPIProtos.AttributeUpdateProto parseFrom = ClusterAPIProtos.AttributeUpdateProto.parseFrom(bArr);
            onAttributesUpdate(EntityIdFactory.getByTypeAndId(parseFrom.getEntityType(), parseFrom.getEntityId()), parseFrom.getScope(), (List) parseFrom.getDataList().stream().map(this::toAttribute).collect(Collectors.toList()));
        } catch (InvalidProtocolBufferException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    @Override // org.thingsboard.server.service.telemetry.TelemetrySubscriptionService
    public void onRemoteTsUpdate(ServerAddress serverAddress, byte[] bArr) {
        try {
            ClusterAPIProtos.TimeseriesUpdateProto parseFrom = ClusterAPIProtos.TimeseriesUpdateProto.parseFrom(bArr);
            onTimeseriesUpdate(EntityIdFactory.getByTypeAndId(parseFrom.getEntityType(), parseFrom.getEntityId()), (List) parseFrom.getDataList().stream().map(this::toTimeseries).collect(Collectors.toList()));
        } catch (InvalidProtocolBufferException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    @Override // org.thingsboard.server.service.telemetry.TelemetrySubscriptionService
    public void onClusterUpdate() {
        log.trace("Processing cluster onUpdate msg!");
        Iterator<Map.Entry<EntityId, Set<Subscription>>> it = this.subscriptionsByEntityId.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<EntityId, Set<Subscription>> next = it.next();
            Set<Subscription> value = next.getValue();
            Optional<ServerAddress> resolveById = this.routingService.resolveById(next.getKey());
            if (resolveById.isPresent()) {
                resolveById.ifPresent(serverAddress -> {
                    checkSubscriptionsNewAddress(serverAddress, value);
                });
            } else {
                checkSubscriptionsPrevAddress(value);
            }
            if (value.size() == 0) {
                log.trace("[{}] No more subscriptions for this device on current server.", next.getKey());
                it.remove();
            }
        }
    }

    private void checkSubscriptionsNewAddress(ServerAddress serverAddress, Set<Subscription> set) {
        Iterator<Subscription> it = set.iterator();
        while (it.hasNext()) {
            Subscription next = it.next();
            if (!next.isLocal()) {
                log.trace("[{}] Remote subscription is now handled on new server address: [{}]", next.getWsSessionId(), serverAddress);
                it.remove();
            } else if (!serverAddress.equals(next.getServer())) {
                log.trace("[{}] Local subscription is now handled on new server [{}]", next.getWsSessionId(), serverAddress);
                next.setServer(serverAddress);
                tellNewSubscription(serverAddress, next.getWsSessionId(), next);
            }
        }
    }

    private void checkSubscriptionsPrevAddress(Set<Subscription> set) {
        for (Subscription subscription : set) {
            if (!subscription.isLocal() || subscription.getServer() == null) {
                log.trace("[{}] Remote subscription is on up to date server address.", subscription.getWsSessionId());
            } else {
                log.trace("[{}] Local subscription is no longer handled on remote server address [{}]", subscription.getWsSessionId(), subscription.getServer());
                subscription.setServer(null);
            }
        }
    }

    private void addRemoteWsSubscription(ServerAddress serverAddress, String str, Subscription subscription) {
        EntityId entityId = subscription.getEntityId();
        log.trace("[{}] Registering remote subscription [{}] for entity [{}] to [{}]", new Object[]{str, Integer.valueOf(subscription.getSubscriptionId()), entityId, serverAddress});
        registerSubscription(str, entityId, subscription);
        if (subscription.getType() == TelemetryFeature.ATTRIBUTES) {
            Map<String, Long> keyStates = subscription.getKeyStates();
            DonAsynchron.withCallback(this.attrService.find(subscription.getSub().getTenantId(), entityId, "CLIENT_SCOPE", keyStates.keySet()), list -> {
                ArrayList arrayList = new ArrayList();
                list.forEach(attributeKvEntry -> {
                    if (attributeKvEntry.getLastUpdateTs() > ((Long) keyStates.get(attributeKvEntry.getKey())).longValue()) {
                        arrayList.add(new BasicTsKvEntry(attributeKvEntry.getLastUpdateTs(), attributeKvEntry));
                    }
                });
                if (arrayList.isEmpty()) {
                    return;
                }
                tellRemoteSubUpdate(serverAddress, str, new SubscriptionUpdate(subscription.getSubscriptionId(), arrayList));
            }, th -> {
                log.error("Failed to fetch missed updates.", th);
            }, this.tsCallBackExecutor);
        } else if (subscription.getType() == TelemetryFeature.TIMESERIES) {
            long currentTimeMillis = System.currentTimeMillis();
            ArrayList arrayList = new ArrayList();
            subscription.getKeyStates().entrySet().forEach(entry -> {
                if (currentTimeMillis > ((Long) entry.getValue()).longValue()) {
                    arrayList.add(new BaseReadTsKvQuery((String) entry.getKey(), ((Long) entry.getValue()).longValue() + 1, currentTimeMillis, 0L, 1000, Aggregation.NONE));
                } else {
                    log.debug("[{}] Invalid subscription [{}], entityId [{}] curTs [{}]", new Object[]{str, subscription, entityId, Long.valueOf(currentTimeMillis)});
                }
            });
            if (arrayList.isEmpty()) {
                return;
            }
            DonAsynchron.withCallback(this.tsService.findAll(subscription.getSub().getTenantId(), entityId, arrayList), list2 -> {
                if (list2 == null || list2.isEmpty()) {
                    return;
                }
                tellRemoteSubUpdate(serverAddress, str, new SubscriptionUpdate(subscription.getSubscriptionId(), (List<TsKvEntry>) list2));
            }, th2 -> {
                log.error("Failed to fetch missed updates.", th2);
            }, this.tsCallBackExecutor);
        }
    }

    private void onAttributesUpdate(EntityId entityId, String str, List<AttributeKvEntry> list) {
        Optional<ServerAddress> resolveById = this.routingService.resolveById(entityId);
        if (resolveById.isPresent()) {
            tellRemoteAttributesUpdate(resolveById.get(), entityId, str, list);
            return;
        }
        onLocalAttributesUpdate(entityId, str, list);
        if (entityId.getEntityType() == EntityType.DEVICE && "SERVER_SCOPE".equalsIgnoreCase(str)) {
            for (AttributeKvEntry attributeKvEntry : list) {
                if (attributeKvEntry.getKey().equals(DefaultDeviceStateService.INACTIVITY_TIMEOUT)) {
                    this.stateService.onDeviceInactivityTimeoutUpdate(new DeviceId(entityId.getId()), ((Long) attributeKvEntry.getLongValue().orElse(0L)).longValue());
                }
            }
        }
    }

    private void onTimeseriesUpdate(EntityId entityId, List<TsKvEntry> list) {
        Optional<ServerAddress> resolveById = this.routingService.resolveById(entityId);
        if (resolveById.isPresent()) {
            tellRemoteTimeseriesUpdate(resolveById.get(), entityId, list);
        } else {
            onLocalTimeseriesUpdate(entityId, list);
        }
    }

    private void onLocalAttributesUpdate(EntityId entityId, String str, List<AttributeKvEntry> list) {
        onLocalSubUpdate(entityId, subscription -> {
            return TelemetryFeature.ATTRIBUTES == subscription.getType() && (StringUtils.isEmpty(subscription.getScope()) || str.equals(subscription.getScope()));
        }, subscription2 -> {
            ArrayList arrayList = null;
            Iterator it = list.iterator();
            while (it.hasNext()) {
                AttributeKvEntry attributeKvEntry = (AttributeKvEntry) it.next();
                if (subscription2.isAllKeys() || subscription2.getKeyStates().containsKey(attributeKvEntry.getKey())) {
                    if (arrayList == null) {
                        arrayList = new ArrayList();
                    }
                    arrayList.add(new BasicTsKvEntry(attributeKvEntry.getLastUpdateTs(), attributeKvEntry));
                }
            }
            return arrayList;
        });
    }

    private void onLocalTimeseriesUpdate(EntityId entityId, List<TsKvEntry> list) {
        onLocalSubUpdate(entityId, subscription -> {
            return TelemetryFeature.TIMESERIES == subscription.getType();
        }, subscription2 -> {
            ArrayList arrayList = null;
            Iterator it = list.iterator();
            while (it.hasNext()) {
                TsKvEntry tsKvEntry = (TsKvEntry) it.next();
                if (isInTimeRange(subscription2, tsKvEntry.getTs()) && (subscription2.isAllKeys() || subscription2.getKeyStates().containsKey(tsKvEntry.getKey()))) {
                    if (arrayList == null) {
                        arrayList = new ArrayList();
                    }
                    arrayList.add(tsKvEntry);
                }
            }
            return arrayList;
        });
    }

    private boolean isInTimeRange(Subscription subscription, long j) {
        return (subscription.getStartTime() == 0 || subscription.getStartTime() <= j) && (subscription.getEndTime() == 0 || subscription.getEndTime() >= j);
    }

    private void onLocalSubUpdate(EntityId entityId, Predicate<Subscription> predicate, Function<Subscription, List<TsKvEntry>> function) {
        Set<Subscription> set = this.subscriptionsByEntityId.get(entityId);
        if (set != null) {
            set.stream().filter(predicate).forEach(subscription -> {
                String wsSessionId = subscription.getWsSessionId();
                List list = (List) function.apply(subscription);
                if (list == null || list.isEmpty()) {
                    return;
                }
                SubscriptionUpdate subscriptionUpdate = new SubscriptionUpdate(subscription.getSubscriptionId(), (List<TsKvEntry>) list);
                if (!subscription.isLocal()) {
                    tellRemoteSubUpdate(subscription.getServer(), wsSessionId, subscriptionUpdate);
                } else {
                    updateSubscriptionState(wsSessionId, subscription, subscriptionUpdate);
                    this.wsService.sendWsMsg(wsSessionId, subscriptionUpdate);
                }
            });
        } else {
            log.debug("[{}] No device subscriptions to process!", entityId);
        }
    }

    private void updateSubscriptionState(String str, Subscription subscription, SubscriptionUpdate subscriptionUpdate) {
        log.trace("[{}] updating subscription state {} using onUpdate {}", new Object[]{str, subscription, subscriptionUpdate});
        subscriptionUpdate.getLatestValues().entrySet().forEach(entry -> {
            subscription.setKeyState((String) entry.getKey(), ((Long) entry.getValue()).longValue());
        });
    }

    private void registerSubscription(String str, EntityId entityId, Subscription subscription) {
        this.subscriptionsByEntityId.computeIfAbsent(entityId, entityId2 -> {
            return ConcurrentHashMap.newKeySet();
        }).add(subscription);
        this.subscriptionsByWsSessionId.computeIfAbsent(str, str2 -> {
            return new ConcurrentHashMap();
        }).put(Integer.valueOf(subscription.getSubscriptionId()), subscription);
    }

    private void cleanupLocalWsSessionSubscriptions(String str) {
        cleanupWsSessionSubscriptions(str, true);
    }

    private void cleanupRemoteWsSessionSubscriptions(String str) {
        cleanupWsSessionSubscriptions(str, false);
    }

    private void cleanupWsSessionSubscriptions(String str, boolean z) {
        log.debug("[{}] Removing all subscriptions for particular session.", str);
        Map<Integer, Subscription> map = this.subscriptionsByWsSessionId.get(str);
        if (map == null) {
            log.debug("[{}] No subscriptions found!", str);
            return;
        }
        int size = map.size();
        for (Subscription subscription : map.values()) {
            EntityId entityId = subscription.getEntityId();
            Set<Subscription> set = this.subscriptionsByEntityId.get(entityId);
            set.remove(subscription);
            if (set.isEmpty()) {
                this.subscriptionsByEntityId.remove(entityId);
            }
        }
        this.subscriptionsByWsSessionId.remove(str);
        log.debug("[{}] Removed {} subscriptions for particular session.", str, Integer.valueOf(size));
        if (z) {
            notifyWsSubscriptionClosed(str, map);
        }
    }

    private void notifyWsSubscriptionClosed(String str, Map<Integer, Subscription> map) {
        HashSet<ServerAddress> hashSet = new HashSet();
        for (Subscription subscription : map.values()) {
            if (subscription.getServer() != null) {
                hashSet.add(subscription.getServer());
            }
        }
        for (ServerAddress serverAddress : hashSet) {
            log.debug("[{}] Going to onSubscriptionUpdate [{}] server about session close event", str, serverAddress);
            tellRemoteSessionClose(serverAddress, str);
        }
    }

    private void processSubscriptionRemoval(String str, Map<Integer, Subscription> map, Subscription subscription) {
        EntityId entityId = subscription.getEntityId();
        if (subscription.isLocal() && subscription.getServer() != null) {
            tellRemoteSubClose(subscription.getServer(), str, subscription.getSubscriptionId());
        }
        if (map.isEmpty()) {
            log.debug("[{}] Removed last subscription for particular session.", str);
            this.subscriptionsByWsSessionId.remove(str);
        } else {
            log.debug("[{}] Removed session subscription.", str);
        }
        Set<Subscription> set = this.subscriptionsByEntityId.get(entityId);
        if (set == null) {
            log.debug("[{}] No device subscriptions found!", str);
            return;
        }
        if (!set.remove(subscription)) {
            log.debug("[{}] Subscription not found!", str);
        } else if (set.size() != 0) {
            log.debug("[{}] Removed device subscription.", str);
        } else {
            log.debug("[{}] Removed last subscription for particular device.", str);
            this.subscriptionsByEntityId.remove(entityId);
        }
    }

    private void addMainCallback(ListenableFuture<List<Void>> listenableFuture, final FutureCallback<Void> futureCallback) {
        Futures.addCallback(listenableFuture, new FutureCallback<List<Void>>() { // from class: org.thingsboard.server.service.telemetry.DefaultTelemetrySubscriptionService.1
            public void onSuccess(@Nullable List<Void> list) {
                futureCallback.onSuccess((Object) null);
            }

            public void onFailure(Throwable th) {
                futureCallback.onFailure(th);
            }
        }, this.tsCallBackExecutor);
    }

    private void addWsCallback(ListenableFuture<List<Void>> listenableFuture, final Consumer<Void> consumer) {
        Futures.addCallback(listenableFuture, new FutureCallback<List<Void>>() { // from class: org.thingsboard.server.service.telemetry.DefaultTelemetrySubscriptionService.2
            public void onSuccess(@Nullable List<Void> list) {
                consumer.accept(null);
            }

            public void onFailure(Throwable th) {
            }
        }, this.wsCallBackExecutor);
    }

    private void tellNewSubscription(ServerAddress serverAddress, String str, Subscription subscription) {
        ClusterAPIProtos.SubscriptionProto.Builder newBuilder = ClusterAPIProtos.SubscriptionProto.newBuilder();
        newBuilder.setSessionId(str);
        newBuilder.setSubscriptionId(subscription.getSubscriptionId());
        newBuilder.setTenantId(subscription.getSub().getTenantId().getId().toString());
        newBuilder.setEntityType(subscription.getEntityId().getEntityType().name());
        newBuilder.setEntityId(subscription.getEntityId().getId().toString());
        newBuilder.setType(subscription.getType().name());
        newBuilder.setAllKeys(subscription.isAllKeys());
        if (subscription.getScope() != null) {
            newBuilder.setScope(subscription.getScope());
        }
        subscription.getKeyStates().entrySet().forEach(entry -> {
            newBuilder.addKeyStates(ClusterAPIProtos.SubscriptionKetStateProto.newBuilder().setKey((String) entry.getKey()).setTs(((Long) entry.getValue()).longValue()).build());
        });
        this.rpcService.tell(serverAddress, ClusterAPIProtos.MessageType.CLUSTER_TELEMETRY_SUBSCRIPTION_CREATE_MESSAGE, newBuilder.build().toByteArray());
    }

    private void tellRemoteSubUpdate(ServerAddress serverAddress, String str, SubscriptionUpdate subscriptionUpdate) {
        ClusterAPIProtos.SubscriptionUpdateProto.Builder newBuilder = ClusterAPIProtos.SubscriptionUpdateProto.newBuilder();
        newBuilder.setSessionId(str);
        newBuilder.setSubscriptionId(subscriptionUpdate.getSubscriptionId());
        newBuilder.setErrorCode(subscriptionUpdate.getErrorCode());
        if (subscriptionUpdate.getErrorMsg() != null) {
            newBuilder.setErrorMsg(subscriptionUpdate.getErrorMsg());
        }
        subscriptionUpdate.getData().entrySet().forEach(entry -> {
            ClusterAPIProtos.SubscriptionUpdateValueListProto.Builder newBuilder2 = ClusterAPIProtos.SubscriptionUpdateValueListProto.newBuilder();
            newBuilder2.setKey((String) entry.getKey());
            ((List) entry.getValue()).forEach(obj -> {
                Object[] objArr = (Object[]) obj;
                newBuilder2.addTs(((Long) objArr[0]).longValue());
                newBuilder2.addValue((String) objArr[1]);
            });
            newBuilder.addData(newBuilder2.build());
        });
        this.rpcService.tell(serverAddress, ClusterAPIProtos.MessageType.CLUSTER_TELEMETRY_SUBSCRIPTION_UPDATE_MESSAGE, newBuilder.build().toByteArray());
    }

    private void tellRemoteAttributesUpdate(ServerAddress serverAddress, EntityId entityId, String str, List<AttributeKvEntry> list) {
        ClusterAPIProtos.AttributeUpdateProto.Builder newBuilder = ClusterAPIProtos.AttributeUpdateProto.newBuilder();
        newBuilder.setEntityId(entityId.getId().toString());
        newBuilder.setEntityType(entityId.getEntityType().name());
        newBuilder.setScope(str);
        list.forEach(attributeKvEntry -> {
            newBuilder.addData(toKeyValueProto(attributeKvEntry.getLastUpdateTs(), attributeKvEntry).build());
        });
        this.rpcService.tell(serverAddress, ClusterAPIProtos.MessageType.CLUSTER_TELEMETRY_ATTR_UPDATE_MESSAGE, newBuilder.m88build().toByteArray());
    }

    private void tellRemoteTimeseriesUpdate(ServerAddress serverAddress, EntityId entityId, List<TsKvEntry> list) {
        ClusterAPIProtos.TimeseriesUpdateProto.Builder newBuilder = ClusterAPIProtos.TimeseriesUpdateProto.newBuilder();
        newBuilder.setEntityId(entityId.getId().toString());
        newBuilder.setEntityType(entityId.getEntityType().name());
        list.forEach(tsKvEntry -> {
            newBuilder.addData(toKeyValueProto(tsKvEntry.getTs(), tsKvEntry).build());
        });
        this.rpcService.tell(serverAddress, ClusterAPIProtos.MessageType.CLUSTER_TELEMETRY_TS_UPDATE_MESSAGE, newBuilder.build().toByteArray());
    }

    private void tellRemoteSessionClose(ServerAddress serverAddress, String str) {
        this.rpcService.tell(serverAddress, ClusterAPIProtos.MessageType.CLUSTER_TELEMETRY_SESSION_CLOSE_MESSAGE, ClusterAPIProtos.SessionCloseProto.newBuilder().setSessionId(str).build().toByteArray());
    }

    private void tellRemoteSubClose(ServerAddress serverAddress, String str, int i) {
        this.rpcService.tell(serverAddress, ClusterAPIProtos.MessageType.CLUSTER_TELEMETRY_SUBSCRIPTION_CLOSE_MESSAGE, ClusterAPIProtos.SubscriptionCloseProto.newBuilder().setSessionId(str).setSubscriptionId(i).build().toByteArray());
    }

    private ClusterAPIProtos.KeyValueProto.Builder toKeyValueProto(long j, KvEntry kvEntry) {
        ClusterAPIProtos.KeyValueProto.Builder newBuilder = ClusterAPIProtos.KeyValueProto.newBuilder();
        newBuilder.setKey(kvEntry.getKey());
        newBuilder.setTs(j);
        newBuilder.setValueType(kvEntry.getDataType().ordinal());
        switch (AnonymousClass3.$SwitchMap$org$thingsboard$server$common$data$kv$DataType[kvEntry.getDataType().ordinal()]) {
            case 1:
                Optional booleanValue = kvEntry.getBooleanValue();
                newBuilder.getClass();
                booleanValue.ifPresent((v1) -> {
                    r1.setBoolValue(v1);
                });
                break;
            case 2:
                Optional longValue = kvEntry.getLongValue();
                newBuilder.getClass();
                longValue.ifPresent((v1) -> {
                    r1.setLongValue(v1);
                });
                break;
            case 3:
                Optional doubleValue = kvEntry.getDoubleValue();
                newBuilder.getClass();
                doubleValue.ifPresent((v1) -> {
                    r1.setDoubleValue(v1);
                });
                break;
            case 4:
                Optional strValue = kvEntry.getStrValue();
                newBuilder.getClass();
                strValue.ifPresent(newBuilder::setStrValue);
                break;
        }
        return newBuilder;
    }

    private AttributeKvEntry toAttribute(ClusterAPIProtos.KeyValueProto keyValueProto) {
        return new BaseAttributeKvEntry(getKvEntry(keyValueProto), keyValueProto.getTs());
    }

    private TsKvEntry toTimeseries(ClusterAPIProtos.KeyValueProto keyValueProto) {
        return new BasicTsKvEntry(keyValueProto.getTs(), getKvEntry(keyValueProto));
    }

    private KvEntry getKvEntry(ClusterAPIProtos.KeyValueProto keyValueProto) {
        BooleanDataEntry booleanDataEntry = null;
        switch (AnonymousClass3.$SwitchMap$org$thingsboard$server$common$data$kv$DataType[DataType.values()[keyValueProto.getValueType()].ordinal()]) {
            case 1:
                booleanDataEntry = new BooleanDataEntry(keyValueProto.getKey(), Boolean.valueOf(keyValueProto.getBoolValue()));
                break;
            case 2:
                booleanDataEntry = new LongDataEntry(keyValueProto.getKey(), Long.valueOf(keyValueProto.getLongValue()));
                break;
            case 3:
                booleanDataEntry = new DoubleDataEntry(keyValueProto.getKey(), Double.valueOf(keyValueProto.getDoubleValue()));
                break;
            case 4:
                booleanDataEntry = new StringDataEntry(keyValueProto.getKey(), keyValueProto.getStrValue());
                break;
        }
        return booleanDataEntry;
    }

    private SubscriptionUpdate convert(ClusterAPIProtos.SubscriptionUpdateProto subscriptionUpdateProto) {
        if (subscriptionUpdateProto.getErrorCode() > 0) {
            return new SubscriptionUpdate(subscriptionUpdateProto.getSubscriptionId(), SubscriptionErrorCode.forCode(subscriptionUpdateProto.getErrorCode()), subscriptionUpdateProto.getErrorMsg());
        }
        TreeMap treeMap = new TreeMap();
        subscriptionUpdateProto.getDataList().forEach(subscriptionUpdateValueListProto -> {
            List list = (List) treeMap.computeIfAbsent(subscriptionUpdateValueListProto.getKey(), str -> {
                return new ArrayList();
            });
            for (int i = 0; i < subscriptionUpdateValueListProto.getTsCount(); i++) {
                list.add(new Object[]{Long.valueOf(subscriptionUpdateValueListProto.getTs(i)), subscriptionUpdateValueListProto.getValue(i)});
            }
        });
        return new SubscriptionUpdate(subscriptionUpdateProto.getSubscriptionId(), treeMap);
    }

    private Optional<Subscription> getSubscription(String str, int i) {
        Subscription subscription = null;
        Map<Integer, Subscription> map = this.subscriptionsByWsSessionId.get(str);
        if (map != null) {
            subscription = map.get(Integer.valueOf(i));
        }
        return Optional.ofNullable(subscription);
    }
}
