/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.queue.discovery;

import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.discovery.QueueRoutingInfo;
import org.thingsboard.server.queue.discovery.QueueRoutingInfoService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.TenantRoutingInfo;
import org.thingsboard.server.queue.discovery.TenantRoutingInfoService;
import org.thingsboard.server.queue.discovery.event.ClusterTopologyChangeEvent;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.discovery.event.ServiceListChangedEvent;
import org.thingsboard.server.queue.util.AfterStartUp;

@Service
public class HashPartitionService
implements PartitionService {
    private static final Logger log = LoggerFactory.getLogger(HashPartitionService.class);
    @Value(value="${queue.core.topic}")
    private String coreTopic;
    @Value(value="${queue.core.partitions:100}")
    private Integer corePartitions;
    @Value(value="${queue.vc.topic:tb_version_control}")
    private String vcTopic;
    @Value(value="${queue.vc.partitions:10}")
    private Integer vcPartitions;
    @Value(value="${queue.partitions.hash_function_name:murmur3_128}")
    private String hashFunctionName;
    private final ApplicationEventPublisher applicationEventPublisher;
    private final TbServiceInfoProvider serviceInfoProvider;
    private final TenantRoutingInfoService tenantRoutingInfoService;
    private final QueueRoutingInfoService queueRoutingInfoService;
    private ConcurrentMap<QueueKey, List<Integer>> myPartitions = new ConcurrentHashMap<QueueKey, List<Integer>>();
    private final ConcurrentMap<QueueKey, String> partitionTopicsMap = new ConcurrentHashMap<QueueKey, String>();
    private final ConcurrentMap<QueueKey, Integer> partitionSizesMap = new ConcurrentHashMap<QueueKey, Integer>();
    private final ConcurrentMap<TenantId, TenantRoutingInfo> tenantRoutingInfoMap = new ConcurrentHashMap<TenantId, TenantRoutingInfo>();
    private Map<String, List<TransportProtos.ServiceInfo>> tbTransportServicesByType = new HashMap<String, List<TransportProtos.ServiceInfo>>();
    private List<TransportProtos.ServiceInfo> currentOtherServices;
    private HashFunction hashFunction;

    public HashPartitionService(TbServiceInfoProvider serviceInfoProvider, TenantRoutingInfoService tenantRoutingInfoService, ApplicationEventPublisher applicationEventPublisher, QueueRoutingInfoService queueRoutingInfoService) {
        this.serviceInfoProvider = serviceInfoProvider;
        this.tenantRoutingInfoService = tenantRoutingInfoService;
        this.applicationEventPublisher = applicationEventPublisher;
        this.queueRoutingInfoService = queueRoutingInfoService;
    }

    @PostConstruct
    public void init() {
        this.hashFunction = HashPartitionService.forName(this.hashFunctionName);
        QueueKey coreKey = new QueueKey(ServiceType.TB_CORE);
        this.partitionSizesMap.put(coreKey, this.corePartitions);
        this.partitionTopicsMap.put(coreKey, this.coreTopic);
        QueueKey vcKey = new QueueKey(ServiceType.TB_VC_EXECUTOR);
        this.partitionSizesMap.put(vcKey, this.vcPartitions);
        this.partitionTopicsMap.put(vcKey, this.vcTopic);
        if (!this.isTransport(this.serviceInfoProvider.getServiceType())) {
            this.doInitRuleEnginePartitions();
        }
    }

    @AfterStartUp(order=1)
    public void partitionsInit() {
        if (this.isTransport(this.serviceInfoProvider.getServiceType())) {
            this.doInitRuleEnginePartitions();
        }
    }

    private void doInitRuleEnginePartitions() {
        List<QueueRoutingInfo> queueRoutingInfoList = this.getQueueRoutingInfos();
        queueRoutingInfoList.forEach(queue -> {
            QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, (QueueRoutingInfo)queue);
            this.partitionTopicsMap.put(queueKey, queue.getQueueTopic());
            this.partitionSizesMap.put(queueKey, queue.getPartitions());
        });
    }

    private List<QueueRoutingInfo> getQueueRoutingInfos() {
        List<QueueRoutingInfo> queueRoutingInfoList;
        block6: {
            String serviceType = this.serviceInfoProvider.getServiceType();
            if (this.isTransport(serviceType)) {
                int getQueuesRetries = 10;
                while (getQueuesRetries > 0) {
                    log.info("Try to get queue routing info.");
                    try {
                        queueRoutingInfoList = this.queueRoutingInfoService.getAllQueuesRoutingInfo();
                        break block6;
                    }
                    catch (Exception e) {
                        log.info("Failed to get queues routing info: {}!", (Object)e.getMessage());
                        --getQueuesRetries;
                        try {
                            Thread.sleep(10000L);
                        }
                        catch (InterruptedException e2) {
                            log.info("Failed to await queues routing info!", (Throwable)e2);
                        }
                    }
                }
                throw new RuntimeException("Failed to await queues routing info!");
            }
            queueRoutingInfoList = this.queueRoutingInfoService.getAllQueuesRoutingInfo();
        }
        return queueRoutingInfoList;
    }

    private boolean isTransport(String serviceType) {
        return "tb-transport".equals(serviceType);
    }

    @Override
    public void updateQueue(TransportProtos.QueueUpdateMsg queueUpdateMsg) {
        TenantId tenantId = new TenantId(new UUID(queueUpdateMsg.getTenantIdMSB(), queueUpdateMsg.getTenantIdLSB()));
        QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueUpdateMsg.getQueueName(), tenantId);
        this.partitionTopicsMap.put(queueKey, queueUpdateMsg.getQueueTopic());
        this.partitionSizesMap.put(queueKey, queueUpdateMsg.getPartitions());
        this.myPartitions.remove(queueKey);
    }

    @Override
    public void removeQueue(TransportProtos.QueueDeleteMsg queueDeleteMsg) {
        TenantId tenantId = new TenantId(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB()));
        QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId);
        this.myPartitions.remove(queueKey);
        this.partitionTopicsMap.remove(queueKey);
        this.partitionSizesMap.remove(queueKey);
        this.removeTenant(tenantId);
    }

    @Override
    public TopicPartitionInfo resolve(ServiceType serviceType, String queueName, TenantId tenantId, EntityId entityId) {
        TenantId isolatedOrSystemTenantId = this.getIsolatedOrSystemTenantId(serviceType, tenantId);
        QueueKey queueKey = new QueueKey(serviceType, queueName, isolatedOrSystemTenantId);
        if (!this.partitionSizesMap.containsKey(queueKey)) {
            queueKey = new QueueKey(serviceType, isolatedOrSystemTenantId);
        }
        return this.resolve(queueKey, entityId);
    }

    @Override
    public TopicPartitionInfo resolve(ServiceType serviceType, TenantId tenantId, EntityId entityId) {
        return this.resolve(serviceType, null, tenantId, entityId);
    }

    private TopicPartitionInfo resolve(QueueKey queueKey, EntityId entityId) {
        int hash = this.hashFunction.newHasher().putLong(entityId.getId().getMostSignificantBits()).putLong(entityId.getId().getLeastSignificantBits()).hash().asInt();
        Integer partitionSize = (Integer)this.partitionSizesMap.get(queueKey);
        int partition = Math.abs(hash % partitionSize);
        return this.buildTopicPartitionInfo(queueKey, partition);
    }

    @Override
    public synchronized void recalculatePartitions(TransportProtos.ServiceInfo currentService, List<TransportProtos.ServiceInfo> otherServices) {
        this.tbTransportServicesByType.clear();
        this.logServiceInfo(currentService);
        otherServices.forEach(this::logServiceInfo);
        HashMap<QueueKey, List<TransportProtos.ServiceInfo>> queueServicesMap = new HashMap<QueueKey, List<TransportProtos.ServiceInfo>>();
        this.addNode(queueServicesMap, currentService);
        for (TransportProtos.ServiceInfo other : otherServices) {
            this.addNode(queueServicesMap, other);
        }
        queueServicesMap.values().forEach(list -> list.sort(Comparator.comparing(TransportProtos.ServiceInfo::getServiceId)));
        ConcurrentMap<QueueKey, List<Integer>> oldPartitions = this.myPartitions;
        this.myPartitions = new ConcurrentHashMap<QueueKey, List<Integer>>();
        this.partitionSizesMap.forEach((queueKey, size) -> {
            for (int i = 0; i < size; ++i) {
                TransportProtos.ServiceInfo serviceInfo = this.resolveByPartitionIdx((List)queueServicesMap.get(queueKey), (QueueKey)queueKey, i);
                if (!currentService.equals((Object)serviceInfo)) continue;
                this.myPartitions.computeIfAbsent((QueueKey)queueKey, key -> new ArrayList()).add(i);
            }
        });
        oldPartitions.forEach((queueKey, partitions) -> {
            if (!this.myPartitions.containsKey(queueKey)) {
                log.info("[{}] NO MORE PARTITIONS FOR CURRENT KEY", queueKey);
                this.applicationEventPublisher.publishEvent((ApplicationEvent)new PartitionChangeEvent(this, (QueueKey)queueKey, Collections.emptySet()));
            }
        });
        this.myPartitions.forEach((queueKey, partitions) -> {
            if (!partitions.equals(oldPartitions.get(queueKey))) {
                log.info("[{}] NEW PARTITIONS: {}", queueKey, partitions);
                Set<TopicPartitionInfo> tpiList = partitions.stream().map(partition -> this.buildTopicPartitionInfo((QueueKey)queueKey, (int)partition)).collect(Collectors.toSet());
                this.applicationEventPublisher.publishEvent((ApplicationEvent)new PartitionChangeEvent(this, (QueueKey)queueKey, tpiList));
            }
        });
        if (this.currentOtherServices == null) {
            this.currentOtherServices = new ArrayList<TransportProtos.ServiceInfo>(otherServices);
        } else {
            HashSet<QueueKey> changes = new HashSet<QueueKey>();
            Map<QueueKey, List<TransportProtos.ServiceInfo>> currentMap = this.getServiceKeyListMap(this.currentOtherServices);
            Map<QueueKey, List<TransportProtos.ServiceInfo>> newMap = this.getServiceKeyListMap(otherServices);
            this.currentOtherServices = otherServices;
            currentMap.forEach((key, list) -> {
                if (!list.equals(newMap.get(key))) {
                    changes.add((QueueKey)key);
                }
            });
            currentMap.keySet().forEach(newMap::remove);
            changes.addAll(newMap.keySet());
            if (!changes.isEmpty()) {
                this.applicationEventPublisher.publishEvent((ApplicationEvent)new ClusterTopologyChangeEvent(this, changes));
            }
        }
        this.applicationEventPublisher.publishEvent((ApplicationEvent)new ServiceListChangedEvent(otherServices, currentService));
    }

    @Override
    public Set<String> getAllServiceIds(ServiceType serviceType) {
        return this.getAllServices(serviceType).stream().map(TransportProtos.ServiceInfo::getServiceId).collect(Collectors.toSet());
    }

    @Override
    public Set<TransportProtos.ServiceInfo> getAllServices(ServiceType serviceType) {
        Set<TransportProtos.ServiceInfo> result = this.getOtherServices(serviceType);
        TransportProtos.ServiceInfo current = this.serviceInfoProvider.getServiceInfo();
        if (current.getServiceTypesList().contains((Object)serviceType.name())) {
            result.add(current);
        }
        return result;
    }

    @Override
    public Set<TransportProtos.ServiceInfo> getOtherServices(ServiceType serviceType) {
        HashSet<TransportProtos.ServiceInfo> result = new HashSet<TransportProtos.ServiceInfo>();
        if (this.currentOtherServices != null) {
            for (TransportProtos.ServiceInfo serviceInfo : this.currentOtherServices) {
                if (!serviceInfo.getServiceTypesList().contains((Object)serviceType.name())) continue;
                result.add(serviceInfo);
            }
        }
        return result;
    }

    @Override
    public int resolvePartitionIndex(UUID entityId, int partitions) {
        int hash = this.hashFunction.newHasher().putLong(entityId.getMostSignificantBits()).putLong(entityId.getLeastSignificantBits()).hash().asInt();
        return Math.abs(hash % partitions);
    }

    @Override
    public void removeTenant(TenantId tenantId) {
        this.tenantRoutingInfoMap.remove(tenantId);
    }

    @Override
    public int countTransportsByType(String type) {
        List<TransportProtos.ServiceInfo> list = this.tbTransportServicesByType.get(type);
        return list == null ? 0 : list.size();
    }

    private Map<QueueKey, List<TransportProtos.ServiceInfo>> getServiceKeyListMap(List<TransportProtos.ServiceInfo> services) {
        HashMap<QueueKey, List<TransportProtos.ServiceInfo>> currentMap = new HashMap<QueueKey, List<TransportProtos.ServiceInfo>>();
        services.forEach(serviceInfo -> {
            for (String serviceTypeStr : serviceInfo.getServiceTypesList()) {
                ServiceType serviceType = ServiceType.valueOf((String)serviceTypeStr.toUpperCase());
                if (ServiceType.TB_RULE_ENGINE.equals((Object)serviceType)) {
                    this.partitionTopicsMap.keySet().forEach(queueKey -> currentMap.computeIfAbsent((QueueKey)queueKey, key -> new ArrayList()).add(serviceInfo));
                    continue;
                }
                QueueKey queueKey2 = new QueueKey(serviceType);
                currentMap.computeIfAbsent(queueKey2, key -> new ArrayList()).add(serviceInfo);
            }
        });
        return currentMap;
    }

    private TopicPartitionInfo buildTopicPartitionInfo(QueueKey queueKey, int partition) {
        TopicPartitionInfo.TopicPartitionInfoBuilder tpi = TopicPartitionInfo.builder();
        tpi.topic((String)this.partitionTopicsMap.get(queueKey));
        tpi.partition(Integer.valueOf(partition));
        tpi.tenantId(queueKey.getTenantId());
        List partitions = (List)this.myPartitions.get(queueKey);
        if (partitions != null) {
            tpi.myPartition(partitions.contains(partition));
        } else {
            tpi.myPartition(false);
        }
        return tpi.build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean isIsolated(ServiceType serviceType, TenantId tenantId) {
        if (TenantId.SYS_TENANT_ID.equals((Object)tenantId)) {
            return false;
        }
        TenantRoutingInfo routingInfo = (TenantRoutingInfo)this.tenantRoutingInfoMap.get(tenantId);
        if (routingInfo == null) {
            ConcurrentMap<TenantId, TenantRoutingInfo> concurrentMap = this.tenantRoutingInfoMap;
            synchronized (concurrentMap) {
                routingInfo = (TenantRoutingInfo)this.tenantRoutingInfoMap.get(tenantId);
                if (routingInfo == null) {
                    routingInfo = this.tenantRoutingInfoService.getRoutingInfo(tenantId);
                    this.tenantRoutingInfoMap.put(tenantId, routingInfo);
                }
            }
        }
        if (routingInfo == null) {
            throw new RuntimeException("Tenant not found!");
        }
        switch (serviceType) {
            case TB_RULE_ENGINE: {
                return routingInfo.isIsolatedTbRuleEngine();
            }
        }
        return false;
    }

    private TenantId getIsolatedOrSystemTenantId(ServiceType serviceType, TenantId tenantId) {
        return this.isIsolated(serviceType, tenantId) ? tenantId : TenantId.SYS_TENANT_ID;
    }

    private void logServiceInfo(TransportProtos.ServiceInfo server) {
        log.info("[{}] Found common server: [{}]", (Object)server.getServiceId(), (Object)server.getServiceTypesList());
    }

    private void addNode(Map<QueueKey, List<TransportProtos.ServiceInfo>> queueServiceList, TransportProtos.ServiceInfo instance) {
        for (String serviceTypeStr : instance.getServiceTypesList()) {
            ServiceType serviceType = ServiceType.valueOf((String)serviceTypeStr.toUpperCase());
            if (ServiceType.TB_RULE_ENGINE.equals((Object)serviceType)) {
                this.partitionTopicsMap.keySet().forEach(key -> {
                    if (key.getType().equals((Object)ServiceType.TB_RULE_ENGINE)) {
                        queueServiceList.computeIfAbsent((QueueKey)key, k -> new ArrayList()).add(instance);
                    }
                });
                continue;
            }
            if (!ServiceType.TB_CORE.equals((Object)serviceType) && !ServiceType.TB_VC_EXECUTOR.equals((Object)serviceType)) continue;
            queueServiceList.computeIfAbsent(new QueueKey(serviceType), key -> new ArrayList()).add(instance);
        }
        for (String transportType : instance.getTransportsList()) {
            this.tbTransportServicesByType.computeIfAbsent(transportType, t -> new ArrayList()).add(instance);
        }
    }

    protected TransportProtos.ServiceInfo resolveByPartitionIdx(List<TransportProtos.ServiceInfo> servers, QueueKey queueKey, int partition) {
        if (servers == null || servers.isEmpty()) {
            return null;
        }
        if (!ServiceType.TB_RULE_ENGINE.equals((Object)queueKey.getType()) || TenantId.SYS_TENANT_ID.equals((Object)queueKey.getTenantId())) {
            return servers.get(partition % servers.size());
        }
        int hash = this.hashFunction.newHasher().putLong(queueKey.getTenantId().getId().getMostSignificantBits()).putLong(queueKey.getTenantId().getId().getLeastSignificantBits()).putString((CharSequence)queueKey.getQueueName(), StandardCharsets.UTF_8).hash().asInt();
        return servers.get(Math.abs((hash + partition) % servers.size()));
    }

    public static HashFunction forName(String name) {
        switch (name) {
            case "murmur3_32": {
                return Hashing.murmur3_32();
            }
            case "murmur3_128": {
                return Hashing.murmur3_128();
            }
            case "sha256": {
                return Hashing.sha256();
            }
        }
        throw new IllegalArgumentException("Can't find hash function with name " + name);
    }
}

