/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.queue.task;

import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.SetCache;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.job.JobType;
import org.thingsboard.server.common.data.job.task.Task;
import org.thingsboard.server.common.data.job.task.TaskResult;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.data.queue.QueueConfig;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.settings.TasksQueueConfig;
import org.thingsboard.server.queue.task.JobStatsService;
import org.thingsboard.server.queue.task.TaskProcessorExecutors;
import org.thingsboard.server.queue.task.TaskProcessorQueueFactory;

public abstract class TaskProcessor<T extends Task<R>, R extends TaskResult> {
    protected final Logger log = LoggerFactory.getLogger(this.getClass());
    @Autowired
    private TaskProcessorQueueFactory queueFactory;
    @Autowired
    private JobStatsService statsService;
    @Autowired
    private TaskProcessorExecutors executors;
    @Autowired
    private TasksQueueConfig config;
    private QueueKey queueKey;
    private MainQueueConsumerManager<TbProtoQueueMsg<TransportProtos.TaskProto>, QueueConfig> taskConsumer;
    private final ExecutorService taskExecutor = Executors.newCachedThreadPool((ThreadFactory)ThingsBoardThreadFactory.forName((String)(this.getJobType().name().toLowerCase() + "-task-processor")));
    protected final Map<Object, Pair<Task<R>, Future<R>>> currentTasks = new ConcurrentHashMap<Object, Pair<Task<R>, Future<R>>>();
    private final SetCache<String> discarded = new SetCache(TimeUnit.MINUTES.toMillis(60L));
    private final SetCache<String> failed = new SetCache(TimeUnit.MINUTES.toMillis(60L));
    private final SetCache<UUID> deletedTenants = new SetCache(TimeUnit.MINUTES.toMillis(60L));

    @PostConstruct
    public void init() {
        this.queueKey = new QueueKey(ServiceType.TASK_PROCESSOR, this.getJobType().name());
        this.taskConsumer = MainQueueConsumerManager.builder().queueKey(this.queueKey).config(QueueConfig.of((boolean)true, (long)this.config.getPollInterval())).msgPackProcessor(this::processMsgs).consumerCreator((queueConfig, tpi) -> this.queueFactory.createTaskConsumer(this.getJobType())).consumerExecutor(this.executors.getConsumersExecutor()).scheduler(this.executors.getScheduler()).taskExecutor(this.executors.getMgmtExecutor()).build();
    }

    @EventListener
    public void onPartitionChangeEvent(PartitionChangeEvent event) {
        if (event.getServiceType() == ServiceType.TASK_PROCESSOR) {
            Set<TopicPartitionInfo> partitions = event.getNewPartitions().get(this.queueKey);
            this.taskConsumer.update(partitions);
        }
    }

    @EventListener
    public void onComponentLifecycle(ComponentLifecycleMsg event) {
        EntityId entityId = event.getEntityId();
        switch (entityId.getEntityType()) {
            case JOB: {
                String tasksKey = event.getInfo().get("tasksKey").asText();
                if (event.getEvent() == ComponentLifecycleEvent.STOPPED) {
                    this.log.info("Adding job {} ({}) to discarded", (Object)entityId, (Object)tasksKey);
                    this.addToDiscarded(tasksKey);
                    this.cancelRunningTasks(tasksKey);
                    break;
                }
                if (event.getEvent() != ComponentLifecycleEvent.FAILED) break;
                this.log.info("Adding job {} ({}) to failed", (Object)entityId, (Object)tasksKey);
                this.failed.add((Object)tasksKey);
                this.cancelRunningTasks(tasksKey);
                break;
            }
            case TENANT: {
                if (event.getEvent() != ComponentLifecycleEvent.DELETED) break;
                this.log.info("Adding tenant {} to deleted", (Object)entityId);
                this.deletedTenants.add((Object)entityId.getId());
                this.cancelRunningTasks((TenantId)entityId);
            }
        }
    }

