/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.service.housekeeper;

import jakarta.annotation.PreDestroy;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.housekeeper.HousekeeperTask;
import org.thingsboard.server.common.data.housekeeper.HousekeeperTaskType;
import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTrigger;
import org.thingsboard.server.common.data.notification.rule.trigger.TaskProcessingFailureTrigger;
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.QueueConsumerManager;
import org.thingsboard.server.queue.housekeeper.HousekeeperConfig;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.housekeeper.HousekeeperReprocessingService;
import org.thingsboard.server.service.housekeeper.processor.HousekeeperTaskProcessor;
import org.thingsboard.server.service.housekeeper.stats.HousekeeperStatsService;

@TbCoreComponent
@Service
public class HousekeeperService {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(HousekeeperService.class);
    private final Map<HousekeeperTaskType, HousekeeperTaskProcessor<?>> taskProcessors;
    private final HousekeeperConfig config;
    private final HousekeeperReprocessingService reprocessingService;
    private final Optional<HousekeeperStatsService> statsService;
    private final NotificationRuleProcessor notificationRuleProcessor;
    private final QueueConsumerManager<TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>> consumer;
    private final ExecutorService consumerExecutor = Executors.newSingleThreadExecutor((ThreadFactory)ThingsBoardThreadFactory.forName((String)"housekeeper-consumer"));
    private final ExecutorService taskExecutor = Executors.newSingleThreadExecutor((ThreadFactory)ThingsBoardThreadFactory.forName((String)"housekeeper-task-processor"));

    public HousekeeperService(HousekeeperConfig config, HousekeeperReprocessingService reprocessingService, TbCoreQueueFactory queueFactory, Optional<HousekeeperStatsService> statsService, NotificationRuleProcessor notificationRuleProcessor, @Lazy List<HousekeeperTaskProcessor<?>> taskProcessors) {
        this.config = config;
        this.reprocessingService = reprocessingService;
        this.statsService = statsService;
        this.notificationRuleProcessor = notificationRuleProcessor;
        this.consumer = QueueConsumerManager.builder().name("Housekeeper").msgPackProcessor(this::processMsgs).pollInterval((long)config.getPollInterval()).consumerCreator(() -> ((TbCoreQueueFactory)queueFactory).createHousekeeperMsgConsumer()).consumerExecutor(this.consumerExecutor).build();
        this.taskProcessors = taskProcessors.stream().collect(Collectors.toMap(HousekeeperTaskProcessor::getTaskType, p -> p));
    }

    @AfterStartUp(order=11)
    public void afterStartUp() {
        this.consumer.subscribe();
        this.consumer.launch();
    }

    private void processMsgs(List<TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>> consumer) {
        for (TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg> msg : msgs) {
            log.trace("Processing task: {}", msg);
            try {
                this.processTask((TransportProtos.ToHousekeeperServiceMsg)msg.getValue());
            }
            catch (InterruptedException e) {
                return;
            }
            catch (Throwable e) {
                log.error("Unexpected error during message processing [{}]", msg, (Object)e);
                this.reprocessingService.submitForReprocessing((TransportProtos.ToHousekeeperServiceMsg)msg.getValue(), e);
            }
        }
        consumer.commit();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected <T extends HousekeeperTask> void processTask(TransportProtos.ToHousekeeperServiceMsg msg) throws Exception {
        HousekeeperTask task = (HousekeeperTask)JacksonUtil.fromString((String)msg.getTask().getValue(), HousekeeperTask.class);
        HousekeeperTaskType taskType = task.getTaskType();
        if (this.config.getDisabledTaskTypes().contains(taskType)) {
            log.debug("Task type {} is disabled, ignoring {}", (Object)taskType, (Object)task);
            return;
        }
        HousekeeperTaskProcessor<?> taskProcessor = this.taskProcessors.get(taskType);
        if (taskProcessor == null) {
            throw new IllegalArgumentException("Unsupported task type " + String.valueOf(taskType));
        }
        Future<Object> future = null;
        try {
            long startTs = System.currentTimeMillis();
            future = this.taskExecutor.submit(() -> {
                taskProcessor.process(task);
                return null;
            });
            future.get(this.config.getTaskProcessingTimeout(), TimeUnit.MILLISECONDS);
            long timing = System.currentTimeMillis() - startTs;
            if (log.isDebugEnabled()) {
                log.debug("[{}] Processed {} in {} ms (attempt {})", new Object[]{task.getTenantId(), task.getDescription(), timing, msg.getTask().getAttempt()});
            }
            this.statsService.ifPresent(statsService -> statsService.reportProcessed(taskType, msg, timing));
        }
        catch (InterruptedException e) {
            throw e;
        }
        catch (Throwable e) {
            Throwable error = e;
            if (e instanceof ExecutionException) {
                error = e.getCause();
            } else if (e instanceof TimeoutException) {
                error = new TimeoutException("Timeout after " + this.config.getTaskProcessingTimeout() + " ms");
            }
            if (msg.getTask().getAttempt() < this.config.getMaxReprocessingAttempts()) {
                log.warn("[{}] Failed to process {} (attempt {}), submitting for reprocessing", new Object[]{task.getTenantId(), task.getDescription(), msg.getTask().getAttempt(), error});
                this.reprocessingService.submitForReprocessing(msg, error);
            } else {
                log.error("[{}] Failed to process task in {} attempts: {}", new Object[]{task.getTenantId(), msg.getTask().getAttempt(), msg, e});
                this.notificationRuleProcessor.process((NotificationRuleTrigger)TaskProcessingFailureTrigger.builder().task(task).error(error).attempt(msg.getTask().getAttempt()).build());
            }
            this.statsService.ifPresent(statsService -> statsService.reportFailure(taskType, msg));
        }
        finally {
            if (future != null && !future.isDone()) {
                future.cancel(true);
            }
        }
    }

    @PreDestroy
    private void stop() {
        this.consumer.stop();
        this.consumerExecutor.shutdownNow();
        this.taskExecutor.shutdownNow();
        log.info("Stopped Housekeeper service");
    }
}

