/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.transport.lwm2m.server.store;

import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import org.eclipse.californium.core.coap.Request;
import org.eclipse.californium.core.coap.Token;
import org.eclipse.californium.core.observe.Observation;
import org.eclipse.californium.core.observe.ObservationStoreException;
import org.eclipse.californium.elements.EndpointContext;
import org.eclipse.leshan.core.Destroyable;
import org.eclipse.leshan.core.Startable;
import org.eclipse.leshan.core.Stoppable;
import org.eclipse.leshan.core.californium.ObserveUtil;
import org.eclipse.leshan.core.observation.SingleObservation;
import org.eclipse.leshan.core.request.Identity;
import org.eclipse.leshan.core.util.NamedThreadFactory;
import org.eclipse.leshan.core.util.Validate;
import org.eclipse.leshan.server.californium.registration.CaliforniumRegistrationStore;
import org.eclipse.leshan.server.redis.RedisRegistrationStore;
import org.eclipse.leshan.server.redis.serialization.IdentitySerDes;
import org.eclipse.leshan.server.redis.serialization.ObservationSerDes;
import org.eclipse.leshan.server.redis.serialization.RegistrationSerDes;
import org.eclipse.leshan.server.registration.Deregistration;
import org.eclipse.leshan.server.registration.ExpirationListener;
import org.eclipse.leshan.server.registration.Registration;
import org.eclipse.leshan.server.registration.RegistrationUpdate;
import org.eclipse.leshan.server.registration.UpdatedRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.RedisClusterConnection;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.integration.redis.util.RedisLockRegistry;

