/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.integration.rpc;

import com.google.common.io.Resources;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.io.File;
import java.net.URISyntaxException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.net.ssl.SSLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.integration.exception.IntegrationConnectionException;
import org.thingsboard.integration.rpc.IntegrationRpcClient;
import org.thingsboard.integration.storage.EventStorage;
import org.thingsboard.server.gen.integration.ConnectRequestMsg;
import org.thingsboard.server.gen.integration.ConnectResponseCode;
import org.thingsboard.server.gen.integration.ConnectResponseMsg;
import org.thingsboard.server.gen.integration.ConverterConfigurationProto;
import org.thingsboard.server.gen.integration.DeviceDownlinkDataProto;
import org.thingsboard.server.gen.integration.IntegrationConfigurationProto;
import org.thingsboard.server.gen.integration.IntegrationTransportGrpc;
import org.thingsboard.server.gen.integration.MessageType;
import org.thingsboard.server.gen.integration.RequestMsg;
import org.thingsboard.server.gen.integration.ResponseMsg;
import org.thingsboard.server.gen.integration.UplinkMsg;
import org.thingsboard.server.gen.integration.UplinkResponseMsg;

@Service
public class IntegrationGrpcClient
implements IntegrationRpcClient {
    private static final Logger log = LoggerFactory.getLogger(IntegrationGrpcClient.class);
    @Value(value="${rpc.host}")
    private String rpcHost;
    @Value(value="${rpc.port}")
    private int rpcPort;
    @Value(value="${rpc.timeout}")
    private int timeoutSecs;
    @Value(value="${rpc.keep_alive_time_sec:300}")
    private int keepAliveTimeSec;
    @Value(value="${rpc.ssl.enabled}")
    private boolean sslEnabled;
    @Value(value="${rpc.ssl.cert}")
    private String certResource;
    @Autowired
    private EventStorage eventStorage;
    private ManagedChannel channel;
    private StreamObserver<RequestMsg> inputStream;
    private CountDownLatch latch;

    @Override
    public void connect(String integrationKey, String integrationSecret, Consumer<IntegrationConfigurationProto> onIntegrationUpdate, Consumer<ConverterConfigurationProto> onConverterUpdate, Consumer<DeviceDownlinkDataProto> onDownlink, Consumer<Exception> onError) {
        NettyChannelBuilder builder = NettyChannelBuilder.forAddress((String)this.rpcHost, (int)this.rpcPort).keepAliveTime((long)this.keepAliveTimeSec, TimeUnit.SECONDS);
        if (this.sslEnabled) {
            try {
                builder.sslContext(GrpcSslContexts.forClient().trustManager(new File(Resources.getResource((String)this.certResource).toURI())).build());
            }
            catch (URISyntaxException | SSLException e) {
                log.error("Failed to initialize channel!", (Throwable)e);
                throw new RuntimeException(e);
            }
        } else {
            builder.usePlaintext();
        }
        this.channel = builder.build();
        IntegrationTransportGrpc.IntegrationTransportStub stub = IntegrationTransportGrpc.newStub((Channel)this.channel);
        log.info("[{}] Sending a connect request to the TB!", (Object)integrationKey);
        this.inputStream = stub.handleMsgs(this.initOutputStream(integrationKey, onIntegrationUpdate, onConverterUpdate, onDownlink, onError));
        this.inputStream.onNext((Object)RequestMsg.newBuilder().setMessageType(MessageType.CONNECT_RPC_MESSAGE).setConnectRequestMsg(ConnectRequestMsg.newBuilder().setIntegrationRoutingKey(integrationKey).setIntegrationSecret(integrationSecret).build()).build());
    }

    private StreamObserver<ResponseMsg> initOutputStream(final String integrationKey, final Consumer<IntegrationConfigurationProto> onIntegrationUpdate, final Consumer<ConverterConfigurationProto> onConverterUpdate, final Consumer<DeviceDownlinkDataProto> onDownlink, final Consumer<Exception> onError) {
        return new StreamObserver<ResponseMsg>(){

            public void onNext(ResponseMsg responseMsg) {
                if (responseMsg.hasConnectResponseMsg()) {
                    ConnectResponseMsg connectResponseMsg = responseMsg.getConnectResponseMsg();
                    if (connectResponseMsg.getResponseCode().equals((Object)ConnectResponseCode.ACCEPTED)) {
                        log.info("[{}] Configuration received: {}", (Object)integrationKey, (Object)connectResponseMsg.getConfiguration());
                        onIntegrationUpdate.accept(connectResponseMsg.getConfiguration());
                    } else {
                        log.error("[{}] Failed to establish the connection! Code: {}. Error message: {}.", new Object[]{integrationKey, connectResponseMsg.getResponseCode(), connectResponseMsg.getErrorMsg()});
                        try {
                            IntegrationGrpcClient.this.disconnect();
                        }
                        catch (InterruptedException e) {
                            log.error("[{}] Got interruption during disconnect!", (Object)integrationKey, (Object)e);
                        }
                        onError.accept(new IntegrationConnectionException("Failed to establish the connection! Response code: " + connectResponseMsg.getResponseCode().name()));
                    }
                } else if (responseMsg.hasUplinkResponseMsg()) {
                    UplinkResponseMsg msg = responseMsg.getUplinkResponseMsg();
                    if (msg.getSuccess()) {
                        log.debug("[{}] Msg has been processed successfully! {}", (Object)integrationKey, (Object)msg);
                    } else {
                        log.error("[{}] Msg processing failed! Error msg: {}", (Object)integrationKey, (Object)msg.getErrorMsg());
                    }
                    IntegrationGrpcClient.this.latch.countDown();
                } else if (responseMsg.hasIntegrationUpdateMsg()) {
                    log.info("[{}] Configuration updated: {}", (Object)integrationKey, (Object)responseMsg.getIntegrationUpdateMsg().getConfiguration());
                    onIntegrationUpdate.accept(responseMsg.getIntegrationUpdateMsg().getConfiguration());
                } else if (responseMsg.hasConverterUpdateMsg()) {
                    log.info("[{}] Converter configuration updated: {}", (Object)integrationKey, (Object)responseMsg.getConverterUpdateMsg().getConfiguration());
                    onConverterUpdate.accept(responseMsg.getConverterUpdateMsg().getConfiguration());
                } else if (responseMsg.hasDownlinkMsg()) {
                    log.debug("[{}] Downlink message received for device {}", (Object)integrationKey, (Object)responseMsg.getDownlinkMsg().getDeviceData().getDeviceName());
                    onDownlink.accept(responseMsg.getDownlinkMsg().getDeviceData());
                }
            }

            public void onError(Throwable t) {
                log.debug("[{}] The rpc session received an error!", (Object)integrationKey, (Object)t);
                onError.accept(new RuntimeException(t));
            }

            public void onCompleted() {
                log.debug("[{}] The rpc session was closed!", (Object)integrationKey);
            }
        };
    }

    @Override
    public void disconnect() throws InterruptedException {
        try {
            this.inputStream.onCompleted();
        }
        catch (Exception e) {
            log.error("Exception during onCompleted", (Throwable)e);
        }
        if (this.channel != null) {
            this.channel.shutdown();
            int attempt = 0;
            do {
                try {
                    this.channel.awaitTermination((long)this.timeoutSecs, TimeUnit.SECONDS);
                }
                catch (Exception e) {
                    log.error("Channel await termination was interrupted", (Throwable)e);
                }
                if (attempt > 5) {
                    log.warn("We had reached maximum of termination attempts. Force closing channel");
                    try {
                        this.channel.shutdownNow();
                    }
                    catch (Exception e) {
                        log.error("Exception during shutdownNow", (Throwable)e);
                    }
                    break;
                }
                ++attempt;
            } while (!this.channel.isTerminated());
        }
    }

    @Override
    public void handleMsgs() throws InterruptedException {
        List<UplinkMsg> uplinkMsgList = this.eventStorage.readCurrentBatch();
        this.latch = new CountDownLatch(uplinkMsgList.size());
        for (UplinkMsg msg : uplinkMsgList) {
            this.inputStream.onNext((Object)RequestMsg.newBuilder().setMessageType(MessageType.UPLINK_RPC_MESSAGE).setUplinkMsg(msg).build());
        }
        boolean success = this.latch.await(10L, TimeUnit.SECONDS);
        if (!success) {
            log.warn("Failed to deliver the batch: {}", uplinkMsgList);
        }
        if (success && !uplinkMsgList.isEmpty()) {
            this.eventStorage.discardCurrentBatch();
        } else {
            this.eventStorage.sleep();
        }
    }
}

