/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.service.edge.rpc;

import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import io.grpc.BindableService;
import io.grpc.Server;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.StreamObserver;
import jakarta.annotation.Nullable;
import jakarta.annotation.PreDestroy;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.cache.TbTransactionalCache;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.ResourceUtils;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.notification.rule.trigger.EdgeConnectionTrigger;
import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTrigger;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg;
import org.thingsboard.server.common.msg.edge.EdgeHighPriorityMsg;
import org.thingsboard.server.common.msg.edge.EdgeSessionMsg;
import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse;
import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest;
import org.thingsboard.server.gen.edge.v1.EdgeRpcServiceGrpc;
import org.thingsboard.server.gen.edge.v1.RequestMsg;
import org.thingsboard.server.gen.edge.v1.ResponseMsg;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.kafka.KafkaAdmin;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.edge.EdgeContextComponent;
import org.thingsboard.server.service.edge.rpc.EdgeGrpcSession;
import org.thingsboard.server.service.edge.rpc.EdgeRpcService;
import org.thingsboard.server.service.edge.rpc.KafkaEdgeGrpcSession;
import org.thingsboard.server.service.edge.rpc.PostgresEdgeGrpcSession;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;

@Service
@ConditionalOnProperty(prefix="edges", value={"enabled"}, havingValue="true")
@TbCoreComponent
public class EdgeGrpcService
extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase
implements EdgeRpcService {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(EdgeGrpcService.class);
    private final ConcurrentMap<EdgeId, EdgeGrpcSession> sessions = new ConcurrentHashMap<EdgeId, EdgeGrpcSession>();
    private final ConcurrentMap<UUID, EdgeGrpcSession> sessionsById = new ConcurrentHashMap<UUID, EdgeGrpcSession>();
    private final ConcurrentMap<EdgeId, Lock> sessionNewEventsLocks = new ConcurrentHashMap<EdgeId, Lock>();
    private final Map<EdgeId, Boolean> sessionNewEvents = new HashMap<EdgeId, Boolean>();
    private final ConcurrentMap<EdgeId, ScheduledFuture<?>> sessionEdgeEventChecks = new ConcurrentHashMap();
    private final ConcurrentMap<UUID, Consumer<FromEdgeSyncResponse>> localSyncEdgeRequests = new ConcurrentHashMap<UUID, Consumer<FromEdgeSyncResponse>>();
    private final ConcurrentMap<EdgeId, Boolean> edgeEventsMigrationProcessed = new ConcurrentHashMap<EdgeId, Boolean>();
    private final List<EdgeGrpcSession> zombieSessions = new ArrayList<EdgeGrpcSession>();
    @Value(value="${edges.rpc.port}")
    private int rpcPort;
    @Value(value="${edges.rpc.ssl.enabled}")
    private boolean sslEnabled;
    @Value(value="${edges.rpc.ssl.cert}")
    private String certFileResource;
    @Value(value="${edges.rpc.ssl.private_key}")
    private String privateKeyResource;
    @Value(value="${edges.state.persistToTelemetry:false}")
    private boolean persistToTelemetry;
    @Value(value="${edges.rpc.client_max_keep_alive_time_sec:1}")
    private int clientMaxKeepAliveTimeSec;
    @Value(value="${edges.rpc.max_inbound_message_size:4194304}")
    private int maxInboundMessageSize;
    @Value(value="${edges.rpc.keep_alive_time_sec:10}")
    private int keepAliveTimeSec;
    @Value(value="${edges.rpc.keep_alive_timeout_sec:5}")
    private int keepAliveTimeoutSec;
    @Value(value="${edges.scheduler_pool_size}")
    private int schedulerPoolSize;
    @Value(value="${edges.send_scheduler_pool_size}")
    private int sendSchedulerPoolSize;
    @Value(value="${edges.max_high_priority_queue_size_per_session:10000}")
    private int maxHighPriorityQueueSizePerSession;
    @Autowired
    @Lazy
    private EdgeContextComponent ctx;
    @Autowired
    private TelemetrySubscriptionService tsSubService;
    @Autowired
    private TbClusterService clusterService;
    @Autowired
    private TbServiceInfoProvider serviceInfoProvider;
    @Autowired
    private TbTransactionalCache<EdgeId, String> edgeIdServiceIdCache;
    @Autowired
    private TopicService topicService;
    @Autowired
    private TbCoreQueueFactory tbCoreQueueFactory;
    @Autowired
    private Optional<KafkaAdmin> kafkaAdmin;
    private Server server;
    private ScheduledExecutorService edgeEventProcessingExecutorService;
    private ScheduledExecutorService sendDownlinkExecutorService;
    private ScheduledExecutorService executorService;

    @AfterStartUp(order=11)
    public void onStartUp() {
        log.info("Initializing Edge RPC service!");
        NettyServerBuilder builder = (NettyServerBuilder)NettyServerBuilder.forPort((int)this.rpcPort).permitKeepAliveTime((long)this.clientMaxKeepAliveTimeSec, TimeUnit.SECONDS).keepAliveTime((long)this.keepAliveTimeSec, TimeUnit.SECONDS).keepAliveTimeout((long)this.keepAliveTimeoutSec, TimeUnit.SECONDS).permitKeepAliveWithoutCalls(true).maxInboundMessageSize(this.maxInboundMessageSize).addService((BindableService)this);
        if (this.sslEnabled) {
            try {
                InputStream certFileIs = ResourceUtils.getInputStream((Object)this, (String)this.certFileResource);
                InputStream privateKeyFileIs = ResourceUtils.getInputStream((Object)this, (String)this.privateKeyResource);
                builder.useTransportSecurity(certFileIs, privateKeyFileIs);
            }
            catch (Exception e) {
                log.error("Unable to set up SSL context. Reason: " + e.getMessage(), (Throwable)e);
                throw new RuntimeException("Unable to set up SSL context!", e);
            }
        }
        this.server = builder.build();
        log.info("Going to start Edge RPC server using port: {}", (Object)this.rpcPort);
        try {
            this.server.start();
        }
        catch (IOException e) {
            log.error("Failed to start Edge RPC server!", (Throwable)e);
            throw new RuntimeException("Failed to start Edge RPC server!");
        }
        this.edgeEventProcessingExecutorService = ThingsBoardExecutors.newScheduledThreadPool((int)this.schedulerPoolSize, (String)"edge-event-check-scheduler");
        this.sendDownlinkExecutorService = ThingsBoardExecutors.newScheduledThreadPool((int)this.sendSchedulerPoolSize, (String)"edge-send-scheduler");
        this.executorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor((String)"edge-service");
        this.executorService.scheduleAtFixedRate(this::cleanupZombieSessions, 60L, 60L, TimeUnit.SECONDS);
        log.info("Edge RPC service initialized!");
    }

    @PreDestroy
    public void destroy() {
        if (this.server != null) {
            this.server.shutdownNow();
        }
        for (Map.Entry entry : this.sessionEdgeEventChecks.entrySet()) {
            EdgeId edgeId = (EdgeId)entry.getKey();
            ScheduledFuture sessionEdgeEventCheck = (ScheduledFuture)entry.getValue();
            if (sessionEdgeEventCheck == null || sessionEdgeEventCheck.isCancelled() || sessionEdgeEventCheck.isDone()) continue;
            sessionEdgeEventCheck.cancel(true);
            this.sessionEdgeEventChecks.remove(edgeId);
        }
        if (this.edgeEventProcessingExecutorService != null) {
            this.edgeEventProcessingExecutorService.shutdownNow();
        }
        if (this.sendDownlinkExecutorService != null) {
            this.sendDownlinkExecutorService.shutdownNow();
        }
        if (this.executorService != null) {
            this.executorService.shutdownNow();
        }
    }

    public StreamObserver<RequestMsg> handleMsgs(StreamObserver<ResponseMsg> outputStream) {
        EdgeGrpcSession session = this.createEdgeGrpcSession(outputStream);
        return session.getInputStream();
    }

    private EdgeGrpcSession createEdgeGrpcSession(StreamObserver<ResponseMsg> outputStream) {
        return this.kafkaAdmin.isPresent() ? new KafkaEdgeGrpcSession(this.ctx, this.topicService, this.tbCoreQueueFactory, this.kafkaAdmin.get(), outputStream, this::onEdgeConnect, this::onEdgeDisconnect, this.sendDownlinkExecutorService, this.maxInboundMessageSize, this.maxHighPriorityQueueSizePerSession) : new PostgresEdgeGrpcSession(this.ctx, outputStream, this::onEdgeConnect, this::onEdgeDisconnect, this.sendDownlinkExecutorService, this.maxInboundMessageSize, this.maxHighPriorityQueueSizePerSession);
    }

    @Override
    public void onToEdgeSessionMsg(TenantId tenantId, EdgeSessionMsg msg) {
        switch (msg.getMsgType()) {
            case EDGE_HIGH_PRIORITY_TO_EDGE_SESSION_MSG: {
                EdgeHighPriorityMsg edgeHighPriorityMsg = (EdgeHighPriorityMsg)msg;
                log.trace("[{}] edgeEventMsg [{}]", (Object)tenantId, (Object)msg);
                this.onEdgeHighPriorityEvent(edgeHighPriorityMsg);
                break;
            }
            case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG: {
                EdgeEventUpdateMsg edgeEventUpdateMsg = (EdgeEventUpdateMsg)msg;
                log.trace("[{}] onToEdgeEventUpdateMsg [{}]", (Object)tenantId, (Object)msg);
                this.onEdgeEventUpdate(tenantId, edgeEventUpdateMsg.getEdgeId());
                break;
            }
            case EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG: {
                ToEdgeSyncRequest toEdgeSyncRequest = (ToEdgeSyncRequest)msg;
                log.trace("[{}] toEdgeSyncRequest [{}]", (Object)tenantId, (Object)msg);
                this.startSyncProcess(tenantId, toEdgeSyncRequest.getEdgeId(), toEdgeSyncRequest.getId(), toEdgeSyncRequest.getServiceId());
                break;
            }
            case EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG: {
                FromEdgeSyncResponse fromEdgeSyncResponse = (FromEdgeSyncResponse)msg;
                log.trace("[{}] fromEdgeSyncResponse [{}]", (Object)tenantId, (Object)msg);
                this.processSyncResponse(fromEdgeSyncResponse);
            }
        }
    }

    @Override
    public void updateEdge(TenantId tenantId, Edge edge) {
        if (edge == null) {
            log.warn("[{}] Edge is null - edge is removed and outdated notification is in process!", (Object)tenantId);
            return;
        }
        EdgeGrpcSession session = (EdgeGrpcSession)this.sessions.get(edge.getId());
        if (session != null && session.isConnected()) {
            log.debug("[{}] Updating configuration for edge [{}] [{}]", new Object[]{tenantId, edge.getName(), edge.getId()});
            session.onConfigurationUpdate(edge);
        } else {
            log.debug("[{}] Session doesn't exist for edge [{}] [{}]", new Object[]{tenantId, edge.getName(), edge.getId()});
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void deleteEdge(TenantId tenantId, EdgeId edgeId) {
        EdgeGrpcSession session = (EdgeGrpcSession)this.sessions.get(edgeId);
        if (session != null && session.isConnected()) {
            log.info("[{}] Closing and removing session for edge [{}]", (Object)tenantId, (Object)edgeId);
            this.destroySession(session);
            session.cleanUp();
            this.sessions.remove(edgeId);
            this.sessionsById.remove(session.getSessionId());
            Lock newEventLock = this.sessionNewEventsLocks.computeIfAbsent(edgeId, id -> new ReentrantLock());
            newEventLock.lock();
            try {
                this.sessionNewEvents.remove(edgeId);
            }
            finally {
                newEventLock.unlock();
            }
            this.cancelScheduleEdgeEventsCheck(edgeId);
        }
    }

    private void onEdgeEventUpdate(TenantId tenantId, EdgeId edgeId) {
        EdgeGrpcSession session = (EdgeGrpcSession)this.sessions.get(edgeId);
        if (session != null && session.isConnected()) {
            log.trace("[{}] onEdgeEventUpdate [{}]", (Object)tenantId, (Object)edgeId.getId());
            this.updateSessionEventsFlag(tenantId, edgeId);
        }
    }

    private void onEdgeHighPriorityEvent(EdgeHighPriorityMsg msg) {
        TenantId tenantId = msg.getTenantId();
        EdgeEvent edgeEvent = msg.getEdgeEvent();
        EdgeId edgeId = edgeEvent.getEdgeId();
        EdgeGrpcSession session = (EdgeGrpcSession)this.sessions.get(edgeId);
        if (session != null && session.isConnected()) {
            log.trace("[{}] onEdgeEvent [{}]", (Object)tenantId, (Object)edgeId);
            session.addEventToHighPriorityQueue(edgeEvent);
            this.updateSessionEventsFlag(tenantId, edgeId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateSessionEventsFlag(TenantId tenantId, EdgeId edgeId) {
        Lock newEventLock = this.sessionNewEventsLocks.computeIfAbsent(edgeId, id -> new ReentrantLock());
        newEventLock.lock();
        try {
            if (Boolean.FALSE.equals(this.sessionNewEvents.get(edgeId))) {
                log.trace("[{}] set session new events flag to true [{}]", (Object)tenantId, (Object)edgeId.getId());
                this.sessionNewEvents.put(edgeId, true);
            }
        }
        finally {
            newEventLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void onEdgeConnect(EdgeId edgeId, EdgeGrpcSession edgeGrpcSession) {
        EdgeGrpcSession existing;
        Edge edge = edgeGrpcSession.getEdge();
        TenantId tenantId = edge.getTenantId();
        log.info("[{}][{}] edge [{}] connected successfully.", new Object[]{tenantId, edgeGrpcSession.getSessionId(), edgeId});
        if (this.sessions.containsKey(edgeId) && (existing = (EdgeGrpcSession)this.sessions.get(edgeId)) != null) {
            log.info("[{}][{}] Replacing existing session [{}] for edge [{}]", new Object[]{tenantId, edgeGrpcSession.getSessionId(), existing.getSessionId(), edgeId});
            this.destroySession(existing);
            this.sessionsById.remove(existing.getSessionId());
        }
        this.sessions.put(edgeId, edgeGrpcSession);
        this.sessionsById.put(edgeGrpcSession.getSessionId(), edgeGrpcSession);
        Lock newEventLock = this.sessionNewEventsLocks.computeIfAbsent(edgeId, id -> new ReentrantLock());
        newEventLock.lock();
        try {
            this.sessionNewEvents.put(edgeId, true);
        }
        finally {
            newEventLock.unlock();
        }
        this.save(tenantId, edgeId, "active", true);
        long lastConnectTs = System.currentTimeMillis();
        this.save(tenantId, edgeId, "lastConnectTime", lastConnectTs);
        this.edgeIdServiceIdCache.put((Serializable)edgeId, (Serializable)((Object)this.serviceInfoProvider.getServiceId()));
        this.pushRuleEngineMessage(tenantId, edge, lastConnectTs, TbMsgType.CONNECT_EVENT);
        this.cancelScheduleEdgeEventsCheck(edgeId);
        this.edgeEventsMigrationProcessed.putIfAbsent(edgeId, Boolean.FALSE);
        this.scheduleEdgeEventsCheck(edgeGrpcSession);
    }

    private void startSyncProcess(TenantId tenantId, EdgeId edgeId, UUID requestId, String requestServiceId) {
        EdgeGrpcSession session = (EdgeGrpcSession)this.sessions.get(edgeId);
        if (session != null) {
            if (session.isSyncInProgress()) {
                this.clusterService.pushEdgeSyncResponseToCore(new FromEdgeSyncResponse(requestId, tenantId, edgeId, false, "Sync process is active at the moment"), requestServiceId);
            } else {
                boolean success = false;
                if (session.isConnected()) {
                    session.startSyncProcess(true);
                    success = true;
                }
                this.clusterService.pushEdgeSyncResponseToCore(new FromEdgeSyncResponse(requestId, tenantId, edgeId, success, ""), requestServiceId);
            }
        }
    }

    @Override
    public void processSyncRequest(TenantId tenantId, EdgeId edgeId, Consumer<FromEdgeSyncResponse> responseConsumer) {
        ToEdgeSyncRequest request = new ToEdgeSyncRequest(UUID.randomUUID(), tenantId, edgeId, this.serviceInfoProvider.getServiceId());
        UUID requestId = request.getId();
        EdgeGrpcSession session = (EdgeGrpcSession)this.sessions.get(request.getEdgeId());
        if (session != null && session.isSyncInProgress()) {
            responseConsumer.accept(new FromEdgeSyncResponse(requestId, request.getTenantId(), request.getEdgeId(), false, "Sync process is active at the moment"));
        } else {
            log.trace("[{}][{}] Processing sync edge request [{}], serviceId [{}]", new Object[]{request.getTenantId(), request.getId(), request.getEdgeId(), request.getServiceId()});
            this.localSyncEdgeRequests.put(requestId, responseConsumer);
            this.clusterService.pushEdgeSyncRequestToEdge(request);
            this.scheduleSyncRequestTimeout(request, requestId);
        }
    }

    private void scheduleSyncRequestTimeout(ToEdgeSyncRequest request, UUID requestId) {
        log.trace("[{}] scheduling sync edge request", (Object)requestId);
        this.executorService.schedule(() -> {
            log.trace("[{}] checking if sync edge request is not processed...", (Object)requestId);
            Consumer consumer = (Consumer)this.localSyncEdgeRequests.remove(requestId);
            if (consumer != null) {
                log.trace("[{}] timeout for processing sync edge request.", (Object)requestId);
                consumer.accept(new FromEdgeSyncResponse(requestId, request.getTenantId(), request.getEdgeId(), false, "Edge is not connected"));
            }
        }, 20L, TimeUnit.SECONDS);
    }

    private void processSyncResponse(FromEdgeSyncResponse response) {
        log.trace("[{}] Received response from sync service: [{}]", (Object)response.getId(), (Object)response);
        UUID requestId = response.getId();
        Consumer consumer = (Consumer)this.localSyncEdgeRequests.remove(requestId);
        if (consumer != null) {
            consumer.accept(response);
        } else {
            log.trace("[{}] Unknown or stale sync response received [{}]", (Object)requestId, (Object)response);
        }
    }

    private void scheduleEdgeEventsCheck(final EdgeGrpcSession session) {
        final EdgeId edgeId = session.getEdge().getId();
        final TenantId tenantId = session.getEdge().getTenantId();
        this.cancelScheduleEdgeEventsCheck(edgeId);
        if (this.sessions.containsKey(edgeId)) {
            ScheduledFuture<?> edgeEventCheckTask = this.edgeEventProcessingExecutorService.schedule(() -> {
                try {
                    Lock newEventLock = this.sessionNewEventsLocks.computeIfAbsent(edgeId, id -> new ReentrantLock());
                    newEventLock.lock();
                    try {
                        if (Boolean.TRUE.equals(this.sessionNewEvents.get(edgeId))) {
                            log.trace("[{}][{}] set session new events flag to false", (Object)tenantId, (Object)edgeId.getId());
                            this.sessionNewEvents.put(edgeId, false);
                            session.processHighPriorityEvents();
                            this.processEdgeEventMigrationIfNeeded(session, edgeId);
                            if (Boolean.TRUE.equals(this.edgeEventsMigrationProcessed.get(edgeId))) {
                                Futures.addCallback(session.processEdgeEvents(), (FutureCallback)new FutureCallback<Boolean>(){

                                    public void onSuccess(Boolean newEventsAdded) {
                                        if (Boolean.TRUE.equals(newEventsAdded)) {
                                            log.trace("[{}][{}] new events added. set session new events flag to true", (Object)tenantId, (Object)edgeId.getId());
                                            EdgeGrpcService.this.sessionNewEvents.put(edgeId, true);
                                        }
                                        EdgeGrpcService.this.scheduleEdgeEventsCheck(session);
                                    }

                                    public void onFailure(Throwable t) {
                                        log.warn("[{}] Failed to process edge events for edge [{}]!", new Object[]{tenantId, session.getEdge().getId().getId(), t});
                                        EdgeGrpcService.this.scheduleEdgeEventsCheck(session);
                                    }
                                }, (Executor)((Object)this.ctx.getGrpcCallbackExecutorService()));
                            } else {
                                this.scheduleEdgeEventsCheck(session);
                            }
                        } else {
                            this.scheduleEdgeEventsCheck(session);
                        }
                    }
                    finally {
                        newEventLock.unlock();
                    }
                }
                catch (Exception e) {
                    log.warn("[{}] Failed to process edge events for edge [{}]!", new Object[]{tenantId, session.getEdge().getId().getId(), e});
                }
            }, this.ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval(), TimeUnit.MILLISECONDS);
            this.sessionEdgeEventChecks.put(edgeId, edgeEventCheckTask);
            log.trace("[{}] Check edge event scheduled for edge [{}]", (Object)tenantId, (Object)edgeId.getId());
        } else {
            log.debug("[{}] Session was removed and edge event check schedule must not be started [{}]", (Object)tenantId, (Object)edgeId.getId());
        }
    }

    private void processEdgeEventMigrationIfNeeded(EdgeGrpcSession session, EdgeId edgeId) throws Exception {
        boolean isMigrationProcessed = this.edgeEventsMigrationProcessed.getOrDefault(edgeId, Boolean.FALSE);
        if (!isMigrationProcessed) {
            log.info("Starting edge event migration for edge [{}]", (Object)edgeId.getId());
            Boolean eventsExist = (Boolean)session.migrateEdgeEvents().get();
            if (Boolean.TRUE.equals(eventsExist)) {
                log.info("Migration still in progress for edge [{}]", (Object)edgeId.getId());
                this.sessionNewEvents.put(edgeId, true);
                this.scheduleEdgeEventsCheck(session);
            } else if (Boolean.FALSE.equals(eventsExist)) {
                log.info("Migration completed for edge [{}]", (Object)edgeId.getId());
                this.edgeEventsMigrationProcessed.put(edgeId, true);
            }
        }
    }

    private void cancelScheduleEdgeEventsCheck(EdgeId edgeId) {
        ScheduledFuture sessionEdgeEventCheck;
        log.trace("[{}] cancelling edge event check for edge", (Object)edgeId);
        if (this.sessionEdgeEventChecks.containsKey(edgeId) && (sessionEdgeEventCheck = (ScheduledFuture)this.sessionEdgeEventChecks.get(edgeId)) != null && !sessionEdgeEventCheck.isCancelled() && !sessionEdgeEventCheck.isDone()) {
            sessionEdgeEventCheck.cancel(true);
            this.sessionEdgeEventChecks.remove(edgeId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void onEdgeDisconnect(Edge edge, UUID sessionId) {
        EdgeId edgeId = edge.getId();
        log.info("[{}][{}] edge disconnected!", (Object)edgeId, (Object)sessionId);
        EdgeGrpcSession current = (EdgeGrpcSession)this.sessions.get(edgeId);
        if (current != null && current.getSessionId().equals(sessionId)) {
            EdgeGrpcSession toRemove = (EdgeGrpcSession)this.sessions.remove(edgeId);
            Lock newEventLock = this.sessionNewEventsLocks.computeIfAbsent(edgeId, id -> new ReentrantLock());
            newEventLock.lock();
            try {
                this.sessionNewEvents.remove(edgeId);
            }
            finally {
                newEventLock.unlock();
            }
            this.destroySession(toRemove);
            this.sessionsById.remove(sessionId);
            TenantId tenantId = toRemove.getEdge().getTenantId();
            this.save(tenantId, edgeId, "active", false);
            long lastDisconnectTs = System.currentTimeMillis();
            this.save(tenantId, edgeId, "lastDisconnectTime", lastDisconnectTs);
            this.pushRuleEngineMessage(toRemove.getEdge().getTenantId(), edge, lastDisconnectTs, TbMsgType.DISCONNECT_EVENT);
            this.cancelScheduleEdgeEventsCheck(edgeId);
        } else {
            log.info("[{}] edge session [{}] is not current anymore. Attempting to destroy it by sessionId.", (Object)edgeId, (Object)sessionId);
            EdgeGrpcSession stale = (EdgeGrpcSession)this.sessionsById.remove(sessionId);
            if (stale != null) {
                try {
                    this.destroySession(stale);
                    log.info("[{}][{}] Successfully destroyed stale session for edge [{}]", new Object[]{stale.getTenantId(), sessionId, edgeId});
                }
                catch (Exception e) {
                    log.warn("[{}][{}] Failed to destroy stale session for edge [{}]", new Object[]{stale.getTenantId(), sessionId, edgeId, e});
                }
            } else {
                log.debug("[{}] No session found by sessionId [{}] to destroy", (Object)edgeId, (Object)sessionId);
            }
        }
        this.edgeIdServiceIdCache.evict((Serializable)edgeId);
    }

    private void destroySession(EdgeGrpcSession session) {
        try (EdgeGrpcSession edgeGrpcSession = session;){
            if (!session.destroy()) {
                log.warn("[{}][{}] Session destroy failed for edge [{}] with session id [{}]. Adding to zombie queue for later cleanup.", new Object[]{session.getTenantId(), session.getEdge().getId(), session.getEdge().getName(), session.getSessionId()});
                this.zombieSessions.add(session);
            }
        }
        catch (Exception e) {
            log.warn("[{}][{}] Exception during session destroy for edge [{}] with session id [{}]", new Object[]{session.getTenantId(), session.getEdge().getId(), session.getEdge().getName(), session.getSessionId(), e});
        }
    }

    private void save(TenantId tenantId, EdgeId edgeId, String key, long value) {
        log.debug("[{}][{}] Updating long edge telemetry [{}] [{}]", new Object[]{tenantId, edgeId, key, value});
        if (this.persistToTelemetry) {
            this.tsSubService.saveTimeseries(TimeseriesSaveRequest.builder().tenantId(tenantId).entityId((EntityId)edgeId).entry((KvEntry)new LongDataEntry(key, Long.valueOf(value))).callback((FutureCallback)new AttributeSaveCallback(tenantId, edgeId, key, value)).build());
        } else {
            this.tsSubService.saveAttributes(AttributesSaveRequest.builder().tenantId(tenantId).entityId((EntityId)edgeId).scope(AttributeScope.SERVER_SCOPE).entry((KvEntry)new LongDataEntry(key, Long.valueOf(value))).callback((FutureCallback)new AttributeSaveCallback(tenantId, edgeId, key, value)).build());
        }
    }

    private void save(TenantId tenantId, EdgeId edgeId, String key, boolean value) {
        log.debug("[{}][{}] Updating boolean edge telemetry [{}] [{}]", new Object[]{tenantId, edgeId, key, value});
        if (this.persistToTelemetry) {
            this.tsSubService.saveTimeseries(TimeseriesSaveRequest.builder().tenantId(tenantId).entityId((EntityId)edgeId).entry((KvEntry)new BooleanDataEntry(key, Boolean.valueOf(value))).callback((FutureCallback)new AttributeSaveCallback(tenantId, edgeId, key, value)).build());
        } else {
            this.tsSubService.saveAttributes(AttributesSaveRequest.builder().tenantId(tenantId).entityId((EntityId)edgeId).scope(AttributeScope.SERVER_SCOPE).entry((KvEntry)new BooleanDataEntry(key, Boolean.valueOf(value))).callback((FutureCallback)new AttributeSaveCallback(tenantId, edgeId, key, value)).build());
        }
    }

    private void pushRuleEngineMessage(TenantId tenantId, Edge edge, long ts, TbMsgType msgType) {
        try {
            EdgeId edgeId = edge.getId();
            ObjectNode edgeState = JacksonUtil.newObjectNode();
            boolean isConnected = TbMsgType.CONNECT_EVENT.equals((Object)msgType);
            if (isConnected) {
                edgeState.put("active", true);
                edgeState.put("lastConnectTime", ts);
            } else {
                edgeState.put("active", false);
                edgeState.put("lastDisconnectTime", ts);
            }
            this.ctx.getRuleProcessor().process((NotificationRuleTrigger)EdgeConnectionTrigger.builder().tenantId(tenantId).customerId(edge.getCustomerId()).edgeId(edgeId).edgeName(edge.getName()).connected(isConnected).build());
            String data = JacksonUtil.toString((Object)edgeState);
            TbMsgMetaData md = new TbMsgMetaData();
            if (!this.persistToTelemetry) {
                md.putValue("scope", "SERVER_SCOPE");
                md.putValue("edgeName", edge.getName());
                md.putValue("edgeType", edge.getType());
            }
            TbMsg tbMsg = TbMsg.newMsg().type(msgType).originator((EntityId)edgeId).copyMetaData(md).dataType(TbMsgDataType.JSON).data(data).build();
            this.clusterService.pushMsgToRuleEngine(tenantId, (EntityId)edgeId, tbMsg, null);
        }
        catch (Exception e) {
            log.warn("[{}][{}] Failed to push {}", new Object[]{tenantId, edge.getId(), msgType, e});
        }
    }

    private void cleanupZombieSessions() {
        try {
            this.tryToDestroyZombieSessions(this.getZombieSessions(this.sessions.values()), s -> (EdgeGrpcSession)this.sessions.remove(s.getEdge().getId()));
            this.tryToDestroyZombieSessions(this.getZombieSessions(this.sessionsById.values()), s -> (EdgeGrpcSession)this.sessionsById.remove(s.getSessionId()));
            this.zombieSessions.removeIf(zombie -> {
                if (zombie.destroy()) {
                    log.info("[{}][{}] Successfully cleaned up zombie session [{}] for edge [{}].", new Object[]{zombie.getTenantId(), zombie.getEdge().getId(), zombie.getSessionId(), zombie.getEdge().getName()});
                    return true;
                }
                log.warn("[{}][{}] Failed to remove zombie session [{}] for edge [{}].", new Object[]{zombie.getTenantId(), zombie.getEdge().getId(), zombie.getSessionId(), zombie.getEdge().getName()});
                return false;
            });
        }
        catch (Exception e) {
            log.warn("Failed to cleanup kafka sessions", (Throwable)e);
        }
    }

    private List<EdgeGrpcSession> getZombieSessions(Collection<EdgeGrpcSession> sessions) {
        ArrayList<EdgeGrpcSession> result = new ArrayList<EdgeGrpcSession>();
        for (EdgeGrpcSession session : sessions) {
            if (!this.isKafkaSessionAndZombie(session)) continue;
            result.add(session);
        }
        return result;
    }

    private void tryToDestroyZombieSessions(List<EdgeGrpcSession> sessionsToRemove, Function<EdgeGrpcSession, EdgeGrpcSession> removeFunc) {
        for (EdgeGrpcSession toRemove : sessionsToRemove) {
            log.info("[{}] Destroying session for edge because edge is not connected", (Object)toRemove.getEdge().getId());
            if (!toRemove.destroy()) continue;
            removeFunc.apply(toRemove);
        }
    }

    private boolean isKafkaSessionAndZombie(EdgeGrpcSession session) {
        if (session instanceof KafkaEdgeGrpcSession) {
            KafkaEdgeGrpcSession kafkaSession = (KafkaEdgeGrpcSession)session;
            Object[] objectArray = new Object[3];
            objectArray[0] = kafkaSession.getEdge().getId();
            objectArray[1] = kafkaSession.isConnected();
            objectArray[2] = kafkaSession.getConsumer() != null ? (kafkaSession.getConsumer().getConsumer() != null ? Boolean.valueOf(kafkaSession.getConsumer().getConsumer().isStopped()) : null) : null;
            log.debug("[{}] kafkaSession.isConnected() = {}, kafkaSession.getConsumer().getConsumer().isStopped() = {}", objectArray);
            return !kafkaSession.isConnected() && kafkaSession.getConsumer() != null && kafkaSession.getConsumer().getConsumer() != null && !kafkaSession.getConsumer().getConsumer().isStopped();
        }
        return false;
    }

    private static class AttributeSaveCallback
    implements FutureCallback<Void> {
        private final TenantId tenantId;
        private final EdgeId edgeId;
        private final String key;
        private final Object value;

        AttributeSaveCallback(TenantId tenantId, EdgeId edgeId, String key, Object value) {
            this.tenantId = tenantId;
            this.edgeId = edgeId;
            this.key = key;
            this.value = value;
        }

        public void onSuccess(@Nullable Void result) {
            log.trace("[{}][{}] Successfully updated attribute [{}] with value [{}]", new Object[]{this.tenantId, this.edgeId, this.key, this.value});
        }

        public void onFailure(Throwable t) {
            log.warn("[{}][{}] Failed to update attribute [{}] with value [{}]", new Object[]{this.tenantId, this.edgeId, this.key, this.value, t});
        }
    }
}

