package org.thingsboard.server.queue.discovery;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.ProtocolStringList;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryForever;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.discovery.event.OtherServiceShutdownEvent;
import org.thingsboard.server.queue.util.AfterStartUp;

@ConditionalOnProperty(prefix = "zk", value = {"enabled"}, havingValue = "true", matchIfMissing = false)
@Service
/* loaded from: input_file:org/thingsboard/server/queue/discovery/ZkDiscoveryService.class */
public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheListener {
    private static final Logger log = LoggerFactory.getLogger(ZkDiscoveryService.class);

    @Value("${zk.url}")
    private String zkUrl;

    @Value("${zk.retry_interval_ms}")
    private Integer zkRetryInterval;

    @Value("${zk.connection_timeout_ms}")
    private Integer zkConnectionTimeout;

    @Value("${zk.session_timeout_ms}")
    private Integer zkSessionTimeout;

    @Value("${zk.zk_dir}")
    private String zkDir;

    @Value("${zk.recalculate_delay:0}")
    private Long recalculateDelay;
    private final ApplicationEventPublisher applicationEventPublisher;
    private final TbServiceInfoProvider serviceInfoProvider;
    private final PartitionService partitionService;
    private ScheduledExecutorService zkExecutorService;
    private CuratorFramework client;
    private PathChildrenCache cache;
    private String nodePath;
    private String zkNodesDir;
    private volatile boolean stopped = true;
    private volatile boolean reconnectInProgress = false;
    protected final ConcurrentHashMap<String, ScheduledFuture<?>> delayedTasks = new ConcurrentHashMap<>();

    /* renamed from: org.thingsboard.server.queue.discovery.ZkDiscoveryService$1, reason: invalid class name */
    /* loaded from: input_file:org/thingsboard/server/queue/discovery/ZkDiscoveryService$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type = new int[PathChildrenCacheEvent.Type.values().length];

        static {
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_ADDED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_REMOVED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public ZkDiscoveryService(ApplicationEventPublisher applicationEventPublisher, TbServiceInfoProvider tbServiceInfoProvider, PartitionService partitionService) {
        this.applicationEventPublisher = applicationEventPublisher;
        this.serviceInfoProvider = tbServiceInfoProvider;
        this.partitionService = partitionService;
    }

    @PostConstruct
    public void init() {
        log.info("Initializing...");
        Assert.hasLength(this.zkUrl, missingProperty("zk.url"));
        Assert.notNull(this.zkRetryInterval, missingProperty("zk.retry_interval_ms"));
        Assert.notNull(this.zkConnectionTimeout, missingProperty("zk.connection_timeout_ms"));
        Assert.notNull(this.zkSessionTimeout, missingProperty("zk.session_timeout_ms"));
        this.zkExecutorService = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("zk-discovery"));
        log.info("Initializing discovery service using ZK connect string: {}", this.zkUrl);
        this.zkNodesDir = this.zkDir + "/nodes";
        initZkClient();
    }

    @Override // org.thingsboard.server.queue.discovery.DiscoveryService
    public List<TransportProtos.ServiceInfo> getOtherServers() {
        return (List) this.cache.getCurrentData().stream().filter(childData -> {
            return !childData.getPath().equals(this.nodePath);
        }).map(childData2 -> {
            try {
                return TransportProtos.ServiceInfo.parseFrom(childData2.getData());
            } catch (NoSuchElementException | InvalidProtocolBufferException e) {
                log.error("Failed to decode ZK node", e);
                throw new RuntimeException(e);
            }
        }).collect(Collectors.toList());
    }

    @Override // org.thingsboard.server.queue.discovery.DiscoveryService
    public boolean isMonolith() {
        return false;
    }

    @AfterStartUp(order = AfterStartUp.DISCOVERY_SERVICE)
    public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
        if (this.stopped) {
            log.debug("Ignoring application ready event. Service is stopped.");
            return;
        }
        log.info("Received application ready event. Starting current ZK node.");
        if (this.client.getState() != CuratorFrameworkState.STARTED) {
            log.debug("Ignoring application ready event, ZK client is not started, ZK client state [{}]", this.client.getState());
            return;
        }
        log.info("Going to publish current server...");
        publishCurrentServer();
        log.info("Going to recalculate partitions...");
        recalculatePartitions();
        this.zkExecutorService.scheduleAtFixedRate(this::publishCurrentServer, 1L, 1L, TimeUnit.MINUTES);
    }

    public synchronized void publishCurrentServer() {
        TransportProtos.ServiceInfo serviceInfo = this.serviceInfoProvider.getServiceInfo();
        if (currentServerExists()) {
            log.trace("[{}] Updating ZK node for current instance: {}", serviceInfo.getServiceId(), this.nodePath);
            this.client.setData().forPath(this.nodePath, this.serviceInfoProvider.generateNewServiceInfoWithCurrentSystemInfo().toByteArray());
        } else {
            try {
                log.info("[{}] Creating ZK node for current instance", serviceInfo.getServiceId());
                this.nodePath = (String) ((ACLBackgroundPathAndBytesable) this.client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(this.zkNodesDir + "/", serviceInfo.toByteArray());
                log.info("[{}] Created ZK node for current instance: {}", serviceInfo.getServiceId(), this.nodePath);
                this.client.getConnectionStateListenable().addListener(checkReconnect(serviceInfo));
            } catch (Exception e) {
                log.error("Failed to create ZK node", e);
                throw new RuntimeException(e);
            }
        }
    }

    private boolean currentServerExists() {
        if (this.nodePath == null) {
            return false;
        }
        try {
            return this.serviceInfoProvider.getServiceInfo().equals(TransportProtos.ServiceInfo.parseFrom((byte[]) this.client.getData().forPath(this.nodePath)));
        } catch (Exception e) {
            log.error("Couldn't check if ZK node exists", e);
            return false;
        } catch (KeeperException.NoNodeException e2) {
            log.info("ZK node does not exist: {}", this.nodePath);
            return false;
        }
    }

    private ConnectionStateListener checkReconnect(TransportProtos.ServiceInfo serviceInfo) {
        return (curatorFramework, connectionState) -> {
            log.info("[{}] ZK state changed: {}", serviceInfo.getServiceId(), connectionState);
            if (connectionState == ConnectionState.LOST) {
                this.zkExecutorService.submit(this::reconnect);
            }
        };
    }

    private synchronized void reconnect() {
        if (this.reconnectInProgress) {
            return;
        }
        this.reconnectInProgress = true;
        try {
            destroyZkClient();
            initZkClient();
            publishCurrentServer();
        } catch (Exception e) {
            log.error("Failed to reconnect to ZK: {}", e.getMessage(), e);
        } finally {
            this.reconnectInProgress = false;
        }
    }

    private void initZkClient() {
        try {
            this.client = CuratorFrameworkFactory.newClient(this.zkUrl, this.zkSessionTimeout.intValue(), this.zkConnectionTimeout.intValue(), new RetryForever(this.zkRetryInterval.intValue()));
            this.client.start();
            this.client.blockUntilConnected();
            this.cache = new PathChildrenCache(this.client, this.zkNodesDir, true);
            this.cache.getListenable().addListener(this);
            this.cache.start();
            this.stopped = false;
            log.info("ZK client connected");
        } catch (Exception e) {
            log.error("Failed to connect to ZK: {}", e.getMessage(), e);
            CloseableUtils.closeQuietly(this.cache);
            CloseableUtils.closeQuietly(this.client);
            throw new RuntimeException(e);
        }
    }

    private void unpublishCurrentServer() {
        try {
            if (this.nodePath != null) {
                this.client.delete().forPath(this.nodePath);
            }
        } catch (Exception e) {
            log.error("Failed to delete ZK node {}", this.nodePath, e);
            throw new RuntimeException(e);
        }
    }

    private void destroyZkClient() {
        this.stopped = true;
        try {
            unpublishCurrentServer();
        } catch (Exception e) {
        }
        CloseableUtils.closeQuietly(this.cache);
        CloseableUtils.closeQuietly(this.client);
        log.info("ZK client disconnected");
    }

    @PreDestroy
    public void destroy() {
        destroyZkClient();
        this.zkExecutorService.shutdownNow();
        log.info("Stopped discovery service");
    }

    public static String missingProperty(String str) {
        return "The " + str + " property need to be set!";
    }

    public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
        if (this.stopped) {
            log.debug("Ignoring {}. Service is stopped.", pathChildrenCacheEvent);
            return;
        }
        if (this.client.getState() != CuratorFrameworkState.STARTED) {
            log.debug("Ignoring {}, ZK client is not started, ZK client state [{}]", pathChildrenCacheEvent, this.client.getState());
            return;
        }
        ChildData data = pathChildrenCacheEvent.getData();
        if (data == null) {
            log.debug("Ignoring {} due to empty child data", pathChildrenCacheEvent);
            return;
        }
        if (data.getData() == null) {
            log.debug("Ignoring {} due to empty child's data", pathChildrenCacheEvent);
            return;
        }
        if (this.nodePath != null && this.nodePath.equals(data.getPath())) {
            if (pathChildrenCacheEvent.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) {
                log.info("ZK node for current instance is somehow deleted.");
                publishCurrentServer();
            }
            log.debug("Ignoring event about current server {}", pathChildrenCacheEvent);
            return;
        }
        try {
            TransportProtos.ServiceInfo parseFrom = TransportProtos.ServiceInfo.parseFrom(data.getData());
            String serviceId = parseFrom.getServiceId();
            ProtocolStringList serviceTypesList = parseFrom.getServiceTypesList();
            log.trace("Processing [{}] event for [{}]", pathChildrenCacheEvent.getType(), serviceId);
            switch (AnonymousClass1.$SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[pathChildrenCacheEvent.getType().ordinal()]) {
                case AfterStartUp.QUEUE_INFO_INITIALIZATION /* 1 */:
                    ScheduledFuture<?> remove = this.delayedTasks.remove(serviceId);
                    if (remove == null) {
                        log.trace("[{}] Going to recalculate partitions due to adding new node [{}].", serviceId, serviceTypesList);
                        recalculatePartitions();
                        return;
                    } else if (remove.cancel(false)) {
                        log.debug("[{}] Recalculate partitions ignored. Service was restarted in time [{}].", serviceId, serviceTypesList);
                        return;
                    } else {
                        log.debug("[{}] Going to recalculate partitions. Service was not restarted in time [{}]!", serviceId, serviceTypesList);
                        recalculatePartitions();
                        return;
                    }
                case AfterStartUp.DISCOVERY_SERVICE /* 2 */:
                    this.zkExecutorService.submit(() -> {
                        this.applicationEventPublisher.publishEvent(new OtherServiceShutdownEvent(this, serviceId, serviceTypesList));
                    });
                    this.delayedTasks.put(serviceId, this.zkExecutorService.schedule(() -> {
                        log.debug("[{}] Going to recalculate partitions due to removed node [{}]", serviceId, serviceTypesList);
                        if (this.delayedTasks.remove(serviceId) != null) {
                            recalculatePartitions();
                        }
                    }, this.recalculateDelay.longValue(), TimeUnit.MILLISECONDS));
                    return;
                default:
                    return;
            }
        } catch (InvalidProtocolBufferException e) {
            log.error("Failed to decode server instance for node {}", data.getPath(), e);
            throw e;
        }
    }

    synchronized void recalculatePartitions() {
        this.delayedTasks.values().forEach(scheduledFuture -> {
            scheduledFuture.cancel(false);
        });
        this.delayedTasks.clear();
        this.partitionService.recalculatePartitions(this.serviceInfoProvider.getServiceInfo(), getOtherServers());
    }
}
