package org.thingsboard.server.service.subscription;

import jakarta.annotation.PostConstruct;
import java.beans.ConstructorProperties;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.kafka.common.network.NetworkReceive;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.alarm.AlarmInfo;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.discovery.event.OtherServiceShutdownEvent;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.ws.notification.sub.NotificationUpdate;
import org.thingsboard.server.service.ws.notification.sub.NotificationsSubscriptionUpdate;

@TbCoreComponent
@Service
/* loaded from: input_file:org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.class */
public class DefaultSubscriptionManagerService extends TbApplicationEventListener<PartitionChangeEvent> implements SubscriptionManagerService {
    private static final Logger log = LoggerFactory.getLogger(DefaultSubscriptionManagerService.class);
    private final TopicService topicService;
    private final PartitionService partitionService;
    private final TbServiceInfoProvider serviceInfoProvider;
    private final TbQueueProducerProvider producerProvider;
    private final TbLocalSubscriptionService localSubscriptionService;
    private final TbClusterService clusterService;
    private final SubscriptionSchedulerComponent scheduler;
    private final Lock subsLock = new ReentrantLock();
    private final ConcurrentMap<EntityId, TbEntityRemoteSubsInfo> entitySubscriptions = new ConcurrentHashMap();
    private final ConcurrentMap<EntityId, TbEntityUpdatesInfo> entityUpdates = new ConcurrentHashMap();
    private String serviceId;
    private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> toCoreNotificationsProducer;
    private long initTs;

    @PostConstruct
    public void initExecutor() {
        this.serviceId = this.serviceInfoProvider.getServiceId();
        this.initTs = System.currentTimeMillis();
        this.toCoreNotificationsProducer = this.producerProvider.getTbCoreNotificationsMsgProducer();
        this.scheduler.scheduleWithFixedDelay(this::cleanupEntityUpdates, 1L, 1L, TimeUnit.HOURS);
    }

    @Override // org.thingsboard.server.service.subscription.SubscriptionManagerService
    public void onSubEvent(String str, TbEntitySubEvent tbEntitySubEvent, TbCallback tbCallback) {
        TenantId tenantId = tbEntitySubEvent.getTenantId();
        EntityId entityId = tbEntitySubEvent.getEntityId();
        log.trace("[{}][{}][{}] Processing subscription event {}", new Object[]{tenantId, entityId, str, tbEntitySubEvent});
        TopicPartitionInfo resolve = this.partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
        if (!resolve.isMyPartition()) {
            log.warn("[{}][{}][{}] Event belongs to external partition. Probably re-balancing is in progress. Topic: {}", new Object[]{tenantId, entityId, str, resolve.getFullTopicName()});
            tbCallback.onFailure(new RuntimeException("Entity belongs to external partition " + resolve.getFullTopicName() + "!"));
            return;
        }
        this.subsLock.lock();
        try {
            if (this.entitySubscriptions.computeIfAbsent(entityId, entityId2 -> {
                return new TbEntityRemoteSubsInfo(tenantId, entityId);
            }).updateAndCheckIsEmpty(str, tbEntitySubEvent)) {
                this.entitySubscriptions.remove(entityId);
            }
            tbCallback.onSuccess();
            if (tbEntitySubEvent.hasTsOrAttrSub()) {
                sendSubEventCallback(tenantId, str, entityId, tbEntitySubEvent.getSeqNumber());
            }
        } finally {
            this.subsLock.unlock();
        }
    }

    @Override // org.thingsboard.server.service.subscription.SubscriptionManagerService
    @EventListener({OtherServiceShutdownEvent.class})
    public void onApplicationEvent(OtherServiceShutdownEvent otherServiceShutdownEvent) {
        if (otherServiceShutdownEvent.getServiceTypes() == null || !otherServiceShutdownEvent.getServiceTypes().contains(ServiceType.TB_CORE)) {
            return;
        }
        this.subsLock.lock();
        try {
            int size = this.entitySubscriptions.size();
            this.entitySubscriptions.entrySet().removeIf(entry -> {
                return ((TbEntityRemoteSubsInfo) entry.getValue()).removeAndCheckIsEmpty(otherServiceShutdownEvent.getServiceId());
            });
            log.info("[{}][{}] Removed {} entity subscription records due to server shutdown.", new Object[]{this.serviceId, otherServiceShutdownEvent.getServiceId(), Integer.valueOf(this.entitySubscriptions.size() - size)});
        } finally {
            this.subsLock.unlock();
        }
    }

