/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.service.queue;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.data.queue.QueueConfig;
import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg;
import org.thingsboard.server.common.msg.edge.EdgeHighPriorityMsg;
import org.thingsboard.server.common.msg.edge.EdgeSessionMsg;
import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse;
import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.edge.EdgeContextComponent;
import org.thingsboard.server.service.edge.rpc.EdgeRpcService;
import org.thingsboard.server.service.queue.EdgeConsumerStats;
import org.thingsboard.server.service.queue.PendingMsgHolder;
import org.thingsboard.server.service.queue.TbEdgeConsumerService;
import org.thingsboard.server.service.queue.TbPackCallback;
import org.thingsboard.server.service.queue.TbPackProcessingContext;
import org.thingsboard.server.service.queue.processing.AbstractConsumerService;
import org.thingsboard.server.service.queue.processing.IdMsgPair;

@Service
@TbCoreComponent
public class DefaultTbEdgeConsumerService
extends AbstractConsumerService<TransportProtos.ToEdgeNotificationMsg>
implements TbEdgeConsumerService {
    @Value(value="${queue.edge.pool-interval:25}")
    private int pollInterval;
    @Value(value="${queue.edge.pack-processing-timeout:10000}")
    private int packProcessingTimeout;
    @Value(value="${queue.edge.consumer-per-partition:false}")
    private boolean consumerPerPartition;
    @Value(value="${queue.edge.pack-processing-retries:3}")
    private int packProcessingRetries;
    @Value(value="${queue.edge.stats.enabled:false}")
    private boolean statsEnabled;
    private final TbCoreQueueFactory queueFactory;
    private final EdgeContextComponent edgeCtx;
    private final EdgeConsumerStats stats;
    private MainQueueConsumerManager<TbProtoQueueMsg<TransportProtos.ToEdgeMsg>, QueueConfig> mainConsumer;

    public DefaultTbEdgeConsumerService(TbCoreQueueFactory tbCoreQueueFactory, ActorSystemContext actorContext, StatsFactory statsFactory, EdgeContextComponent edgeCtx) {
        super(actorContext, null, null, null, null, null, null, null, null);
        this.edgeCtx = edgeCtx;
        this.stats = new EdgeConsumerStats(statsFactory);
        this.queueFactory = tbCoreQueueFactory;
    }

    @PostConstruct
    public void init() {
        super.init("tb-edge");
        this.mainConsumer = MainQueueConsumerManager.builder().queueKey((Object)new QueueKey(ServiceType.TB_CORE).withQueueName("Edge")).config(QueueConfig.of((boolean)this.consumerPerPartition, (long)this.pollInterval)).msgPackProcessor(this::processMsgs).consumerCreator((config, tpi) -> this.queueFactory.createEdgeMsgConsumer()).consumerExecutor(this.consumersExecutor).scheduler(this.scheduler).taskExecutor(this.mgmtExecutor).build();
    }

    @Override
    @PreDestroy
    public void destroy() {
        super.destroy();
    }

    @Override
    protected void startConsumers() {
        super.startConsumers();
    }

    protected void onTbApplicationEvent(PartitionChangeEvent event) {
        Set partitions = event.getEdgePartitions();
        this.log.debug("Subscribing to partitions: {}", (Object)partitions);
        this.mainConsumer.update(partitions);
    }

    private void processMsgs(List<TbProtoQueueMsg<TransportProtos.ToEdgeMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToEdgeMsg>> consumer, Object consumerKey, QueueConfig edgeQueueConfig) throws InterruptedException {
        List<IdMsgPair> orderedMsgList = msgs.stream().map(msg -> new IdMsgPair(UUID.randomUUID(), msg)).toList();
        ConcurrentMap<UUID, TbProtoQueueMsg> pendingMap = orderedMsgList.stream().collect(Collectors.toConcurrentMap(IdMsgPair::getUuid, IdMsgPair::getMsg));
        CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
        TbPackProcessingContext<TbProtoQueueMsg> ctx = new TbPackProcessingContext<TbProtoQueueMsg>(processingTimeoutLatch, pendingMap, new ConcurrentHashMap());
        PendingMsgHolder pendingMsgHolder = new PendingMsgHolder();
        Future<?> submitFuture = this.consumersExecutor.submit(() -> orderedMsgList.forEach(element -> {
            UUID id = element.getUuid();
            TbProtoQueueMsg msg = element.getMsg();
            TbPackCallback callback = new TbPackCallback(id, ctx);
            try {
                TransportProtos.ToEdgeMsg toEdgeMsg = (TransportProtos.ToEdgeMsg)msg.getValue();
                pendingMsgHolder.setMsg(toEdgeMsg);
                if (toEdgeMsg.hasEdgeNotificationMsg()) {
                    this.pushNotificationToEdge(toEdgeMsg.getEdgeNotificationMsg(), 0, this.packProcessingRetries, callback);
                }
                if (this.statsEnabled) {
                    this.stats.log(toEdgeMsg);
                }
            }
            catch (Throwable e) {
                this.log.warn("[{}] Failed to process message: {}", new Object[]{id, msg, e});
                callback.onFailure(e);
            }
        }));
        if (!processingTimeoutLatch.await(this.packProcessingTimeout, TimeUnit.MILLISECONDS)) {
            if (!submitFuture.isDone()) {
                submitFuture.cancel(true);
                this.log.info("Timeout to process message: {}", pendingMsgHolder.getMsg());
            }
            ctx.getFailedMap().forEach((id, msg) -> this.log.warn("[{}] Failed to process message: {}", id, (Object)msg.getValue()));
        }
        consumer.commit();
    }

    @Override
    protected ServiceType getServiceType() {
        return ServiceType.TB_CORE;
    }

    @Override
    protected long getNotificationPollDuration() {
        return this.pollInterval;
    }

    @Override
    protected long getNotificationPackProcessingTimeout() {
        return this.packProcessingTimeout;
    }

    @Override
    protected int getMgmtThreadPoolSize() {
        return Math.max(Runtime.getRuntime().availableProcessors(), 4);
    }

    @Override
    protected TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToEdgeNotificationMsg>> createNotificationsConsumer() {
        return this.queueFactory.createToEdgeNotificationsMsgConsumer();
    }

    @Override
    protected void handleNotification(UUID id, TbProtoQueueMsg<TransportProtos.ToEdgeNotificationMsg> msg, TbCallback callback) {
        TransportProtos.ToEdgeNotificationMsg toEdgeNotificationMsg = (TransportProtos.ToEdgeNotificationMsg)msg.getValue();
        try {
            EdgeRpcService edgeRpcService = this.edgeCtx.getEdgeRpcService();
            if (edgeRpcService == null) {
                this.log.debug("No EdgeRpcService available (edge functionality disabled), ignoring msg: {}", (Object)toEdgeNotificationMsg);
                callback.onSuccess();
                return;
            }
            if (toEdgeNotificationMsg.hasEdgeHighPriority()) {
                EdgeHighPriorityMsg edgeSessionMsg = ProtoUtils.fromProto((TransportProtos.EdgeHighPriorityMsgProto)toEdgeNotificationMsg.getEdgeHighPriority());
                edgeRpcService.onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), (EdgeSessionMsg)edgeSessionMsg);
                callback.onSuccess();
            } else if (toEdgeNotificationMsg.hasEdgeEventUpdate()) {
                EdgeEventUpdateMsg edgeSessionMsg = ProtoUtils.fromProto((TransportProtos.EdgeEventUpdateMsgProto)toEdgeNotificationMsg.getEdgeEventUpdate());
                edgeRpcService.onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), (EdgeSessionMsg)edgeSessionMsg);
                callback.onSuccess();
            } else if (toEdgeNotificationMsg.hasToEdgeSyncRequest()) {
                ToEdgeSyncRequest edgeSessionMsg = ProtoUtils.fromProto((TransportProtos.ToEdgeSyncRequestMsgProto)toEdgeNotificationMsg.getToEdgeSyncRequest());
                edgeRpcService.onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), (EdgeSessionMsg)edgeSessionMsg);
                callback.onSuccess();
            } else if (toEdgeNotificationMsg.hasFromEdgeSyncResponse()) {
                FromEdgeSyncResponse edgeSessionMsg = ProtoUtils.fromProto((TransportProtos.FromEdgeSyncResponseMsgProto)toEdgeNotificationMsg.getFromEdgeSyncResponse());
                edgeRpcService.onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), (EdgeSessionMsg)edgeSessionMsg);
                callback.onSuccess();
            } else if (toEdgeNotificationMsg.hasComponentLifecycle()) {
                ComponentLifecycleMsg componentLifecycle = ProtoUtils.fromProto((TransportProtos.ComponentLifecycleMsgProto)toEdgeNotificationMsg.getComponentLifecycle());
                TenantId tenantId = componentLifecycle.getTenantId();
                EdgeId edgeId = new EdgeId(componentLifecycle.getEntityId().getId());
                if (ComponentLifecycleEvent.DELETED.equals((Object)componentLifecycle.getEvent())) {
                    edgeRpcService.deleteEdge(tenantId, edgeId);
                } else if (ComponentLifecycleEvent.UPDATED.equals((Object)componentLifecycle.getEvent())) {
                    Edge edge = this.edgeCtx.getEdgeService().findEdgeById(tenantId, edgeId);
                    edgeRpcService.updateEdge(tenantId, edge);
                }
                callback.onSuccess();
            }
        }
        catch (Exception e) {
            this.log.error("Error processing edge notification message {}", (Object)toEdgeNotificationMsg, (Object)e);
            callback.onFailure((Throwable)e);
        }
        if (this.statsEnabled) {
            this.stats.log((TransportProtos.ToEdgeNotificationMsg)msg.getValue());
        }
    }

    private void pushNotificationToEdge(final TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg, final int retryCount, final int retryLimit, final TbCallback callback) {
        final TenantId tenantId = TenantId.fromUUID((UUID)new UUID(edgeNotificationMsg.getTenantIdMSB(), edgeNotificationMsg.getTenantIdLSB()));
        this.log.debug("[{}] Pushing notification to edge {}", (Object)tenantId, (Object)edgeNotificationMsg);
        try {
            EdgeEventType type = EdgeEventType.valueOf((String)edgeNotificationMsg.getType());
            ListenableFuture<Void> future = this.edgeCtx.getProcessor(type).processEntityNotification(tenantId, edgeNotificationMsg);
            Futures.addCallback(future, (FutureCallback)new FutureCallback<Void>(){

                public void onSuccess(@Nullable Void unused) {
                    callback.onSuccess();
                }

                public void onFailure(@NotNull Throwable throwable) {
                    if (retryCount < retryLimit) {
                        DefaultTbEdgeConsumerService.this.log.warn("[{}] Retry {} for message due to failure: {}", new Object[]{tenantId, retryCount + 1, throwable.getMessage()});
                        DefaultTbEdgeConsumerService.this.pushNotificationToEdge(edgeNotificationMsg, retryCount + 1, retryLimit, callback);
                    } else {
                        DefaultTbEdgeConsumerService.this.callBackFailure(tenantId, edgeNotificationMsg, callback, throwable);
                    }
                }
            }, (Executor)MoreExecutors.directExecutor());
        }
        catch (Exception e) {
            if (retryCount < retryLimit) {
                this.log.warn("[{}] Retry {} for message due to exception: {}", new Object[]{tenantId, retryCount + 1, e.getMessage()});
                this.pushNotificationToEdge(edgeNotificationMsg, retryCount + 1, retryLimit, callback);
            }
            this.callBackFailure(tenantId, edgeNotificationMsg, callback, e);
        }
    }

    private void callBackFailure(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg, TbCallback callback, Throwable throwable) {
        this.log.error("[{}] Can't push to edge updates, edgeNotificationMsg [{}]", new Object[]{tenantId, edgeNotificationMsg, throwable});
        callback.onFailure(throwable);
    }

    @Scheduled(fixedDelayString="${queue.edge.stats.print-interval-ms}")
    public void printStats() {
        if (this.statsEnabled) {
            this.stats.printStats();
            this.stats.reset();
        }
    }

    @Override
    protected void stopConsumers() {
        super.stopConsumers();
        this.mainConsumer.stop();
        this.mainConsumer.awaitStop();
    }
}

