/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.queue.discovery;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.ProtocolStringList;
import java.io.Closeable;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryForever;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.discovery.DiscoveryService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.event.OtherServiceShutdownEvent;
import org.thingsboard.server.queue.util.AfterStartUp;

@Service
@ConditionalOnProperty(prefix="zk", value={"enabled"}, havingValue="true", matchIfMissing=false)
public class ZkDiscoveryService
implements DiscoveryService,
PathChildrenCacheListener {
    private static final Logger log = LoggerFactory.getLogger(ZkDiscoveryService.class);
    @Value(value="${zk.url}")
    private String zkUrl;
    @Value(value="${zk.retry_interval_ms}")
    private Integer zkRetryInterval;
    @Value(value="${zk.connection_timeout_ms}")
    private Integer zkConnectionTimeout;
    @Value(value="${zk.session_timeout_ms}")
    private Integer zkSessionTimeout;
    @Value(value="${zk.zk_dir}")
    private String zkDir;
    @Value(value="${zk.recalculate_delay:0}")
    private Long recalculateDelay;
    protected final ConcurrentHashMap<String, ScheduledFuture<?>> delayedTasks;
    private final ApplicationEventPublisher applicationEventPublisher;
    private final TbServiceInfoProvider serviceInfoProvider;
    private final PartitionService partitionService;
    private ScheduledExecutorService zkExecutorService;
    private CuratorFramework client;
    private PathChildrenCache cache;
    private String nodePath;
    private String zkNodesDir;
    private volatile boolean stopped = true;
    private volatile boolean reconnectInProgress = false;

    public ZkDiscoveryService(ApplicationEventPublisher applicationEventPublisher, TbServiceInfoProvider serviceInfoProvider, PartitionService partitionService) {
        this.applicationEventPublisher = applicationEventPublisher;
        this.serviceInfoProvider = serviceInfoProvider;
        this.partitionService = partitionService;
        this.delayedTasks = new ConcurrentHashMap();
    }

    @PostConstruct
    public void init() {
        log.info("Initializing...");
        Assert.hasLength((String)this.zkUrl, (String)ZkDiscoveryService.missingProperty("zk.url"));
        Assert.notNull((Object)this.zkRetryInterval, (String)ZkDiscoveryService.missingProperty("zk.retry_interval_ms"));
        Assert.notNull((Object)this.zkConnectionTimeout, (String)ZkDiscoveryService.missingProperty("zk.connection_timeout_ms"));
        Assert.notNull((Object)this.zkSessionTimeout, (String)ZkDiscoveryService.missingProperty("zk.session_timeout_ms"));
        this.zkExecutorService = Executors.newSingleThreadScheduledExecutor((ThreadFactory)ThingsBoardThreadFactory.forName((String)"zk-discovery"));
        log.info("Initializing discovery service using ZK connect string: {}", (Object)this.zkUrl);
        this.zkNodesDir = this.zkDir + "/nodes";
        this.initZkClient();
    }

    @Override
    public List<TransportProtos.ServiceInfo> getOtherServers() {
        return this.cache.getCurrentData().stream().filter(cd -> !cd.getPath().equals(this.nodePath)).map(cd -> {
            try {
                return TransportProtos.ServiceInfo.parseFrom((byte[])cd.getData());
            }
            catch (InvalidProtocolBufferException | NoSuchElementException e) {
                log.error("Failed to decode ZK node", e);
                throw new RuntimeException(e);
            }
        }).collect(Collectors.toList());
    }

    @Override
    public boolean isMonolith() {
        return false;
    }

    @AfterStartUp(order=2)
    public void onApplicationEvent(ApplicationReadyEvent event) {
        if (this.stopped) {
            log.debug("Ignoring application ready event. Service is stopped.");
            return;
        }
        log.info("Received application ready event. Starting current ZK node.");
        if (this.client.getState() != CuratorFrameworkState.STARTED) {
            log.debug("Ignoring application ready event, ZK client is not started, ZK client state [{}]", (Object)this.client.getState());
            return;
        }
        log.info("Going to publish current server...");
        this.publishCurrentServer();
        log.info("Going to recalculate partitions...");
        this.recalculatePartitions();
        this.zkExecutorService.scheduleAtFixedRate(this::publishCurrentServer, 1L, 1L, TimeUnit.MINUTES);
    }

    public synchronized void publishCurrentServer() {
        block5: {
            TransportProtos.ServiceInfo self = this.serviceInfoProvider.getServiceInfo();
            if (this.currentServerExists()) {
                log.trace("[{}] Updating ZK node for current instance: {}", (Object)self.getServiceId(), (Object)this.nodePath);
                this.client.setData().forPath(this.nodePath, this.serviceInfoProvider.generateNewServiceInfoWithCurrentSystemInfo().toByteArray());
                break block5;
            }
            try {
                log.info("[{}] Creating ZK node for current instance", (Object)self.getServiceId());
                this.nodePath = (String)((ACLBackgroundPathAndBytesable)this.client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(this.zkNodesDir + "/", self.toByteArray());
                log.info("[{}] Created ZK node for current instance: {}", (Object)self.getServiceId(), (Object)this.nodePath);
                this.client.getConnectionStateListenable().addListener((Object)this.checkReconnect(self));
            }
            catch (Exception e) {
                log.error("Failed to create ZK node", (Throwable)e);
                throw new RuntimeException(e);
            }
        }
    }

    private boolean currentServerExists() {
        if (this.nodePath == null) {
            return false;
        }
        try {
            TransportProtos.ServiceInfo self = this.serviceInfoProvider.getServiceInfo();
            TransportProtos.ServiceInfo registeredServerInfo = null;
            registeredServerInfo = TransportProtos.ServiceInfo.parseFrom((byte[])((byte[])this.client.getData().forPath(this.nodePath)));
            if (self.equals((Object)registeredServerInfo)) {
                return true;
            }
        }
        catch (KeeperException.NoNodeException e) {
            log.info("ZK node does not exist: {}", (Object)this.nodePath);
        }
        catch (Exception e) {
            log.error("Couldn't check if ZK node exists", (Throwable)e);
        }
        return false;
    }

    private ConnectionStateListener checkReconnect(TransportProtos.ServiceInfo self) {
        return (client, newState) -> {
            log.info("[{}] ZK state changed: {}", (Object)self.getServiceId(), (Object)newState);
            if (newState == ConnectionState.LOST) {
                this.zkExecutorService.submit(this::reconnect);
            }
        };
    }

    private synchronized void reconnect() {
        if (!this.reconnectInProgress) {
            this.reconnectInProgress = true;
            try {
                this.destroyZkClient();
                this.initZkClient();
                this.publishCurrentServer();
            }
            catch (Exception e) {
                log.error("Failed to reconnect to ZK: {}", (Object)e.getMessage(), (Object)e);
            }
            finally {
                this.reconnectInProgress = false;
            }
        }
    }

    private void initZkClient() {
        try {
            this.client = CuratorFrameworkFactory.newClient((String)this.zkUrl, (int)this.zkSessionTimeout, (int)this.zkConnectionTimeout, (RetryPolicy)new RetryForever(this.zkRetryInterval.intValue()));
            this.client.start();
            this.client.blockUntilConnected();
            this.cache = new PathChildrenCache(this.client, this.zkNodesDir, true);
            this.cache.getListenable().addListener((Object)this);
            this.cache.start();
            this.stopped = false;
            log.info("ZK client connected");
        }
        catch (Exception e) {
            log.error("Failed to connect to ZK: {}", (Object)e.getMessage(), (Object)e);
            CloseableUtils.closeQuietly((Closeable)this.cache);
            CloseableUtils.closeQuietly((Closeable)this.client);
            throw new RuntimeException(e);
        }
    }

    private void unpublishCurrentServer() {
        try {
            if (this.nodePath != null) {
                this.client.delete().forPath(this.nodePath);
            }
        }
        catch (Exception e) {
            log.error("Failed to delete ZK node {}", (Object)this.nodePath, (Object)e);
            throw new RuntimeException(e);
        }
    }

    private void destroyZkClient() {
        this.stopped = true;
        try {
            this.unpublishCurrentServer();
        }
        catch (Exception exception) {
            // empty catch block
        }
        CloseableUtils.closeQuietly((Closeable)this.cache);
        CloseableUtils.closeQuietly((Closeable)this.client);
        log.info("ZK client disconnected");
    }

    @PreDestroy
    public void destroy() {
        this.destroyZkClient();
        this.zkExecutorService.shutdownNow();
        log.info("Stopped discovery service");
    }

    public static String missingProperty(String propertyName) {
        return "The " + propertyName + " property need to be set!";
    }

    public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
        TransportProtos.ServiceInfo instance;
        if (this.stopped) {
            log.debug("Ignoring {}. Service is stopped.", (Object)pathChildrenCacheEvent);
            return;
        }
        if (this.client.getState() != CuratorFrameworkState.STARTED) {
            log.debug("Ignoring {}, ZK client is not started, ZK client state [{}]", (Object)pathChildrenCacheEvent, (Object)this.client.getState());
            return;
        }
        ChildData data = pathChildrenCacheEvent.getData();
        if (data == null) {
            log.debug("Ignoring {} due to empty child data", (Object)pathChildrenCacheEvent);
            return;
        }
        if (data.getData() == null) {
            log.debug("Ignoring {} due to empty child's data", (Object)pathChildrenCacheEvent);
            return;
        }
        if (this.nodePath != null && this.nodePath.equals(data.getPath())) {
            if (pathChildrenCacheEvent.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) {
                log.info("ZK node for current instance is somehow deleted.");
                this.publishCurrentServer();
            }
            log.debug("Ignoring event about current server {}", (Object)pathChildrenCacheEvent);
            return;
        }
        try {
            instance = TransportProtos.ServiceInfo.parseFrom((byte[])data.getData());
        }
        catch (InvalidProtocolBufferException e) {
            log.error("Failed to decode server instance for node {}", (Object)data.getPath(), (Object)e);
            throw e;
        }
        String serviceId = instance.getServiceId();
        ProtocolStringList serviceTypesList = instance.getServiceTypesList();
        log.trace("Processing [{}] event for [{}]", (Object)pathChildrenCacheEvent.getType(), (Object)serviceId);
        switch (pathChildrenCacheEvent.getType()) {
            case CHILD_ADDED: {
                ScheduledFuture<?> task = this.delayedTasks.remove(serviceId);
                if (task != null) {
                    if (task.cancel(false)) {
                        log.debug("[{}] Recalculate partitions ignored. Service was restarted in time [{}].", (Object)serviceId, (Object)serviceTypesList);
                        break;
                    }
                    log.debug("[{}] Going to recalculate partitions. Service was not restarted in time [{}]!", (Object)serviceId, (Object)serviceTypesList);
                    this.recalculatePartitions();
                    break;
                }
                log.trace("[{}] Going to recalculate partitions due to adding new node [{}].", (Object)serviceId, (Object)serviceTypesList);
                this.recalculatePartitions();
                break;
            }
            case CHILD_REMOVED: {
                this.zkExecutorService.submit(() -> this.applicationEventPublisher.publishEvent((ApplicationEvent)new OtherServiceShutdownEvent(this, serviceId, (List<String>)serviceTypesList)));
                ScheduledFuture<?> future = this.zkExecutorService.schedule(() -> {
                    log.debug("[{}] Going to recalculate partitions due to removed node [{}]", (Object)serviceId, (Object)serviceTypesList);
                    ScheduledFuture<?> removedTask = this.delayedTasks.remove(serviceId);
                    if (removedTask != null) {
                        this.recalculatePartitions();
                    }
                }, (long)this.recalculateDelay, TimeUnit.MILLISECONDS);
                this.delayedTasks.put(serviceId, future);
                break;
            }
        }
    }

    synchronized void recalculatePartitions() {
        this.delayedTasks.values().forEach(future -> future.cancel(false));
        this.delayedTasks.clear();
        this.partitionService.recalculatePartitions(this.serviceInfoProvider.getServiceInfo(), this.getOtherServers());
    }
}

