package org.thingsboard.server.service.edge.rpc;

import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import io.grpc.Server;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.StreamObserver;
import jakarta.annotation.Nullable;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import org.apache.kafka.common.network.NetworkReceive;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.actors.calculatedField.CalculatedFieldEntityMessageProcessor;
import org.thingsboard.server.cache.TbTransactionalCache;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.ResourceUtils;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.notification.rule.trigger.EdgeConnectionTrigger;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg;
import org.thingsboard.server.common.msg.edge.EdgeHighPriorityMsg;
import org.thingsboard.server.common.msg.edge.EdgeSessionMsg;
import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse;
import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest;
import org.thingsboard.server.gen.edge.v1.EdgeRpcServiceGrpc;
import org.thingsboard.server.gen.edge.v1.RequestMsg;
import org.thingsboard.server.gen.edge.v1.ResponseMsg;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.edge.EdgeContextComponent;
import org.thingsboard.server.service.state.DefaultDeviceStateService;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;

@TbCoreComponent
@ConditionalOnProperty(prefix = "edges", value = {"enabled"}, havingValue = "true")
@Service
/* loaded from: input_file:org/thingsboard/server/service/edge/rpc/EdgeGrpcService.class */
public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase implements EdgeRpcService {
    private static final Logger log = LoggerFactory.getLogger(EdgeGrpcService.class);
    private final ConcurrentMap<EdgeId, EdgeGrpcSession> sessions = new ConcurrentHashMap();
    private final ConcurrentMap<EdgeId, Lock> sessionNewEventsLocks = new ConcurrentHashMap();
    private final Map<EdgeId, Boolean> sessionNewEvents = new HashMap();
    private final ConcurrentMap<EdgeId, ScheduledFuture<?>> sessionEdgeEventChecks = new ConcurrentHashMap();
    private final ConcurrentMap<UUID, Consumer<FromEdgeSyncResponse>> localSyncEdgeRequests = new ConcurrentHashMap();
    private final ConcurrentMap<EdgeId, Boolean> edgeEventsMigrationProcessed = new ConcurrentHashMap();

    @Value("${edges.rpc.port}")
    private int rpcPort;

    @Value("${edges.rpc.ssl.enabled}")
    private boolean sslEnabled;

    @Value("${edges.rpc.ssl.cert}")
    private String certFileResource;

    @Value("${edges.rpc.ssl.private_key}")
    private String privateKeyResource;

    @Value("${edges.state.persistToTelemetry:false}")
    private boolean persistToTelemetry;

    @Value("${edges.rpc.client_max_keep_alive_time_sec:1}")
    private int clientMaxKeepAliveTimeSec;

    @Value("${edges.rpc.max_inbound_message_size:4194304}")
    private int maxInboundMessageSize;

    @Value("${edges.rpc.keep_alive_time_sec:10}")
    private int keepAliveTimeSec;

    @Value("${edges.rpc.keep_alive_timeout_sec:5}")
    private int keepAliveTimeoutSec;

    @Value("${edges.scheduler_pool_size}")
    private int schedulerPoolSize;

    @Value("${edges.send_scheduler_pool_size}")
    private int sendSchedulerPoolSize;

    @Value("${edges.max_high_priority_queue_size_per_session:10000}")
    private int maxHighPriorityQueueSizePerSession;

    @Autowired
    @Lazy
    private EdgeContextComponent ctx;

    @Autowired
    private TelemetrySubscriptionService tsSubService;

    @Autowired
    private TbClusterService clusterService;

    @Autowired
    private TbServiceInfoProvider serviceInfoProvider;

    @Autowired
    private TbTransactionalCache<EdgeId, String> edgeIdServiceIdCache;

    @Autowired
    private TopicService topicService;

    @Autowired
    private TbCoreQueueFactory tbCoreQueueFactory;

    @Autowired
    private Optional<TbKafkaSettings> kafkaSettings;

    @Autowired
    private Optional<TbKafkaTopicConfigs> kafkaTopicConfigs;
    private Server server;
    private ScheduledExecutorService edgeEventProcessingExecutorService;
    private ScheduledExecutorService sendDownlinkExecutorService;
    private ScheduledExecutorService executorService;

    /* renamed from: org.thingsboard.server.service.edge.rpc.EdgeGrpcService$2, reason: invalid class name */
    /* loaded from: input_file:org/thingsboard/server/service/edge/rpc/EdgeGrpcService$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$thingsboard$server$common$msg$MsgType = new int[MsgType.values().length];

        static {
            try {
                $SwitchMap$org$thingsboard$server$common$msg$MsgType[MsgType.EDGE_HIGH_PRIORITY_TO_EDGE_SESSION_MSG.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$thingsboard$server$common$msg$MsgType[MsgType.EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$thingsboard$server$common$msg$MsgType[MsgType.EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$thingsboard$server$common$msg$MsgType[MsgType.EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/thingsboard/server/service/edge/rpc/EdgeGrpcService$AttributeSaveCallback.class */
    public static class AttributeSaveCallback implements FutureCallback<Void> {
        private final TenantId tenantId;
        private final EdgeId edgeId;
        private final String key;
        private final Object value;

        AttributeSaveCallback(TenantId tenantId, EdgeId edgeId, String str, Object obj) {
            this.tenantId = tenantId;
            this.edgeId = edgeId;
            this.key = str;
            this.value = obj;
        }

        public void onSuccess(@Nullable Void r8) {
            EdgeGrpcService.log.trace("[{}][{}] Successfully updated attribute [{}] with value [{}]", new Object[]{this.tenantId, this.edgeId, this.key, this.value});
        }

        public void onFailure(Throwable th) {
            EdgeGrpcService.log.warn("[{}][{}] Failed to update attribute [{}] with value [{}]", new Object[]{this.tenantId, this.edgeId, this.key, this.value, th});
        }
    }

    @PostConstruct
    public void init() {
        log.info("Initializing Edge RPC service!");
        NettyServerBuilder addService = NettyServerBuilder.forPort(this.rpcPort).permitKeepAliveTime(this.clientMaxKeepAliveTimeSec, TimeUnit.SECONDS).keepAliveTime(this.keepAliveTimeSec, TimeUnit.SECONDS).keepAliveTimeout(this.keepAliveTimeoutSec, TimeUnit.SECONDS).permitKeepAliveWithoutCalls(true).maxInboundMessageSize(this.maxInboundMessageSize).addService(this);
        if (this.sslEnabled) {
            try {
                addService.useTransportSecurity(ResourceUtils.getInputStream(this, this.certFileResource), ResourceUtils.getInputStream(this, this.privateKeyResource));
            } catch (Exception e) {
                log.error("Unable to set up SSL context. Reason: " + e.getMessage(), e);
                throw new RuntimeException("Unable to set up SSL context!", e);
            }
        }
        this.server = addService.build();
        log.info("Going to start Edge RPC server using port: {}", Integer.valueOf(this.rpcPort));
        try {
            this.server.start();
            this.edgeEventProcessingExecutorService = ThingsBoardExecutors.newScheduledThreadPool(this.schedulerPoolSize, "edge-event-check-scheduler");
            this.sendDownlinkExecutorService = ThingsBoardExecutors.newScheduledThreadPool(this.sendSchedulerPoolSize, "edge-send-scheduler");
            this.executorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("edge-service");
            log.info("Edge RPC service initialized!");
        } catch (IOException e2) {
            log.error("Failed to start Edge RPC server!", e2);
            throw new RuntimeException("Failed to start Edge RPC server!");
        }
    }

    @PreDestroy
    public void destroy() {
        if (this.server != null) {
            this.server.shutdownNow();
        }
        for (Map.Entry<EdgeId, ScheduledFuture<?>> entry : this.sessionEdgeEventChecks.entrySet()) {
            EdgeId key = entry.getKey();
            ScheduledFuture<?> value = entry.getValue();
            if (value != null && !value.isCancelled() && !value.isDone()) {
                value.cancel(true);
                this.sessionEdgeEventChecks.remove(key);
            }
        }
        if (this.edgeEventProcessingExecutorService != null) {
            this.edgeEventProcessingExecutorService.shutdownNow();
        }
        if (this.sendDownlinkExecutorService != null) {
            this.sendDownlinkExecutorService.shutdownNow();
        }
        if (this.executorService != null) {
            this.executorService.shutdownNow();
        }
    }

    public StreamObserver<RequestMsg> handleMsgs(StreamObserver<ResponseMsg> streamObserver) {
        return createEdgeGrpcSession(streamObserver).getInputStream();
    }

    private EdgeGrpcSession createEdgeGrpcSession(StreamObserver<ResponseMsg> streamObserver) {
        return (this.kafkaSettings.isPresent() && this.kafkaTopicConfigs.isPresent()) ? new KafkaEdgeGrpcSession(this.ctx, this.topicService, this.tbCoreQueueFactory, this.kafkaSettings.get(), this.kafkaTopicConfigs.get(), streamObserver, this::onEdgeConnect, this::onEdgeDisconnect, this.sendDownlinkExecutorService, this.maxInboundMessageSize, this.maxHighPriorityQueueSizePerSession) : new PostgresEdgeGrpcSession(this.ctx, streamObserver, this::onEdgeConnect, this::onEdgeDisconnect, this.sendDownlinkExecutorService, this.maxInboundMessageSize, this.maxHighPriorityQueueSizePerSession);
    }

    @Override // org.thingsboard.server.service.edge.rpc.EdgeRpcService
    public void onToEdgeSessionMsg(TenantId tenantId, EdgeSessionMsg edgeSessionMsg) {
        switch (AnonymousClass2.$SwitchMap$org$thingsboard$server$common$msg$MsgType[edgeSessionMsg.getMsgType().ordinal()]) {
            case 1:
                log.trace("[{}] edgeEventMsg [{}]", tenantId, edgeSessionMsg);
                onEdgeHighPriorityEvent((EdgeHighPriorityMsg) edgeSessionMsg);
                return;
            case CalculatedFieldEntityMessageProcessor.CALLBACKS_PER_CF /* 2 */:
                log.trace("[{}] onToEdgeEventUpdateMsg [{}]", tenantId, edgeSessionMsg);
                onEdgeEventUpdate(tenantId, ((EdgeEventUpdateMsg) edgeSessionMsg).getEdgeId());
                return;
            case 3:
                ToEdgeSyncRequest toEdgeSyncRequest = (ToEdgeSyncRequest) edgeSessionMsg;
                log.trace("[{}] toEdgeSyncRequest [{}]", tenantId, edgeSessionMsg);
                startSyncProcess(tenantId, toEdgeSyncRequest.getEdgeId(), toEdgeSyncRequest.getId(), toEdgeSyncRequest.getServiceId());
                return;
            case 4:
                log.trace("[{}] fromEdgeSyncResponse [{}]", tenantId, edgeSessionMsg);
                processSyncResponse((FromEdgeSyncResponse) edgeSessionMsg);
                return;
            default:
                return;
        }
    }

    @Override // org.thingsboard.server.service.edge.rpc.EdgeRpcService
    public void updateEdge(TenantId tenantId, Edge edge) {
        EdgeGrpcSession edgeGrpcSession = this.sessions.get(edge.getId());
        if (edgeGrpcSession == null || !edgeGrpcSession.isConnected()) {
            log.debug("[{}] Session doesn't exist for edge [{}] [{}]", new Object[]{tenantId, edge.getName(), edge.getId()});
        } else {
            log.debug("[{}] Updating configuration for edge [{}] [{}]", new Object[]{tenantId, edge.getName(), edge.getId()});
            edgeGrpcSession.onConfigurationUpdate(edge);
        }
    }

    @Override // org.thingsboard.server.service.edge.rpc.EdgeRpcService
    public void deleteEdge(TenantId tenantId, EdgeId edgeId) {
        EdgeGrpcSession edgeGrpcSession = this.sessions.get(edgeId);
        if (edgeGrpcSession == null || !edgeGrpcSession.isConnected()) {
            return;
        }
        log.info("[{}] Closing and removing session for edge [{}]", tenantId, edgeId);
        edgeGrpcSession.destroy();
        edgeGrpcSession.cleanUp();
        edgeGrpcSession.close();
        this.sessions.remove(edgeId);
        Lock computeIfAbsent = this.sessionNewEventsLocks.computeIfAbsent(edgeId, edgeId2 -> {
            return new ReentrantLock();
        });
        computeIfAbsent.lock();
        try {
            this.sessionNewEvents.remove(edgeId);
            computeIfAbsent.unlock();
            cancelScheduleEdgeEventsCheck(edgeId);
        } catch (Throwable th) {
            computeIfAbsent.unlock();
            throw th;
        }
    }

    private void onEdgeEventUpdate(TenantId tenantId, EdgeId edgeId) {
        EdgeGrpcSession edgeGrpcSession = this.sessions.get(edgeId);
        if (edgeGrpcSession == null || !edgeGrpcSession.isConnected()) {
            return;
        }
        log.trace("[{}] onEdgeEventUpdate [{}]", tenantId, edgeId.getId());
        updateSessionEventsFlag(tenantId, edgeId);
    }

    private void onEdgeHighPriorityEvent(EdgeHighPriorityMsg edgeHighPriorityMsg) {
        TenantId tenantId = edgeHighPriorityMsg.getTenantId();
        EdgeEvent edgeEvent = edgeHighPriorityMsg.getEdgeEvent();
        EdgeId edgeId = edgeEvent.getEdgeId();
        EdgeGrpcSession edgeGrpcSession = this.sessions.get(edgeId);
        if (edgeGrpcSession == null || !edgeGrpcSession.isConnected()) {
            return;
        }
        log.trace("[{}] onEdgeEvent [{}]", tenantId, edgeId);
        edgeGrpcSession.addEventToHighPriorityQueue(edgeEvent);
        updateSessionEventsFlag(tenantId, edgeId);
    }

    private void updateSessionEventsFlag(TenantId tenantId, EdgeId edgeId) {
        Lock computeIfAbsent = this.sessionNewEventsLocks.computeIfAbsent(edgeId, edgeId2 -> {
            return new ReentrantLock();
        });
        computeIfAbsent.lock();
        try {
            if (Boolean.FALSE.equals(this.sessionNewEvents.get(edgeId))) {
                log.trace("[{}] set session new events flag to true [{}]", tenantId, edgeId.getId());
                this.sessionNewEvents.put(edgeId, true);
            }
        } finally {
            computeIfAbsent.unlock();
        }
    }

    private void onEdgeConnect(EdgeId edgeId, EdgeGrpcSession edgeGrpcSession) {
        Edge edge = edgeGrpcSession.getEdge();
        TenantId tenantId = edge.getTenantId();
        log.info("[{}][{}] edge [{}] connected successfully.", new Object[]{tenantId, edgeGrpcSession.getSessionId(), edgeId});
        this.sessions.put(edgeId, edgeGrpcSession);
        Lock computeIfAbsent = this.sessionNewEventsLocks.computeIfAbsent(edgeId, edgeId2 -> {
            return new ReentrantLock();
        });
        computeIfAbsent.lock();
        try {
            this.sessionNewEvents.put(edgeId, true);
            computeIfAbsent.unlock();
            save(tenantId, edgeId, "active", true);
            long currentTimeMillis = System.currentTimeMillis();
            save(tenantId, edgeId, DefaultDeviceStateService.LAST_CONNECT_TIME, currentTimeMillis);
            this.edgeIdServiceIdCache.put(edgeId, this.serviceInfoProvider.getServiceId());
            pushRuleEngineMessage(tenantId, edge, currentTimeMillis, TbMsgType.CONNECT_EVENT);
            cancelScheduleEdgeEventsCheck(edgeId);
            this.edgeEventsMigrationProcessed.putIfAbsent(edgeId, Boolean.FALSE);
            scheduleEdgeEventsCheck(edgeGrpcSession);
        } catch (Throwable th) {
            computeIfAbsent.unlock();
            throw th;
        }
    }

    private void startSyncProcess(TenantId tenantId, EdgeId edgeId, UUID uuid, String str) {
        EdgeGrpcSession edgeGrpcSession = this.sessions.get(edgeId);
        if (edgeGrpcSession != null) {
            if (edgeGrpcSession.isSyncInProgress()) {
                this.clusterService.pushEdgeSyncResponseToCore(new FromEdgeSyncResponse(uuid, tenantId, edgeId, false, "Sync process is active at the moment"), str);
                return;
            }
            boolean z = false;
            if (edgeGrpcSession.isConnected()) {
                edgeGrpcSession.startSyncProcess(true);
                z = true;
            }
            this.clusterService.pushEdgeSyncResponseToCore(new FromEdgeSyncResponse(uuid, tenantId, edgeId, z, NetworkReceive.UNKNOWN_SOURCE), str);
        }
    }

    @Override // org.thingsboard.server.service.edge.rpc.EdgeRpcService
    public void processSyncRequest(TenantId tenantId, EdgeId edgeId, Consumer<FromEdgeSyncResponse> consumer) {
        ToEdgeSyncRequest toEdgeSyncRequest = new ToEdgeSyncRequest(UUID.randomUUID(), tenantId, edgeId, this.serviceInfoProvider.getServiceId());
        UUID id = toEdgeSyncRequest.getId();
        EdgeGrpcSession edgeGrpcSession = this.sessions.get(toEdgeSyncRequest.getEdgeId());
        if (edgeGrpcSession != null && edgeGrpcSession.isSyncInProgress()) {
            consumer.accept(new FromEdgeSyncResponse(id, toEdgeSyncRequest.getTenantId(), toEdgeSyncRequest.getEdgeId(), false, "Sync process is active at the moment"));
            return;
        }
        log.trace("[{}][{}] Processing sync edge request [{}], serviceId [{}]", new Object[]{toEdgeSyncRequest.getTenantId(), toEdgeSyncRequest.getId(), toEdgeSyncRequest.getEdgeId(), toEdgeSyncRequest.getServiceId()});
        this.localSyncEdgeRequests.put(id, consumer);
        this.clusterService.pushEdgeSyncRequestToEdge(toEdgeSyncRequest);
        scheduleSyncRequestTimeout(toEdgeSyncRequest, id);
    }

    private void scheduleSyncRequestTimeout(ToEdgeSyncRequest toEdgeSyncRequest, UUID uuid) {
        log.trace("[{}] scheduling sync edge request", uuid);
        this.executorService.schedule(() -> {
            log.trace("[{}] checking if sync edge request is not processed...", uuid);
            Consumer<FromEdgeSyncResponse> remove = this.localSyncEdgeRequests.remove(uuid);
            if (remove != null) {
                log.trace("[{}] timeout for processing sync edge request.", uuid);
                remove.accept(new FromEdgeSyncResponse(uuid, toEdgeSyncRequest.getTenantId(), toEdgeSyncRequest.getEdgeId(), false, "Edge is not connected"));
            }
        }, 20L, TimeUnit.SECONDS);
    }

    private void processSyncResponse(FromEdgeSyncResponse fromEdgeSyncResponse) {
        log.trace("[{}] Received response from sync service: [{}]", fromEdgeSyncResponse.getId(), fromEdgeSyncResponse);
        UUID id = fromEdgeSyncResponse.getId();
        Consumer<FromEdgeSyncResponse> remove = this.localSyncEdgeRequests.remove(id);
        if (remove != null) {
            remove.accept(fromEdgeSyncResponse);
        } else {
            log.trace("[{}] Unknown or stale sync response received [{}]", id, fromEdgeSyncResponse);
        }
    }

    private void scheduleEdgeEventsCheck(EdgeGrpcSession edgeGrpcSession) {
        EdgeId id = edgeGrpcSession.getEdge().getId();
        TenantId tenantId = edgeGrpcSession.getEdge().getTenantId();
        cancelScheduleEdgeEventsCheck(id);
        if (!this.sessions.containsKey(id)) {
            log.debug("[{}] Session was removed and edge event check schedule must not be started [{}]", tenantId, id.getId());
            return;
        }
        this.sessionEdgeEventChecks.put(id, this.edgeEventProcessingExecutorService.schedule(() -> {
            try {
                Lock computeIfAbsent = this.sessionNewEventsLocks.computeIfAbsent(id, edgeId -> {
                    return new ReentrantLock();
                });
                computeIfAbsent.lock();
                try {
                    if (Boolean.TRUE.equals(this.sessionNewEvents.get(id))) {
                        log.trace("[{}][{}] set session new events flag to false", tenantId, id.getId());
                        this.sessionNewEvents.put(id, false);
                        edgeGrpcSession.processHighPriorityEvents();
                        processEdgeEventMigrationIfNeeded(edgeGrpcSession, id);
                        if (Boolean.TRUE.equals(this.edgeEventsMigrationProcessed.get(id))) {
                            Futures.addCallback(edgeGrpcSession.processEdgeEvents(), new FutureCallback<Boolean>() { // from class: org.thingsboard.server.service.edge.rpc.EdgeGrpcService.1
                                public void onSuccess(Boolean bool) {
                                    if (Boolean.TRUE.equals(bool)) {
                                        EdgeGrpcService.log.trace("[{}][{}] new events added. set session new events flag to true", tenantId, id.getId());
                                        EdgeGrpcService.this.sessionNewEvents.put(id, true);
                                    }
                                    EdgeGrpcService.this.scheduleEdgeEventsCheck(edgeGrpcSession);
                                }

                                public void onFailure(Throwable th) {
                                    EdgeGrpcService.log.warn("[{}] Failed to process edge events for edge [{}]!", new Object[]{tenantId, edgeGrpcSession.getEdge().getId().getId(), th});
                                    EdgeGrpcService.this.scheduleEdgeEventsCheck(edgeGrpcSession);
                                }
                            }, this.ctx.getGrpcCallbackExecutorService());
                        } else {
                            scheduleEdgeEventsCheck(edgeGrpcSession);
                        }
                    } else {
                        scheduleEdgeEventsCheck(edgeGrpcSession);
                    }
                    computeIfAbsent.unlock();
                } catch (Throwable th) {
                    computeIfAbsent.unlock();
                    throw th;
                }
            } catch (Exception e) {
                log.warn("[{}] Failed to process edge events for edge [{}]!", new Object[]{tenantId, edgeGrpcSession.getEdge().getId().getId(), e});
            }
        }, this.ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval(), TimeUnit.MILLISECONDS));
        log.trace("[{}] Check edge event scheduled for edge [{}]", tenantId, id.getId());
    }

    private void processEdgeEventMigrationIfNeeded(EdgeGrpcSession edgeGrpcSession, EdgeId edgeId) throws Exception {
        if (this.edgeEventsMigrationProcessed.getOrDefault(edgeId, Boolean.FALSE).booleanValue()) {
            return;
        }
        Boolean bool = (Boolean) edgeGrpcSession.migrateEdgeEvents().get();
        if (Boolean.TRUE.equals(bool)) {
            this.sessionNewEvents.put(edgeId, true);
            scheduleEdgeEventsCheck(edgeGrpcSession);
        } else if (Boolean.FALSE.equals(bool)) {
            this.edgeEventsMigrationProcessed.put(edgeId, true);
        }
    }

    private void cancelScheduleEdgeEventsCheck(EdgeId edgeId) {
        ScheduledFuture<?> scheduledFuture;
        log.trace("[{}] cancelling edge event check for edge", edgeId);
        if (!this.sessionEdgeEventChecks.containsKey(edgeId) || (scheduledFuture = this.sessionEdgeEventChecks.get(edgeId)) == null || scheduledFuture.isCancelled() || scheduledFuture.isDone()) {
            return;
        }
        scheduledFuture.cancel(true);
        this.sessionEdgeEventChecks.remove(edgeId);
    }

    private void onEdgeDisconnect(Edge edge, UUID uuid) {
        EdgeId id = edge.getId();
        log.info("[{}][{}] edge disconnected!", id, uuid);
        if (this.sessions.get(id).getSessionId().equals(uuid)) {
            EdgeGrpcSession remove = this.sessions.remove(id);
            Lock computeIfAbsent = this.sessionNewEventsLocks.computeIfAbsent(id, edgeId -> {
                return new ReentrantLock();
            });
            computeIfAbsent.lock();
            try {
                this.sessionNewEvents.remove(id);
                computeIfAbsent.unlock();
                remove.destroy();
                TenantId tenantId = remove.getEdge().getTenantId();
                save(tenantId, id, "active", false);
                long currentTimeMillis = System.currentTimeMillis();
                save(tenantId, id, DefaultDeviceStateService.LAST_DISCONNECT_TIME, currentTimeMillis);
                pushRuleEngineMessage(remove.getEdge().getTenantId(), edge, currentTimeMillis, TbMsgType.DISCONNECT_EVENT);
                cancelScheduleEdgeEventsCheck(id);
            } catch (Throwable th) {
                computeIfAbsent.unlock();
                throw th;
            }
        } else {
            log.debug("[{}] edge session [{}] is not available anymore, nothing to remove. most probably this session is already outdated!", id, uuid);
        }
        this.edgeIdServiceIdCache.evict(id);
    }

    private void save(TenantId tenantId, EdgeId edgeId, String str, long j) {
        log.debug("[{}][{}] Updating long edge telemetry [{}] [{}]", new Object[]{tenantId, edgeId, str, Long.valueOf(j)});
        if (this.persistToTelemetry) {
            this.tsSubService.saveTimeseries(TimeseriesSaveRequest.builder().tenantId(tenantId).entityId(edgeId).entry(new LongDataEntry(str, Long.valueOf(j))).callback(new AttributeSaveCallback(tenantId, edgeId, str, Long.valueOf(j))).build());
        } else {
            this.tsSubService.saveAttributes(AttributesSaveRequest.builder().tenantId(tenantId).entityId(edgeId).scope(AttributeScope.SERVER_SCOPE).entry(new LongDataEntry(str, Long.valueOf(j))).callback(new AttributeSaveCallback(tenantId, edgeId, str, Long.valueOf(j))).build());
        }
    }

    private void save(TenantId tenantId, EdgeId edgeId, String str, boolean z) {
        log.debug("[{}][{}] Updating boolean edge telemetry [{}] [{}]", new Object[]{tenantId, edgeId, str, Boolean.valueOf(z)});
        if (this.persistToTelemetry) {
            this.tsSubService.saveTimeseries(TimeseriesSaveRequest.builder().tenantId(tenantId).entityId(edgeId).entry(new BooleanDataEntry(str, Boolean.valueOf(z))).callback(new AttributeSaveCallback(tenantId, edgeId, str, Boolean.valueOf(z))).build());
        } else {
            this.tsSubService.saveAttributes(AttributesSaveRequest.builder().tenantId(tenantId).entityId(edgeId).scope(AttributeScope.SERVER_SCOPE).entry(new BooleanDataEntry(str, Boolean.valueOf(z))).callback(new AttributeSaveCallback(tenantId, edgeId, str, Boolean.valueOf(z))).build());
        }
    }

    private void pushRuleEngineMessage(TenantId tenantId, Edge edge, long j, TbMsgType tbMsgType) {
        try {
            EdgeId id = edge.getId();
            ObjectNode newObjectNode = JacksonUtil.newObjectNode();
            boolean equals = TbMsgType.CONNECT_EVENT.equals(tbMsgType);
            if (equals) {
                newObjectNode.put("active", true);
                newObjectNode.put(DefaultDeviceStateService.LAST_CONNECT_TIME, j);
            } else {
                newObjectNode.put("active", false);
                newObjectNode.put(DefaultDeviceStateService.LAST_DISCONNECT_TIME, j);
            }
            this.ctx.getRuleProcessor().process(EdgeConnectionTrigger.builder().tenantId(tenantId).customerId(edge.getCustomerId()).edgeId(id).edgeName(edge.getName()).connected(equals).build());
            String jacksonUtil = JacksonUtil.toString(newObjectNode);
            TbMsgMetaData tbMsgMetaData = new TbMsgMetaData();
            if (!this.persistToTelemetry) {
                tbMsgMetaData.putValue("scope", "SERVER_SCOPE");
                tbMsgMetaData.putValue("edgeName", edge.getName());
                tbMsgMetaData.putValue("edgeType", edge.getType());
            }
            this.clusterService.pushMsgToRuleEngine(tenantId, id, TbMsg.newMsg().type(tbMsgType).originator(id).copyMetaData(tbMsgMetaData).dataType(TbMsgDataType.JSON).data(jacksonUtil).build(), (TbQueueCallback) null);
        } catch (Exception e) {
            log.warn("[{}][{}] Failed to push {}", new Object[]{tenantId, edge.getId(), tbMsgType, e});
        }
    }
}
