/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.rule.engine.analytics.incoming;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import java.time.DayOfWeek;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.springframework.data.util.Pair;
import org.springframework.util.StringUtils;
import org.thingsboard.rule.engine.analytics.incoming.AggIntervalType;
import org.thingsboard.rule.engine.analytics.incoming.IntervalPersistPolicy;
import org.thingsboard.rule.engine.analytics.incoming.MathFunction;
import org.thingsboard.rule.engine.analytics.incoming.TbSimpleAggMsgNodeConfiguration;
import org.thingsboard.rule.engine.analytics.incoming.state.TbAvgIntervalState;
import org.thingsboard.rule.engine.analytics.incoming.state.TbCountIntervalState;
import org.thingsboard.rule.engine.analytics.incoming.state.TbCountUniqueIntervalState;
import org.thingsboard.rule.engine.analytics.incoming.state.TbIntervalState;
import org.thingsboard.rule.engine.analytics.incoming.state.TbMaxIntervalState;
import org.thingsboard.rule.engine.analytics.incoming.state.TbMinIntervalState;
import org.thingsboard.rule.engine.analytics.incoming.state.TbSumIntervalState;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.msg.TbMsg;

class TbIntervalTable {
    private final TbContext ctx;
    private final JsonParser gsonParser;
    private final Gson gson = new Gson();
    private final AggIntervalType aggIntervalType;
    private final ZoneId tz;
    private final long intervalDuration;
    private final long intervalTtl;
    private final MathFunction function;
    private final boolean autoCreateIntervals;
    private ConcurrentMap<EntityId, ConcurrentMap<Long, TbIntervalState>> states = new ConcurrentHashMap<EntityId, ConcurrentMap<Long, TbIntervalState>>();

    TbIntervalTable(TbContext ctx, TbSimpleAggMsgNodeConfiguration config, JsonParser gson) {
        long tmpIntervalDuration;
        this.ctx = ctx;
        this.gsonParser = gson;
        AggIntervalType aggIntervalType = this.aggIntervalType = config.getAggIntervalType() == null ? AggIntervalType.CUSTOM : config.getAggIntervalType();
        if (this.aggIntervalType == AggIntervalType.CUSTOM) {
            this.tz = ZoneId.systemDefault();
            tmpIntervalDuration = TimeUnit.valueOf(config.getAggIntervalTimeUnit()).toMillis(config.getAggIntervalValue());
        } else {
            this.tz = ZoneId.of(config.getTimeZoneId());
            tmpIntervalDuration = this.getDefaultIntervalDurationByAggType();
        }
        this.intervalDuration = Math.max(tmpIntervalDuration, TimeUnit.MINUTES.toMillis(1L));
        this.intervalTtl = TimeUnit.valueOf(config.getAggIntervalTimeUnit()).toMillis(config.getAggIntervalValue() * 2);
        this.function = MathFunction.valueOf(config.getMathFunction());
        this.autoCreateIntervals = config.isAutoCreateIntervals();
    }

    void addEntities(TbContext ctx, TbMsg msg, List<EntityId> entities) {
        long ts = System.currentTimeMillis();
        entities.forEach(entityId -> {
            block2: {
                try {
                    this.getByEntityIdAndTs((EntityId)entityId, ts);
                }
                catch (Exception e) {
                    if (msg == null) break block2;
                    ctx.tellFailure(msg, (Throwable)e);
                }
            }
        });
    }

    void cleanupEntities(TbContext ctx) {
        HashSet keys = new HashSet(this.states.keySet());
        keys.stream().filter(entityId -> !ctx.getPeContext().isLocalEntity(entityId)).forEach(this.states::remove);
    }

