package org.thingsboard.server.service.cluster.discovery;

import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryForever;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import org.thingsboard.server.actors.service.ActorService;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
import org.thingsboard.server.service.state.DeviceStateService;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import org.thingsboard.server.utils.MiscUtils;

@ConditionalOnProperty(prefix = "zk", value = {"enabled"}, havingValue = "true", matchIfMissing = false)
@Service
/* loaded from: input_file:org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.class */
public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheListener {
    private static final Logger log = LoggerFactory.getLogger(ZkDiscoveryService.class);

    @Value("${zk.url}")
    private String zkUrl;

    @Value("${zk.retry_interval_ms}")
    private Integer zkRetryInterval;

    @Value("${zk.connection_timeout_ms}")
    private Integer zkConnectionTimeout;

    @Value("${zk.session_timeout_ms}")
    private Integer zkSessionTimeout;

    @Value("${zk.zk_dir}")
    private String zkDir;
    private String zkNodesDir;

    @Autowired
    private ServerInstanceService serverInstance;

    @Autowired
    @Lazy
    private TelemetrySubscriptionService tsSubService;

    @Autowired
    @Lazy
    private DeviceStateService deviceStateService;

    @Autowired
    @Lazy
    private ActorService actorService;

    @Autowired
    @Lazy
    private ClusterRoutingService routingService;
    private ExecutorService reconnectExecutorService;
    private CuratorFramework client;
    private PathChildrenCache cache;
    private String nodePath;
    private volatile boolean stopped = true;
    private volatile boolean reconnectInProgress = false;

    /* renamed from: org.thingsboard.server.service.cluster.discovery.ZkDiscoveryService$1, reason: invalid class name */
    /* loaded from: input_file:org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type = new int[PathChildrenCacheEvent.Type.values().length];

        static {
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_ADDED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_UPDATED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_REMOVED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    @PostConstruct
    public void init() {
        log.info("Initializing...");
        Assert.hasLength(this.zkUrl, MiscUtils.missingProperty("zk.url"));
        Assert.notNull(this.zkRetryInterval, MiscUtils.missingProperty("zk.retry_interval_ms"));
        Assert.notNull(this.zkConnectionTimeout, MiscUtils.missingProperty("zk.connection_timeout_ms"));
        Assert.notNull(this.zkSessionTimeout, MiscUtils.missingProperty("zk.session_timeout_ms"));
        this.reconnectExecutorService = Executors.newSingleThreadExecutor();
        log.info("Initializing discovery service using ZK connect string: {}", this.zkUrl);
        this.zkNodesDir = this.zkDir + "/nodes";
        initZkClient();
    }

    private void initZkClient() {
        try {
            this.client = CuratorFrameworkFactory.newClient(this.zkUrl, this.zkSessionTimeout.intValue(), this.zkConnectionTimeout.intValue(), new RetryForever(this.zkRetryInterval.intValue()));
            this.client.start();
            this.client.blockUntilConnected();
            this.cache = new PathChildrenCache(this.client, this.zkNodesDir, true);
            this.cache.getListenable().addListener(this);
            this.cache.start();
            this.stopped = false;
            log.info("ZK client connected");
        } catch (Exception e) {
            log.error("Failed to connect to ZK: {}", e.getMessage(), e);
            CloseableUtils.closeQuietly(this.cache);
            CloseableUtils.closeQuietly(this.client);
            throw new RuntimeException(e);
        }
    }

    private void destroyZkClient() {
        this.stopped = true;
        try {
            unpublishCurrentServer();
        } catch (Exception e) {
        }
        CloseableUtils.closeQuietly(this.cache);
        CloseableUtils.closeQuietly(this.client);
        log.info("ZK client disconnected");
    }

    @PreDestroy
    public void destroy() {
        destroyZkClient();
        this.reconnectExecutorService.shutdownNow();
        log.info("Stopped discovery service");
    }

    @Override // org.thingsboard.server.service.cluster.discovery.DiscoveryService
    public synchronized void publishCurrentServer() {
        ServerInstance self = this.serverInstance.getSelf();
        if (currentServerExists()) {
            log.info("[{}:{}] ZK node for current instance already exists, NOT created new one: {}", new Object[]{self.getHost(), Integer.valueOf(self.getPort()), this.nodePath});
            return;
        }
        try {
            log.info("[{}:{}] Creating ZK node for current instance", self.getHost(), Integer.valueOf(self.getPort()));
            this.nodePath = (String) ((ACLBackgroundPathAndBytesable) this.client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(this.zkNodesDir + "/", SerializationUtils.serialize(self.getServerAddress()));
            log.info("[{}:{}] Created ZK node for current instance: {}", new Object[]{self.getHost(), Integer.valueOf(self.getPort()), this.nodePath});
            this.client.getConnectionStateListenable().addListener(checkReconnect(self));
        } catch (Exception e) {
            log.error("Failed to create ZK node", e);
            throw new RuntimeException(e);
        }
    }

    private boolean currentServerExists() {
        if (this.nodePath == null) {
            return false;
        }
        try {
            ServerInstance self = this.serverInstance.getSelf();
            ServerAddress serverAddress = (ServerAddress) SerializationUtils.deserialize((byte[]) this.client.getData().forPath(this.nodePath));
            if (self.getServerAddress() != null) {
                return self.getServerAddress().equals(serverAddress);
            }
            return false;
        } catch (KeeperException.NoNodeException e) {
            log.info("ZK node does not exist: {}", this.nodePath);
            return false;
        } catch (Exception e2) {
            log.error("Couldn't check if ZK node exists", e2);
            return false;
        }
    }

    private ConnectionStateListener checkReconnect(ServerInstance serverInstance) {
        return (curatorFramework, connectionState) -> {
            log.info("[{}:{}] ZK state changed: {}", new Object[]{serverInstance.getHost(), Integer.valueOf(serverInstance.getPort()), connectionState});
            if (connectionState == ConnectionState.LOST) {
                this.reconnectExecutorService.submit(this::reconnect);
            }
        };
    }

    private synchronized void reconnect() {
        if (this.reconnectInProgress) {
            return;
        }
        this.reconnectInProgress = true;
        try {
            destroyZkClient();
            initZkClient();
            publishCurrentServer();
        } catch (Exception e) {
            log.error("Failed to reconnect to ZK: {}", e.getMessage(), e);
        } finally {
            this.reconnectInProgress = false;
        }
    }

    @Override // org.thingsboard.server.service.cluster.discovery.DiscoveryService
    public void unpublishCurrentServer() {
        try {
            if (this.nodePath != null) {
                this.client.delete().forPath(this.nodePath);
            }
        } catch (Exception e) {
            log.error("Failed to delete ZK node {}", this.nodePath, e);
            throw new RuntimeException(e);
        }
    }

    @Override // org.thingsboard.server.service.cluster.discovery.DiscoveryService
    public ServerInstance getCurrentServer() {
        return this.serverInstance.getSelf();
    }

    @Override // org.thingsboard.server.service.cluster.discovery.DiscoveryService
    public List<ServerInstance> getOtherServers() {
        return (List) this.cache.getCurrentData().stream().filter(childData -> {
            return !childData.getPath().equals(this.nodePath);
        }).map(childData2 -> {
            try {
                return new ServerInstance((ServerAddress) SerializationUtils.deserialize(childData2.getData()));
            } catch (NoSuchElementException e) {
                log.error("Failed to decode ZK node", e);
                throw new RuntimeException(e);
            }
        }).collect(Collectors.toList());
    }

    @EventListener({ApplicationReadyEvent.class})
    public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
        log.info("Received application ready event. Starting current ZK node.");
        if (this.stopped) {
            log.debug("Ignoring application ready event. Service is stopped.");
        } else if (this.client.getState() != CuratorFrameworkState.STARTED) {
            log.debug("Ignoring application ready event, ZK client is not started, ZK client state [{}]", this.client.getState());
        } else {
            publishCurrentServer();
            getOtherServers().forEach(serverInstance -> {
                log.info("Found active server: [{}:{}]", serverInstance.getHost(), Integer.valueOf(serverInstance.getPort()));
            });
        }
    }

    public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
        if (this.stopped) {
            log.debug("Ignoring {}. Service is stopped.", pathChildrenCacheEvent);
            return;
        }
        if (this.client.getState() != CuratorFrameworkState.STARTED) {
            log.debug("Ignoring {}, ZK client is not started, ZK client state [{}]", pathChildrenCacheEvent, this.client.getState());
            return;
        }
        ChildData data = pathChildrenCacheEvent.getData();
        if (data == null) {
            log.debug("Ignoring {} due to empty child data", pathChildrenCacheEvent);
            return;
        }
        if (data.getData() == null) {
            log.debug("Ignoring {} due to empty child's data", pathChildrenCacheEvent);
            return;
        }
        if (this.nodePath != null && this.nodePath.equals(data.getPath())) {
            if (pathChildrenCacheEvent.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) {
                log.info("ZK node for current instance is somehow deleted.");
                publishCurrentServer();
            }
            log.debug("Ignoring event about current server {}", pathChildrenCacheEvent);
            return;
        }
        try {
            ServerInstance serverInstance = new ServerInstance((ServerAddress) SerializationUtils.deserialize(data.getData()));
            log.info("Processing [{}] event for [{}:{}]", new Object[]{pathChildrenCacheEvent.getType(), serverInstance.getHost(), Integer.valueOf(serverInstance.getPort())});
            switch (AnonymousClass1.$SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[pathChildrenCacheEvent.getType().ordinal()]) {
                case 1:
                    this.routingService.onServerAdded(serverInstance);
                    this.tsSubService.onClusterUpdate();
                    this.deviceStateService.onClusterUpdate();
                    this.actorService.onServerAdded(serverInstance);
                    return;
                case 2:
                    this.routingService.onServerUpdated(serverInstance);
                    this.actorService.onServerUpdated(serverInstance);
                    return;
                case 3:
                    this.routingService.onServerRemoved(serverInstance);
                    this.tsSubService.onClusterUpdate();
                    this.deviceStateService.onClusterUpdate();
                    this.actorService.onServerRemoved(serverInstance);
                    return;
                default:
                    return;
            }
        } catch (SerializationException e) {
            log.error("Failed to decode server instance for node {}", data.getPath(), e);
            throw e;
        }
    }
}