    private void processMsgs(List<TbProtoQueueMsg<TransportProtos.TaskProto>> msgs, TbQueueConsumer<TbProtoQueueMsg<TransportProtos.TaskProto>> consumer, Object consumerKey, QueueConfig queueConfig) throws Exception {
        for (TbProtoQueueMsg<TransportProtos.TaskProto> msg : msgs) {
            try {
                Task task = (Task)JacksonUtil.fromString((String)msg.getValue().getValue(), Task.class);
                if (this.discarded.contains((Object)task.getKey())) {
                    this.log.debug("Skipping task for discarded job {}: {}", (Object)task.getJobId(), (Object)task);
                    this.reportTaskDiscarded(task);
                    continue;
                }
                if (this.failed.contains((Object)task.getKey())) {
                    this.log.debug("Skipping task for failed job {}: {}", (Object)task.getJobId(), (Object)task);
                    continue;
                }
                if (this.deletedTenants.contains((Object)task.getTenantId().getId())) {
                    this.log.debug("Skipping task for deleted tenant {}: {}", (Object)task.getTenantId(), (Object)task);
                    continue;
                }
                this.processTask(task, consumerKey);
            }
            catch (InterruptedException e) {
                throw e;
            }
            catch (Exception e) {
                this.log.error("Failed to process msg: {}", msg, (Object)e);
            }
        }
        consumer.commit();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processTask(T task, Object consumerKey) throws InterruptedException {
        task.setAttempt(task.getAttempt() + 1);
        this.log.debug("Processing task: {}", task);
        Future<TaskResult> future = null;
        try {
            TaskResult result;
            long startNs = System.nanoTime();
            long timeoutMs = this.getProcessingTimeout(task);
            future = this.taskExecutor.submit(() -> this.process(task));
            this.currentTasks.put(consumerKey, Pair.of(task, future));
            try {
                result = future.get(timeoutMs, TimeUnit.MILLISECONDS);
            }
            catch (ExecutionException e) {
                throw e.getCause();
            }
            catch (TimeoutException e) {
                throw new TimeoutException("Timeout after " + timeoutMs + " ms");
            }
            long timingNs = System.nanoTime() - startNs;
            this.log.info("Processed task in {} ms: {}", (Object)((double)timingNs / 1000000.0), task);
            this.reportTaskResult(task, result);
        }
        catch (InterruptedException e) {
            throw e;
        }
        catch (CancellationException e) {
            if (!this.failed.contains((Object)task.getKey()) && !this.deletedTenants.contains((Object)task.getTenantId().getId())) {
                this.reportTaskDiscarded(task);
            }
        }
        catch (Throwable e) {
            this.log.error("Failed to process task (attempt {}): {}", new Object[]{task.getAttempt(), task, e});
            if (task.getAttempt() <= task.getRetries()) {
                this.processTask(task, consumerKey);
            } else {
                this.reportTaskFailure(task, e);
            }
        }
        finally {
            if (future != null && !future.isDone()) {
                future.cancel(true);
            }
            this.currentTasks.remove(consumerKey);
        }
    }

    public abstract R process(T var1) throws Exception;

    private void cancelRunningTasks(String tasksKey) {
        this.cancelRunningTasks((Task<R> task) -> task.getKey().equals(tasksKey));
    }

    private void cancelRunningTasks(TenantId tenantId) {
        this.cancelRunningTasks((Task<R> task) -> task.getTenantId().equals((Object)tenantId));
    }

    private void cancelRunningTasks(Predicate<Task<R>> filter) {
        this.currentTasks.values().forEach(entry -> {
            Task task = (Task)entry.getKey();
            Future future = (Future)entry.getValue();
            if (filter.test(task)) {
                this.log.debug("Cancelling running task {}", (Object)task);
                future.cancel(true);
            }
        });
    }

    private void reportTaskFailure(T task, Throwable error) {
        TaskResult taskResult = task.toFailed(error);
        this.reportTaskResult(task, taskResult);
    }

    private void reportTaskDiscarded(T task) {
        TaskResult taskResult = task.toDiscarded();
        this.reportTaskResult(task, taskResult);
    }

    private void reportTaskResult(T task, R result) {
        result.setKey(task.getKey());
        result.setFinishTs(System.currentTimeMillis());
        this.statsService.reportTaskResult(task.getTenantId(), task.getJobId(), (TaskResult)result);
    }

    public void addToDiscarded(String tasksKey) {
        this.discarded.add((Object)tasksKey);
    }

    protected <V> V wait(Future<V> future) throws Exception {
        try {
            return future.get();
        }
        catch (InterruptedException e) {
            future.cancel(true);
            throw e;
        }
    }

    @PreDestroy
    public void destroy() {
        this.taskConsumer.stop();
        this.taskConsumer.awaitStop();
        this.taskExecutor.shutdownNow();
    }

    public abstract long getProcessingTimeout(T var1);

    public abstract JobType getJobType();
}