    Pair<Long, TbIntervalState> getByEntityIdAndTs(EntityId entityId, long ts) throws ExecutionException, InterruptedException {
        long intervalStartTs = this.calculateIntervalStart(ts);
        Map tsStates = this.states.computeIfAbsent(entityId, k -> new ConcurrentHashMap());
        TbIntervalState state = (TbIntervalState)tsStates.get(intervalStartTs);
        if (state == null) {
            state = (TbIntervalState)this.fetchIntervalState(entityId, intervalStartTs).get();
            tsStates.put(intervalStartTs, state);
        }
        return Pair.of((Object)intervalStartTs, (Object)state);
    }

    ListenableFuture<Integer> saveIntervalState(EntityId entityId, long ts, TbIntervalState state) {
        StringDataEntry kvEntry = new StringDataEntry("RuleNodeState_" + this.ctx.getSelfId(), state.toStateJson(this.gson));
        BasicTsKvEntry tsKvEntry = new BasicTsKvEntry(this.calculateIntervalStart(ts), (KvEntry)kvEntry);
        return this.ctx.getTimeseriesService().save(this.ctx.getTenantId(), entityId, (TsKvEntry)tsKvEntry);
    }

    Map<EntityId, Map<Long, TbIntervalState>> getStatesToReport(IntervalPersistPolicy intervalPersistPolicy) {
        long ts = System.currentTimeMillis();
        HashMap<EntityId, Map<Long, TbIntervalState>> updatedStates = new HashMap<EntityId, Map<Long, TbIntervalState>>();
        if (this.autoCreateIntervals) {
            this.states.forEach((entityId, intervals) -> {
                Optional<Long> maxIntervalTs = intervals.keySet().stream().max(Comparator.comparingLong(Long::valueOf));
                if (maxIntervalTs.isPresent()) {
                    for (long tmpTs = maxIntervalTs.get() + this.intervalDuration; tmpTs < ts; tmpTs += this.intervalDuration) {
                        intervals.put(this.calculateIntervalStart(tmpTs), this.createDefaultTbIntervalState());
                    }
                } else {
                    intervals.put(this.calculateIntervalStart(ts), this.createDefaultTbIntervalState());
                }
            });
        }
        this.states.forEach((entityId, intervals) -> {
            Map<Long, TbIntervalState> updatedIntervals;
            Stream<Map.Entry> entryStream = intervals.entrySet().stream().filter(e -> ((TbIntervalState)e.getValue()).hasChangesToReport());
            if (intervalPersistPolicy == IntervalPersistPolicy.ON_EACH_CHECK_AFTER_INTERVAL_END) {
                entryStream = entryStream.filter(e -> (Long)e.getKey() + this.intervalDuration < ts);
            }
            if (!(updatedIntervals = entryStream.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).isEmpty()) {
                updatedIntervals.values().forEach(TbIntervalState::clearChangesToReport);
                updatedStates.put((EntityId)entityId, updatedIntervals);
            }
        });
        return updatedStates;
    }