    private void sendSubEventCallback(TenantId tenantId, String str, EntityId entityId, int i) {
        TbEntityUpdatesInfo entityUpdatesInfo = getEntityUpdatesInfo(entityId);
        if (this.serviceId.equals(str)) {
            this.localSubscriptionService.onSubEventCallback(tenantId, entityId, i, entityUpdatesInfo, TbCallback.EMPTY);
        } else {
            sendCoreNotification(str, entityId, TbSubscriptionUtils.toProto(tenantId, entityId.getId(), i, entityUpdatesInfo));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
        if (ServiceType.TB_CORE.equals(partitionChangeEvent.getServiceType())) {
            this.entitySubscriptions.values().removeIf(tbEntityRemoteSubsInfo -> {
                return !this.partitionService.isMyPartition(ServiceType.TB_CORE, tbEntityRemoteSubsInfo.getTenantId(), tbEntityRemoteSubsInfo.getEntityId());
            });
        }
    }

    @Override // org.thingsboard.server.service.subscription.SubscriptionManagerService
    public void onTimeSeriesUpdate(TenantId tenantId, EntityId entityId, List<TsKvEntry> list, TbCallback tbCallback) {
        onTimeSeriesUpdate(entityId, list);
        tbCallback.onSuccess();
    }

    @Override // org.thingsboard.server.service.subscription.SubscriptionManagerService
    public void onTimeSeriesDelete(TenantId tenantId, EntityId entityId, List<String> list, TbCallback tbCallback) {
        onTimeSeriesUpdate(entityId, (List) list.stream().map(str -> {
            return new BasicTsKvEntry(0L, new StringDataEntry(str, NetworkReceive.UNKNOWN_SOURCE));
        }).collect(Collectors.toList()));
        tbCallback.onSuccess();
    }

    private void onTimeSeriesUpdate(EntityId entityId, List<TsKvEntry> list) {
        getEntityUpdatesInfo(entityId).timeSeriesUpdateTs = System.currentTimeMillis();
        TbEntityRemoteSubsInfo tbEntityRemoteSubsInfo = this.entitySubscriptions.get(entityId);
        if (tbEntityRemoteSubsInfo == null) {
            log.trace("[{}] No time-series subscriptions for entity.", entityId);
        } else {
            log.trace("[{}] Handling time-series update: {}", entityId, list);
            tbEntityRemoteSubsInfo.getSubs().forEach((str, tbSubscriptionsInfo) -> {
                List<TsKvEntry> subList;
                if (tbSubscriptionsInfo.tsAllKeys) {
                    onTimeSeriesUpdate(str, entityId, list);
                } else {
                    if (tbSubscriptionsInfo.tsKeys == null || (subList = getSubList(list, tbSubscriptionsInfo.tsKeys)) == null) {
                        return;
                    }
                    onTimeSeriesUpdate(str, entityId, subList);
                }
            });
        }
    }

    private void onTimeSeriesUpdate(String str, EntityId entityId, List<TsKvEntry> list) {
        if (this.serviceId.equals(str)) {
            this.localSubscriptionService.onTimeSeriesUpdate(entityId, list, TbCallback.EMPTY);
        } else {
            sendCoreNotification(str, entityId, TbSubscriptionUtils.toProto(entityId, list));
        }
    }

    @Override // org.thingsboard.server.service.subscription.SubscriptionManagerService
    public void onAttributesUpdate(TenantId tenantId, EntityId entityId, String str, List<AttributeKvEntry> list, TbCallback tbCallback) {
        getEntityUpdatesInfo(entityId).attributesUpdateTs = System.currentTimeMillis();
        processAttributesUpdate(entityId, str, list);
        tbCallback.onSuccess();
    }

    @Override // org.thingsboard.server.service.subscription.SubscriptionManagerService
    public void onAttributesDelete(TenantId tenantId, EntityId entityId, String str, List<String> list, TbCallback tbCallback) {
        onAttributesDelete(tenantId, entityId, str, list, false, tbCallback);
    }

    @Override // org.thingsboard.server.service.subscription.SubscriptionManagerService
    public void onAttributesDelete(TenantId tenantId, EntityId entityId, String str, List<String> list, boolean z, TbCallback tbCallback) {
        processAttributesUpdate(entityId, str, (List) list.stream().map(str2 -> {
            return new BaseAttributeKvEntry(0L, new StringDataEntry(str2, NetworkReceive.UNKNOWN_SOURCE));
        }).collect(Collectors.toList()));
        if (entityId.getEntityType() == EntityType.DEVICE && TbAttributeSubscriptionScope.SHARED_SCOPE.name().equalsIgnoreCase(str) && z) {
            this.clusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete(tenantId, new DeviceId(entityId.getId()), str, list), (TbQueueCallback) null);
        }
        tbCallback.onSuccess();
    }

