package org.thingsboard.server.service.transaction;

import com.google.protobuf.InvalidProtocolBufferException;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.rule.engine.api.RuleChainTransactionService;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;

@Service
/* loaded from: input_file:org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.class */
public class BaseRuleChainTransactionService implements RuleChainTransactionService {
    private static final Logger log = LoggerFactory.getLogger(BaseRuleChainTransactionService.class);

    @Autowired
    private ClusterRoutingService routingService;

    @Autowired
    private ClusterRpcService clusterRpcService;

    @Autowired
    private DbCallbackExecutorService callbackExecutor;

    @Value("${actors.rule.transaction.queue_size}")
    private int finalQueueSize;

    @Value("${actors.rule.transaction.duration}")
    private long duration;
    private final Lock transactionLock = new ReentrantLock();
    private final ConcurrentMap<EntityId, BlockingQueue<TbTransactionTask>> transactionMap = new ConcurrentHashMap();
    private final Queue<TbTransactionTask> timeoutQueue = new ConcurrentLinkedQueue();
    private ExecutorService timeoutExecutor;

    @PostConstruct
    public void init() {
        this.timeoutExecutor = Executors.newSingleThreadExecutor();
        executeOnTimeout();
    }

    @PreDestroy
    public void destroy() {
        if (this.timeoutExecutor != null) {
            this.timeoutExecutor.shutdownNow();
        }
    }

    public void beginTransaction(TbMsg tbMsg, Consumer<TbMsg> consumer, Consumer<TbMsg> consumer2, Consumer<Throwable> consumer3) {
        this.transactionLock.lock();
        try {
            BlockingQueue<TbTransactionTask> computeIfAbsent = this.transactionMap.computeIfAbsent(tbMsg.getTransactionData().getOriginatorId(), entityId -> {
                return new LinkedBlockingQueue(this.finalQueueSize);
            });
            TbTransactionTask tbTransactionTask = new TbTransactionTask(tbMsg, consumer, consumer2, consumer3, System.currentTimeMillis() + this.duration);
            int size = computeIfAbsent.size();
            if (size >= this.finalQueueSize) {
                executeOnFailure(tbTransactionTask.getOnFailure(), "Queue has no space!");
            } else {
                addMsgToQueues(computeIfAbsent, tbTransactionTask);
                if (size == 0) {
                    executeOnSuccess(tbTransactionTask.getOnStart(), tbTransactionTask.getMsg());
                } else {
                    log.trace("Msg [{}][{}] is waiting to start transaction!", tbMsg.getId(), tbMsg.getType());
                }
            }
        } finally {
            this.transactionLock.unlock();
        }
    }

    public void endTransaction(TbMsg tbMsg, Consumer<TbMsg> consumer, Consumer<Throwable> consumer2) {
        EntityId originatorId = tbMsg.getTransactionData().getOriginatorId();
        UUID transactionId = tbMsg.getTransactionData().getTransactionId();
        Optional<ServerAddress> resolveById = this.routingService.resolveById(originatorId);
        if (!resolveById.isPresent()) {
            endLocalTransaction(transactionId, originatorId, consumer, consumer2);
        } else {
            sendTransactionEventToRemoteServer(originatorId, transactionId, resolveById.get());
            executeOnSuccess(consumer, tbMsg);
        }
    }

