/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.edge.rpc;

import io.grpc.Channel;
import io.grpc.HttpConnectProxiedSocketAddress;
import io.grpc.ManagedChannel;
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder;
import io.grpc.stub.StreamObserver;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import javax.net.ssl.SSLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.edge.exception.EdgeConnectionException;
import org.thingsboard.edge.rpc.EdgeRpcClient;
import org.thingsboard.server.common.data.ResourceUtils;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.gen.edge.v1.ConnectRequestMsg;
import org.thingsboard.server.gen.edge.v1.ConnectResponseCode;
import org.thingsboard.server.gen.edge.v1.ConnectResponseMsg;
import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
import org.thingsboard.server.gen.edge.v1.DownlinkResponseMsg;
import org.thingsboard.server.gen.edge.v1.EdgeConfiguration;
import org.thingsboard.server.gen.edge.v1.EdgeRpcServiceGrpc;
import org.thingsboard.server.gen.edge.v1.EdgeVersion;
import org.thingsboard.server.gen.edge.v1.RequestMsg;
import org.thingsboard.server.gen.edge.v1.RequestMsgType;
import org.thingsboard.server.gen.edge.v1.ResponseMsg;
import org.thingsboard.server.gen.edge.v1.SyncRequestMsg;
import org.thingsboard.server.gen.edge.v1.UplinkMsg;
import org.thingsboard.server.gen.edge.v1.UplinkResponseMsg;

