/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.rule.engine.telemetry;

import com.google.common.util.concurrent.FutureCallback;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration;
import org.thingsboard.rule.engine.telemetry.TelemetryNodeCallback;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;

@RuleNode(type=ComponentType.ACTION, name="save timeseries", configClazz=TbMsgTimeseriesNodeConfiguration.class, nodeDescription="Saves timeseries data", nodeDetails="Saves timeseries telemetry data based on configurable TTL parameter. Expects messages with 'POST_TELEMETRY_REQUEST' message type. Timestamp in milliseconds will be taken from metadata.ts, otherwise 'now' message timestamp will be applied. Allows stopping updating values for incoming keys in the latest ts_kv table if 'skipLatestPersistence' is set to true.\n <br/>Enable 'useServerTs' param to use the timestamp of the message processing instead of the timestamp from the message. Useful for all sorts of sequential processing if you merge messages from multiple sources (devices, assets, etc).\n<br/>In the case of sequential processing, the platform guarantees that the messages are processed in the order of their submission to the queue. However, the timestamp of the messages originated by multiple devices/servers may be unsynchronized long before they are pushed to the queue. The DB layer has certain optimizations to ignore the updates of the \"attributes\" and \"latest values\" tables if the new record has a timestamp that is older than the previous record. So, to make sure that all the messages will be processed correctly, one should enable this parameter for sequential message processing scenarios.", uiResources={"static/rulenode/rulenode-core-config.js"}, configDirective="tbActionNodeTimeseriesConfig", icon="file_upload")
public class TbMsgTimeseriesNode
implements TbNode {
    private static final Logger log = LoggerFactory.getLogger(TbMsgTimeseriesNode.class);
    private TbMsgTimeseriesNodeConfiguration config;
    private TbContext ctx;
    private long tenantProfileDefaultStorageTtl;

    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
        this.config = (TbMsgTimeseriesNodeConfiguration)TbNodeUtils.convert((TbNodeConfiguration)configuration, TbMsgTimeseriesNodeConfiguration.class);
        this.ctx = ctx;
        ctx.addTenantProfileListener(this::onTenantProfileUpdate);
        this.onTenantProfileUpdate(ctx.getTenantProfile());
    }

    void onTenantProfileUpdate(TenantProfile tenantProfile) {
        DefaultTenantProfileConfiguration configuration = (DefaultTenantProfileConfiguration)tenantProfile.getProfileData().getConfiguration();
        this.tenantProfileDefaultStorageTtl = TimeUnit.DAYS.toSeconds(configuration.getDefaultStorageTtlDays());
    }

    public void onMsg(TbContext ctx, TbMsg msg) {
        long ttl;
        if (!msg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name())) {
            ctx.tellFailure(msg, (Throwable)new IllegalArgumentException("Unsupported msg type: " + msg.getType()));
            return;
        }
        long ts = TbMsgTimeseriesNode.computeTs(msg, this.config.isUseServerTs());
        String src = msg.getData();
        Map tsKvMap = JsonConverter.convertToTelemetry((JsonElement)new JsonParser().parse(src), (long)ts);
        if (tsKvMap.isEmpty()) {
            ctx.tellFailure(msg, (Throwable)new IllegalArgumentException("Msg body is empty: " + src));
            return;
        }
        ArrayList<BasicTsKvEntry> tsKvEntryList = new ArrayList<BasicTsKvEntry>();
        for (Map.Entry tsKvEntry : tsKvMap.entrySet()) {
            for (KvEntry kvEntry : (List)tsKvEntry.getValue()) {
                tsKvEntryList.add(new BasicTsKvEntry(((Long)tsKvEntry.getKey()).longValue(), kvEntry));
            }
        }
        String ttlValue = msg.getMetaData().getValue("TTL");
        long l = ttl = !StringUtils.isEmpty((String)ttlValue) ? Long.parseLong(ttlValue) : this.config.getDefaultTTL();
        if (ttl == 0L) {
            ttl = this.tenantProfileDefaultStorageTtl;
        }
        if (this.config.isSkipLatestPersistence()) {
            ctx.getTelemetryService().saveWithoutLatestAndNotify(ctx.getTenantId(), msg.getCustomerId(), msg.getOriginator(), tsKvEntryList, ttl, (FutureCallback)new TelemetryNodeCallback(ctx, msg));
        } else {
            ctx.getTelemetryService().saveAndNotify(ctx.getTenantId(), msg.getCustomerId(), msg.getOriginator(), tsKvEntryList, ttl, (FutureCallback)new TelemetryNodeCallback(ctx, msg));
        }
    }

    public static long computeTs(TbMsg msg, boolean ignoreMetadataTs) {
        return ignoreMetadataTs ? System.currentTimeMillis() : msg.getMetaDataTs();
    }

    public void destroy() {
        this.ctx.removeListeners();
    }
}