    public void onRemoteTransactionMsg(ServerAddress serverAddress, byte[] bArr) {
        try {
            ClusterAPIProtos.TransactionEndServiceMsgProto parseFrom = ClusterAPIProtos.TransactionEndServiceMsgProto.parseFrom(bArr);
            endLocalTransaction(new UUID(parseFrom.getTransactionIdMSB(), parseFrom.getTransactionIdLSB()), EntityIdFactory.getByTypeAndUuid(parseFrom.getEntityType(), new UUID(parseFrom.getOriginatorIdMSB(), parseFrom.getOriginatorIdLSB())), tbMsg -> {
            }, th -> {
            });
        } catch (InvalidProtocolBufferException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    private void addMsgToQueues(BlockingQueue<TbTransactionTask> blockingQueue, TbTransactionTask tbTransactionTask) {
        blockingQueue.offer(tbTransactionTask);
        this.timeoutQueue.offer(tbTransactionTask);
        log.trace("Added msg to queue, size: [{}]", Integer.valueOf(blockingQueue.size()));
    }

    private void endLocalTransaction(UUID uuid, EntityId entityId, Consumer<TbMsg> consumer, Consumer<Throwable> consumer2) {
        this.transactionLock.lock();
        try {
            BlockingQueue<TbTransactionTask> computeIfAbsent = this.transactionMap.computeIfAbsent(entityId, entityId2 -> {
                return new LinkedBlockingQueue(this.finalQueueSize);
            });
            TbTransactionTask peek = computeIfAbsent.peek();
            if (peek == null) {
                log.trace("Queue is empty, previous task has expired!");
                executeOnFailure(consumer2, "Queue is empty, previous task has expired!");
            } else if (peek.getMsg().getTransactionData().getTransactionId().equals(uuid)) {
                peek.setCompleted(true);
                computeIfAbsent.poll();
                log.trace("Removed msg from queue, size [{}]", Integer.valueOf(computeIfAbsent.size()));
                executeOnSuccess(peek.getOnEnd(), peek.getMsg());
                executeOnSuccess(consumer, peek.getMsg());
                TbTransactionTask peek2 = computeIfAbsent.peek();
                if (peek2 != null) {
                    executeOnSuccess(peek2.getOnStart(), peek2.getMsg());
                }
            } else {
                log.trace("Task has expired!");
                executeOnFailure(consumer2, "Task has expired!");
            }
        } finally {
            this.transactionLock.unlock();
        }
    }

    private void executeOnTimeout() {
        this.timeoutExecutor.submit(() -> {
            while (true) {
                TbTransactionTask peek = this.timeoutQueue.peek();
                if (peek != null) {
                    long j = 0;
                    this.transactionLock.lock();
                    try {
                        if (peek.isCompleted()) {
                            this.timeoutQueue.poll();
                        } else {
                            long expirationTime = peek.getExpirationTime() - System.currentTimeMillis();
                            if (expirationTime < 0) {
                                log.trace("Task has expired! Deleting it...[{}][{}]", peek.getMsg().getId(), peek.getMsg().getType());
                                this.timeoutQueue.poll();
                                executeOnFailure(peek.getOnFailure(), "Task has expired!");
                                BlockingQueue<TbTransactionTask> blockingQueue = this.transactionMap.get(peek.getMsg().getTransactionData().getOriginatorId());
                                if (blockingQueue != null) {
                                    blockingQueue.poll();
                                    TbTransactionTask peek2 = blockingQueue.peek();
                                    if (peek2 != null) {
                                        executeOnSuccess(peek2.getOnStart(), peek2.getMsg());
                                    }
                                }
                            } else {
                                j = Math.min(expirationTime, this.duration);
                            }
                        }
                        if (j > 0) {
                            try {
                                log.trace("Task has not expired! Continue executing...[{}][{}]", peek.getMsg().getId(), peek.getMsg().getType());
                                TimeUnit.MILLISECONDS.sleep(j);
                            } catch (InterruptedException e) {
                                throw new IllegalStateException("Thread interrupted", e);
                            }
                        } else {
                            continue;
                        }
                    } finally {
                        this.transactionLock.unlock();
                    }
                } else {
                    try {
                        log.trace("Queue is empty, waiting for tasks!");
                        TimeUnit.SECONDS.sleep(1L);
                    } catch (InterruptedException e2) {
                        throw new IllegalStateException("Thread interrupted", e2);
                    }
                }
            }
        });
    }

    private void executeOnFailure(Consumer<Throwable> consumer, String str) {
        executeCallback(() -> {
            consumer.accept(new RuntimeException(str));
            return null;
        });
    }

    private void executeOnSuccess(Consumer<TbMsg> consumer, TbMsg tbMsg) {
        executeCallback(() -> {
            consumer.accept(tbMsg);
            return null;
        });
    }

    private void executeCallback(Callable<Void> callable) {
        this.callbackExecutor.executeAsync(callable);
    }

    private void sendTransactionEventToRemoteServer(EntityId entityId, UUID uuid, ServerAddress serverAddress) {
        log.trace("[{}][{}] Originator is monitored on other server: {}", new Object[]{entityId, uuid, serverAddress});
        ClusterAPIProtos.TransactionEndServiceMsgProto.Builder newBuilder = ClusterAPIProtos.TransactionEndServiceMsgProto.newBuilder();
        newBuilder.setEntityType(entityId.getEntityType().name());
        newBuilder.setOriginatorIdMSB(entityId.getId().getMostSignificantBits());
        newBuilder.setOriginatorIdLSB(entityId.getId().getLeastSignificantBits());
        newBuilder.setTransactionIdMSB(uuid.getMostSignificantBits());
        newBuilder.setTransactionIdLSB(uuid.getLeastSignificantBits());
        this.clusterRpcService.tell(serverAddress, ClusterAPIProtos.MessageType.CLUSTER_TRANSACTION_SERVICE_MESSAGE, newBuilder.build().toByteArray());
    }
}
