/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.service.subscription;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.query.EntityData;
import org.thingsboard.server.common.data.query.EntityDataQuery;
import org.thingsboard.server.common.data.query.EntityKey;
import org.thingsboard.server.common.data.query.EntityKeyType;
import org.thingsboard.server.common.data.query.TsValue;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.entity.EntityService;
import org.thingsboard.server.service.subscription.SubscriptionServiceStatistics;
import org.thingsboard.server.service.subscription.TbAbstractDataSubCtx;
import org.thingsboard.server.service.subscription.TbLocalSubscriptionService;
import org.thingsboard.server.service.subscription.TbSubscription;
import org.thingsboard.server.service.ws.WebSocketService;
import org.thingsboard.server.service.ws.WebSocketSessionRef;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUpdate;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.LatestValueCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.TimeSeriesCmd;
import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate;

public class TbEntityDataSubCtx
extends TbAbstractDataSubCtx<EntityDataQuery> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(TbEntityDataSubCtx.class);
    private volatile boolean initialDataSent;
    private TimeSeriesCmd curTsCmd;
    private LatestValueCmd latestValueCmd;
    private final int maxEntitiesPerDataSubscription;
    private Map<EntityId, Map<String, TsValue>> latestTsEntityData;

    public TbEntityDataSubCtx(String serviceId, WebSocketService wsService, EntityService entityService, TbLocalSubscriptionService localSubscriptionService, AttributesService attributesService, SubscriptionServiceStatistics stats, WebSocketSessionRef sessionRef, int cmdId, int maxEntitiesPerDataSubscription) {
        super(serviceId, wsService, entityService, localSubscriptionService, attributesService, stats, sessionRef, cmdId);
        this.maxEntitiesPerDataSubscription = maxEntitiesPerDataSubscription;
    }

    @Override
    public void fetchData() {
        super.fetchData();
        this.updateLatestTsData((PageData<EntityData>)this.data);
    }

    @Override
    protected void sendWsMsg(String sessionId, TelemetrySubscriptionUpdate subscriptionUpdate, EntityKeyType keyType, boolean resultToLatestValues) {
        EntityId entityId = (EntityId)this.subToEntityIdMap.get(subscriptionUpdate.getSubscriptionId());
        if (entityId != null) {
            log.trace("[{}][{}][{}][{}] Received subscription update: {}", new Object[]{sessionId, this.cmdId, subscriptionUpdate.getSubscriptionId(), keyType, subscriptionUpdate});
            if (resultToLatestValues) {
                this.sendLatestWsMsg(entityId, sessionId, subscriptionUpdate, keyType);
            } else {
                this.sendTsWsMsg(entityId, sessionId, subscriptionUpdate, keyType);
            }
        } else {
            log.trace("[{}][{}][{}][{}] Received stale subscription update: {}", new Object[]{sessionId, this.cmdId, subscriptionUpdate.getSubscriptionId(), keyType, subscriptionUpdate});
        }
    }

    @Override
    protected Aggregation getCurrentAggregation() {
        return this.curTsCmd == null || this.curTsCmd.getAgg() == null ? Aggregation.NONE : this.curTsCmd.getAgg();
    }

    private void sendLatestWsMsg(EntityId entityId, String sessionId, TelemetrySubscriptionUpdate subscriptionUpdate, EntityKeyType keyType) {
        HashMap latestUpdate = new HashMap();
        subscriptionUpdate.getValues().forEach((key, values) -> latestUpdate.put(key, this.getLatest((List<TsValue>)values)));
        EntityData entityData = this.getDataForEntity(entityId);
        if (entityData != null && entityData.getLatest() != null) {
            Map latestCtxValues = entityData.getLatest().computeIfAbsent(keyType, __ -> new HashMap());
            log.trace("[{}][{}][{}] Going to compare update with {}", new Object[]{sessionId, this.cmdId, subscriptionUpdate.getSubscriptionId(), latestCtxValues});
            latestCtxValues.forEach((k, v) -> {
                TsValue update = (TsValue)latestUpdate.get(k);
                if (update != null) {
                    if (update.getTs() != 0L || update.getValue() != null && !update.getValue().isEmpty()) {
                        if (update.getTs() < v.getTs()) {
                            log.trace("[{}][{}][{}] Removed stale update for key: {} and ts: {}", new Object[]{sessionId, this.cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs()});
                            latestUpdate.remove(k);
                        } else if (update.getTs() == v.getTs() && update.getValue().equals(v.getValue())) {
                            log.trace("[{}][{}][{}] Removed duplicate update for key: {} and ts: {}", new Object[]{sessionId, this.cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs()});
                            latestUpdate.remove(k);
                        }
                    } else {
                        log.trace("[{}][{}][{}] Received deleted notification for: {}", new Object[]{sessionId, this.cmdId, subscriptionUpdate.getSubscriptionId(), k});
                    }
                }
            });
            latestCtxValues.putAll(latestUpdate);
        }
        if (!latestUpdate.isEmpty()) {
            Map latestMap = Collections.singletonMap(keyType, latestUpdate);
            entityData = new EntityData(entityId, latestMap, null);
            this.sendWsMsg(new EntityDataUpdate(this.cmdId, null, Collections.singletonList(entityData), this.maxEntitiesPerDataSubscription));
        }
    }

    private void sendTsWsMsg(EntityId entityId, String sessionId, TelemetrySubscriptionUpdate subscriptionUpdate, EntityKeyType keyType) {
        HashMap<String, List> tsUpdate = new HashMap<String, List>();
        subscriptionUpdate.getValues().forEach((key, values) -> tsUpdate.put((String)key, new ArrayList(values)));
        Map<String, TsValue> latestCtxValues = this.getLatestTsValuesForEntity(entityId);
        log.trace("[{}][{}][{}] Going to compare update with {}", new Object[]{sessionId, this.cmdId, subscriptionUpdate.getSubscriptionId(), latestCtxValues});
        if (latestCtxValues != null) {
            latestCtxValues.forEach((key, latest) -> {
                List updateList = (List)tsUpdate.get(key);
                if (updateList != null) {
                    for (TsValue update : new ArrayList(updateList)) {
                        if (update.getTs() < latest.getTs()) {
                            log.trace("[{}][{}][{}] Removed stale update for key: {} and ts: {}", new Object[]{sessionId, this.cmdId, subscriptionUpdate.getSubscriptionId(), key, update.getTs()});
                        } else if (update.getTs() == latest.getTs() && update.getValue().equals(latest.getValue())) {
                            log.trace("[{}][{}][{}] Removed duplicate update for key: {} and ts: {}", new Object[]{sessionId, this.cmdId, subscriptionUpdate.getSubscriptionId(), key, update.getTs()});
                            updateList.remove(update);
                        }
                        if (!updateList.isEmpty()) continue;
                        tsUpdate.remove(key);
                    }
                }
            });
            tsUpdate.forEach((key, values) -> values.stream().max(Comparator.comparingLong(TsValue::getTs)).ifPresent(latest -> latestCtxValues.put((String)key, (TsValue)latest)));
        }
        if (!tsUpdate.isEmpty()) {
            HashMap tsMap = new HashMap();
            tsUpdate.forEach((key, values) -> tsMap.put(key, values.toArray(new TsValue[0])));
            EntityData entityData = new EntityData(entityId, null, tsMap);
            this.sendWsMsg(new EntityDataUpdate(this.cmdId, null, Collections.singletonList(entityData), this.maxEntitiesPerDataSubscription));
        }
    }

    private EntityData getDataForEntity(EntityId entityId) {
        return this.data.getData().stream().filter(item -> item.getEntityId().equals(entityId)).findFirst().orElse(null);
    }

    private Map<String, TsValue> getLatestTsValuesForEntity(EntityId entityId) {
        return this.latestTsEntityData.get(entityId);
    }

    private void updateLatestTsData(PageData<EntityData> data) {
        this.latestTsEntityData = new HashMap<EntityId, Map<String, TsValue>>();
        data.getData().stream().forEach(entityData -> {
            Map latestTsValues;
            HashMap latestTsMap = new HashMap();
            this.latestTsEntityData.put(entityData.getEntityId(), latestTsMap);
            if (entityData.getLatest() != null && (latestTsValues = (Map)entityData.getLatest().get(EntityKeyType.TIME_SERIES)) != null) {
                latestTsValues.forEach(latestTsMap::put);
            }
        });
    }

    @Override
    public synchronized void doUpdate(Map<EntityId, EntityData> newDataMap) {
        List<EntityKey> keys;
        this.updateLatestTsData((PageData<EntityData>)this.data);
        ArrayList subIdsToCancel = new ArrayList();
        ArrayList subsToAdd = new ArrayList();
        HashSet currentSubs = new HashSet();
        this.subToEntityIdMap.forEach((subId, entityId) -> {
            if (!newDataMap.containsKey(entityId)) {
                subIdsToCancel.add(subId);
            } else {
                currentSubs.add(entityId);
            }
        });
        log.trace("[{}][{}] Subscriptions that are invalid: {}", new Object[]{this.sessionRef.getSessionId(), this.cmdId, subIdsToCancel});
        subIdsToCancel.forEach(this.subToEntityIdMap::remove);
        List<EntityData> newSubsList = newDataMap.entrySet().stream().filter(entry -> !currentSubs.contains(entry.getKey())).map(Map.Entry::getValue).collect(Collectors.toList());
        if (!newSubsList.isEmpty() && this.curTsCmd == null && this.latestValueCmd != null && (keys = this.latestValueCmd.getKeys()) != null && !keys.isEmpty()) {
            Map<EntityKeyType, List<EntityKey>> keysByType = this.getEntityKeyByTypeMap(keys);
            newSubsList.forEach(entity -> {
                log.trace("[{}][{}] Found new subscription for entity: {}", new Object[]{this.sessionRef.getSessionId(), this.cmdId, entity.getEntityId()});
                subsToAdd.addAll(this.addSubscriptions((EntityData)entity, keysByType, true, 0L, 0L));
            });
        }
        subIdsToCancel.forEach(subId -> this.localSubscriptionService.cancelSubscription(this.getTenantId(), this.getSessionId(), (int)subId));
        subsToAdd.forEach(subscription -> this.localSubscriptionService.addSubscription((TbSubscription<?>)subscription, this.sessionRef));
        this.sendWsMsg(new EntityDataUpdate(this.cmdId, (PageData<EntityData>)this.data, null, this.maxEntitiesPerDataSubscription));
    }

    public void setCurrentCmd(EntityDataCmd cmd) {
        this.curTsCmd = cmd.getTsCmd();
        this.latestValueCmd = cmd.getLatestCmd();
    }

    @Override
    protected EntityDataQuery buildEntityDataQuery() {
        return (EntityDataQuery)this.query;
    }

    @Generated
    public boolean isInitialDataSent() {
        return this.initialDataSent;
    }

    @Generated
    public void setInitialDataSent(boolean initialDataSent) {
        this.initialDataSent = initialDataSent;
    }

    @Generated
    public int getMaxEntitiesPerDataSubscription() {
        return this.maxEntitiesPerDataSubscription;
    }
}