@Service
public class EdgeGrpcClient
implements EdgeRpcClient {
    private static final Logger log = LoggerFactory.getLogger(EdgeGrpcClient.class);
    @Value(value="${cloud.rpc.host}")
    private String rpcHost;
    @Value(value="${cloud.rpc.port}")
    private int rpcPort;
    @Value(value="${cloud.rpc.timeout}")
    private int timeoutSecs;
    @Value(value="${cloud.rpc.keep_alive_time_sec:10}")
    private int keepAliveTimeSec;
    @Value(value="${cloud.rpc.keep_alive_timeout_sec:5}")
    private int keepAliveTimeoutSec;
    @Value(value="${cloud.rpc.ssl.enabled}")
    private boolean sslEnabled;
    @Value(value="${cloud.rpc.ssl.cert:}")
    private String certResource;
    @Value(value="${cloud.rpc.max_inbound_message_size:4194304}")
    private int maxInboundMessageSize;
    @Value(value="${cloud.rpc.proxy.enabled}")
    private boolean proxyEnabled;
    @Value(value="${cloud.rpc.proxy.host:}")
    private String proxyHost;
    @Value(value="${cloud.rpc.proxy.port:0}")
    private int proxyPort;
    @Value(value="${cloud.rpc.proxy.username:}")
    private String proxyUsername;
    @Value(value="${cloud.rpc.proxy.password:}")
    private String proxyPassword;
    private int serverMaxInboundMessageSize;
    private ManagedChannel channel;
    private StreamObserver<RequestMsg> inputStream;
    private static final ReentrantLock uplinkMsgLock = new ReentrantLock();

    @Override
    public void connect(String edgeKey, String edgeSecret, Consumer<UplinkResponseMsg> onUplinkResponse, Consumer<EdgeConfiguration> onEdgeUpdate, Consumer<DownlinkMsg> onDownlink, Consumer<Exception> onError) {
        NettyChannelBuilder builder = NettyChannelBuilder.forAddress((String)this.rpcHost, (int)this.rpcPort).maxInboundMessageSize(this.maxInboundMessageSize).keepAliveTime((long)this.keepAliveTimeSec, TimeUnit.SECONDS).keepAliveTimeout((long)this.keepAliveTimeoutSec, TimeUnit.SECONDS).keepAliveWithoutCalls(true);
        if (this.sslEnabled) {
            try {
                SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
                if (StringUtils.isNotEmpty((String)this.certResource)) {
                    sslContextBuilder.trustManager(ResourceUtils.getInputStream((Object)this, (String)this.certResource));
                }
                builder.sslContext(sslContextBuilder.build());
            }
            catch (SSLException e) {
                log.error("Failed to initialize channel!", (Throwable)e);
                throw new RuntimeException(e);
            }
        } else {
            builder.usePlaintext();
        }
        if (this.proxyEnabled && StringUtils.isNotEmpty((String)this.proxyHost) && this.proxyPort > 0) {
            InetSocketAddress proxyAddress = new InetSocketAddress(this.proxyHost, this.proxyPort);
            InetSocketAddress targetAddress = new InetSocketAddress(this.rpcHost, this.rpcPort);
            builder.proxyDetector(socketAddress -> HttpConnectProxiedSocketAddress.newBuilder().setTargetAddress(targetAddress).setProxyAddress((SocketAddress)proxyAddress).setUsername(this.proxyUsername).setPassword(this.proxyPassword).build());
        }
        this.channel = builder.build();
        EdgeRpcServiceGrpc.EdgeRpcServiceStub stub = EdgeRpcServiceGrpc.newStub((Channel)this.channel);
        log.info("[{}] Sending a connect request to the TB!", (Object)edgeKey);
        this.inputStream = ((EdgeRpcServiceGrpc.EdgeRpcServiceStub)stub.withCompression("gzip")).handleMsgs(this.initOutputStream(edgeKey, onUplinkResponse, onEdgeUpdate, onDownlink, onError));
        this.inputStream.onNext((Object)RequestMsg.newBuilder().setMsgType(RequestMsgType.CONNECT_RPC_MESSAGE).setConnectRequestMsg(ConnectRequestMsg.newBuilder().setEdgeRoutingKey(edgeKey).setEdgeSecret(edgeSecret).setEdgeVersion(EdgeVersion.V_4_0_0).setMaxInboundMessageSize(this.maxInboundMessageSize).build()).build());
    }

    private StreamObserver<ResponseMsg> initOutputStream(final String edgeKey, final Consumer<UplinkResponseMsg> onUplinkResponse, final Consumer<EdgeConfiguration> onEdgeUpdate, final Consumer<DownlinkMsg> onDownlink, final Consumer<Exception> onError) {
        return new StreamObserver<ResponseMsg>(){

            public void onNext(ResponseMsg responseMsg) {
                if (responseMsg.hasConnectResponseMsg()) {
                    ConnectResponseMsg connectResponseMsg = responseMsg.getConnectResponseMsg();
                    if (connectResponseMsg.getResponseCode().equals((Object)ConnectResponseCode.ACCEPTED)) {
                        if (connectResponseMsg.hasMaxInboundMessageSize()) {
                            log.debug("[{}] Server max inbound message size: {}", (Object)edgeKey, (Object)connectResponseMsg.getMaxInboundMessageSize());
                            EdgeGrpcClient.this.serverMaxInboundMessageSize = connectResponseMsg.getMaxInboundMessageSize();
                        }
                        log.info("[{}] Configuration received: {}", (Object)edgeKey, (Object)connectResponseMsg.getConfiguration());
                        onEdgeUpdate.accept(connectResponseMsg.getConfiguration());
                    } else {
                        log.error("[{}] Failed to establish the connection! Code: {}. Error message: {}.", new Object[]{edgeKey, connectResponseMsg.getResponseCode(), connectResponseMsg.getErrorMsg()});
                        try {
                            EdgeGrpcClient.this.disconnect(true);
                        }
                        catch (InterruptedException e) {
                            log.error("[{}] Got interruption during disconnect!", (Object)edgeKey, (Object)e);
                        }
                        onError.accept(new EdgeConnectionException("Failed to establish the connection! Response code: " + connectResponseMsg.getResponseCode().name()));
                    }
                } else if (responseMsg.hasEdgeUpdateMsg()) {
                    log.debug("[{}] Edge update message received {}", (Object)edgeKey, (Object)responseMsg.getEdgeUpdateMsg());
                    onEdgeUpdate.accept(responseMsg.getEdgeUpdateMsg().getConfiguration());
                } else if (responseMsg.hasUplinkResponseMsg()) {
                    log.debug("[{}] Uplink response message received {}", (Object)edgeKey, (Object)responseMsg.getUplinkResponseMsg());
                    onUplinkResponse.accept(responseMsg.getUplinkResponseMsg());
                } else if (responseMsg.hasDownlinkMsg()) {
                    log.debug("[{}] Downlink message received {}", (Object)edgeKey, (Object)responseMsg.getDownlinkMsg());
                    onDownlink.accept(responseMsg.getDownlinkMsg());
                }
            }

            public void onError(Throwable t) {
                log.warn("[{}] Stream was terminated due to error:", (Object)edgeKey, (Object)t);
                try {
                    EdgeGrpcClient.this.disconnect(true);
                }
                catch (InterruptedException e) {
                    log.error("[{}] Got interruption during disconnect!", (Object)edgeKey, (Object)e);
                }
                onError.accept(new RuntimeException(t));
            }

            public void onCompleted() {
                log.info("[{}] Stream was closed and completed successfully!", (Object)edgeKey);
            }
        };
    }

    @Override
    public void disconnect(boolean onError) throws InterruptedException {
        if (!onError) {
            try {
                if (this.inputStream != null) {
                    this.inputStream.onCompleted();
                }
            }
            catch (Exception e) {
                log.error("Exception during onCompleted", (Throwable)e);
            }
        }
        if (this.channel != null) {
            this.channel.shutdown();
            int attempt = 0;
            do {
                try {
                    this.channel.awaitTermination((long)this.timeoutSecs, TimeUnit.SECONDS);
                }
                catch (Exception e) {
                    log.error("Channel await termination was interrupted", (Throwable)e);
                }
                if (attempt > 5) {
                    log.warn("We had reached maximum of termination attempts. Force closing channel");
                    try {
                        this.channel.shutdownNow();
                    }
                    catch (Exception e) {
                        log.error("Exception during shutdownNow", (Throwable)e);
                    }
                    break;
                }
                ++attempt;
            } while (!this.channel.isTerminated());
        }
    }

    @Override
    public void sendUplinkMsg(UplinkMsg msg) {
        uplinkMsgLock.lock();
        try {
            this.inputStream.onNext((Object)RequestMsg.newBuilder().setMsgType(RequestMsgType.UPLINK_RPC_MESSAGE).setUplinkMsg(msg).build());
        }
        finally {
            uplinkMsgLock.unlock();
        }
    }

    @Override
    public void sendSyncRequestMsg(boolean fullSyncRequired) {
        uplinkMsgLock.lock();
        try {
            SyncRequestMsg syncRequestMsg = SyncRequestMsg.newBuilder().setFullSync(fullSyncRequired).build();
            this.inputStream.onNext((Object)RequestMsg.newBuilder().setMsgType(RequestMsgType.SYNC_REQUEST_RPC_MESSAGE).setSyncRequestMsg(syncRequestMsg).build());
        }
        finally {
            uplinkMsgLock.unlock();
        }
    }

    @Override
    public void sendDownlinkResponseMsg(DownlinkResponseMsg downlinkResponseMsg) {
        uplinkMsgLock.lock();
        try {
            this.inputStream.onNext((Object)RequestMsg.newBuilder().setMsgType(RequestMsgType.UPLINK_RPC_MESSAGE).setDownlinkResponseMsg(downlinkResponseMsg).build());
        }
        finally {
            uplinkMsgLock.unlock();
        }
    }

    @Override
    public int getServerMaxInboundMessageSize() {
        return this.serverMaxInboundMessageSize;
    }
}

