/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.rule.engine.edge;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executor;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.edge.AbstractTbMsgPushNode;
import org.thingsboard.rule.engine.edge.TbMsgPushToEdgeNodeConfiguration;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.data.rule.RuleChainType;
import org.thingsboard.server.common.msg.TbMsg;

@RuleNode(type=ComponentType.ACTION, name="push to edge", configClazz=TbMsgPushToEdgeNodeConfiguration.class, nodeDescription="Push messages from cloud to edge", nodeDetails="Push messages from cloud to edge. Message originator must be assigned to particular edge or message originator is <b>EDGE</b> entity itself. This node used only on cloud instances to push messages from cloud to edge. Once message arrived into this node it\u2019s going to be converted into edge event and saved to the database. Node doesn't push messages directly to edge, but stores event(s) in the edge queue. <br>Supports next originator types:<br><code>DEVICE</code><br><code>ASSET</code><br><code>ENTITY_VIEW</code><br><code>DASHBOARD</code><br><code>TENANT</code><br><code>CUSTOMER</code><br><code>EDGE</code><br><br>As well node supports next message types:<br><code>POST_TELEMETRY_REQUEST</code><br><code>POST_ATTRIBUTES_REQUEST</code><br><code>ATTRIBUTES_UPDATED</code><br><code>ATTRIBUTES_DELETED</code><br><code>ALARM</code><br><br>Message will be routed via <b>Failure</b> route if node was not able to save edge event to database or unsupported originator type/message type arrived. In case successful storage edge event to database message will be routed via <b>Success</b> route.", uiResources={"static/rulenode/rulenode-core-config.js"}, configDirective="tbActionNodePushToEdgeConfig", icon="cloud_download", ruleChainTypes={RuleChainType.CORE})
public class TbMsgPushToEdgeNode
extends AbstractTbMsgPushNode<TbMsgPushToEdgeNodeConfiguration, EdgeEvent, EdgeEventType> {
    private static final Logger log = LoggerFactory.getLogger(TbMsgPushToEdgeNode.class);
    static final int DEFAULT_PAGE_SIZE = 100;

    @Override
    EdgeEvent buildEvent(TenantId tenantId, EdgeEventActionType eventAction, UUID entityId, EdgeEventType eventType, JsonNode entityBody) {
        EdgeEvent edgeEvent = new EdgeEvent();
        edgeEvent.setTenantId(tenantId);
        edgeEvent.setAction(eventAction);
        edgeEvent.setEntityId(entityId);
        edgeEvent.setType(eventType);
        edgeEvent.setBody(entityBody);
        return edgeEvent;
    }

    @Override
    EdgeEventType getEventTypeByEntityType(EntityType entityType) {
        return EdgeUtils.getEdgeEventTypeByEntityType((EntityType)entityType);
    }

    @Override
    EdgeEventType getAlarmEventType() {
        return EdgeEventType.ALARM;
    }

    @Override
    String getIgnoredMessageSource() {
        return "edge";
    }

    @Override
    protected Class<TbMsgPushToEdgeNodeConfiguration> getConfigClazz() {
        return TbMsgPushToEdgeNodeConfiguration.class;
    }

    @Override
    protected void processMsg(final TbContext ctx, final TbMsg msg) {
        try {
            if (EntityType.EDGE.equals((Object)msg.getOriginator().getEntityType())) {
                EdgeEvent edgeEvent = (EdgeEvent)this.buildEvent(msg, ctx);
                EdgeId edgeId = new EdgeId(msg.getOriginator().getId());
                ListenableFuture<Void> future = this.notifyEdge(ctx, edgeEvent, edgeId);
                FutureCallback<Void> futureCallback = new FutureCallback<Void>(){

                    public void onSuccess(@Nullable Void result) {
                        ctx.tellSuccess(msg);
                    }

                    public void onFailure(Throwable t) {
                        ctx.tellFailure(msg, t);
                    }
                };
                Futures.addCallback(future, (FutureCallback)futureCallback, (Executor)ctx.getDbCallbackExecutor());
            } else {
                PageData pageData;
                PageLink pageLink = new PageLink(100);
                ArrayList<ListenableFuture<Void>> futures = new ArrayList<ListenableFuture<Void>>();
                do {
                    if ((pageData = ctx.getEdgeService().findRelatedEdgeIdsByEntityId(ctx.getTenantId(), msg.getOriginator(), pageLink)) == null || pageData.getData() == null || pageData.getData().isEmpty()) continue;
                    for (EdgeId edgeId : pageData.getData()) {
                        EdgeEvent edgeEvent = (EdgeEvent)this.buildEvent(msg, ctx);
                        futures.add(this.notifyEdge(ctx, edgeEvent, edgeId));
                    }
                    if (!pageData.hasNext()) continue;
                    pageLink = pageLink.nextPageLink();
                } while (pageData != null && pageData.hasNext());
                if (futures.isEmpty()) {
                    ctx.ack(msg);
                } else {
                    Futures.addCallback((ListenableFuture)Futures.allAsList(futures), (FutureCallback)new FutureCallback<List<Void>>(){

                        public void onSuccess(@Nullable List<Void> voids) {
                            ctx.tellSuccess(msg);
                        }

                        public void onFailure(Throwable t) {
                            ctx.tellFailure(msg, t);
                        }
                    }, (Executor)ctx.getDbCallbackExecutor());
                }
            }
        }
        catch (Exception e) {
            log.error("Failed to build edge event", (Throwable)e);
            ctx.tellFailure(msg, (Throwable)e);
        }
    }

    private ListenableFuture<Void> notifyEdge(TbContext ctx, EdgeEvent edgeEvent, EdgeId edgeId) {
        edgeEvent.setEdgeId(edgeId);
        ListenableFuture future = ctx.getEdgeEventService().saveAsync(edgeEvent);
        return Futures.transform((ListenableFuture)future, result -> {
            ctx.onEdgeEventUpdate(ctx.getTenantId(), edgeId);
            return null;
        }, (Executor)ctx.getDbCallbackExecutor());
    }
}

