/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.rule.engine.analytics.latest;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Executor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.rule.engine.analytics.latest.TbAbstractLatestNodeConfiguration;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.TbRelationTypes;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.session.SessionMsgType;

public abstract class TbAbstractLatestNode<C extends TbAbstractLatestNodeConfiguration>
implements TbNode {
    private static final Logger log = LoggerFactory.getLogger(TbAbstractLatestNode.class);
    private final Gson gson = new Gson();
    protected C config;
    private long delay;
    private long lastScheduledTs;
    private UUID nextTickId;

    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
        this.config = this.loadMapperNodeConfig(configuration);
        this.delay = ((TbAbstractLatestNodeConfiguration)this.config).getPeriodTimeUnit().toMillis(((TbAbstractLatestNodeConfiguration)this.config).getPeriodValue());
        this.scheduleTickMsg(ctx);
    }

    public void onMsg(TbContext ctx, TbMsg msg) {
        if (msg.getType().equals(this.tickMessageType()) && msg.getId().equals(this.nextTickId)) {
            DonAsynchron.withCallback(this.aggregate(ctx), m -> this.scheduleTickMsg(ctx), t -> {
                ctx.tellFailure(msg, t);
                this.scheduleTickMsg(ctx);
            });
        }
    }

    private void scheduleTickMsg(TbContext ctx) {
        long curTs = System.currentTimeMillis();
        if (this.lastScheduledTs == 0L) {
            this.lastScheduledTs = curTs;
        }
        this.lastScheduledTs += this.delay;
        long curDelay = Math.max(0L, this.lastScheduledTs - curTs);
        TbMsg tickMsg = ctx.newMsg("Main", this.tickMessageType(), (EntityId)ctx.getSelfId(), new TbMsgMetaData(), "");
        this.nextTickId = tickMsg.getId();
        ctx.tellSelf(tickMsg, curDelay);
    }

    private ListenableFuture<List<TbMsg>> aggregate(TbContext ctx) {
        ListenableFuture<List<EntityId>> parentEntityIdsFuture = ((TbAbstractLatestNodeConfiguration)this.config).getParentEntitiesQuery().getParentEntitiesAsync(ctx);
        return Futures.transformAsync(parentEntityIdsFuture, parentEntityIds -> {
            ArrayList msgFutures = new ArrayList();
            String dataTs = Long.toString(System.currentTimeMillis());
            parentEntityIds.forEach(parentEntityId -> {
                Map<EntityId, List<ListenableFuture<Optional<JsonObject>>>> aggregateFuturesMap = this.doParentAggregations(ctx, (EntityId)parentEntityId);
                aggregateFuturesMap.forEach((originatorId, aggregateFutures) -> aggregateFutures.forEach(aggregateFuture -> {
                    ListenableFuture aggregateFutureWithFallback = Futures.catching((ListenableFuture)aggregateFuture, Throwable.class, e -> {
                        TbMsg msg = TbMsg.newMsg((String)SessionMsgType.POST_TELEMETRY_REQUEST.name(), (EntityId)originatorId, (TbMsgMetaData)new TbMsgMetaData(), (TbMsgDataType)TbMsgDataType.JSON, (String)"");
                        ctx.enqueueForTellFailure(msg, e.getMessage());
                        return Optional.empty();
                    }, (Executor)MoreExecutors.directExecutor());
                    ListenableFuture msgFuture = Futures.transform((ListenableFuture)aggregateFutureWithFallback, element -> {
                        if (element.isPresent()) {
                            TbMsgMetaData metaData = new TbMsgMetaData();
                            metaData.putValue("ts", dataTs);
                            JsonObject messageData = (JsonObject)element.get();
                            TbMsg msg = TbMsg.newMsg((String)SessionMsgType.POST_TELEMETRY_REQUEST.name(), (EntityId)originatorId, (TbMsgMetaData)metaData, (String)this.gson.toJson((JsonElement)messageData));
                            ctx.enqueueForTellNext(msg, TbRelationTypes.SUCCESS);
                            return msg;
                        }
                        return null;
                    }, (Executor)MoreExecutors.directExecutor());
                    msgFutures.add(msgFuture);
                }));
            });
            return Futures.allAsList(msgFutures);
        }, (Executor)ctx.getDbCallbackExecutor());
    }

    protected abstract C loadMapperNodeConfig(TbNodeConfiguration var1) throws TbNodeException;

    protected abstract String tickMessageType();

    protected abstract Map<EntityId, List<ListenableFuture<Optional<JsonObject>>>> doParentAggregations(TbContext var1, EntityId var2);
}