    private void processAttributesUpdate(EntityId entityId, String str, List<AttributeKvEntry> list) {
        TbEntityRemoteSubsInfo tbEntityRemoteSubsInfo = this.entitySubscriptions.get(entityId);
        if (tbEntityRemoteSubsInfo == null) {
            log.trace("[{}] No attributes subscriptions for entity.", entityId);
        } else {
            log.trace("[{}] Handling attributes update: {}", entityId, list);
            tbEntityRemoteSubsInfo.getSubs().forEach((str2, tbSubscriptionsInfo) -> {
                List<AttributeKvEntry> subList;
                if (tbSubscriptionsInfo.attrAllKeys) {
                    processAttributesUpdate(str2, entityId, str, list);
                } else {
                    if (tbSubscriptionsInfo.attrKeys == null || (subList = getSubList(list, tbSubscriptionsInfo.attrKeys)) == null) {
                        return;
                    }
                    processAttributesUpdate(str2, entityId, str, subList);
                }
            });
        }
    }

    private void processAttributesUpdate(String str, EntityId entityId, String str2, List<AttributeKvEntry> list) {
        List<TsKvEntry> list2 = (List) list.stream().map(attributeKvEntry -> {
            return new BasicTsKvEntry(attributeKvEntry.getLastUpdateTs(), attributeKvEntry);
        }).collect(Collectors.toList());
        if (this.serviceId.equals(str)) {
            this.localSubscriptionService.onAttributesUpdate(entityId, str2, list2, TbCallback.EMPTY);
        } else {
            sendCoreNotification(str, entityId, TbSubscriptionUtils.toProto(str2, entityId, list2));
        }
    }

    @Override // org.thingsboard.server.service.subscription.SubscriptionManagerService
    public void onAlarmUpdate(TenantId tenantId, EntityId entityId, AlarmInfo alarmInfo, TbCallback tbCallback) {
        onAlarmSubUpdate(tenantId, entityId, alarmInfo, false, tbCallback);
    }

    @Override // org.thingsboard.server.service.subscription.SubscriptionManagerService
    public void onAlarmDeleted(TenantId tenantId, EntityId entityId, AlarmInfo alarmInfo, TbCallback tbCallback) {
        onAlarmSubUpdate(tenantId, entityId, alarmInfo, true, tbCallback);
    }

    private void onAlarmSubUpdate(TenantId tenantId, EntityId entityId, AlarmInfo alarmInfo, boolean z, TbCallback tbCallback) {
        TbEntityRemoteSubsInfo tbEntityRemoteSubsInfo = this.entitySubscriptions.get(entityId);
        if (tbEntityRemoteSubsInfo != null) {
            log.trace("[{}][{}] Handling alarm update {}: {}", new Object[]{tenantId, entityId, alarmInfo, Boolean.valueOf(z)});
            for (Map.Entry<String, TbSubscriptionsInfo> entry : tbEntityRemoteSubsInfo.getSubs().entrySet()) {
                if (entry.getValue().alarms) {
                    onAlarmSubUpdate(entry.getKey(), entityId, alarmInfo, z);
                }
            }
        }
        tbCallback.onSuccess();
    }

    private void onAlarmSubUpdate(String str, EntityId entityId, AlarmInfo alarmInfo, boolean z) {
        if (alarmInfo == null) {
            log.warn("[{}] empty alarm update!", entityId);
        } else if (!this.serviceId.equals(str)) {
            sendCoreNotification(str, entityId, TbSubscriptionUtils.toAlarmSubUpdateToProto(entityId, alarmInfo, z));
        } else {
            log.trace("[{}] Forwarding to local service: {} deleted: {}", new Object[]{entityId, alarmInfo, Boolean.valueOf(z)});
            this.localSubscriptionService.onAlarmUpdate(entityId, alarmInfo, z, TbCallback.EMPTY);
        }
    }

