/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.service.sync.vc;

import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.GeneratedMessageV3;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.beans.ConstructorProperties;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.eclipse.jgit.errors.LargeObjectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.page.SortOrder;
import org.thingsboard.server.common.data.sync.vc.EntityVersion;
import org.thingsboard.server.common.data.sync.vc.RepositorySettings;
import org.thingsboard.server.common.data.sync.vc.VersionCreationResult;
import org.thingsboard.server.common.data.sync.vc.VersionedEntityInfo;
import org.thingsboard.server.common.data.util.CollectionsUtil;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.QueueConsumerManager;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.queue.provider.TbVersionControlQueueFactory;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.queue.util.TbVersionControlComponent;
import org.thingsboard.server.service.sync.vc.ClusterVersionControlService;
import org.thingsboard.server.service.sync.vc.DefaultGitRepositoryService;
import org.thingsboard.server.service.sync.vc.GitRepositoryService;
import org.thingsboard.server.service.sync.vc.PendingCommit;
import org.thingsboard.server.service.sync.vc.VersionControlRequestCtx;

@TbVersionControlComponent
@Service
public class DefaultClusterVersionControlService
extends TbApplicationEventListener<PartitionChangeEvent>
implements ClusterVersionControlService {
    private static final Logger log = LoggerFactory.getLogger(DefaultClusterVersionControlService.class);
    private final PartitionService partitionService;
    private final TbQueueProducerProvider producerProvider;
    private final TbVersionControlQueueFactory queueFactory;
    private final GitRepositoryService vcService;
    private final TopicService topicService;
    private final ConcurrentMap<TenantId, Lock> tenantRepoLocks = new ConcurrentHashMap<TenantId, Lock>();
    private final Map<TenantId, PendingCommit> pendingCommitMap = new HashMap<TenantId, PendingCommit>();
    private volatile ExecutorService consumerExecutor;
    private volatile QueueConsumerManager<TbProtoQueueMsg<TransportProtos.ToVersionControlServiceMsg>> consumer;
    private volatile TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> producer;
    @Value(value="${queue.vc.poll-interval:25}")
    private long pollDuration;
    @Value(value="${queue.vc.pack-processing-timeout:180000}")
    private long packProcessingTimeout;
    @Value(value="${vc.git.io_pool_size:3}")
    private int ioPoolSize;
    @Value(value="${queue.vc.msg-chunk-size:250000}")
    private int msgChunkSize;
    private final List<ListeningExecutorService> ioThreads = new ArrayList<ListeningExecutorService>();

    @PostConstruct
    public void init() {
        this.consumerExecutor = Executors.newCachedThreadPool((ThreadFactory)ThingsBoardThreadFactory.forName((String)"vc-consumer"));
        ThingsBoardThreadFactory threadFactory = ThingsBoardThreadFactory.forName((String)"vc-io-thread");
        for (int i = 0; i < this.ioPoolSize; ++i) {
            this.ioThreads.add(MoreExecutors.listeningDecorator((ExecutorService)Executors.newSingleThreadExecutor((ThreadFactory)threadFactory)));
        }
        this.producer = this.producerProvider.getTbCoreNotificationsMsgProducer();
        this.consumer = QueueConsumerManager.builder().name("TB Version Control").msgPackProcessor(this::processMsgs).pollInterval(this.pollDuration).consumerCreator(() -> ((TbVersionControlQueueFactory)this.queueFactory).createToVersionControlMsgConsumer()).consumerExecutor(this.consumerExecutor).build();
    }

    @PreDestroy
    public void stop() {
        if (this.consumer != null) {
            this.consumer.stop();
        }
        if (this.consumerExecutor != null) {
            this.consumerExecutor.shutdownNow();
        }
        this.ioThreads.forEach(ExecutorService::shutdownNow);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void onTbApplicationEvent(PartitionChangeEvent event) {
        for (TenantId tenantId : this.vcService.getActiveRepositoryTenants()) {
            if (this.partitionService.isMyPartition(ServiceType.TB_VC_EXECUTOR, tenantId, (EntityId)tenantId)) continue;
            Lock lock = this.getRepoLock(tenantId);
            lock.lock();
            try {
                this.pendingCommitMap.remove(tenantId);
                this.vcService.clearRepository(tenantId);
            }
            catch (Exception e) {
                log.warn("[{}] Failed to cleanup the tenant repository", (Object)tenantId, (Object)e);
            }
            finally {
                lock.unlock();
            }
        }
        this.consumer.subscribe(event.getPartitionsMap().values().stream().findAny().orElse(Collections.emptySet()));
    }

    protected boolean filterTbApplicationEvent(PartitionChangeEvent event) {
        return ServiceType.TB_VC_EXECUTOR.equals((Object)event.getServiceType());
    }

    @AfterStartUp(order=2)
    public void afterStartUp() {
        this.consumer.launch();
    }

    void processMsgs(List<TbProtoQueueMsg<TransportProtos.ToVersionControlServiceMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToVersionControlServiceMsg>> consumer) throws Exception {
        ArrayList<ListenableFuture> futures = new ArrayList<ListenableFuture>();
        for (TbProtoQueueMsg<TransportProtos.ToVersionControlServiceMsg> msgWrapper : msgs) {
            TransportProtos.ToVersionControlServiceMsg msg;
            VersionControlRequestCtx ctx = new VersionControlRequestCtx(msg, (msg = (TransportProtos.ToVersionControlServiceMsg)msgWrapper.getValue()).hasClearRepositoryRequest() ? null : ProtoUtils.fromProto((TransportProtos.RepositorySettingsProto)msg.getVcSettings()));
            long startTs = System.currentTimeMillis();
            log.trace("[{}][{}] RECEIVED task: {}", new Object[]{ctx.getTenantId(), ctx.getRequestId(), msg});
            int threadIdx = Math.abs(ctx.getTenantId().hashCode() % this.ioPoolSize);
            ListenableFuture future = this.ioThreads.get(threadIdx).submit(() -> this.processMessage(ctx, msg));
            this.logTaskExecution(ctx, (ListenableFuture<Void>)future, startTs);
            futures.add(future);
        }
        try {
            Futures.allAsList(futures).get(this.packProcessingTimeout, TimeUnit.MILLISECONDS);
        }
        catch (TimeoutException e) {
            log.error("Timeout for processing the version control tasks.", (Throwable)e);
        }
        consumer.commit();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Void processMessage(VersionControlRequestCtx ctx, TransportProtos.ToVersionControlServiceMsg msg) {
        Lock lock = this.getRepoLock(ctx.getTenantId());
        lock.lock();
        try {
            if (msg.hasClearRepositoryRequest()) {
                this.handleClearRepositoryCommand(ctx);
            } else if (msg.hasTestRepositoryRequest()) {
                this.handleTestRepositoryCommand(ctx);
            } else if (msg.hasInitRepositoryRequest()) {
                this.handleInitRepositoryCommand(ctx);
            } else {
                RepositorySettings currentSettings = this.vcService.getRepositorySettings(ctx.getTenantId());
                RepositorySettings newSettings = ctx.getSettings();
                if (!newSettings.equals((Object)currentSettings)) {
                    this.vcService.initRepository(ctx.getTenantId(), ctx.getSettings(), false);
                }
                if (msg.hasCommitRequest()) {
                    this.handleCommitRequest(ctx, msg.getCommitRequest());
                } else if (msg.hasListBranchesRequest()) {
                    this.vcService.fetch(ctx.getTenantId());
                    this.handleListBranches(ctx, msg.getListBranchesRequest());
                } else if (msg.hasListEntitiesRequest()) {
                    this.handleListEntities(ctx, msg.getListEntitiesRequest());
                } else if (msg.hasListVersionRequest()) {
                    this.vcService.fetch(ctx.getTenantId());
                    this.handleListVersions(ctx, msg.getListVersionRequest());
                } else if (msg.hasEntityContentRequest()) {
                    this.handleEntityContentRequest(ctx, msg.getEntityContentRequest());
                } else if (msg.hasEntitiesContentRequest()) {
                    this.handleEntitiesContentRequest(ctx, msg.getEntitiesContentRequest());
                } else if (msg.hasVersionsDiffRequest()) {
                    this.handleVersionsDiffRequest(ctx, msg.getVersionsDiffRequest());
                }
            }
        }
        catch (Exception e) {
            this.reply(ctx, Optional.of(this.handleError(e)));
        }
        finally {
            lock.unlock();
        }
        return null;
    }

    private void handleEntitiesContentRequest(VersionControlRequestCtx ctx, TransportProtos.EntitiesContentRequestMsg request) throws Exception {
        EntityType entityType = EntityType.valueOf((String)request.getEntityType());
        if (request.getIdsList().isEmpty()) {
            List ids = this.vcService.listEntitiesAtVersion(ctx.getTenantId(), request.getVersionId(), request.getPath(), entityType, request.getGroups(), request.getRecursive()).skip(request.getOffset()).limit(request.getLimit()).collect(Collectors.toList());
            if (!ids.isEmpty()) {
                for (int i = 0; i < ids.size(); ++i) {
                    VersionedEntityInfo info = (VersionedEntityInfo)ids.get(i);
                    this.sendData(info.getPath(), request, ctx, i, ids.size());
                }
            } else {
                this.reply(ctx, Optional.empty(), builder -> builder.setEntitiesContentResponse(TransportProtos.EntitiesContentResponseMsg.newBuilder().setItemsCount(0)));
            }
        } else {
            for (int i = 0; i < request.getIdsList().size(); ++i) {
                TransportProtos.EntityIdProto idProto = (TransportProtos.EntityIdProto)request.getIdsList().get(i);
                UUID uuid = new UUID(idProto.getEntityIdMSB(), idProto.getEntityIdLSB());
                String entityPath = this.getRelativePath(EntityType.valueOf((String)request.getEntityType()), uuid.toString());
                this.sendData(entityPath, request, ctx, i, request.getIdsCount());
            }
        }
    }

    private void sendData(String entityPath, TransportProtos.EntitiesContentRequestMsg request, VersionControlRequestCtx ctx, int itemIdx, int totalItemsCount) throws IOException {
        entityPath = StringUtils.isNotEmpty((String)request.getPath()) ? request.getPath() + entityPath : entityPath;
        String data = this.vcService.getFileContentAtCommit(ctx.getTenantId(), entityPath, request.getVersionId());
        Iterable dataChunks = StringUtils.split((String)data, (int)this.msgChunkSize);
        String chunkedMsgId = UUID.randomUUID().toString();
        int chunksCount = Iterables.size((Iterable)dataChunks);
        AtomicInteger chunkIndex = new AtomicInteger();
        dataChunks.forEach(chunk -> {
            TransportProtos.EntitiesContentResponseMsg.Builder response = TransportProtos.EntitiesContentResponseMsg.newBuilder().setItemsCount(totalItemsCount).setItemIdx(itemIdx).setItem(TransportProtos.EntityContentResponseMsg.newBuilder().setData(chunk).setChunksCount(chunksCount).setChunkIndex(chunkIndex.getAndIncrement()).build());
            this.reply(ctx, Optional.empty(), builder -> builder.setEntitiesContentResponse(response));
        });
    }

    private void handleEntityContentRequest(VersionControlRequestCtx ctx, TransportProtos.EntityContentRequestMsg request) throws IOException {
        Object path;
        Object object = path = StringUtils.isNotEmpty((String)request.getPath()) ? request.getPath() : "";
        if (StringUtils.isNotEmpty((String)request.getEntityType())) {
            path = (String)path + this.getRelativePath(EntityType.valueOf((String)request.getEntityType()), new UUID(request.getEntityIdMSB(), request.getEntityIdLSB()).toString());
        }
        log.debug("Executing handleEntityContentRequest [{}][{}]", (Object)ctx.getTenantId(), path);
        String data = this.vcService.getFileContentAtCommit(ctx.getTenantId(), (String)path, request.getVersionId());
        Iterable dataChunks = StringUtils.split((String)data, (int)this.msgChunkSize);
        String chunkedMsgId = UUID.randomUUID().toString();
        int chunksCount = Iterables.size((Iterable)dataChunks);
        AtomicInteger chunkIndex = new AtomicInteger();
        dataChunks.forEach(chunk -> {
            log.trace("[{}] sending chunk {} for 'getEntity'", (Object)chunkedMsgId, (Object)chunkIndex.get());
            this.reply(ctx, Optional.empty(), builder -> builder.setEntityContentResponse(TransportProtos.EntityContentResponseMsg.newBuilder().setData(chunk).setChunksCount(chunksCount).setChunkIndex(chunkIndex.getAndIncrement())));
        });
    }

    private void handleListVersions(VersionControlRequestCtx ctx, TransportProtos.ListVersionsRequestMsg request) throws Exception {
        Object path;
        if (StringUtils.isNotEmpty((String)request.getEntityType())) {
            EntityType entityType = EntityType.valueOf((String)request.getEntityType());
            path = request.getEntityIdLSB() != 0L || request.getEntityIdMSB() != 0L ? this.getRelativePath(entityType, new UUID(request.getEntityIdMSB(), request.getEntityIdLSB()).toString()) : this.getRelativePath(entityType, null);
        } else {
            path = null;
        }
        SortOrder sortOrder = null;
        if (StringUtils.isNotEmpty((String)request.getSortProperty())) {
            SortOrder.Direction direction = SortOrder.Direction.DESC;
            if (StringUtils.isNotEmpty((String)request.getSortDirection())) {
                direction = SortOrder.Direction.valueOf((String)request.getSortDirection());
            }
            sortOrder = new SortOrder(request.getSortProperty(), direction);
        }
        if (StringUtils.isNotEmpty((String)request.getPath())) {
            path = request.getPath() + path;
        }
        PageData<EntityVersion> data = this.vcService.listVersions(ctx.getTenantId(), request.getBranchName(), (String)path, new PageLink(request.getPageSize(), request.getPage(), request.getTextSearch(), sortOrder));
        this.reply(ctx, Optional.empty(), builder -> builder.setListVersionsResponse(TransportProtos.ListVersionsResponseMsg.newBuilder().setTotalPages(data.getTotalPages()).setTotalElements(data.getTotalElements()).setHasNext(data.hasNext()).addAllVersions((Iterable)data.getData().stream().map(v -> TransportProtos.EntityVersionProto.newBuilder().setTs(v.getTimestamp()).setId(v.getId()).setName(v.getName()).setAuthor(v.getAuthor()).build()).collect(Collectors.toList()))));
    }

    private void handleListEntities(VersionControlRequestCtx ctx, TransportProtos.ListEntitiesRequestMsg request) throws Exception {
        EntityType entityType = StringUtils.isNotEmpty((String)request.getEntityType()) ? EntityType.valueOf((String)request.getEntityType()) : null;
        Stream<VersionedEntityInfo> data = this.vcService.listEntitiesAtVersion(ctx.getTenantId(), request.getVersionId(), "", entityType, false, false);
        this.reply(ctx, Optional.empty(), builder -> builder.setListEntitiesResponse(TransportProtos.ListEntitiesResponseMsg.newBuilder().addAllEntities((Iterable)data.map(VersionedEntityInfo::getExternalId).map(id -> TransportProtos.VersionedEntityInfoProto.newBuilder().setEntityType(id.getEntityType().name()).setEntityIdMSB(id.getId().getMostSignificantBits()).setEntityIdLSB(id.getId().getLeastSignificantBits()).build()).collect(Collectors.toList()))));
    }

    private void handleListBranches(VersionControlRequestCtx ctx, TransportProtos.ListBranchesRequestMsg request) {
        List branches = this.vcService.listBranches(ctx.getTenantId()).stream().map(branchInfo -> TransportProtos.BranchInfoProto.newBuilder().setName(branchInfo.getName()).setIsDefault(branchInfo.isDefault()).build()).collect(Collectors.toList());
        this.reply(ctx, Optional.empty(), builder -> builder.setListBranchesResponse(TransportProtos.ListBranchesResponseMsg.newBuilder().addAllBranches((Iterable)branches)));
    }

    private void handleVersionsDiffRequest(VersionControlRequestCtx ctx, TransportProtos.VersionsDiffRequestMsg request) throws IOException {
        List diffList = this.vcService.getVersionsDiffList(ctx.getTenantId(), request.getPath(), request.getVersionId1(), request.getVersionId2()).stream().map(diff -> {
            EntityId entityId = DefaultGitRepositoryService.fromRelativePath(diff.getFilePath());
            return TransportProtos.EntityVersionsDiff.newBuilder().setEntityType(entityId.getEntityType().name()).setEntityIdMSB(entityId.getId().getMostSignificantBits()).setEntityIdLSB(entityId.getId().getLeastSignificantBits()).setEntityDataAtVersion1(diff.getFileContentAtCommit1()).setEntityDataAtVersion2(diff.getFileContentAtCommit2()).setRawDiff(diff.getDiffStringValue()).build();
        }).collect(Collectors.toList());
        this.reply(ctx, (TransportProtos.VersionControlResponseMsg.Builder builder) -> builder.setVersionsDiffResponse(TransportProtos.VersionsDiffResponseMsg.newBuilder().addAllDiff((Iterable)diffList)));
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private void handleCommitRequest(VersionControlRequestCtx ctx, TransportProtos.CommitRequestMsg request) throws Exception {
        log.debug("Executing handleCommitRequest [{}][{}]", (Object)ctx.getTenantId(), (Object)ctx.getRequestId());
        TenantId tenantId = ctx.getTenantId();
        UUID txId = UUID.fromString(request.getTxId());
        if (request.hasPrepareMsg()) {
            this.vcService.fetch(ctx.getTenantId());
            this.prepareCommit(ctx, txId, request.getPrepareMsg());
            return;
        } else if (request.hasAbortMsg()) {
            PendingCommit current = this.pendingCommitMap.get(tenantId);
            if (current == null || !current.getTxId().equals(txId)) return;
            this.doAbortCurrentCommit(tenantId, current);
            return;
        } else {
            PendingCommit current = this.pendingCommitMap.get(tenantId);
            if (current != null && current.getTxId().equals(txId)) {
                try {
                    if (request.hasAddMsg()) {
                        this.addToCommit(ctx, current, request.getAddMsg());
                        return;
                    }
                    if (request.hasDeleteMsg()) {
                        this.deleteFromCommit(ctx, current, request.getDeleteMsg());
                        return;
                    }
                    if (!request.hasPushMsg()) return;
                    VersionCreationResult result = this.vcService.push(current);
                    this.pendingCommitMap.remove(ctx.getTenantId());
                    this.reply(ctx, result);
                    return;
                }
                catch (Exception e) {
                    this.doAbortCurrentCommit(tenantId, current, e);
                    throw e;
                }
            } else {
                log.debug("[{}] Ignore request due to stale commit: {}", (Object)txId, (Object)request);
            }
        }
    }

    private void prepareCommit(VersionControlRequestCtx ctx, UUID txId, TransportProtos.PrepareMsg prepareMsg) {
        TenantId tenantId = ctx.getTenantId();
        PendingCommit pendingCommit = new PendingCommit(tenantId, ctx.getNodeId(), txId, prepareMsg.getBranchName(), prepareMsg.getCommitMsg(), prepareMsg.getAuthorName(), prepareMsg.getAuthorEmail());
        PendingCommit old = this.pendingCommitMap.get(tenantId);
        if (old != null) {
            this.doAbortCurrentCommit(tenantId, old);
        }
        this.pendingCommitMap.put(tenantId, pendingCommit);
        this.vcService.prepareCommit(pendingCommit);
    }

    private void deleteFromCommit(VersionControlRequestCtx ctx, PendingCommit commit, TransportProtos.DeleteMsg deleteMsg) throws IOException {
        this.vcService.deleteFolderContent(commit, deleteMsg.getFolder(), deleteMsg.getRecursively());
    }

    private void addToCommit(VersionControlRequestCtx ctx, PendingCommit commit, TransportProtos.AddMsg addMsg) throws IOException {
        log.debug("Executing addToCommit [{}][{}]", (Object)ctx.getTenantId(), (Object)ctx.getRequestId());
        log.trace("[{}] received chunk {} for 'addToCommit'", (Object)addMsg.getChunkedMsgId(), (Object)addMsg.getChunkIndex());
        Map<String, String[]> chunkedMsgs = commit.getChunkedMsgs();
        Object[] msgChunks = chunkedMsgs.computeIfAbsent(addMsg.getChunkedMsgId(), id -> new String[addMsg.getChunksCount()]);
        msgChunks[addMsg.getChunkIndex()] = addMsg.getEntityDataJsonChunk();
        if (CollectionsUtil.countNonNull((Object[])msgChunks) == msgChunks.length) {
            log.trace("[{}] collected all chunks for 'addToCommit'", (Object)addMsg.getChunkedMsgId());
            String entityDataJson = String.join((CharSequence)"", (CharSequence[])msgChunks);
            chunkedMsgs.remove(addMsg.getChunkedMsgId());
            this.vcService.add(commit, addMsg.getRelativePath(), entityDataJson);
        }
    }

    private void doAbortCurrentCommit(TenantId tenantId, PendingCommit current) {
        this.doAbortCurrentCommit(tenantId, current, null);
    }

    private void doAbortCurrentCommit(TenantId tenantId, PendingCommit current, Exception e) {
        this.vcService.abort(current);
        this.pendingCommitMap.remove(tenantId);
    }

    private void handleClearRepositoryCommand(VersionControlRequestCtx ctx) {
        try {
            this.vcService.clearRepository(ctx.getTenantId());
            this.reply(ctx, Optional.empty());
        }
        catch (Exception e) {
            log.debug("[{}] Failed to connect to the repository: ", (Object)ctx, (Object)e);
            this.reply(ctx, Optional.of(e));
        }
    }

    private void handleInitRepositoryCommand(VersionControlRequestCtx ctx) {
        try {
            this.vcService.initRepository(ctx.getTenantId(), ctx.getSettings(), false);
            this.reply(ctx, Optional.empty());
        }
        catch (Exception e) {
            log.debug("[{}] Failed to connect to the repository: ", (Object)ctx, (Object)e);
            this.reply(ctx, Optional.of(e));
        }
    }

    private void handleTestRepositoryCommand(VersionControlRequestCtx ctx) {
        try {
            this.vcService.testRepository(ctx.getTenantId(), ctx.getSettings());
            this.reply(ctx, Optional.empty());
        }
        catch (Exception e) {
            log.debug("[{}] Failed to connect to the repository: ", (Object)ctx, (Object)e);
            this.reply(ctx, Optional.of(e));
        }
    }

    private Exception handleError(Exception e) {
        if (e instanceof LargeObjectException) {
            return new RuntimeException("Version is too big");
        }
        return e;
    }

    private void reply(VersionControlRequestCtx ctx, VersionCreationResult result) {
        TransportProtos.CommitResponseMsg.Builder responseBuilder = TransportProtos.CommitResponseMsg.newBuilder().setAdded(result.getAdded()).setModified(result.getModified()).setRemoved(result.getRemoved());
        if (result.getVersion() != null) {
            responseBuilder.setTs(result.getVersion().getTimestamp()).setCommitId(result.getVersion().getId()).setName(result.getVersion().getName()).setAuthor(result.getVersion().getAuthor());
        }
        this.reply(ctx, Optional.empty(), builder -> builder.setCommitResponse(responseBuilder));
    }

    private void reply(VersionControlRequestCtx ctx, Optional<Exception> e) {
        this.reply(ctx, e, null);
    }

    private void reply(VersionControlRequestCtx ctx, Function<TransportProtos.VersionControlResponseMsg.Builder, TransportProtos.VersionControlResponseMsg.Builder> enrichFunction) {
        this.reply(ctx, Optional.empty(), enrichFunction);
    }

    private void reply(VersionControlRequestCtx ctx, Optional<Exception> e, Function<TransportProtos.VersionControlResponseMsg.Builder, TransportProtos.VersionControlResponseMsg.Builder> enrichFunction) {
        TopicPartitionInfo tpi = this.topicService.getNotificationsTopic(ServiceType.TB_CORE, ctx.getNodeId());
        TransportProtos.VersionControlResponseMsg.Builder builder = TransportProtos.VersionControlResponseMsg.newBuilder().setRequestIdMSB(ctx.getRequestId().getMostSignificantBits()).setRequestIdLSB(ctx.getRequestId().getLeastSignificantBits());
        if (e.isPresent()) {
            log.debug("[{}][{}] Failed to process task", new Object[]{ctx.getTenantId(), ctx.getRequestId(), e.get()});
            String message = e.get().getMessage();
            builder.setError(message != null ? message : e.get().getClass().getSimpleName());
        } else {
            if (enrichFunction != null) {
                builder = enrichFunction.apply(builder);
            } else {
                builder.setGenericResponse(TransportProtos.GenericRepositoryResponseMsg.newBuilder().build());
            }
            log.debug("[{}][{}] Processed task", (Object)ctx.getTenantId(), (Object)ctx.getRequestId());
        }
        TransportProtos.ToCoreNotificationMsg msg = TransportProtos.ToCoreNotificationMsg.newBuilder().setVcResponseMsg(builder).build();
        log.trace("[{}][{}] PUSHING reply: {} to: {}", new Object[]{ctx.getTenantId(), ctx.getRequestId(), msg, tpi});
        this.producer.send(tpi, (TbQueueMsg)new TbProtoQueueMsg(UUID.randomUUID(), (GeneratedMessageV3)msg), null);
    }

    private String getRelativePath(EntityType entityType, String entityId) {
        Object path = entityType.name().toLowerCase();
        if (entityId != null) {
            path = (String)path + "/" + entityId + ".json";
        }
        return path;
    }

    private Lock getRepoLock(TenantId tenantId) {
        return this.tenantRepoLocks.computeIfAbsent(tenantId, t -> new ReentrantLock(true));
    }

    private void logTaskExecution(final VersionControlRequestCtx ctx, ListenableFuture<Void> future, final long startTs) {
        if (log.isTraceEnabled()) {
            Futures.addCallback(future, (FutureCallback)new FutureCallback<Object>(){

                public void onSuccess(@Nullable Object result) {
                    log.trace("[{}][{}] Task processing took: {}ms", new Object[]{ctx.getTenantId(), ctx.getRequestId(), System.currentTimeMillis() - startTs});
                }

                public void onFailure(Throwable t) {
                    log.trace("[{}][{}] Task failed: ", new Object[]{ctx.getTenantId(), ctx.getRequestId(), t});
                }
            }, (Executor)MoreExecutors.directExecutor());
        }
    }

    @ConstructorProperties(value={"partitionService", "producerProvider", "queueFactory", "vcService", "topicService"})
    public DefaultClusterVersionControlService(PartitionService partitionService, TbQueueProducerProvider producerProvider, TbVersionControlQueueFactory queueFactory, GitRepositoryService vcService, TopicService topicService) {
        this.partitionService = partitionService;
        this.producerProvider = producerProvider;
        this.queueFactory = queueFactory;
        this.vcService = vcService;
        this.topicService = topicService;
    }
}

