/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.operator.impl;

import io.ray.streaming.api.collector.Collector;
import io.ray.streaming.api.context.RuntimeContext;
import io.ray.streaming.api.function.impl.ReduceFunction;
import io.ray.streaming.message.KeyRecord;
import io.ray.streaming.message.Record;
import io.ray.streaming.operator.OneInputOperator;
import io.ray.streaming.operator.StreamOperator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ReduceOperator<K, T>
extends StreamOperator<ReduceFunction<T>>
implements OneInputOperator<T> {
    private Map<K, T> reduceState;

    public ReduceOperator(ReduceFunction<T> reduceFunction) {
        super(reduceFunction);
    }

    @Override
    public void open(List<Collector> collectorList, RuntimeContext runtimeContext) {
        super.open(collectorList, runtimeContext);
        this.reduceState = new HashMap<K, T>();
    }

    @Override
    public void processElement(Record<T> record) throws Exception {
        KeyRecord keyRecord = (KeyRecord)record;
        Object key = keyRecord.getKey();
        Object value = keyRecord.getValue();
        if (this.reduceState.containsKey(key)) {
            T oldValue = this.reduceState.get(key);
            T newValue = ((ReduceFunction)this.function).reduce(oldValue, value);
            this.reduceState.put(key, newValue);
            this.collect(new Record<T>(newValue));
        } else {
            this.reduceState.put(key, value);
            this.collect(record);
        }
    }
}

