/*
 * Decompiled with CFR 0.152.
 */
package org.dflib.jjava.jupyter.channels;

import java.util.concurrent.atomic.AtomicInteger;
import org.dflib.jjava.jupyter.channels.JupyterSocket;
import org.dflib.jjava.jupyter.channels.Loop;
import org.dflib.jjava.jupyter.kernel.KernelConnectionProperties;
import org.dflib.jjava.jupyter.messages.HMACGenerator;
import org.dflib.jjava.shaded.org.slf4j.LoggerFactory;
import org.dflib.jjava.shaded.org.zeromq.SocketType;
import org.dflib.jjava.shaded.org.zeromq.ZMQ;

public class HeartbeatChannel
extends JupyterSocket {
    private static final long HB_DEFAULT_SLEEP_MS = 500L;
    private static final AtomicInteger HEARTBEAT_ID = new AtomicInteger();
    private final long sleep;
    private volatile Loop pulse;

    public HeartbeatChannel(ZMQ.Context context, HMACGenerator hmacGenerator, long sleep) {
        super(context, SocketType.REP, hmacGenerator, LoggerFactory.getLogger("HeartbeatChannel"));
        this.sleep = sleep;
    }

    public HeartbeatChannel(ZMQ.Context context, HMACGenerator hmacGenerator) {
        this(context, hmacGenerator, 500L);
    }

    private boolean isBound() {
        return this.pulse != null;
    }

    @Override
    public void bind(KernelConnectionProperties connProps) {
        if (this.isBound()) {
            throw new IllegalStateException("Heartbeat channel already bound");
        }
        String channelThreadName = "Heartbeat-" + HEARTBEAT_ID.getAndIncrement();
        String addr = HeartbeatChannel.formatAddress(connProps.getTransport(), connProps.getIp(), connProps.getHbPort());
        this.logger.debug("Binding {} to {}.", (Object)channelThreadName, (Object)addr);
        super.bind(addr);
        ZMQ.Poller poller = this.ctx.poller(1);
        poller.register(this, 1);
        this.pulse = new Loop(channelThreadName, this.sleep, () -> {
            int events = poller.poll(0L);
            if (events > 0) {
                byte[] msg = this.recv();
                if (msg == null) {
                    this.logger.warn("Poll returned 1 event but could not read the echo string");
                    return;
                }
                if (!this.send(msg)) {
                    this.logger.warn("Could not send heartbeat reply");
                }
                this.logger.trace("Heartbeat pulse");
            }
        });
        this.pulse.onClose(() -> {
            this.logger.debug("{} shutdown.", (Object)channelThreadName);
            this.pulse = null;
        });
        this.pulse.start();
        this.logger.debug("Polling on {}", (Object)channelThreadName);
    }

    @Override
    public void close() {
        if (this.isBound()) {
            this.pulse.shutdown();
        }
        super.close();
    }

    @Override
    public void waitUntilClose() {
        if (this.pulse != null) {
            try {
                this.pulse.join();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }
}

