/*
 * Decompiled with CFR 0.152.
 */
package weka.knowledgeflow.steps;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import weka.core.Attribute;
import weka.core.DenseInstance;
import weka.core.Instance;
import weka.core.Instances;
import weka.core.Range;
import weka.core.SerializedObject;
import weka.core.WekaException;
import weka.knowledgeflow.Data;
import weka.knowledgeflow.StepManager;
import weka.knowledgeflow.steps.BaseStep;
import weka.knowledgeflow.steps.KFStep;
import weka.knowledgeflow.steps.Sorter;

@KFStep(name="Join", category="Flow", toolTipText="Performs an inner join on two incoming datasets/instance streams (IMPORTANT: assumes that both datasets are sorted in ascending order of the key fields). If data is not sorted then usea Sorter step to sort both into ascending order of the key fields. Does not handle the case wherekeys are not unique in one or both inputs.", iconPath="weka/gui/knowledgeflow/icons/Join.gif")
public class Join
extends BaseStep {
    public static final String KEY_SPEC_SEPARATOR = "@@KS@@";
    private static final long serialVersionUID = -8248954818247532014L;
    protected StepManager m_firstInput;
    protected StepManager m_secondInput;
    protected transient boolean m_firstFinished;
    protected transient boolean m_secondFinished;
    protected String m_firstInputConnectionType = "";
    protected String m_secondInputConnectionType = "";
    protected transient Queue<Sorter.InstanceHolder> m_firstBuffer;
    protected transient Queue<Sorter.InstanceHolder> m_secondBuffer;
    protected Data m_streamingData;
    protected transient Instances m_headerOne;
    protected transient Instances m_headerTwo;
    protected transient Instances m_mergedHeader;
    protected transient List<Instances> m_headerPool;
    protected transient AtomicInteger m_count;
    protected boolean m_stringAttsPresent;
    protected boolean m_runningIncrementally;
    protected int[] m_keyIndexesOne;
    protected int[] m_keyIndexesTwo;
    protected String m_keySpec = "";
    protected Map<String, Integer> m_stringAttIndexesOne;
    protected Map<String, Integer> m_stringAttIndexesTwo;
    protected boolean m_firstIsWaiting;
    protected boolean m_secondIsWaiting;

    public void setKeySpec(String ks) {
        this.m_keySpec = ks;
    }

    public String getKeySpec() {
        return this.m_keySpec;
    }

    public List<String> getConnectedInputNames() {
        this.establishFirstAndSecondConnectedInputs();
        ArrayList<String> connected = new ArrayList<String>();
        connected.add(this.m_firstInput != null ? this.m_firstInput.getName() : null);
        connected.add(this.m_secondInput != null ? this.m_secondInput.getName() : null);
        return connected;
    }

    public Instances getFirstInputStructure() throws WekaException {
        if (this.m_firstInput == null) {
            this.establishFirstAndSecondConnectedInputs();
        }
        if (this.m_firstInput != null) {
            return this.getStepManager().getIncomingStructureFromStep(this.m_firstInput, this.m_firstInputConnectionType);
        }
        return null;
    }

    public Instances getSecondInputStructure() throws WekaException {
        if (this.m_secondInput == null) {
            this.establishFirstAndSecondConnectedInputs();
        }
        if (this.m_secondInput != null) {
            return this.getStepManager().getIncomingStructureFromStep(this.m_secondInput, this.m_secondInputConnectionType);
        }
        return null;
    }

    protected void establishFirstAndSecondConnectedInputs() {
        this.m_firstInput = null;
        this.m_secondInput = null;
        block0: for (Map.Entry<String, List<StepManager>> e : this.getStepManager().getIncomingConnections().entrySet()) {
            if (this.m_firstInput != null && this.m_secondInput != null) break;
            for (StepManager m : e.getValue()) {
                if (this.m_firstInput == null) {
                    this.m_firstInput = m;
                    this.m_firstInputConnectionType = e.getKey();
                } else if (this.m_secondInput == null) {
                    this.m_secondInput = m;
                    this.m_secondInputConnectionType = e.getKey();
                }
                if (this.m_firstInput == null || this.m_secondInput == null) continue;
                continue block0;
            }
        }
    }

    @Override
    public void stepInit() throws WekaException {
        this.m_firstBuffer = new LinkedList<Sorter.InstanceHolder>();
        this.m_secondBuffer = new LinkedList<Sorter.InstanceHolder>();
        this.m_streamingData = new Data("instance");
        this.m_firstInput = null;
        this.m_secondInput = null;
        this.m_headerOne = null;
        this.m_headerTwo = null;
        this.m_firstFinished = false;
        this.m_secondFinished = false;
        if (this.getStepManager().numIncomingConnections() < 2) {
            throw new WekaException("Two incoming connections are required for the Join step");
        }
        this.establishFirstAndSecondConnectedInputs();
    }

    @Override
    public void processIncoming(Data data) throws WekaException {
        if (data.getConnectionName().equals("instance")) {
            this.processStreaming(data);
            if (this.isStopRequested()) {
                this.getStepManager().interrupted();
            }
        } else {
            this.processBatch(data);
            if (this.isStopRequested()) {
                this.getStepManager().interrupted();
            }
            return;
        }
    }

    protected synchronized void processStreaming(Data data) throws WekaException {
        if (this.isStopRequested()) {
            return;
        }
        if (this.getStepManager().isStreamFinished(data)) {
            if (data.getSourceStep().getStepManager() == this.m_firstInput) {
                this.m_firstFinished = true;
                this.getStepManager().logBasic("Finished receiving from " + this.m_firstInput.getName());
            } else if (data.getSourceStep().getStepManager() == this.m_secondInput) {
                this.m_secondFinished = true;
                this.getStepManager().logBasic("Finished receiving from " + this.m_secondInput.getName());
            }
            if (this.m_firstFinished && this.m_secondFinished) {
                this.clearBuffers();
                this.m_streamingData.clearPayload();
                this.getStepManager().throughputFinished(this.m_streamingData);
            }
            return;
        }
        Instance inst = (Instance)data.getPrimaryPayload();
        StepManager source = data.getSourceStep().getStepManager();
        if (this.m_headerOne == null || this.m_headerTwo == null) {
            int i2;
            if (this.m_headerOne == null && source == this.m_firstInput) {
                this.m_headerOne = new Instances(inst.dataset(), 0);
                this.getStepManager().logBasic("Initializing buffer for " + this.m_firstInput.getName());
                this.m_stringAttIndexesOne = new HashMap<String, Integer>();
                for (i2 = 0; i2 < this.m_headerOne.numAttributes(); ++i2) {
                    if (!this.m_headerOne.attribute(i2).isString()) continue;
                    this.m_stringAttIndexesOne.put(this.m_headerOne.attribute(i2).name(), i2);
                }
            }
            if (this.m_headerTwo == null && source == this.m_secondInput) {
                this.m_headerTwo = new Instances(inst.dataset(), 0);
                this.getStepManager().logBasic("Initializing buffer for " + this.m_secondInput.getName());
                this.m_stringAttIndexesTwo = new HashMap<String, Integer>();
                for (i2 = 0; i2 < this.m_headerTwo.numAttributes(); ++i2) {
                    if (!this.m_headerTwo.attribute(i2).isString()) continue;
                    this.m_stringAttIndexesTwo.put(this.m_headerTwo.attribute(i2).name(), i2);
                }
            }
            if (this.m_mergedHeader == null && this.m_headerOne != null && this.m_headerTwo != null && this.m_keySpec != null && this.m_keySpec.length() > 0) {
                this.generateMergedHeader();
            }
        }
        if (source == this.m_firstInput) {
            this.addToFirstBuffer(inst);
        } else {
            this.addToSecondBuffer(inst);
        }
        if (source == this.m_firstInput && this.m_secondBuffer.size() <= 100 && this.m_secondIsWaiting) {
            this.m_secondIsWaiting = false;
            this.notifyAll();
        } else if (source == this.m_secondInput && this.m_secondBuffer.size() <= 100 && this.m_firstIsWaiting) {
            this.m_firstIsWaiting = false;
            this.notifyAll();
        }
        if (this.isStopRequested()) {
            return;
        }
        Instance outputI = this.processBuffers();
        if (outputI != null) {
            this.getStepManager().throughputUpdateStart();
            this.m_streamingData.setPayloadElement("instance", outputI);
            this.getStepManager().outputData(this.m_streamingData);
            this.getStepManager().throughputUpdateEnd();
        }
    }

    private static void copyStringAttVals(Sorter.InstanceHolder holder, Map<String, Integer> stringAttIndexes) {
        for (String attName : stringAttIndexes.keySet()) {
            Attribute att = holder.m_instance.dataset().attribute(attName);
            String val = holder.m_instance.stringValue(att);
            if (holder.m_stringVals == null) {
                holder.m_stringVals = new HashMap<String, String>();
            }
            holder.m_stringVals.put(attName, val);
        }
    }

    protected synchronized void addToFirstBuffer(Instance inst) {
        if (this.isStopRequested()) {
            return;
        }
        Sorter.InstanceHolder newH = new Sorter.InstanceHolder();
        newH.m_instance = inst;
        Join.copyStringAttVals(newH, this.m_stringAttIndexesOne);
        this.m_firstBuffer.add(newH);
        if (this.m_firstBuffer.size() > 100 && !this.m_secondFinished) {
            try {
                this.m_firstIsWaiting = true;
                this.wait();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    protected synchronized void addToSecondBuffer(Instance inst) {
        if (this.isStopRequested()) {
            return;
        }
        Sorter.InstanceHolder newH = new Sorter.InstanceHolder();
        newH.m_instance = inst;
        Join.copyStringAttVals(newH, this.m_stringAttIndexesTwo);
        this.m_secondBuffer.add(newH);
        if (this.m_secondBuffer.size() > 100 && !this.m_firstFinished) {
            try {
                this.m_secondIsWaiting = true;
                this.wait();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    protected synchronized void clearBuffers() throws WekaException {
        while (this.m_firstBuffer.size() > 0 && this.m_secondBuffer.size() > 0) {
            if (this.isStopRequested()) {
                return;
            }
            this.getStepManager().throughputUpdateStart();
            Instance newInst = this.processBuffers();
            this.getStepManager().throughputUpdateEnd();
            this.m_streamingData.setPayloadElement("instance", newInst);
            this.getStepManager().outputData(this.m_streamingData);
        }
    }

    protected synchronized void processBatch(Data data) throws WekaException {
        Sorter.InstanceHolder tempH;
        int i2;
        Instances insts = (Instances)data.getPrimaryPayload();
        if (data.getSourceStep().getStepManager() == this.m_firstInput) {
            this.m_headerOne = new Instances(insts, 0);
            this.getStepManager().logDetailed("Receiving batch from " + this.m_firstInput.getName());
            for (i2 = 0; i2 < insts.numInstances() && !this.isStopRequested(); ++i2) {
                tempH = new Sorter.InstanceHolder();
                tempH.m_instance = insts.instance(i2);
                this.m_firstBuffer.add(tempH);
            }
        } else if (data.getSourceStep().getStepManager() == this.m_secondInput) {
            this.m_headerTwo = new Instances(insts, 0);
            this.getStepManager().logDetailed("Receiving batch from " + this.m_secondInput.getName());
            for (i2 = 0; i2 < insts.numInstances() && !this.isStopRequested(); ++i2) {
                tempH = new Sorter.InstanceHolder();
                tempH.m_instance = insts.instance(i2);
                this.m_secondBuffer.add(tempH);
            }
        } else {
            throw new WekaException("This should never happen");
        }
        if (this.m_firstBuffer.size() > 0 && this.m_secondBuffer.size() > 0) {
            this.getStepManager().processing();
            this.generateMergedHeader();
            Instances newData = new Instances(this.m_mergedHeader, 0);
            while (!this.isStopRequested() && this.m_firstBuffer.size() > 0 && this.m_secondBuffer.size() > 0) {
                Instance newI = this.processBuffers();
                if (newI == null) continue;
                newData.add(newI);
            }
            for (String outConnType : this.getStepManager().getOutgoingConnections().keySet()) {
                if (this.isStopRequested()) {
                    return;
                }
                Data outputD = new Data(outConnType, newData);
                outputD.setPayloadElement("aux_set_num", 1);
                outputD.setPayloadElement("aux_max_set_num", 1);
                this.getStepManager().outputData(outputD);
            }
            this.getStepManager().finished();
        }
    }

    protected synchronized Instance processBuffers() {
        if (this.m_firstBuffer.size() > 0 && this.m_secondBuffer.size() > 0) {
            Sorter.InstanceHolder secondH;
            Sorter.InstanceHolder firstH;
            Instance first = firstH.m_instance;
            Instance second = secondH.m_instance;
            firstH = this.m_firstBuffer.peek();
            int cmp = this.compare(first, second, firstH, secondH = this.m_secondBuffer.peek());
            if (cmp == 0) {
                Instance newInst = this.generateMergedInstance(this.m_firstBuffer.remove(), this.m_secondBuffer.remove());
                return newInst;
            }
            if (cmp < 0) {
                do {
                    this.m_firstBuffer.remove();
                    if (this.m_firstBuffer.size() <= 0) continue;
                    firstH = this.m_firstBuffer.peek();
                    first = firstH.m_instance;
                    cmp = this.compare(first, second, firstH, secondH);
                } while (cmp < 0 && this.m_firstBuffer.size() > 0);
            } else {
                do {
                    this.m_secondBuffer.remove();
                    if (this.m_secondBuffer.size() <= 0) continue;
                    secondH = this.m_secondBuffer.peek();
                    second = secondH.m_instance;
                    cmp = this.compare(first, second, firstH, secondH);
                } while (cmp > 0 && this.m_secondBuffer.size() > 0);
            }
        }
        return null;
    }

    protected int compare(Instance one, Instance two, Sorter.InstanceHolder oneH, Sorter.InstanceHolder twoH) {
        for (int i2 = 0; i2 < this.m_keyIndexesOne.length; ++i2) {
            String twoS;
            if (one.isMissing(this.m_keyIndexesOne[i2]) && two.isMissing(this.m_keyIndexesTwo[i2])) continue;
            if (one.isMissing(this.m_keyIndexesOne[i2]) || two.isMissing(this.m_keyIndexesTwo[i2])) {
                if (one.isMissing(this.m_keyIndexesOne[i2])) {
                    return -1;
                }
                return 1;
            }
            if (this.m_mergedHeader.attribute(this.m_keyIndexesOne[i2]).isNumeric()) {
                double v2;
                double v1 = one.value(this.m_keyIndexesOne[i2]);
                if (v1 == (v2 = two.value(this.m_keyIndexesTwo[i2]))) continue;
                return v1 < v2 ? -1 : 1;
            }
            if (this.m_mergedHeader.attribute(this.m_keyIndexesOne[i2]).isNominal()) {
                String twoS2;
                String oneS = one.stringValue(this.m_keyIndexesOne[i2]);
                int cmp = oneS.compareTo(twoS2 = two.stringValue(this.m_keyIndexesTwo[i2]));
                if (cmp == 0) continue;
                return cmp;
            }
            if (!this.m_mergedHeader.attribute(this.m_keyIndexesOne[i2]).isString()) continue;
            String attNameOne = this.m_mergedHeader.attribute(this.m_keyIndexesOne[i2]).name();
            String attNameTwo = this.m_mergedHeader.attribute(this.m_keyIndexesTwo[i2]).name();
            String oneS = oneH.m_stringVals == null || oneH.m_stringVals.size() == 0 ? one.stringValue(this.m_keyIndexesOne[i2]) : oneH.m_stringVals.get(attNameOne);
            int cmp = oneS.compareTo(twoS = twoH.m_stringVals == null || twoH.m_stringVals.size() == 0 ? two.stringValue(this.m_keyIndexesTwo[i2]) : twoH.m_stringVals.get(attNameTwo));
            if (cmp == 0) continue;
            return cmp;
        }
        return 0;
    }

    protected synchronized Instance generateMergedInstance(Sorter.InstanceHolder one, Sorter.InstanceHolder two) {
        String valToSetInHeader;
        int i2;
        double[] vals = new double[this.m_mergedHeader.numAttributes()];
        int count = 0;
        Instances currentStructure = this.m_mergedHeader;
        if (this.m_runningIncrementally && this.m_stringAttsPresent) {
            currentStructure = this.m_headerPool.get(this.m_count.getAndIncrement() % 10);
        }
        for (i2 = 0; i2 < this.m_headerOne.numAttributes(); ++i2) {
            vals[count] = one.m_instance.value(i2);
            if (one.m_stringVals != null && one.m_stringVals.size() > 0 && this.m_mergedHeader.attribute(count).isString()) {
                valToSetInHeader = one.m_stringVals.get(one.m_instance.attribute(i2).name());
                currentStructure.attribute(count).setStringValue(valToSetInHeader);
                vals[count] = 0.0;
            }
            ++count;
        }
        for (i2 = 0; i2 < this.m_headerTwo.numAttributes(); ++i2) {
            vals[count] = two.m_instance.value(i2);
            if (two.m_stringVals != null && two.m_stringVals.size() > 0 && this.m_mergedHeader.attribute(count).isString()) {
                valToSetInHeader = one.m_stringVals.get(two.m_instance.attribute(i2).name());
                currentStructure.attribute(count).setStringValue(valToSetInHeader);
                vals[count] = 0.0;
            }
            ++count;
        }
        DenseInstance newInst = new DenseInstance(1.0, vals);
        newInst.setDataset(currentStructure);
        return newInst;
    }

    protected void generateMergedHeader() throws WekaException {
        int i2;
        int i3;
        if (this.m_keySpec == null || this.m_keySpec.length() == 0) {
            throw new WekaException("Key fields are null!");
        }
        String resolvedKeySpec = this.m_keySpec;
        String[] parts = (resolvedKeySpec = this.environmentSubstitute(resolvedKeySpec)).split(KEY_SPEC_SEPARATOR);
        if (parts.length != 2) {
            throw new WekaException("Invalid key specification");
        }
        for (i3 = 0; i3 < 2; ++i3) {
            String rangeS = parts[i3].trim();
            Range r = new Range();
            r.setUpper(i3 == 0 ? this.m_headerOne.numAttributes() : this.m_headerTwo.numAttributes());
            try {
                r.setRanges(rangeS);
                if (i3 == 0) {
                    this.m_keyIndexesOne = r.getSelection();
                    continue;
                }
                this.m_keyIndexesTwo = r.getSelection();
                continue;
            }
            catch (IllegalArgumentException e) {
                String[] names = rangeS.split(",");
                if (i3 == 0) {
                    this.m_keyIndexesOne = new int[names.length];
                } else {
                    this.m_keyIndexesTwo = new int[names.length];
                }
                for (int j = 0; j < names.length; ++j) {
                    Attribute anAtt;
                    String aName = names[j].trim();
                    Attribute attribute = anAtt = i3 == 0 ? this.m_headerOne.attribute(aName) : this.m_headerTwo.attribute(aName);
                    if (anAtt == null) {
                        throw new WekaException("Invalid key attribute name");
                    }
                    if (i3 == 0) {
                        this.m_keyIndexesOne[j] = anAtt.index();
                        continue;
                    }
                    this.m_keyIndexesTwo[j] = anAtt.index();
                }
            }
        }
        if (this.m_keyIndexesOne == null || this.m_keyIndexesTwo == null) {
            throw new WekaException("Key fields are null!");
        }
        if (this.m_keyIndexesOne.length != this.m_keyIndexesTwo.length) {
            throw new WekaException("Number of key fields are different for each input");
        }
        for (i3 = 0; i3 < this.m_keyIndexesOne.length; ++i3) {
            if (this.m_headerOne.attribute(this.m_keyIndexesOne[i3]).type() == this.m_headerTwo.attribute(this.m_keyIndexesTwo[i3]).type()) continue;
            throw new WekaException("Type of key corresponding to key fields differ: input 1 - " + Attribute.typeToStringShort(this.m_headerOne.attribute(this.m_keyIndexesOne[i3])) + " input 2 - " + Attribute.typeToStringShort(this.m_headerTwo.attribute(this.m_keyIndexesTwo[i3])));
        }
        ArrayList<Attribute> newAtts = new ArrayList<Attribute>();
        HashSet<String> nameLookup = new HashSet<String>();
        for (i2 = 0; i2 < this.m_headerOne.numAttributes(); ++i2) {
            newAtts.add((Attribute)this.m_headerOne.attribute(i2).copy());
            nameLookup.add(this.m_headerOne.attribute(i2).name());
        }
        for (i2 = 0; i2 < this.m_headerTwo.numAttributes(); ++i2) {
            String name = this.m_headerTwo.attribute(i2).name();
            if (nameLookup.contains(name)) {
                name = name + "_2";
            }
            newAtts.add(this.m_headerTwo.attribute(i2).copy(name));
        }
        this.m_mergedHeader = new Instances(this.m_headerOne.relationName() + "+" + this.m_headerTwo.relationName(), newAtts, 0);
        this.m_stringAttsPresent = false;
        if (this.m_mergedHeader.checkForStringAttributes()) {
            this.m_stringAttsPresent = true;
            this.m_headerPool = new ArrayList<Instances>();
            this.m_count = new AtomicInteger();
            for (i2 = 0; i2 < 10; ++i2) {
                try {
                    this.m_headerPool.add((Instances)new SerializedObject(this.m_mergedHeader).getObject());
                    continue;
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Override
    public List<String> getIncomingConnectionTypes() {
        ArrayList<String> result = new ArrayList<String>();
        if (this.getStepManager().numIncomingConnections() == 0) {
            return Arrays.asList("instance", "dataSet", "trainingSet", "testSet");
        }
        if (this.getStepManager().numIncomingConnections() == 1) {
            result.addAll(this.getStepManager().getIncomingConnections().keySet());
            return result;
        }
        return null;
    }

    @Override
    public List<String> getOutgoingConnectionTypes() {
        if (this.getStepManager().numIncomingConnections() > 0) {
            ArrayList<String> result = new ArrayList<String>();
            result.addAll(this.getStepManager().getIncomingConnections().keySet());
            return result;
        }
        return null;
    }

    @Override
    public String getCustomEditorForStep() {
        return "weka.gui.knowledgeflow.steps.JoinStepEditorDialog";
    }
}

