/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.state.strategy;

import io.ray.streaming.state.StateException;
import io.ray.streaming.state.StorageRecord;
import io.ray.streaming.state.store.KeyValueStore;
import io.ray.streaming.state.strategy.AbstractStateStoreManager;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class MVStateStoreManager<V>
extends AbstractStateStoreManager<V> {
    public MVStateStoreManager(KeyValueStore<String, Map<Long, byte[]>> kvStore) {
        super(kvStore);
    }

    @Override
    public void finish(long checkpointId) {
        HashMap<String, byte[]> currentStateRecords = new HashMap<String, byte[]>();
        for (Map.Entry entry : this.frontStore.entrySet()) {
            currentStateRecords.put((String)entry.getKey(), this.toBytes((StorageRecord)entry.getValue()));
        }
        this.middleStore.put(checkpointId, currentStateRecords);
        this.frontStore.clear();
    }

    @Override
    public void commit(long checkpointId) {
        ArrayList checkpointIds = new ArrayList(this.middleStore.keySet());
        Collections.sort(checkpointIds);
        for (int i = checkpointIds.size() - 1; i >= 0; --i) {
            long commitBatchId = (Long)checkpointIds.get(i);
            if (commitBatchId > checkpointId) continue;
            Map commitRecords = (Map)this.middleStore.get(commitBatchId);
            try {
                for (Map.Entry entry : commitRecords.entrySet()) {
                    HashMap<Long, byte[]> remoteData = (HashMap<Long, byte[]>)this.kvStore.get((String)entry.getKey());
                    if (remoteData == null) {
                        remoteData = new HashMap<Long, byte[]>();
                    }
                    remoteData.put(commitBatchId, (byte[])entry.getValue());
                    this.kvStore.put((String)entry.getKey(), remoteData);
                }
                this.kvStore.flush();
                continue;
            }
            catch (Exception e) {
                throw new StateException(e);
            }
        }
    }

    @Override
    public void rollBack(long checkpointId) {
        this.frontStore.clear();
        this.middleStore.clear();
        this.kvStore.clearCache();
    }

    @Override
    public V get(long checkpointId, String key) {
        StorageRecord valueArray = (StorageRecord)this.frontStore.get(key);
        if (valueArray != null) {
            return (V)valueArray.getValue();
        }
        ArrayList checkpointIds = new ArrayList(this.middleStore.keySet());
        Collections.sort(checkpointIds);
        for (int i = checkpointIds.size() - 1; i >= 0; --i) {
            Map records;
            if ((Long)checkpointIds.get(i) > checkpointId || (records = (Map)this.middleStore.get(checkpointIds.get(i))) == null || !records.containsKey(key)) continue;
            byte[] bytes = (byte[])records.get(key);
            return this.toStorageRecord(bytes).getValue();
        }
        try {
            Map remoteData = (Map)this.kvStore.get(key);
            if (remoteData != null) {
                checkpointIds = new ArrayList(remoteData.keySet());
                Collections.sort(checkpointIds);
                for (int i = checkpointIds.size() - 1; i >= 0; --i) {
                    if ((Long)checkpointIds.get(i) > checkpointId) continue;
                    byte[] bytes = (byte[])remoteData.get(checkpointIds.get(i));
                    return this.toStorageRecord(bytes).getValue();
                }
            }
        }
        catch (Exception e) {
            throw new StateException(e);
        }
        return null;
    }

    @Override
    public void put(long checkpointId, String k, V v) {
        this.frontStore.put(k, new StorageRecord<V>(checkpointId, v));
    }

    @Override
    public void ackCommit(long checkpointId) {
        ArrayList checkpointIds = new ArrayList(this.middleStore.keySet());
        Collections.sort(checkpointIds);
        for (int i = checkpointIds.size() - 1; i >= 0; --i) {
            long commitBatchId = (Long)checkpointIds.get(i);
            if (commitBatchId > checkpointId) continue;
            this.middleStore.remove(commitBatchId);
        }
    }
}

