package org.thingsboard.server.service.sync.vc;

import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.beans.ConstructorProperties;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.page.SortOrder;
import org.thingsboard.server.common.data.sync.vc.EntityVersion;
import org.thingsboard.server.common.data.sync.vc.RepositorySettings;
import org.thingsboard.server.common.data.sync.vc.VersionCreationResult;
import org.thingsboard.server.common.data.sync.vc.VersionedEntityInfo;
import org.thingsboard.server.common.data.util.CollectionsUtil;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.queue.provider.TbVersionControlQueueFactory;
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import org.thingsboard.server.queue.util.TbVersionControlComponent;

@TbVersionControlComponent
@Service
/* loaded from: input_file:org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.class */
public class DefaultClusterVersionControlService extends TbApplicationEventListener<PartitionChangeEvent> implements ClusterVersionControlService {
    private static final Logger log = LoggerFactory.getLogger(DefaultClusterVersionControlService.class);
    private final PartitionService partitionService;
    private final TbQueueProducerProvider producerProvider;
    private final TbVersionControlQueueFactory queueFactory;
    private final DataDecodingEncodingService encodingService;
    private final GitRepositoryService vcService;
    private final TopicService topicService;
    private volatile ExecutorService consumerExecutor;
    private volatile TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToVersionControlServiceMsg>> consumer;
    private volatile TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> producer;

    @Value("${queue.vc.poll-interval:25}")
    private long pollDuration;

    @Value("${queue.vc.pack-processing-timeout:180000}")
    private long packProcessingTimeout;

    @Value("${vc.git.io_pool_size:3}")
    private int ioPoolSize;

    @Value("${queue.vc.msg-chunk-size:500000}")
    private int msgChunkSize;
    private final ConcurrentMap<TenantId, Lock> tenantRepoLocks = new ConcurrentHashMap();
    private final Map<TenantId, PendingCommit> pendingCommitMap = new HashMap();
    private volatile boolean stopped = false;
    private final List<ListeningExecutorService> ioThreads = new ArrayList();