    Map<EntityId, Map<Long, TbIntervalState>> getStatesToPersist() {
        HashMap<EntityId, Map<Long, TbIntervalState>> updatedStates = new HashMap<EntityId, Map<Long, TbIntervalState>>();
        this.states.forEach((entityId, intervals) -> {
            Map<Long, TbIntervalState> updatedIntervals = intervals.entrySet().stream().filter(e -> ((TbIntervalState)e.getValue()).hasChangesToPersist()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
            if (!updatedIntervals.isEmpty()) {
                updatedIntervals.values().forEach(TbIntervalState::clearChangesToPersist);
                updatedStates.put((EntityId)entityId, updatedIntervals);
            }
        });
        return updatedStates;
    }

    void cleanupStatesUsingTTL() {
        long expTime = System.currentTimeMillis() - this.intervalTtl;
        this.states.forEach((entityId, intervals) -> {
            List<Long> keysToRemove = intervals.keySet().stream().filter(ts -> ts < expTime).collect(Collectors.toList());
            keysToRemove.forEach(intervals::remove);
        });
    }

    private ListenableFuture<TbIntervalState> fetchIntervalState(EntityId entityId, long intervalStartTs) {
        ListenableFuture f = this.ctx.getTimeseriesService().findOne(this.ctx.getTenantId(), entityId, intervalStartTs, "RuleNodeState_" + this.ctx.getSelfId());
        return Futures.transform((ListenableFuture)f, input -> {
            String value = null;
            if (input != null) {
                value = input.getStrValue().orElse(null);
            }
            if (StringUtils.isEmpty(value)) {
                return this.createDefaultTbIntervalState();
            }
            return this.readTbIntervalState(value);
        }, (Executor)MoreExecutors.directExecutor());
    }

    private TbIntervalState readTbIntervalState(String value) {
        JsonElement stateJson = this.gsonParser.parse(value);
        switch (this.function) {
            case MIN: {
                return new TbMinIntervalState(stateJson);
            }
            case MAX: {
                return new TbMaxIntervalState(stateJson);
            }
            case SUM: {
                return new TbSumIntervalState(stateJson);
            }
            case AVG: {
                return new TbAvgIntervalState(stateJson);
            }
            case COUNT: {
                return new TbCountIntervalState(stateJson);
            }
            case COUNT_UNIQUE: {
                return new TbCountUniqueIntervalState(stateJson);
            }
        }
        throw new IllegalArgumentException("Unsupported incoming function: " + this.function.name() + "!");
    }

    private TbIntervalState createDefaultTbIntervalState() {
        switch (this.function) {
            case MIN: {
                return new TbMinIntervalState();
            }
            case MAX: {
                return new TbMaxIntervalState();
            }
            case SUM: {
                return new TbSumIntervalState();
            }
            case AVG: {
                return new TbAvgIntervalState();
            }
            case COUNT: {
                return new TbCountIntervalState();
            }
            case COUNT_UNIQUE: {
                return new TbCountUniqueIntervalState();
            }
        }
        throw new IllegalArgumentException("Unsupported incoming function: " + this.function.name() + "!");
    }

    private long calculateIntervalStart(long ts) {
        if (AggIntervalType.CUSTOM.equals((Object)this.aggIntervalType)) {
            return ts / this.intervalDuration * this.intervalDuration;
        }
        ZonedDateTime zdt = ZonedDateTime.ofInstant(Instant.ofEpochMilli(ts), this.tz);
        switch (this.aggIntervalType) {
            case HOUR: {
                return zdt.truncatedTo(ChronoUnit.HOURS).toInstant().toEpochMilli();
            }
            case DAY: {
                return zdt.truncatedTo(ChronoUnit.DAYS).toInstant().toEpochMilli();
            }
            case WEEK: {
                return zdt.truncatedTo(ChronoUnit.DAYS).with(DayOfWeek.MONDAY).toInstant().toEpochMilli();
            }
            case WEEK_SUN_SAT: {
                return zdt.truncatedTo(ChronoUnit.DAYS).with(DayOfWeek.SUNDAY).toInstant().toEpochMilli();
            }
            case MONTH: {
                return zdt.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1).toInstant().toEpochMilli();
            }
            case YEAR: {
                return zdt.truncatedTo(ChronoUnit.DAYS).withDayOfYear(1).toInstant().toEpochMilli();
            }
        }
        return ts / this.intervalDuration * this.intervalDuration;
    }

    private long getDefaultIntervalDurationByAggType() {
        switch (this.aggIntervalType) {
            case HOUR: {
                return TimeUnit.HOURS.toMillis(1L);
            }
            case DAY: {
                return TimeUnit.DAYS.toMillis(1L);
            }
            case WEEK: 
            case WEEK_SUN_SAT: {
                return TimeUnit.DAYS.toMillis(7L);
            }
            case MONTH: {
                return TimeUnit.DAYS.toMillis(30L);
            }
        }
        return TimeUnit.HOURS.toMillis(365L);
    }
}