    private void sendCoreNotification(String str, EntityId entityId, TransportProtos.ToCoreNotificationMsg toCoreNotificationMsg) {
        log.trace("[{}] Forwarding to remote service [{}]: {}", new Object[]{entityId, str, toCoreNotificationMsg});
        this.toCoreNotificationsProducer.send(this.topicService.getNotificationsTopic(ServiceType.TB_CORE, str), new TbProtoQueueMsg(entityId.getId(), toCoreNotificationMsg), (TbQueueCallback) null);
    }

    @Override // org.thingsboard.server.service.subscription.SubscriptionManagerService
    public void onNotificationUpdate(TenantId tenantId, UserId userId, NotificationUpdate notificationUpdate, TbCallback tbCallback) {
        TbEntityRemoteSubsInfo tbEntityRemoteSubsInfo = this.entitySubscriptions.get(userId);
        if (tbEntityRemoteSubsInfo != null) {
            NotificationsSubscriptionUpdate notificationsSubscriptionUpdate = new NotificationsSubscriptionUpdate(notificationUpdate);
            log.trace("[{}][{}] Handling notificationUpdate for user {}", new Object[]{tenantId, userId, notificationUpdate});
            for (Map.Entry<String, TbSubscriptionsInfo> entry : tbEntityRemoteSubsInfo.getSubs().entrySet()) {
                if (entry.getValue().notifications) {
                    onNotificationsSubUpdate(entry.getKey(), userId, notificationsSubscriptionUpdate);
                }
            }
        }
        tbCallback.onSuccess();
    }

    private void onNotificationsSubUpdate(String str, EntityId entityId, NotificationsSubscriptionUpdate notificationsSubscriptionUpdate) {
        if (!this.serviceId.equals(str)) {
            sendCoreNotification(str, entityId, TbSubscriptionUtils.notificationsSubUpdateToProto(entityId, notificationsSubscriptionUpdate));
        } else {
            log.trace("[{}] Forwarding to local service: {}", entityId, notificationsSubscriptionUpdate);
            this.localSubscriptionService.onNotificationUpdate(entityId, notificationsSubscriptionUpdate, TbCallback.EMPTY);
        }
    }

    private static <T extends KvEntry> List<T> getSubList(List<T> list, Set<String> set) {
        ArrayList arrayList = null;
        for (T t : list) {
            if (set.contains(t.getKey())) {
                if (arrayList == null) {
                    arrayList = new ArrayList(list.size());
                }
                arrayList.add(t);
            }
        }
        return arrayList;
    }

    private TbEntityUpdatesInfo getEntityUpdatesInfo(EntityId entityId) {
        return this.entityUpdates.computeIfAbsent(entityId, entityId2 -> {
            return new TbEntityUpdatesInfo(this.initTs);
        });
    }

    private void cleanupEntityUpdates() {
        this.initTs = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1L);
        int size = this.entityUpdates.size();
        this.entityUpdates.entrySet().removeIf(entry -> {
            TbEntityUpdatesInfo tbEntityUpdatesInfo = (TbEntityUpdatesInfo) entry.getValue();
            return this.initTs > tbEntityUpdatesInfo.attributesUpdateTs && this.initTs > tbEntityUpdatesInfo.timeSeriesUpdateTs;
        });
        log.info("Removed {} old entity update records.", Integer.valueOf(this.entityUpdates.size() - size));
    }

    @ConstructorProperties({"topicService", "partitionService", "serviceInfoProvider", "producerProvider", "localSubscriptionService", "clusterService", "scheduler"})
    public DefaultSubscriptionManagerService(TopicService topicService, PartitionService partitionService, TbServiceInfoProvider tbServiceInfoProvider, TbQueueProducerProvider tbQueueProducerProvider, TbLocalSubscriptionService tbLocalSubscriptionService, TbClusterService tbClusterService, SubscriptionSchedulerComponent subscriptionSchedulerComponent) {
        this.topicService = topicService;
        this.partitionService = partitionService;
        this.serviceInfoProvider = tbServiceInfoProvider;
        this.producerProvider = tbQueueProducerProvider;
        this.localSubscriptionService = tbLocalSubscriptionService;
        this.clusterService = tbClusterService;
        this.scheduler = subscriptionSchedulerComponent;
    }
}