public class TbLwM2mRedisRegistrationStore
implements CaliforniumRegistrationStore,
Startable,
Stoppable,
Destroyable {
    public static final long DEFAULT_CLEAN_PERIOD = 60L;
    public static final int DEFAULT_CLEAN_LIMIT = 500;
    public static final long DEFAULT_GRACE_PERIOD = 0L;
    private static final Logger LOG = LoggerFactory.getLogger(RedisRegistrationStore.class);
    private static final String REG_EP = "REG:EP:";
    private static final String REG_EP_REGID_IDX = "EP:REGID:";
    private static final String REG_EP_ADDR_IDX = "EP:ADDR:";
    private static final String REG_EP_IDENTITY = "EP:IDENTITY:";
    private static final String LOCK_EP = "LOCK:EP:";
    private static final byte[] OBS_TKN = "OBS:TKN:".getBytes(StandardCharsets.UTF_8);
    private static final String OBS_TKNS_REGID_IDX = "TKNS:REGID:";
    private static final byte[] EXP_EP = "EXP:EP".getBytes(StandardCharsets.UTF_8);
    private final RedisConnectionFactory connectionFactory;
    private ExpirationListener expirationListener;
    private final ScheduledExecutorService schedExecutor;
    private ScheduledFuture<?> cleanerTask;
    private boolean started = false;
    private final long cleanPeriod;
    private final int cleanLimit;
    private final long gracePeriod;
    private final RedisLockRegistry redisLock;

    public TbLwM2mRedisRegistrationStore(RedisConnectionFactory connectionFactory) {
        this(connectionFactory, 60L, 0L, 500);
    }

    public TbLwM2mRedisRegistrationStore(RedisConnectionFactory connectionFactory, long cleanPeriodInSec, long lifetimeGracePeriodInSec, int cleanLimit) {
        this(connectionFactory, Executors.newScheduledThreadPool(1, (ThreadFactory)new NamedThreadFactory(String.format("RedisRegistrationStore Cleaner (%ds)", cleanPeriodInSec))), cleanPeriodInSec, lifetimeGracePeriodInSec, cleanLimit);
    }

    public TbLwM2mRedisRegistrationStore(RedisConnectionFactory connectionFactory, ScheduledExecutorService schedExecutor, long cleanPeriodInSec, long lifetimeGracePeriodInSec, int cleanLimit) {
        this.connectionFactory = connectionFactory;
        this.schedExecutor = schedExecutor;
        this.cleanPeriod = cleanPeriodInSec;
        this.cleanLimit = cleanLimit;
        this.gracePeriod = lifetimeGracePeriodInSec;
        this.redisLock = new RedisLockRegistry(connectionFactory, "Registration");
    }

    private byte[] toKey(byte[] prefix, byte[] key) {
        byte[] result = new byte[prefix.length + key.length];
        System.arraycopy(prefix, 0, result, 0, prefix.length);
        System.arraycopy(key, 0, result, prefix.length, key.length);
        return result;
    }

    private byte[] toKey(String prefix, String registrationID) {
        return (prefix + registrationID).getBytes();
    }

    private String toLockKey(String endpoint) {
        return new String(this.toKey(LOCK_EP, endpoint));
    }

    private String toLockKey(byte[] endpoint) {
        return new String(this.toKey(LOCK_EP.getBytes(StandardCharsets.UTF_8), endpoint));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Deregistration addRegistration(Registration registration) {
        Lock lock = null;
        try (RedisConnection connection = this.connectionFactory.getConnection();){
            block17: {
                String lockKey = this.toLockKey(registration.getEndpoint());
                try {
                    lock = this.redisLock.obtain((Object)lockKey);
                    lock.lock();
                    byte[] k = this.toEndpointKey(registration.getEndpoint());
                    byte[] old = connection.getSet(k, this.serializeReg(registration));
                    byte[] regid_idx = this.toRegIdKey(registration.getId());
                    connection.set(regid_idx, registration.getEndpoint().getBytes(StandardCharsets.UTF_8));
                    byte[] addr_idx = this.toRegAddrKey(registration.getSocketAddress());
                    connection.set(addr_idx, registration.getEndpoint().getBytes(StandardCharsets.UTF_8));
                    byte[] identity_idx = this.toRegIdentityKey(registration.getIdentity());
                    connection.set(identity_idx, registration.getEndpoint().getBytes(StandardCharsets.UTF_8));
                    this.addOrUpdateExpiration(connection, registration);
                    if (old == null) break block17;
                    Registration oldRegistration = this.deserializeReg(old);
                    if (!registration.getId().equals(oldRegistration.getId())) {
                        connection.del((byte[][])new byte[][]{this.toRegIdKey(oldRegistration.getId())});
                    }
                    if (!oldRegistration.getSocketAddress().equals(registration.getSocketAddress())) {
                        this.removeAddrIndex(connection, oldRegistration);
                    }
                    if (!oldRegistration.getIdentity().equals((Object)registration.getIdentity())) {
                        this.removeIdentityIndex(connection, oldRegistration);
                    }
                    Collection<org.eclipse.leshan.core.observation.Observation> obsRemoved = this.unsafeRemoveAllObservations(connection, oldRegistration.getId());
                    Deregistration deregistration = new Deregistration(oldRegistration, obsRemoved);
                    if (lock != null) {
                        lock.unlock();
                    }
                    return deregistration;
                }
                catch (Throwable throwable) {
                    if (lock != null) {
                        lock.unlock();
                    }
                    throw throwable;
                }
            }
            Deregistration deregistration = null;
            if (lock != null) {
                lock.unlock();
            }
            return deregistration;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public UpdatedRegistration updateRegistration(RegistrationUpdate update) {
        Lock lock = null;
        try (RedisConnection connection = this.connectionFactory.getConnection();){
            byte[] data;
            block18: {
                byte[] ep = connection.get(this.toRegIdKey(update.getRegistrationId()));
                if (ep == null) {
                    UpdatedRegistration updatedRegistration = null;
                    return updatedRegistration;
                }
                String lockKey = this.toLockKey(ep);
                try {
                    lock = this.redisLock.obtain((Object)lockKey);
                    lock.lock();
                    data = connection.get(this.toEndpointKey(ep));
                    if (data != null) break block18;
                    UpdatedRegistration updatedRegistration = null;
                    if (lock != null) {
                        lock.unlock();
                    }
                    return updatedRegistration;
                }
                catch (Throwable throwable) {
                    if (lock != null) {
                        lock.unlock();
                    }
                    throw throwable;
                }
            }
            Registration r = this.deserializeReg(data);
            Registration updatedRegistration = update.update(r);
            connection.set(this.toEndpointKey(updatedRegistration.getEndpoint()), this.serializeReg(updatedRegistration));
            this.addOrUpdateExpiration(connection, updatedRegistration);
            byte[] addr_idx = this.toRegAddrKey(updatedRegistration.getSocketAddress());
            connection.set(addr_idx, updatedRegistration.getEndpoint().getBytes(StandardCharsets.UTF_8));
            if (!r.getSocketAddress().equals(updatedRegistration.getSocketAddress())) {
                this.removeAddrIndex(connection, r);
            }
            if (!r.getIdentity().equals((Object)updatedRegistration.getIdentity())) {
                this.removeIdentityIndex(connection, r);
            }
            UpdatedRegistration updatedRegistration2 = new UpdatedRegistration(r, updatedRegistration);
            if (lock != null) {
                lock.unlock();
            }
            return updatedRegistration2;
        }
    }

    public Registration getRegistration(String registrationId) {
        try (RedisConnection connection = this.connectionFactory.getConnection();){
            Registration registration = this.getRegistration(connection, registrationId);
            return registration;
        }
    }

    public Registration getRegistrationByEndpoint(String endpoint) {
        Validate.notNull((Object)endpoint);
        try (RedisConnection connection = this.connectionFactory.getConnection();){
            byte[] data = connection.get(this.toEndpointKey(endpoint));
            if (data == null) {
                Registration registration = null;
                return registration;
            }
            Registration registration = this.deserializeReg(data);
            return registration;
        }
    }

    public Registration getRegistrationByAdress(InetSocketAddress address) {
        Validate.notNull((Object)address);
        try (RedisConnection connection = this.connectionFactory.getConnection();){
            byte[] ep = connection.get(this.toRegAddrKey(address));
            if (ep == null) {
                Registration registration = null;
                return registration;
            }
            byte[] data = connection.get(this.toEndpointKey(ep));
            if (data == null) {
                Registration registration = null;
                return registration;
            }
            Registration registration = this.deserializeReg(data);
            return registration;
        }
    }

    public Registration getRegistrationByIdentity(Identity identity) {
        Validate.notNull((Object)identity);
        try (RedisConnection connection = this.connectionFactory.getConnection();){
            byte[] ep = connection.get(this.toRegIdentityKey(identity));
            if (ep == null) {
                Registration registration = null;
                return registration;
            }
            byte[] data = connection.get(this.toEndpointKey(ep));
            if (data == null) {
                Registration registration = null;
                return registration;
            }
            Registration registration = this.deserializeReg(data);
            return registration;
        }
    }

    public Iterator<Registration> getAllRegistrations() {
        try (RedisConnection connection = this.connectionFactory.getConnection();){
            LinkedList list = new LinkedList();
            ScanOptions scanOptions = ScanOptions.scanOptions().count(100L).match("REG:EP:*").build();
            ArrayList<Cursor> scans = new ArrayList<Cursor>();
            if (connection instanceof RedisClusterConnection) {
                ((RedisClusterConnection)connection).clusterGetNodes().forEach(node -> scans.add(((RedisClusterConnection)connection).scan(node, scanOptions)));
            } else {
                scans.add(connection.scan(scanOptions));
            }
            scans.forEach(scan -> scan.forEachRemaining(key -> {
                byte[] element = connection.get(key);
                list.add(this.deserializeReg(element));
            }));
            Iterator<Registration> iterator = list.iterator();
            return iterator;
        }
    }

    public Deregistration removeRegistration(String registrationId) {
        try (RedisConnection connection = this.connectionFactory.getConnection();){
            Deregistration deregistration = this.removeRegistration(connection, registrationId, false);
            return deregistration;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Deregistration removeRegistration(RedisConnection connection, String registrationId, boolean removeOnlyIfNotAlive) {
        byte[] ep = connection.get(this.toRegIdKey(registrationId));
        if (ep == null) {
            return null;
        }
        Lock lock = null;
        String lockKey = this.toLockKey(ep);
        try {
            long nbRemoved;
            lock = this.redisLock.obtain((Object)lockKey);
            lock.lock();
            byte[] data = connection.get(this.toEndpointKey(ep));
            if (data == null) {
                Deregistration deregistration = null;
                return deregistration;
            }
            Registration r = this.deserializeReg(data);
            if (!(removeOnlyIfNotAlive && r.isAlive(this.gracePeriod) || (nbRemoved = connection.del((byte[][])new byte[][]{this.toRegIdKey(r.getId())}).longValue()) <= 0L)) {
                connection.del((byte[][])new byte[][]{this.toEndpointKey(r.getEndpoint())});
                Collection<org.eclipse.leshan.core.observation.Observation> obsRemoved = this.unsafeRemoveAllObservations(connection, r.getId());
                this.removeAddrIndex(connection, r);
                this.removeIdentityIndex(connection, r);
                this.removeExpiration(connection, r);
                Deregistration deregistration = new Deregistration(r, obsRemoved);
                return deregistration;
            }
            Deregistration deregistration = null;
            return deregistration;
        }
        finally {
            if (lock != null) {
                lock.unlock();
            }
        }
    }

    private void removeAddrIndex(RedisConnection connection, Registration r) {
        this.removeSecondaryIndex(connection, this.toRegAddrKey(r.getSocketAddress()), r.getEndpoint());
    }

    private void removeIdentityIndex(RedisConnection connection, Registration r) {
        this.removeSecondaryIndex(connection, this.toRegIdentityKey(r.getIdentity()), r.getEndpoint());
    }

    private void removeSecondaryIndex(RedisConnection connection, byte[] indexKey, String endpointName) {
        byte[] epFromAddr = connection.get(indexKey);
        if (Arrays.equals(epFromAddr, endpointName.getBytes(StandardCharsets.UTF_8))) {
            connection.del((byte[][])new byte[][]{indexKey});
        }
    }

    private void addOrUpdateExpiration(RedisConnection connection, Registration registration) {
        connection.zAdd(EXP_EP, (double)registration.getExpirationTimeStamp(this.gracePeriod), registration.getEndpoint().getBytes(StandardCharsets.UTF_8));
    }

    private void removeExpiration(RedisConnection connection, Registration registration) {
        connection.zRem(EXP_EP, (byte[][])new byte[][]{registration.getEndpoint().getBytes(StandardCharsets.UTF_8)});
    }

    private byte[] toRegIdKey(String registrationId) {
        return this.toKey(REG_EP_REGID_IDX, registrationId);
    }

    private byte[] toRegAddrKey(InetSocketAddress addr) {
        return this.toKey(REG_EP_ADDR_IDX, addr.getAddress().toString() + ":" + addr.getPort());
    }

    private byte[] toRegIdentityKey(Identity identity) {
        return this.toKey(REG_EP_IDENTITY, IdentitySerDes.serialize((Identity)identity).toString());
    }

    private byte[] toEndpointKey(String endpoint) {
        return this.toKey(REG_EP, endpoint);
    }

    private byte[] toEndpointKey(byte[] endpoint) {
        return this.toKey(REG_EP.getBytes(StandardCharsets.UTF_8), endpoint);
    }

    private byte[] serializeReg(Registration registration) {
        return RegistrationSerDes.bSerialize((Registration)registration);
    }

    private Registration deserializeReg(byte[] data) {
        return RegistrationSerDes.deserialize((byte[])data);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Collection<org.eclipse.leshan.core.observation.Observation> addObservation(String registrationId, org.eclipse.leshan.core.observation.Observation observation) {
        ArrayList<org.eclipse.leshan.core.observation.Observation> removed = new ArrayList<org.eclipse.leshan.core.observation.Observation>();
        try (RedisConnection connection = this.connectionFactory.getConnection();){
            byte[] ep = connection.get(this.toRegIdKey(registrationId));
            if (ep == null) {
                Collection<org.eclipse.leshan.core.observation.Observation> collection = null;
                return collection;
            }
            Lock lock = null;
            String lockKey = this.toLockKey(ep);
            try {
                lock = this.redisLock.obtain((Object)lockKey);
                lock.lock();
                for (org.eclipse.leshan.core.observation.Observation obs : this.getObservations(connection, registrationId)) {
                    if (!((SingleObservation)observation).getPath().equals((Object)((SingleObservation)obs).getPath()) || Arrays.equals(observation.getId(), obs.getId())) continue;
                    removed.add(obs);
                    this.unsafeRemoveObservation(connection, registrationId, obs.getId());
                }
            }
            finally {
                if (lock != null) {
                    lock.unlock();
                }
            }
        }
        return removed;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public org.eclipse.leshan.core.observation.Observation removeObservation(String registrationId, byte[] observationId) {
        try (RedisConnection connection = this.connectionFactory.getConnection();){
            Lock lock;
            block16: {
                byte[] ep = connection.get(this.toRegIdKey(registrationId));
                if (ep == null) {
                    org.eclipse.leshan.core.observation.Observation observation = null;
                    return observation;
                }
                lock = null;
                String lockKey = this.toLockKey(ep);
                try {
                    lock = this.redisLock.obtain((Object)lockKey);
                    lock.lock();
                    org.eclipse.leshan.core.observation.Observation observation = this.build(this.get(new Token(observationId)));
                    if (observation == null || !registrationId.equals(observation.getRegistrationId())) break block16;
                    this.unsafeRemoveObservation(connection, registrationId, observationId);
                    org.eclipse.leshan.core.observation.Observation observation2 = observation;
                    if (lock != null) {
                        lock.unlock();
                    }
                    return observation2;
                }
                catch (Throwable throwable) {
                    if (lock != null) {
                        lock.unlock();
                    }
                    throw throwable;
                }
            }
            org.eclipse.leshan.core.observation.Observation observation = null;
            if (lock != null) {
                lock.unlock();
            }
            return observation;
        }
    }

    public org.eclipse.leshan.core.observation.Observation getObservation(String registrationId, byte[] observationId) {
        return this.build(this.get(new Token(observationId)));
    }

    public Collection<org.eclipse.leshan.core.observation.Observation> getObservations(String registrationId) {
        try (RedisConnection connection = this.connectionFactory.getConnection();){
            Collection<org.eclipse.leshan.core.observation.Observation> collection = this.getObservations(connection, registrationId);
            return collection;
        }
    }

    private Collection<org.eclipse.leshan.core.observation.Observation> getObservations(RedisConnection connection, String registrationId) {
        ArrayList<org.eclipse.leshan.core.observation.Observation> result = new ArrayList<org.eclipse.leshan.core.observation.Observation>();
        for (byte[] token : connection.lRange(this.toKey(OBS_TKNS_REGID_IDX, registrationId), 0L, -1L)) {
            byte[] obs = connection.get(this.toKey(OBS_TKN, token));
            if (obs == null) continue;
            result.add(this.build(this.deserializeObs(obs)));
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Collection<org.eclipse.leshan.core.observation.Observation> removeObservations(String registrationId) {
        try (RedisConnection connection = this.connectionFactory.getConnection();){
            Registration registration = this.getRegistration(connection, registrationId);
            if (registration == null) {
                List<org.eclipse.leshan.core.observation.Observation> list = Collections.emptyList();
                return list;
            }
            String endpoint = registration.getEndpoint();
            Lock lock = null;
            String lockKey = this.toLockKey(endpoint);
            try {
                lock = this.redisLock.obtain((Object)lockKey);
                lock.lock();
                Collection<org.eclipse.leshan.core.observation.Observation> collection = this.unsafeRemoveAllObservations(connection, registrationId);
                if (lock != null) {
                    lock.unlock();
                }
                return collection;
            }
            catch (Throwable throwable) {
                if (lock != null) {
                    lock.unlock();
                }
                throw throwable;
            }
        }
    }

    public Observation putIfAbsent(Token token, Observation obs) throws ObservationStoreException {
        return this.add(obs, true);
    }

    public Observation put(Token token, Observation obs) throws ObservationStoreException {
        return this.add(obs, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private Observation add(Observation obs, boolean ifAbsent) throws ObservationStoreException {
        String endpoint = ObserveUtil.validateCoapObservation((Observation)obs);
        Observation previousObservation = null;
        try (RedisConnection connection = this.connectionFactory.getConnection();){
            Lock lock = null;
            String lockKey = this.toLockKey(endpoint);
            try {
                byte[] previousValue;
                lock = this.redisLock.obtain((Object)lockKey);
                lock.lock();
                String registrationId = ObserveUtil.extractRegistrationId((Observation)obs);
                if (!connection.exists(this.toRegIdKey(registrationId)).booleanValue()) {
                    throw new ObservationStoreException("no registration for this Id");
                }
                byte[] key = this.toKey(OBS_TKN, obs.getRequest().getToken().getBytes());
                byte[] serializeObs = this.serializeObs(obs);
                if (ifAbsent) {
                    previousValue = connection.get(key);
                    if (previousValue != null && previousValue.length != 0) {
                        Observation observation = this.deserializeObs(previousValue);
                        return observation;
                    }
                    connection.set(key, serializeObs);
                } else {
                    previousValue = connection.getSet(key, serializeObs);
                }
                connection.lPush(this.toKey(OBS_TKNS_REGID_IDX, registrationId), (byte[][])new byte[][]{obs.getRequest().getToken().getBytes()});
                if (previousValue == null) return previousObservation;
                if (previousValue.length == 0) return previousObservation;
                previousObservation = this.deserializeObs(previousValue);
                LOG.warn("Token collision ? observation from request [{}] will be replaced by observation from request [{}] ", (Object)previousObservation.getRequest(), (Object)obs.getRequest());
                return previousObservation;
            }
            finally {
                if (lock != null) {
                    lock.unlock();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void remove(Token token) {
        try (RedisConnection connection = this.connectionFactory.getConnection();){
            byte[] tokenKey = this.toKey(OBS_TKN, token.getBytes());
            byte[] serializedObs = connection.get(tokenKey);
            if (serializedObs == null) {
                return;
            }
            Observation obs = this.deserializeObs(serializedObs);
            String registrationId = ObserveUtil.extractRegistrationId((Observation)obs);
            Registration registration = this.getRegistration(connection, registrationId);
            if (registration == null) {
                LOG.warn("Unable to remove observation {}, registration {} does not exist anymore", (Object)obs.getRequest(), (Object)registrationId);
                return;
            }
            String endpoint = registration.getEndpoint();
            Lock lock = null;
            String lockKey = this.toLockKey(endpoint);
            try {
                lock = this.redisLock.obtain((Object)lockKey);
                lock.lock();
                this.unsafeRemoveObservation(connection, registrationId, token.getBytes());
            }
            finally {
                if (lock != null) {
                    lock.unlock();
                }
            }
        }
    }

    public Observation get(Token token) {
        try (RedisConnection connection = this.connectionFactory.getConnection();){
            byte[] obs = connection.get(this.toKey(OBS_TKN, token.getBytes()));
            if (obs == null) {
                Observation observation = null;
                return observation;
            }
            Observation observation = this.deserializeObs(obs);
            return observation;
        }
    }

    private Registration getRegistration(RedisConnection connection, String registrationId) {
        byte[] ep = connection.get(this.toRegIdKey(registrationId));
        if (ep == null) {
            return null;
        }
        byte[] data = connection.get(this.toEndpointKey(ep));
        if (data == null) {
            return null;
        }
        return this.deserializeReg(data);
    }

    private void unsafeRemoveObservation(RedisConnection connection, String registrationId, byte[] observationId) {
        if (connection.del((byte[][])new byte[][]{this.toKey(OBS_TKN, observationId)}) > 0L) {
            connection.lRem(this.toKey(OBS_TKNS_REGID_IDX, registrationId), 0L, observationId);
        }
    }

    private Collection<org.eclipse.leshan.core.observation.Observation> unsafeRemoveAllObservations(RedisConnection connection, String registrationId) {
        ArrayList<org.eclipse.leshan.core.observation.Observation> removed = new ArrayList<org.eclipse.leshan.core.observation.Observation>();
        byte[] regIdKey = this.toKey(OBS_TKNS_REGID_IDX, registrationId);
        for (byte[] token : connection.lRange(regIdKey, 0L, -1L)) {
            byte[] obs = connection.get(this.toKey(OBS_TKN, token));
            if (obs != null) {
                removed.add(this.build(this.deserializeObs(obs)));
            }
            connection.del((byte[][])new byte[][]{this.toKey(OBS_TKN, token)});
        }
        connection.del((byte[][])new byte[][]{regIdKey});
        return removed;
    }

    public void setContext(Token token, EndpointContext correlationContext) {
    }

    private byte[] serializeObs(Observation obs) {
        return ObservationSerDes.serialize((Observation)obs);
    }

    private Observation deserializeObs(byte[] data) {
        return ObservationSerDes.deserialize((byte[])data);
    }

    private org.eclipse.leshan.core.observation.Observation build(Observation cfObs) {
        if (cfObs == null) {
            return null;
        }
        return ObserveUtil.createLwM2mObservation((Request)cfObs.getRequest());
    }

    public synchronized void start() {
        if (!this.started) {
            this.started = true;
            this.cleanerTask = this.schedExecutor.scheduleAtFixedRate(new Cleaner(), this.cleanPeriod, this.cleanPeriod, TimeUnit.SECONDS);
        }
    }

    public synchronized void stop() {
        if (this.started) {
            this.started = false;
            if (this.cleanerTask != null) {
                this.cleanerTask.cancel(false);
                this.cleanerTask = null;
            }
        }
    }

    public synchronized void destroy() {
        this.started = false;
        this.schedExecutor.shutdownNow();
        try {
            this.schedExecutor.awaitTermination(5L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            LOG.warn("Destroying RedisRegistrationStore was interrupted.", (Throwable)e);
        }
    }

    public void setExpirationListener(ExpirationListener listener) {
        this.expirationListener = listener;
    }

    public void setExecutor(ScheduledExecutorService executor) {
    }

    private class Cleaner
    implements Runnable {
        private Cleaner() {
        }

        @Override
        public void run() {
            try (RedisConnection connection = TbLwM2mRedisRegistrationStore.this.connectionFactory.getConnection();){
                Set endpointsExpired = connection.zRangeByScore(EXP_EP, Double.NEGATIVE_INFINITY, (double)System.currentTimeMillis(), 0L, (long)TbLwM2mRedisRegistrationStore.this.cleanLimit);
                for (byte[] endpoint : endpointsExpired) {
                    Deregistration dereg;
                    Registration r = TbLwM2mRedisRegistrationStore.this.deserializeReg(connection.get(TbLwM2mRedisRegistrationStore.this.toEndpointKey(endpoint)));
                    if (r.isAlive(TbLwM2mRedisRegistrationStore.this.gracePeriod) || (dereg = TbLwM2mRedisRegistrationStore.this.removeRegistration(connection, r.getId(), true)) == null) continue;
                    TbLwM2mRedisRegistrationStore.this.expirationListener.registrationExpired(dereg.getRegistration(), dereg.getObservations());
                }
            }
            catch (Exception e) {
                LOG.warn("Unexpected Exception while registration cleaning", (Throwable)e);
            }
        }
    }
}

