package org.thingsboard.server.service.partition;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;

/* loaded from: input_file:org/thingsboard/server/service/partition/AbstractPartitionBasedService.class */
public abstract class AbstractPartitionBasedService<T extends EntityId> extends TbApplicationEventListener<PartitionChangeEvent> {
    private static final Logger log = LoggerFactory.getLogger(AbstractPartitionBasedService.class);
    protected final ConcurrentMap<TopicPartitionInfo, Set<T>> partitionedEntities = new ConcurrentHashMap();
    protected final ConcurrentMap<TopicPartitionInfo, List<ListenableFuture<?>>> partitionedFetchTasks = new ConcurrentHashMap();
    final Queue<Set<TopicPartitionInfo>> subscribeQueue = new ConcurrentLinkedQueue();

    @Autowired
    protected PartitionService partitionService;
    protected ListeningScheduledExecutorService scheduledExecutor;

    protected abstract String getServiceName();

    protected abstract String getSchedulerExecutorName();

    protected abstract Map<TopicPartitionInfo, List<ListenableFuture<?>>> onAddedPartitions(Set<TopicPartitionInfo> set);

    protected abstract void cleanupEntityOnPartitionRemoval(T t);

    public Set<T> getPartitionedEntities(TopicPartitionInfo topicPartitionInfo) {
        return this.partitionedEntities.get(topicPartitionInfo);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void init() {
        this.scheduledExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newSingleThreadScheduledExecutor(getSchedulerExecutorName()));
    }

    protected ServiceType getServiceType() {
        return ServiceType.TB_CORE;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void stop() {
        if (this.scheduledExecutor != null) {
            this.scheduledExecutor.shutdownNow();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
        log.debug("onTbApplicationEvent, processing event: {}", partitionChangeEvent);
        this.subscribeQueue.add(partitionChangeEvent.getCorePartitions());
        this.scheduledExecutor.submit(this::pollInitStateFromDB);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean filterTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
        return partitionChangeEvent.getServiceType() == getServiceType();
    }

    protected void pollInitStateFromDB() {
        Set<TopicPartitionInfo> latestPartitions = getLatestPartitions();
        if (latestPartitions == null) {
            log.debug("Nothing to do. Partitions are empty.");
        } else {
            initStateFromDB(latestPartitions);
        }
    }

    private void initStateFromDB(Set<TopicPartitionInfo> set) {
        try {
            log.info("[{}] CURRENT PARTITIONS: {}", getServiceName(), this.partitionedEntities.keySet());
            log.info("[{}] NEW PARTITIONS: {}", getServiceName(), set);
            HashSet hashSet = new HashSet(set);
            hashSet.removeAll(this.partitionedEntities.keySet());
            log.info("[{}] ADDED PARTITIONS: {}", getServiceName(), hashSet);
            HashSet<TopicPartitionInfo> hashSet2 = new HashSet(this.partitionedEntities.keySet());
            hashSet2.removeAll(set);
            log.info("[{}] REMOVED PARTITIONS: {}", getServiceName(), hashSet2);
            boolean z = false;
            for (TopicPartitionInfo topicPartitionInfo : hashSet2) {
                Set<T> remove = this.partitionedEntities.remove(topicPartitionInfo);
                if (remove != null) {
                    remove.forEach(this::cleanupEntityOnPartitionRemoval);
                }
                List<ListenableFuture<?>> remove2 = this.partitionedFetchTasks.remove(topicPartitionInfo);
                if (remove2 != null) {
                    remove2.forEach(listenableFuture -> {
                        listenableFuture.cancel(false);
                    });
                }
                z = true;
            }
            onRepartitionEvent();
            hashSet.forEach(topicPartitionInfo2 -> {
                this.partitionedEntities.computeIfAbsent(topicPartitionInfo2, topicPartitionInfo2 -> {
                    return ConcurrentHashMap.newKeySet();
                });
            });
            if (!hashSet.isEmpty()) {
                Map<? extends TopicPartitionInfo, ? extends List<ListenableFuture<?>>> onAddedPartitions = onAddedPartitions(hashSet);
                if (onAddedPartitions != null && !onAddedPartitions.isEmpty()) {
                    this.partitionedFetchTasks.putAll(onAddedPartitions);
                }
                z = true;
            }
            if (z) {
                ArrayList arrayList = new ArrayList();
                Collection<List<ListenableFuture<?>>> values = this.partitionedFetchTasks.values();
                Objects.requireNonNull(arrayList);
                values.forEach((v1) -> {
                    r1.addAll(v1);
                });
                DonAsynchron.withCallback(Futures.allAsList(arrayList), list -> {
                    logPartitions();
                }, this::logFailure);
            }
        } catch (Throwable th) {
            log.warn("[{}] Failed to init entities state from DB", getServiceName(), th);
        }
    }

    private void logFailure(Throwable th) {
        if (th instanceof CancellationException) {
            log.trace("Partition fetch task error", th);
        } else {
            log.error("Partition fetch task error", th);
        }
    }

    private void logPartitions() {
        log.info("[{}] Managing following partitions:", getServiceName());
        this.partitionedEntities.forEach((topicPartitionInfo, set) -> {
            log.info("[{}][{}]: {} entities", new Object[]{getServiceName(), topicPartitionInfo.getFullTopicName(), Integer.valueOf(set.size())});
        });
    }

    protected void onRepartitionEvent() {
    }

    private Set<TopicPartitionInfo> getLatestPartitions() {
        log.debug("getLatestPartitionsFromQueue, queue size {}", Integer.valueOf(this.subscribeQueue.size()));
        Set<TopicPartitionInfo> set = null;
        while (!this.subscribeQueue.isEmpty()) {
            set = this.subscribeQueue.poll();
            log.debug("polled from the queue partitions {}", set);
        }
        log.debug("getLatestPartitionsFromQueue, partitions {}", set);
        return set;
    }
}
