/*
 * Decompiled with CFR 0.152.
 */
package org.dflib.jjava.jupyter.channels;

import java.util.concurrent.atomic.AtomicInteger;
import org.dflib.jjava.jupyter.channels.JupyterConnection;
import org.dflib.jjava.jupyter.channels.JupyterSocket;
import org.dflib.jjava.jupyter.channels.Loop;
import org.dflib.jjava.jupyter.channels.ShellHandler;
import org.dflib.jjava.jupyter.channels.ShellReplyEnvironment;
import org.dflib.jjava.jupyter.kernel.KernelConnectionProperties;
import org.dflib.jjava.jupyter.messages.HMACGenerator;
import org.dflib.jjava.jupyter.messages.Message;
import org.dflib.jjava.shaded.org.slf4j.LoggerFactory;
import org.dflib.jjava.shaded.org.zeromq.SocketType;
import org.dflib.jjava.shaded.org.zeromq.ZMQ;

public class ShellChannel
extends JupyterSocket {
    private static final long SHELL_DEFAULT_LOOP_SLEEP_MS = 50L;
    private static final AtomicInteger SHELL_ID = new AtomicInteger();
    private volatile Loop ioloop;
    private final boolean isControl;
    private final JupyterConnection connection;
    private final long sleep;

    public ShellChannel(ZMQ.Context context, HMACGenerator hmacGenerator, boolean isControl, JupyterConnection connection, long sleep) {
        super(context, SocketType.ROUTER, hmacGenerator, LoggerFactory.getLogger(isControl ? "ControlChannel" : "ShellChannel"));
        this.isControl = isControl;
        this.connection = connection;
        this.sleep = sleep;
    }

    public ShellChannel(ZMQ.Context context, HMACGenerator hmacGenerator, boolean isControl, JupyterConnection connection) {
        this(context, hmacGenerator, isControl, connection, 50L);
    }

    private boolean isBound() {
        return this.ioloop != null;
    }

    @Override
    public void bind(KernelConnectionProperties connProps) {
        if (this.isBound()) {
            throw new IllegalStateException("Shell channel already bound");
        }
        String channelThreadName = "Shell-" + SHELL_ID.getAndIncrement();
        String address = JupyterSocket.formatAddress(connProps.getTransport(), connProps.getIp(), this.isControl ? connProps.getControlPort() : connProps.getShellPort());
        this.logger.debug("Binding {} to {}.", (Object)channelThreadName, (Object)address);
        super.bind(address);
        ZMQ.Poller poller = this.ctx.poller(1);
        poller.register(this, 1);
        this.ioloop = new Loop(channelThreadName, this.sleep, () -> {
            int events = poller.poll(0L);
            if (events > 0) {
                Message<?> message = super.readMessage();
                ShellHandler<?> handler = this.connection.getHandler(message.getHeader().getType());
                if (handler != null) {
                    this.logger.debug("Handling message: {}", (Object)message.getHeader().getType().getName());
                    ShellReplyEnvironment env = this.connection.prepareReplyEnv(this, message);
                    try {
                        handler.handle(env, message);
                    }
                    catch (Exception e) {
                        this.logger.warn("Unhandled exception handling {}. {} - {}", message.getHeader().getType().getName(), e.getClass().getSimpleName(), e.getLocalizedMessage());
                    }
                    finally {
                        env.resolveDeferrals();
                    }
                    if (env.isMarkedForShutdown()) {
                        this.logger.debug("{} shutting down connection as environment was marked for shutdown.", (Object)channelThreadName);
                        this.connection.close();
                    }
                } else {
                    this.logger.warn("Unhandled message: {}", (Object)message.getHeader().getType().getName());
                }
            }
        });
        this.ioloop.onClose(() -> {
            this.logger.debug("{} shutdown.", (Object)channelThreadName);
            this.ioloop = null;
        });
        this.ioloop.start();
        this.logger.debug("Polling on {}", (Object)channelThreadName);
    }

    @Override
    public void close() {
        if (this.isBound()) {
            this.ioloop.shutdown();
        }
        super.close();
    }

    @Override
    public void waitUntilClose() {
        if (this.ioloop != null) {
            try {
                this.ioloop.join();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }
}

