/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.rule.engine.mqtt;

import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.ssl.SslContext;
import io.netty.util.concurrent.Future;
import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.net.ssl.SSLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import org.thingsboard.mqtt.MqttClient;
import org.thingsboard.mqtt.MqttClientConfig;
import org.thingsboard.mqtt.MqttConnectResult;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.credentials.BasicCredentials;
import org.thingsboard.rule.engine.credentials.ClientCredentials;
import org.thingsboard.rule.engine.credentials.CredentialsType;
import org.thingsboard.rule.engine.mqtt.TbMqttNodeConfiguration;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;

@RuleNode(type=ComponentType.EXTERNAL, name="mqtt", configClazz=TbMqttNodeConfiguration.class, nodeDescription="Publish messages to the MQTT broker", nodeDetails="Will publish message payload to the MQTT broker with QoS <b>AT_LEAST_ONCE</b>.", uiResources={"static/rulenode/rulenode-core-config.js"}, configDirective="tbActionNodeMqttConfig", icon="call_split")
public class TbMqttNode
implements TbNode {
    private static final Logger log = LoggerFactory.getLogger(TbMqttNode.class);
    private static final Charset UTF8 = Charset.forName("UTF-8");
    private static final String ERROR = "error";
    protected TbMqttNodeConfiguration mqttNodeConfiguration;
    protected MqttClient mqttClient;

    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
        try {
            this.mqttNodeConfiguration = (TbMqttNodeConfiguration)TbNodeUtils.convert((TbNodeConfiguration)configuration, TbMqttNodeConfiguration.class);
            this.mqttClient = this.initClient(ctx);
        }
        catch (Exception e) {
            throw new TbNodeException(e);
        }
    }

    public void onMsg(TbContext ctx, TbMsg msg) {
        String topic = TbNodeUtils.processPattern((String)this.mqttNodeConfiguration.getTopicPattern(), (TbMsg)msg);
        this.mqttClient.publish(topic, Unpooled.wrappedBuffer((byte[])msg.getData().getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE).addListener(future -> {
            if (future.isSuccess()) {
                ctx.tellSuccess(msg);
            } else {
                TbMsg next = this.processException(ctx, msg, future.cause());
                ctx.tellFailure(next, future.cause());
            }
        });
    }

    private TbMsg processException(TbContext ctx, TbMsg origMsg, Throwable e) {
        TbMsgMetaData metaData = origMsg.getMetaData().copy();
        metaData.putValue(ERROR, e.getClass() + ": " + e.getMessage());
        return ctx.transformMsg(origMsg, origMsg.getType(), origMsg.getOriginator(), metaData, origMsg.getData());
    }

    public void destroy() {
        if (this.mqttClient != null) {
            this.mqttClient.disconnect();
        }
    }

    protected MqttClient initClient(TbContext ctx) throws Exception {
        MqttConnectResult result;
        MqttClientConfig config = new MqttClientConfig(this.getSslContext());
        if (!StringUtils.isEmpty((Object)this.mqttNodeConfiguration.getClientId())) {
            config.setClientId(this.mqttNodeConfiguration.getClientId());
        }
        config.setCleanSession(this.mqttNodeConfiguration.isCleanSession());
        this.prepareMqttClientConfig(config);
        MqttClient client = MqttClient.create((MqttClientConfig)config, null);
        client.setEventLoop(ctx.getSharedEventLoop());
        Future connectFuture = client.connect(this.mqttNodeConfiguration.getHost(), this.mqttNodeConfiguration.getPort());
        try {
            result = (MqttConnectResult)connectFuture.get((long)this.mqttNodeConfiguration.getConnectTimeoutSec(), TimeUnit.SECONDS);
        }
        catch (TimeoutException ex) {
            connectFuture.cancel(true);
            client.disconnect();
            String hostPort = this.mqttNodeConfiguration.getHost() + ":" + this.mqttNodeConfiguration.getPort();
            throw new RuntimeException(String.format("Failed to connect to MQTT broker at %s.", hostPort));
        }
        if (!result.isSuccess()) {
            connectFuture.cancel(true);
            client.disconnect();
            String hostPort = this.mqttNodeConfiguration.getHost() + ":" + this.mqttNodeConfiguration.getPort();
            throw new RuntimeException(String.format("Failed to connect to MQTT broker at %s. Result code is: %s", hostPort, result.getReturnCode()));
        }
        return client;
    }

    protected void prepareMqttClientConfig(MqttClientConfig config) throws SSLException {
        ClientCredentials credentials = this.mqttNodeConfiguration.getCredentials();
        if (credentials.getType() == CredentialsType.BASIC) {
            BasicCredentials basicCredentials = (BasicCredentials)credentials;
            config.setUsername(basicCredentials.getUsername());
            config.setPassword(basicCredentials.getPassword());
        }
    }

    private SslContext getSslContext() throws SSLException {
        return this.mqttNodeConfiguration.isSsl() ? this.mqttNodeConfiguration.getCredentials().initSslContext() : null;
    }
}

