/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.state.strategy;

import com.google.common.primitives.Longs;
import io.ray.streaming.state.StateException;
import io.ray.streaming.state.StorageRecord;
import io.ray.streaming.state.store.KeyValueStore;
import io.ray.streaming.state.strategy.AbstractStateStoreManager;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DualStateStoreManager<V>
extends AbstractStateStoreManager<V> {
    private static final Logger LOG = LoggerFactory.getLogger(DualStateStoreManager.class);

    public DualStateStoreManager(KeyValueStore<String, Map<Long, byte[]>> backStore) {
        super(backStore);
    }

    @Override
    public void finish(long checkpointId) {
        LOG.info("do finish checkpointId:{}", (Object)checkpointId);
        HashMap<String, byte[]> cpStore = new HashMap<String, byte[]>();
        for (Map.Entry entry : this.frontStore.entrySet()) {
            String key = (String)entry.getKey();
            StorageRecord value = (StorageRecord)entry.getValue();
            cpStore.put(key, this.toBytes(value));
        }
        this.middleStore.put(checkpointId, cpStore);
        this.frontStore.clear();
    }

    @Override
    public void commit(long checkpointId) {
        try {
            LOG.info("do commit checkpointId:{}", (Object)checkpointId);
            Map cpStore = (Map)this.middleStore.get(checkpointId);
            if (cpStore == null) {
                throw new StateException("why cp store is null");
            }
            for (Map.Entry entry : cpStore.entrySet()) {
                String key = (String)entry.getKey();
                byte[] value = (byte[])entry.getValue();
                Map<Long, byte[]> remoteData = this.kvStore.get(key);
                if (remoteData == null || remoteData.size() == 0) {
                    remoteData = new HashMap<Long, byte[]>();
                    remoteData.put(2L, value);
                    remoteData.put(-2L, Longs.toByteArray(checkpointId));
                } else {
                    long oldBatchId = Longs.fromByteArray(remoteData.get(-2L));
                    if (oldBatchId < checkpointId) {
                        remoteData.put(1L, remoteData.get(2L));
                        remoteData.put(-1L, remoteData.get(-2L));
                    }
                    remoteData.put(2L, value);
                    remoteData.put(-2L, Longs.toByteArray(checkpointId));
                }
                this.kvStore.put(key, remoteData);
            }
            this.kvStore.flush();
        }
        catch (Exception e) {
            LOG.error(e.getMessage(), e);
            throw new StateException(e);
        }
    }

    @Override
    public void rollBack(long checkpointId) {
        LOG.info("do rollBack checkpointId:{}", (Object)checkpointId);
        this.frontStore.clear();
        this.middleStore.clear();
        this.kvStore.clearCache();
    }

    @Override
    public V get(long checkpointId, String key) {
        StorageRecord storageRecord = (StorageRecord)this.frontStore.get(key);
        if (storageRecord != null) {
            return (V)storageRecord.getValue();
        }
        ArrayList checkpointIds = new ArrayList(this.middleStore.keySet());
        Collections.sort(checkpointIds);
        for (int i = checkpointIds.size() - 1; i >= 0; --i) {
            Map cpStore = (Map)this.middleStore.get(checkpointIds.get(i));
            if (cpStore == null || !cpStore.containsKey(key)) continue;
            byte[] cpData = (byte[])cpStore.get(key);
            storageRecord = this.toStorageRecord(cpData);
            return storageRecord.getValue();
        }
        try {
            Map<Long, byte[]> remoteData = this.kvStore.get(key);
            if (remoteData != null) {
                for (Map.Entry<Long, byte[]> entry : remoteData.entrySet()) {
                    StorageRecord tmp;
                    if (entry.getKey() <= 0L || (tmp = this.toStorageRecord(entry.getValue())).getCheckpointId() >= checkpointId) continue;
                    if (storageRecord == null) {
                        storageRecord = tmp;
                        continue;
                    }
                    if (storageRecord.getCheckpointId() >= tmp.getCheckpointId()) continue;
                    storageRecord = tmp;
                }
                if (storageRecord != null) {
                    return (V)storageRecord.getValue();
                }
            }
        }
        catch (Exception e) {
            LOG.error("get checkpointId:" + checkpointId + " key:" + key, e);
            throw new StateException(e);
        }
        return null;
    }

    @Override
    public void ackCommit(long checkpointId) {
        LOG.info("do ackCommit checkpointId:{}", (Object)checkpointId);
        this.middleStore.remove(checkpointId);
    }
}

