package org.thingsboard.server.service.housekeeper;

import jakarta.annotation.PreDestroy;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.housekeeper.HousekeeperTask;
import org.thingsboard.server.common.data.housekeeper.HousekeeperTaskType;
import org.thingsboard.server.common.data.notification.rule.trigger.TaskProcessingFailureTrigger;
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.QueueConsumerManager;
import org.thingsboard.server.queue.housekeeper.HousekeeperConfig;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.housekeeper.processor.HousekeeperTaskProcessor;
import org.thingsboard.server.service.housekeeper.stats.HousekeeperStatsService;

@TbCoreComponent
@Service
/* loaded from: input_file:org/thingsboard/server/service/housekeeper/HousekeeperService.class */
public class HousekeeperService {
    private static final Logger log = LoggerFactory.getLogger(HousekeeperService.class);
    private final Map<HousekeeperTaskType, HousekeeperTaskProcessor<?>> taskProcessors;
    private final HousekeeperConfig config;
    private final HousekeeperReprocessingService reprocessingService;
    private final Optional<HousekeeperStatsService> statsService;
    private final NotificationRuleProcessor notificationRuleProcessor;
    private final QueueConsumerManager<TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>> consumer;
    private final ExecutorService consumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("housekeeper-consumer"));
    private final ExecutorService taskExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("housekeeper-task-processor"));

    public HousekeeperService(HousekeeperConfig housekeeperConfig, HousekeeperReprocessingService housekeeperReprocessingService, TbCoreQueueFactory tbCoreQueueFactory, Optional<HousekeeperStatsService> optional, NotificationRuleProcessor notificationRuleProcessor, @Lazy List<HousekeeperTaskProcessor<?>> list) {
        this.config = housekeeperConfig;
        this.reprocessingService = housekeeperReprocessingService;
        this.statsService = optional;
        this.notificationRuleProcessor = notificationRuleProcessor;
        QueueConsumerManager.QueueConsumerManagerBuilder pollInterval = QueueConsumerManager.builder().name("Housekeeper").msgPackProcessor(this::processMsgs).pollInterval(housekeeperConfig.getPollInterval());
        Objects.requireNonNull(tbCoreQueueFactory);
        this.consumer = pollInterval.consumerCreator(tbCoreQueueFactory::createHousekeeperMsgConsumer).consumerExecutor(this.consumerExecutor).build();
        this.taskProcessors = (Map) list.stream().collect(Collectors.toMap((v0) -> {
            return v0.getTaskType();
        }, housekeeperTaskProcessor -> {
            return housekeeperTaskProcessor;
        }));
    }

    @AfterStartUp(order = 11)
    public void afterStartUp() {
        this.consumer.subscribe();
        this.consumer.launch();
    }

    private void processMsgs(List<TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>> list, TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>> tbQueueConsumer) {
        for (TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg> tbProtoQueueMsg : list) {
            log.trace("Processing task: {}", tbProtoQueueMsg);
            try {
                processTask((TransportProtos.ToHousekeeperServiceMsg) tbProtoQueueMsg.getValue());
            } catch (InterruptedException e) {
                return;
            } catch (Throwable th) {
                log.error("Unexpected error during message processing [{}]", tbProtoQueueMsg, th);
                this.reprocessingService.submitForReprocessing((TransportProtos.ToHousekeeperServiceMsg) tbProtoQueueMsg.getValue(), th);
            }
        }
        tbQueueConsumer.commit();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T extends HousekeeperTask> void processTask(TransportProtos.ToHousekeeperServiceMsg toHousekeeperServiceMsg) throws Exception {
        HousekeeperTask housekeeperTask = (HousekeeperTask) JacksonUtil.fromString(toHousekeeperServiceMsg.getTask().getValue(), HousekeeperTask.class);
        HousekeeperTaskType taskType = housekeeperTask.getTaskType();
        if (this.config.getDisabledTaskTypes().contains(taskType)) {
            log.debug("Task type {} is disabled, ignoring {}", taskType, housekeeperTask);
            return;
        }
        HousekeeperTaskProcessor<?> housekeeperTaskProcessor = this.taskProcessors.get(taskType);
        if (housekeeperTaskProcessor == null) {
            throw new IllegalArgumentException("Unsupported task type " + String.valueOf(taskType));
        }
        Future future = null;
        try {
            try {
                long currentTimeMillis = System.currentTimeMillis();
                future = this.taskExecutor.submit(() -> {
                    housekeeperTaskProcessor.process(housekeeperTask);
                    return null;
                });
                future.get(this.config.getTaskProcessingTimeout(), TimeUnit.MILLISECONDS);
                long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Processed {} in {} ms (attempt {})", new Object[]{housekeeperTask.getTenantId(), housekeeperTask.getDescription(), Long.valueOf(currentTimeMillis2), Integer.valueOf(toHousekeeperServiceMsg.getTask().getAttempt())});
                }
                this.statsService.ifPresent(housekeeperStatsService -> {
                    housekeeperStatsService.reportProcessed(taskType, toHousekeeperServiceMsg, currentTimeMillis2);
                });
                if (future == null || future.isDone()) {
                    return;
                }
                future.cancel(true);
            } catch (InterruptedException e) {
                throw e;
            } catch (Throwable th) {
                Throwable th2 = th;
                if (th instanceof ExecutionException) {
                    th2 = th.getCause();
                } else if (th instanceof TimeoutException) {
                    th2 = new TimeoutException("Timeout after " + this.config.getTaskProcessingTimeout() + " ms");
                }
                if (toHousekeeperServiceMsg.getTask().getAttempt() < this.config.getMaxReprocessingAttempts()) {
                    log.warn("[{}] Failed to process {} (attempt {}), submitting for reprocessing", new Object[]{housekeeperTask.getTenantId(), housekeeperTask.getDescription(), Integer.valueOf(toHousekeeperServiceMsg.getTask().getAttempt()), th2});
                    this.reprocessingService.submitForReprocessing(toHousekeeperServiceMsg, th2);
                } else {
                    log.error("[{}] Failed to process task in {} attempts: {}", new Object[]{housekeeperTask.getTenantId(), Integer.valueOf(toHousekeeperServiceMsg.getTask().getAttempt()), toHousekeeperServiceMsg, th});
                    this.notificationRuleProcessor.process(TaskProcessingFailureTrigger.builder().task(housekeeperTask).error(th2).attempt(toHousekeeperServiceMsg.getTask().getAttempt()).build());
                }
                this.statsService.ifPresent(housekeeperStatsService2 -> {
                    housekeeperStatsService2.reportFailure(taskType, toHousekeeperServiceMsg);
                });
                if (future == null || future.isDone()) {
                    return;
                }
                future.cancel(true);
            }
        } catch (Throwable th3) {
            if (future != null && !future.isDone()) {
                future.cancel(true);
            }
            throw th3;
        }
    }

    @PreDestroy
    private void stop() {
        this.consumer.stop();
        this.consumerExecutor.shutdownNow();
        log.info("Stopped Housekeeper service");
    }
}