    @PostConstruct
    public void init() {
        this.consumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("vc-consumer"));
        ThingsBoardThreadFactory forName = ThingsBoardThreadFactory.forName("vc-io-thread");
        for (int i = 0; i < this.ioPoolSize; i++) {
            this.ioThreads.add(MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(forName)));
        }
        this.producer = this.producerProvider.getTbCoreNotificationsMsgProducer();
        this.consumer = this.queueFactory.createToVersionControlMsgConsumer();
    }

    @PreDestroy
    public void stop() {
        this.stopped = true;
        if (this.consumer != null) {
            this.consumer.unsubscribe();
        }
        if (this.consumerExecutor != null) {
            this.consumerExecutor.shutdownNow();
        }
        this.ioThreads.forEach((v0) -> {
            v0.shutdownNow();
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
        for (TenantId tenantId : this.vcService.getActiveRepositoryTenants()) {
            if (!this.partitionService.resolve(ServiceType.TB_VC_EXECUTOR, tenantId, tenantId).isMyPartition()) {
                Lock repoLock = getRepoLock(tenantId);
                repoLock.lock();
                try {
                    try {
                        this.pendingCommitMap.remove(tenantId);
                        this.vcService.clearRepository(tenantId);
                        repoLock.unlock();
                    } catch (Exception e) {
                        log.warn("[{}] Failed to cleanup the tenant repository", tenantId, e);
                        repoLock.unlock();
                    }
                } catch (Throwable th) {
                    repoLock.unlock();
                    throw th;
                }
            }
        }
        this.consumer.subscribe(partitionChangeEvent.getPartitions());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean filterTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
        return ServiceType.TB_VC_EXECUTOR.equals(partitionChangeEvent.getServiceType());
    }

    @EventListener({ApplicationReadyEvent.class})
    @Order(2)
    public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
        this.consumerExecutor.execute(() -> {
            consumerLoop(this.consumer);
        });
    }

    void consumerLoop(TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToVersionControlServiceMsg>> tbQueueConsumer) {
        while (!this.stopped && !tbQueueConsumer.isStopped()) {
            ArrayList arrayList = new ArrayList();
            try {
                List poll = tbQueueConsumer.poll(this.pollDuration);
                if (!poll.isEmpty()) {
                    Iterator it = poll.iterator();
                    while (it.hasNext()) {
                        TransportProtos.ToVersionControlServiceMsg toVersionControlServiceMsg = (TransportProtos.ToVersionControlServiceMsg) ((TbProtoQueueMsg) it.next()).getValue();
                        VersionControlRequestCtx versionControlRequestCtx = new VersionControlRequestCtx(toVersionControlServiceMsg, toVersionControlServiceMsg.hasClearRepositoryRequest() ? null : getEntitiesVersionControlSettings(toVersionControlServiceMsg));
                        long currentTimeMillis = System.currentTimeMillis();
                        log.trace("[{}][{}] RECEIVED task: {}", new Object[]{versionControlRequestCtx.getTenantId(), versionControlRequestCtx.getRequestId(), toVersionControlServiceMsg});
                        ListenableFuture<Void> submit = this.ioThreads.get(Math.abs(versionControlRequestCtx.getTenantId().hashCode() % this.ioPoolSize)).submit(() -> {
                            return processMessage(versionControlRequestCtx, toVersionControlServiceMsg);
                        });
                        logTaskExecution(versionControlRequestCtx, submit, currentTimeMillis);
                        arrayList.add(submit);
                    }
                    try {
                        Futures.allAsList(arrayList).get(this.packProcessingTimeout, TimeUnit.MILLISECONDS);
                    } catch (TimeoutException e) {
                        log.info("Timeout for processing the version control tasks.", e);
                    }
                    tbQueueConsumer.commit();
                }
            } catch (Exception e2) {
                if (!this.stopped) {
                    log.warn("Failed to obtain version control requests from queue.", e2);
                    try {
                        Thread.sleep(this.pollDuration);
                    } catch (InterruptedException e3) {
                        log.trace("Failed to wait until the server has capacity to handle new version control messages", e3);
                    }
                }
            }
        }
        log.info("TB Version Control request consumer stopped.");
    }

    private Void processMessage(VersionControlRequestCtx versionControlRequestCtx, TransportProtos.ToVersionControlServiceMsg toVersionControlServiceMsg) {
        Lock repoLock = getRepoLock(versionControlRequestCtx.getTenantId());
        repoLock.lock();
        try {
            try {
                if (toVersionControlServiceMsg.hasClearRepositoryRequest()) {
                    handleClearRepositoryCommand(versionControlRequestCtx);
                } else if (toVersionControlServiceMsg.hasTestRepositoryRequest()) {
                    handleTestRepositoryCommand(versionControlRequestCtx);
                } else if (toVersionControlServiceMsg.hasInitRepositoryRequest()) {
                    handleInitRepositoryCommand(versionControlRequestCtx);
                } else {
                    if (!versionControlRequestCtx.getSettings().equals(this.vcService.getRepositorySettings(versionControlRequestCtx.getTenantId()))) {
                        this.vcService.initRepository(versionControlRequestCtx.getTenantId(), versionControlRequestCtx.getSettings());
                    }
                    if (toVersionControlServiceMsg.hasCommitRequest()) {
                        handleCommitRequest(versionControlRequestCtx, toVersionControlServiceMsg.getCommitRequest());
                    } else if (toVersionControlServiceMsg.hasListBranchesRequest()) {
                        this.vcService.fetch(versionControlRequestCtx.getTenantId());
                        handleListBranches(versionControlRequestCtx, toVersionControlServiceMsg.getListBranchesRequest());
                    } else if (toVersionControlServiceMsg.hasListEntitiesRequest()) {
                        handleListEntities(versionControlRequestCtx, toVersionControlServiceMsg.getListEntitiesRequest());
                    } else if (toVersionControlServiceMsg.hasListVersionRequest()) {
                        this.vcService.fetch(versionControlRequestCtx.getTenantId());
                        handleListVersions(versionControlRequestCtx, toVersionControlServiceMsg.getListVersionRequest());
                    } else if (toVersionControlServiceMsg.hasEntityContentRequest()) {
                        handleEntityContentRequest(versionControlRequestCtx, toVersionControlServiceMsg.getEntityContentRequest());
                    } else if (toVersionControlServiceMsg.hasEntitiesContentRequest()) {
                        handleEntitiesContentRequest(versionControlRequestCtx, toVersionControlServiceMsg.getEntitiesContentRequest());
                    } else if (toVersionControlServiceMsg.hasVersionsDiffRequest()) {
                        handleVersionsDiffRequest(versionControlRequestCtx, toVersionControlServiceMsg.getVersionsDiffRequest());
                    }
                }
                repoLock.unlock();
                return null;
            } catch (Exception e) {
                reply(versionControlRequestCtx, Optional.of(e));
                repoLock.unlock();
                return null;
            }
        } catch (Throwable th) {
            repoLock.unlock();
            throw th;
        }
    }

    private void handleEntitiesContentRequest(VersionControlRequestCtx versionControlRequestCtx, TransportProtos.EntitiesContentRequestMsg entitiesContentRequestMsg) throws Exception {
        EntityType valueOf = EntityType.valueOf(entitiesContentRequestMsg.getEntityType());
        if (!entitiesContentRequestMsg.getIdsList().isEmpty()) {
            for (int i = 0; i < entitiesContentRequestMsg.getIdsList().size(); i++) {
                TransportProtos.EntityIdProto entityIdProto = (TransportProtos.EntityIdProto) entitiesContentRequestMsg.getIdsList().get(i);
                sendData(getRelativePath(EntityType.valueOf(entitiesContentRequestMsg.getEntityType()), new UUID(entityIdProto.getEntityIdMSB(), entityIdProto.getEntityIdLSB()).toString()), entitiesContentRequestMsg, versionControlRequestCtx, i, entitiesContentRequestMsg.getIdsCount());
            }
            return;
        }
        List list = (List) this.vcService.listEntitiesAtVersion(versionControlRequestCtx.getTenantId(), entitiesContentRequestMsg.getVersionId(), entitiesContentRequestMsg.getPath(), valueOf, entitiesContentRequestMsg.getGroups(), entitiesContentRequestMsg.getRecursive()).skip(entitiesContentRequestMsg.getOffset()).limit(entitiesContentRequestMsg.getLimit()).collect(Collectors.toList());
        if (list.isEmpty()) {
            reply(versionControlRequestCtx, Optional.empty(), builder -> {
                return builder.setEntitiesContentResponse(TransportProtos.EntitiesContentResponseMsg.newBuilder().setItemsCount(0));
            });
            return;
        }
        for (int i2 = 0; i2 < list.size(); i2++) {
            sendData(((VersionedEntityInfo) list.get(i2)).getPath(), entitiesContentRequestMsg, versionControlRequestCtx, i2, list.size());
        }
    }

    private void sendData(String str, TransportProtos.EntitiesContentRequestMsg entitiesContentRequestMsg, VersionControlRequestCtx versionControlRequestCtx, int i, int i2) throws IOException {
        Iterable split = StringUtils.split(this.vcService.getFileContentAtCommit(versionControlRequestCtx.getTenantId(), StringUtils.isNotEmpty(entitiesContentRequestMsg.getPath()) ? entitiesContentRequestMsg.getPath() + str : str, entitiesContentRequestMsg.getVersionId()), this.msgChunkSize);
        UUID.randomUUID().toString();
        int size = Iterables.size(split);
        AtomicInteger atomicInteger = new AtomicInteger();
        split.forEach(str2 -> {
            TransportProtos.EntitiesContentResponseMsg.Builder item = TransportProtos.EntitiesContentResponseMsg.newBuilder().setItemsCount(i2).setItemIdx(i).setItem(TransportProtos.EntityContentResponseMsg.newBuilder().setData(str2).setChunksCount(size).setChunkIndex(atomicInteger.getAndIncrement()).build());
            reply(versionControlRequestCtx, Optional.empty(), builder -> {
                return builder.setEntitiesContentResponse(item);
            });
        });
    }

    private void handleEntityContentRequest(VersionControlRequestCtx versionControlRequestCtx, TransportProtos.EntityContentRequestMsg entityContentRequestMsg) throws IOException {
        String path = StringUtils.isNotEmpty(entityContentRequestMsg.getPath()) ? entityContentRequestMsg.getPath() : "";
        if (StringUtils.isNotEmpty(entityContentRequestMsg.getEntityType())) {
            path = path + getRelativePath(EntityType.valueOf(entityContentRequestMsg.getEntityType()), new UUID(entityContentRequestMsg.getEntityIdMSB(), entityContentRequestMsg.getEntityIdLSB()).toString());
        }
        Iterable split = StringUtils.split(this.vcService.getFileContentAtCommit(versionControlRequestCtx.getTenantId(), path, entityContentRequestMsg.getVersionId()), this.msgChunkSize);
        String uuid = UUID.randomUUID().toString();
        int size = Iterables.size(split);
        AtomicInteger atomicInteger = new AtomicInteger();
        split.forEach(str -> {
            log.trace("[{}] sending chunk {} for 'getEntity'", uuid, Integer.valueOf(atomicInteger.get()));
            reply(versionControlRequestCtx, Optional.empty(), builder -> {
                return builder.setEntityContentResponse(TransportProtos.EntityContentResponseMsg.newBuilder().setData(str).setChunksCount(size).setChunkIndex(atomicInteger.getAndIncrement()));
            });
        });
    }

    private void handleListVersions(VersionControlRequestCtx versionControlRequestCtx, TransportProtos.ListVersionsRequestMsg listVersionsRequestMsg) throws Exception {
        String str;
        if (StringUtils.isNotEmpty(listVersionsRequestMsg.getEntityType())) {
            EntityType valueOf = EntityType.valueOf(listVersionsRequestMsg.getEntityType());
            str = (listVersionsRequestMsg.getEntityIdLSB() == 0 && listVersionsRequestMsg.getEntityIdMSB() == 0) ? getRelativePath(valueOf, null) : getRelativePath(valueOf, new UUID(listVersionsRequestMsg.getEntityIdMSB(), listVersionsRequestMsg.getEntityIdLSB()).toString());
        } else {
            str = null;
        }
        SortOrder sortOrder = null;
        if (StringUtils.isNotEmpty(listVersionsRequestMsg.getSortProperty())) {
            SortOrder.Direction direction = SortOrder.Direction.DESC;
            if (StringUtils.isNotEmpty(listVersionsRequestMsg.getSortDirection())) {
                direction = SortOrder.Direction.valueOf(listVersionsRequestMsg.getSortDirection());
            }
            sortOrder = new SortOrder(listVersionsRequestMsg.getSortProperty(), direction);
        }
        if (StringUtils.isNotEmpty(listVersionsRequestMsg.getPath())) {
            str = listVersionsRequestMsg.getPath() + str;
        }
        PageData<EntityVersion> listVersions = this.vcService.listVersions(versionControlRequestCtx.getTenantId(), listVersionsRequestMsg.getBranchName(), str, new PageLink(listVersionsRequestMsg.getPageSize(), listVersionsRequestMsg.getPage(), listVersionsRequestMsg.getTextSearch(), sortOrder));
        reply(versionControlRequestCtx, Optional.empty(), builder -> {
            return builder.setListVersionsResponse(TransportProtos.ListVersionsResponseMsg.newBuilder().setTotalPages(listVersions.getTotalPages()).setTotalElements(listVersions.getTotalElements()).setHasNext(listVersions.hasNext()).addAllVersions((Iterable) listVersions.getData().stream().map(entityVersion -> {
                return TransportProtos.EntityVersionProto.newBuilder().setTs(entityVersion.getTimestamp()).setId(entityVersion.getId()).setName(entityVersion.getName()).setAuthor(entityVersion.getAuthor()).build();
            }).collect(Collectors.toList())));
        });
    }

    private void handleListEntities(VersionControlRequestCtx versionControlRequestCtx, TransportProtos.ListEntitiesRequestMsg listEntitiesRequestMsg) throws Exception {
        Stream<VersionedEntityInfo> listEntitiesAtVersion = this.vcService.listEntitiesAtVersion(versionControlRequestCtx.getTenantId(), listEntitiesRequestMsg.getVersionId(), "", StringUtils.isNotEmpty(listEntitiesRequestMsg.getEntityType()) ? EntityType.valueOf(listEntitiesRequestMsg.getEntityType()) : null, false, false);
        reply(versionControlRequestCtx, Optional.empty(), builder -> {
            return builder.setListEntitiesResponse(TransportProtos.ListEntitiesResponseMsg.newBuilder().addAllEntities((Iterable) listEntitiesAtVersion.map((v0) -> {
                return v0.getExternalId();
            }).map(entityId -> {
                return TransportProtos.VersionedEntityInfoProto.newBuilder().setEntityType(entityId.getEntityType().name()).setEntityIdMSB(entityId.getId().getMostSignificantBits()).setEntityIdLSB(entityId.getId().getLeastSignificantBits()).build();
            }).collect(Collectors.toList())));
        });
    }

    private void handleListBranches(VersionControlRequestCtx versionControlRequestCtx, TransportProtos.ListBranchesRequestMsg listBranchesRequestMsg) {
        List list = (List) this.vcService.listBranches(versionControlRequestCtx.getTenantId()).stream().map(branchInfo -> {
            return TransportProtos.BranchInfoProto.newBuilder().setName(branchInfo.getName()).setIsDefault(branchInfo.isDefault()).build();
        }).collect(Collectors.toList());
        reply(versionControlRequestCtx, Optional.empty(), builder -> {
            return builder.setListBranchesResponse(TransportProtos.ListBranchesResponseMsg.newBuilder().addAllBranches(list));
        });
    }

    private void handleVersionsDiffRequest(VersionControlRequestCtx versionControlRequestCtx, TransportProtos.VersionsDiffRequestMsg versionsDiffRequestMsg) throws IOException {
        List list = (List) this.vcService.getVersionsDiffList(versionControlRequestCtx.getTenantId(), versionsDiffRequestMsg.getPath(), versionsDiffRequestMsg.getVersionId1(), versionsDiffRequestMsg.getVersionId2()).stream().map(diff -> {
            EntityId fromRelativePath = DefaultGitRepositoryService.fromRelativePath(diff.getFilePath());
            return TransportProtos.EntityVersionsDiff.newBuilder().setEntityType(fromRelativePath.getEntityType().name()).setEntityIdMSB(fromRelativePath.getId().getMostSignificantBits()).setEntityIdLSB(fromRelativePath.getId().getLeastSignificantBits()).setEntityDataAtVersion1(diff.getFileContentAtCommit1()).setEntityDataAtVersion2(diff.getFileContentAtCommit2()).setRawDiff(diff.getDiffStringValue()).build();
        }).collect(Collectors.toList());
        reply(versionControlRequestCtx, builder -> {
            return builder.setVersionsDiffResponse(TransportProtos.VersionsDiffResponseMsg.newBuilder().addAllDiff(list));
        });
    }

    private void handleCommitRequest(VersionControlRequestCtx versionControlRequestCtx, TransportProtos.CommitRequestMsg commitRequestMsg) throws Exception {
        TenantId tenantId = versionControlRequestCtx.getTenantId();
        UUID fromString = UUID.fromString(commitRequestMsg.getTxId());
        if (commitRequestMsg.hasPrepareMsg()) {
            this.vcService.fetch(versionControlRequestCtx.getTenantId());
            prepareCommit(versionControlRequestCtx, fromString, commitRequestMsg.getPrepareMsg());
            return;
        }
        if (commitRequestMsg.hasAbortMsg()) {
            PendingCommit pendingCommit = this.pendingCommitMap.get(tenantId);
            if (pendingCommit == null || !pendingCommit.getTxId().equals(fromString)) {
                return;
            }
            doAbortCurrentCommit(tenantId, pendingCommit);
            return;
        }
        PendingCommit pendingCommit2 = this.pendingCommitMap.get(tenantId);
        if (pendingCommit2 == null || !pendingCommit2.getTxId().equals(fromString)) {
            log.debug("[{}] Ignore request due to stale commit: {}", fromString, commitRequestMsg);
            return;
        }
        try {
            if (commitRequestMsg.hasAddMsg()) {
                addToCommit(versionControlRequestCtx, pendingCommit2, commitRequestMsg.getAddMsg());
            } else if (commitRequestMsg.hasDeleteMsg()) {
                deleteFromCommit(versionControlRequestCtx, pendingCommit2, commitRequestMsg.getDeleteMsg());
            } else if (commitRequestMsg.hasPushMsg()) {
                VersionCreationResult push = this.vcService.push(pendingCommit2);
                this.pendingCommitMap.remove(versionControlRequestCtx.getTenantId());
                reply(versionControlRequestCtx, push);
            }
        } catch (Exception e) {
            doAbortCurrentCommit(tenantId, pendingCommit2, e);
            throw e;
        }
    }

    private void prepareCommit(VersionControlRequestCtx versionControlRequestCtx, UUID uuid, TransportProtos.PrepareMsg prepareMsg) {
        TenantId tenantId = versionControlRequestCtx.getTenantId();
        PendingCommit pendingCommit = new PendingCommit(tenantId, versionControlRequestCtx.getNodeId(), uuid, prepareMsg.getBranchName(), prepareMsg.getCommitMsg(), prepareMsg.getAuthorName(), prepareMsg.getAuthorEmail());
        PendingCommit pendingCommit2 = this.pendingCommitMap.get(tenantId);
        if (pendingCommit2 != null) {
            doAbortCurrentCommit(tenantId, pendingCommit2);
        }
        this.pendingCommitMap.put(tenantId, pendingCommit);
        this.vcService.prepareCommit(pendingCommit);
    }

    private void deleteFromCommit(VersionControlRequestCtx versionControlRequestCtx, PendingCommit pendingCommit, TransportProtos.DeleteMsg deleteMsg) throws IOException {
        this.vcService.deleteFolderContent(pendingCommit, deleteMsg.getFolder(), deleteMsg.getRecursively());
    }

    private void addToCommit(VersionControlRequestCtx versionControlRequestCtx, PendingCommit pendingCommit, TransportProtos.AddMsg addMsg) throws IOException {
        log.trace("[{}] received chunk {} for 'addToCommit'", addMsg.getChunkedMsgId(), Integer.valueOf(addMsg.getChunkIndex()));
        Map<String, String[]> chunkedMsgs = pendingCommit.getChunkedMsgs();
        String[] computeIfAbsent = chunkedMsgs.computeIfAbsent(addMsg.getChunkedMsgId(), str -> {
            return new String[addMsg.getChunksCount()];
        });
        computeIfAbsent[addMsg.getChunkIndex()] = addMsg.getEntityDataJsonChunk();
        if (CollectionsUtil.countNonNull(computeIfAbsent) == computeIfAbsent.length) {
            log.trace("[{}] collected all chunks for 'addToCommit'", addMsg.getChunkedMsgId());
            String join = String.join("", computeIfAbsent);
            chunkedMsgs.remove(addMsg.getChunkedMsgId());
            this.vcService.add(pendingCommit, addMsg.getRelativePath(), join);
        }
    }

    private void doAbortCurrentCommit(TenantId tenantId, PendingCommit pendingCommit) {
        doAbortCurrentCommit(tenantId, pendingCommit, null);
    }

    private void doAbortCurrentCommit(TenantId tenantId, PendingCommit pendingCommit, Exception exc) {
        this.vcService.abort(pendingCommit);
        this.pendingCommitMap.remove(tenantId);
    }

    private void handleClearRepositoryCommand(VersionControlRequestCtx versionControlRequestCtx) {
        try {
            this.vcService.clearRepository(versionControlRequestCtx.getTenantId());
            reply(versionControlRequestCtx, Optional.empty());
        } catch (Exception e) {
            log.debug("[{}] Failed to connect to the repository: ", versionControlRequestCtx, e);
            reply(versionControlRequestCtx, Optional.of(e));
        }
    }

    private void handleInitRepositoryCommand(VersionControlRequestCtx versionControlRequestCtx) {
        try {
            this.vcService.initRepository(versionControlRequestCtx.getTenantId(), versionControlRequestCtx.getSettings());
            reply(versionControlRequestCtx, Optional.empty());
        } catch (Exception e) {
            log.debug("[{}] Failed to connect to the repository: ", versionControlRequestCtx, e);
            reply(versionControlRequestCtx, Optional.of(e));
        }
    }

    private void handleTestRepositoryCommand(VersionControlRequestCtx versionControlRequestCtx) {
        try {
            this.vcService.testRepository(versionControlRequestCtx.getTenantId(), versionControlRequestCtx.getSettings());
            reply(versionControlRequestCtx, Optional.empty());
        } catch (Exception e) {
            log.debug("[{}] Failed to connect to the repository: ", versionControlRequestCtx, e);
            reply(versionControlRequestCtx, Optional.of(e));
        }
    }

    private void reply(VersionControlRequestCtx versionControlRequestCtx, VersionCreationResult versionCreationResult) {
        TransportProtos.CommitResponseMsg.Builder removed = TransportProtos.CommitResponseMsg.newBuilder().setAdded(versionCreationResult.getAdded()).setModified(versionCreationResult.getModified()).setRemoved(versionCreationResult.getRemoved());
        if (versionCreationResult.getVersion() != null) {
            removed.setTs(versionCreationResult.getVersion().getTimestamp()).setCommitId(versionCreationResult.getVersion().getId()).setName(versionCreationResult.getVersion().getName()).setAuthor(versionCreationResult.getVersion().getAuthor());
        }
        reply(versionControlRequestCtx, Optional.empty(), builder -> {
            return builder.setCommitResponse(removed);
        });
    }

    private void reply(VersionControlRequestCtx versionControlRequestCtx, Optional<Exception> optional) {
        reply(versionControlRequestCtx, optional, null);
    }

    private void reply(VersionControlRequestCtx versionControlRequestCtx, Function<TransportProtos.VersionControlResponseMsg.Builder, TransportProtos.VersionControlResponseMsg.Builder> function) {
        reply(versionControlRequestCtx, Optional.empty(), function);
    }

    private void reply(VersionControlRequestCtx versionControlRequestCtx, Optional<Exception> optional, Function<TransportProtos.VersionControlResponseMsg.Builder, TransportProtos.VersionControlResponseMsg.Builder> function) {
        TopicPartitionInfo notificationsTopic = this.topicService.getNotificationsTopic(ServiceType.TB_CORE, versionControlRequestCtx.getNodeId());
        TransportProtos.VersionControlResponseMsg.Builder requestIdLSB = TransportProtos.VersionControlResponseMsg.newBuilder().setRequestIdMSB(versionControlRequestCtx.getRequestId().getMostSignificantBits()).setRequestIdLSB(versionControlRequestCtx.getRequestId().getLeastSignificantBits());
        if (optional.isPresent()) {
            log.debug("[{}][{}] Failed to process task", new Object[]{versionControlRequestCtx.getTenantId(), versionControlRequestCtx.getRequestId(), optional.get()});
            String message = optional.get().getMessage();
            requestIdLSB.setError(message != null ? message : optional.get().getClass().getSimpleName());
        } else {
            if (function != null) {
                requestIdLSB = function.apply(requestIdLSB);
            } else {
                requestIdLSB.setGenericResponse(TransportProtos.GenericRepositoryResponseMsg.newBuilder().build());
            }
            log.debug("[{}][{}] Processed task", versionControlRequestCtx.getTenantId(), versionControlRequestCtx.getRequestId());
        }
        TransportProtos.ToCoreNotificationMsg build = TransportProtos.ToCoreNotificationMsg.newBuilder().setVcResponseMsg(requestIdLSB).build();
        log.trace("[{}][{}] PUSHING reply: {} to: {}", new Object[]{versionControlRequestCtx.getTenantId(), versionControlRequestCtx.getRequestId(), build, notificationsTopic});
        this.producer.send(notificationsTopic, new TbProtoQueueMsg(UUID.randomUUID(), build), (TbQueueCallback) null);
    }

    private RepositorySettings getEntitiesVersionControlSettings(TransportProtos.ToVersionControlServiceMsg toVersionControlServiceMsg) {
        Optional decode = this.encodingService.decode(toVersionControlServiceMsg.getVcSettings().toByteArray());
        if (decode.isPresent()) {
            return (RepositorySettings) decode.get();
        }
        log.warn("Failed to parse VC settings: {}", toVersionControlServiceMsg.getVcSettings());
        throw new RuntimeException("Failed to parse vc settings!");
    }

    private String getRelativePath(EntityType entityType, String str) {
        String lowerCase = entityType.name().toLowerCase();
        if (str != null) {
            lowerCase = lowerCase + "/" + str + ".json";
        }
        return lowerCase;
    }

    private Lock getRepoLock(TenantId tenantId) {
        return this.tenantRepoLocks.computeIfAbsent(tenantId, tenantId2 -> {
            return new ReentrantLock(true);
        });
    }

    private void logTaskExecution(final VersionControlRequestCtx versionControlRequestCtx, ListenableFuture<Void> listenableFuture, final long j) {
        if (log.isTraceEnabled()) {
            Futures.addCallback(listenableFuture, new FutureCallback<Object>() { // from class: org.thingsboard.server.service.sync.vc.DefaultClusterVersionControlService.1
                public void onSuccess(Object obj) {
                    DefaultClusterVersionControlService.log.trace("[{}][{}] Task processing took: {}ms", new Object[]{versionControlRequestCtx.getTenantId(), versionControlRequestCtx.getRequestId(), Long.valueOf(System.currentTimeMillis() - j)});
                }

                public void onFailure(Throwable th) {
                    DefaultClusterVersionControlService.log.trace("[{}][{}] Task failed: ", new Object[]{versionControlRequestCtx.getTenantId(), versionControlRequestCtx.getRequestId(), th});
                }
            }, MoreExecutors.directExecutor());
        }
    }

    @ConstructorProperties({"partitionService", "producerProvider", "queueFactory", "encodingService", "vcService", "topicService"})
    public DefaultClusterVersionControlService(PartitionService partitionService, TbQueueProducerProvider tbQueueProducerProvider, TbVersionControlQueueFactory tbVersionControlQueueFactory, DataDecodingEncodingService dataDecodingEncodingService, GitRepositoryService gitRepositoryService, TopicService topicService) {
        this.partitionService = partitionService;
        this.producerProvider = tbQueueProducerProvider;
        this.queueFactory = tbVersionControlQueueFactory;
        this.encodingService = dataDecodingEncodingService;
        this.vcService = gitRepositoryService;
        this.topicService = topicService;
    }
}
