/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.state.strategy;

import io.ray.streaming.state.StateException;
import io.ray.streaming.state.StateStoreManager;
import io.ray.streaming.state.StorageRecord;
import io.ray.streaming.state.serialization.Serializer;
import io.ray.streaming.state.store.KeyValueStore;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public abstract class AbstractStateStoreManager<V>
implements StateStoreManager {
    protected Map<String, StorageRecord<V>> frontStore = new ConcurrentHashMap<String, StorageRecord<V>>();
    protected KeyValueStore<String, Map<Long, byte[]>> kvStore;
    protected Map<Long, Map<String, byte[]>> middleStore = new ConcurrentHashMap<Long, Map<String, byte[]>>();
    protected int keyGroupIndex = -1;

    public AbstractStateStoreManager(KeyValueStore<String, Map<Long, byte[]>> backStore) {
        this.kvStore = backStore;
    }

    public byte[] toBytes(StorageRecord storageRecord) {
        return Serializer.object2Bytes(storageRecord);
    }

    public StorageRecord<V> toStorageRecord(byte[] data) {
        return (StorageRecord)Serializer.bytes2Object(data);
    }

    public abstract V get(long var1, String var3);

    public void put(long checkpointId, String k, V v) {
        this.frontStore.put(k, new StorageRecord<V>(checkpointId, v));
    }

    @Override
    public void ackCommit(long checkpointId, long timeStamp) {
        this.ackCommit(checkpointId);
    }

    public abstract void ackCommit(long var1);

    public void setKeyGroupIndex(int keyGroupIndex) {
        this.keyGroupIndex = keyGroupIndex;
    }

    public void close() {
        this.frontStore.clear();
        this.middleStore.clear();
        if (this.kvStore != null) {
            this.kvStore.clearCache();
            try {
                this.kvStore.close();
            }
            catch (IOException e) {
                throw new StateException(e);
            }
        }
    }
}

