Skip to content

[kqueue] hits unreachable code with high completion loads #122

Open
@bhansconnect

Description

@bhansconnect

I totally might be using the library incorrectly. I am just starting out working with it and trying to set it up to function as an http server. Right now I just have the basics hardcoded. If I benchmark it with wrk and a high number of connections (512) I end up seeing: error(libxev_kqueue): invalid state in submission queue state=backend.kqueue.Completion.State.active and an eventual crash with the following stacktrace:

thread 7376789 panic: reached unreachable code
/opt/homebrew/Cellar/zig/0.13.0/lib/zig/std/debug.zig:412:14: 0x100ff6c63 in assert (platform)
    if (!ok) unreachable; // assertion failure
             ^
/Users/bren077s/.cache/zig/p/1220612bc023c21d75234882ec9a8c6a1cbd9d642da3dfb899297f14bb5bd7b6cd78/src/queue.zig:24:19: 0x100ff515f in push (platform)
            assert(v.next == null);
                  ^
/Users/bren077s/.cache/zig/p/1220612bc023c21d75234882ec9a8c6a1cbd9d642da3dfb899297f14bb5bd7b6cd78/src/backend/kqueue.zig:225:38: 0x100ff2763 in submit (platform)
                self.completions.push(c);
                                     ^
/Users/bren077s/.cache/zig/p/1220612bc023c21d75234882ec9a8c6a1cbd9d642da3dfb899297f14bb5bd7b6cd78/src/backend/kqueue.zig:333:24: 0x100ff6dd7 in tick (platform)
        try self.submit();
                       ^
/Users/bren077s/.cache/zig/p/1220612bc023c21d75234882ec9a8c6a1cbd9d642da3dfb899297f14bb5bd7b6cd78/src/backend/kqueue.zig:264:62: 0x100ff8b47 in run (platform)
            .until_done => while (!self.done()) try self.tick(1),
                                                             ^
/Users/bren077s/Projects/roc-coro/platform/src/main.zig:63:22: 0x100ff8fcb in main (platform)
    try root_loop.run(.until_done);
                     ^
/opt/homebrew/Cellar/zig/0.13.0/lib/zig/std/start.zig:524:37: 0x100ff96af in main (platform)
            const result = root.main() catch |err| {
                                    ^
???:?:?: 0x18e714273 in ??? (???)
???:?:?: 0x0 in ??? (???)
run
└─ run platform failure
error: the following command terminated unexpectedly:
/Users/bren077s/Projects/roc-coro/platform/zig-out/bin/platform 

I don't think this is the same as #111, but I might be wrong.
Any general advise would be greatly appreciated (even if the advice is that libxev is not ready and I should really switch to something else). I'm not trying to build anything production ready, just working on a prototype of a coroutine based webserver to use with roc (which is a new programming language).

The full repo at the current state is here: https://github.com/bhansconnect/roc-coro-webserver/tree/9b07612a95d1b937b0b8e7ec90a14c1a310f5b2b/platform

Here is a slightly reduced version of the main file that is leading to the errors:

const std = @import("std");
const xev = @import("xev");
const log = std.log.scoped(.platform);
const Allocator = std.mem.Allocator;

pub const std_options: std.Options = .{
    .log_level = .info,
};

var gpa: std.heap.GeneralPurposeAllocator(.{}) = .{};
var allocator: Allocator = gpa.allocator();

pub fn main() !void {
    var root_loop = try xev.Loop.init(.{});
    defer root_loop.deinit();

    var address = try std.net.Address.parseIp4("127.0.0.1", 8000);
    const server = try xev.TCP.init(address);

    // Bind and listen
    try server.bind(address);
    try server.listen(128);

    // Is this needed? I think it is getting the port.
    // But we specify a specific port...
    const fd = if (xev.backend == .iocp) @as(std.os.windows.ws2_32.SOCKET, @ptrCast(server.fd)) else server.fd;
    var sock_len = address.getOsSockLen();
    try std.posix.getsockname(fd, &address.any, &sock_len);
    log.info("Starting server at: http://{}", .{address});

    // Setup accepting connections
    var c_accept: xev.Completion = undefined;
    server.accept(&root_loop, &c_accept, void, null, (struct {
        fn callback(
            _: ?*void,
            loop: *xev.Loop,
            _: *xev.Completion,
            accept_result: xev.AcceptError!xev.TCP,
        ) xev.CallbackAction {
            const socket = accept_result catch |err| {
                log.err("Failed to accept connection: {}", .{err});
                return .rearm;
            };
            log.debug("Accepting new TCP connection", .{});
            var handler = allocator.create(Handler) catch unreachable;
            socket.read(loop, &handler.completion, .{ .slice = &handler.buffer }, Handler, handler, Handler.read_callback);
            return .rearm;
        }
    }).callback);

    try root_loop.run(.until_done);
}

const Handler = struct {
    const Self = @This();

    completion: xev.Completion,
    buffer: [4096]u8,

    fn read_callback(
        self: ?*Self,
        loop: *xev.Loop,
        _: *xev.Completion,
        socket: xev.TCP,
        rb: xev.ReadBuffer,
        len_result: xev.ReadError!usize,
    ) xev.CallbackAction {
        const len = len_result catch |err| {
            // I'm not sure this is correct, but I think we need to retry on would block.
            // Feels like something that libxev should handle on its own.
            if (err == error.WouldBlock) {
                return .rearm;
            }
            if (err != error.ConnectionResetByPeer and err != error.EOF) {
                log.warn("Failed to read from tcp connection: {}", .{err});
            }
            self.?.close(loop, socket);
            return .disarm;
        };
        // TODO: handle partial reads.
        // I'm a bit suprised that reading 0 bytes doesn't count as would block.
        if (len == 0) {
            return .rearm;
        }
        log.debug("Request: \n{s}\n", .{rb.slice[0..len]});

        // TODO: This is where we should parse the header, make sure it is valid.
        // Check the full lengh and keep polling if more is to come.
        // Also should check keep alive.

        const response =
            "HTTP/1.1 200 OK\r\nContent-Type: text/plain; charset=utf-8\r\nContent-Length: 13\r\n\r\nHello, World!";
        const slice = std.fmt.bufPrint(&self.?.buffer, response, .{}) catch |err| {
            log.warn("Failed to write to io buffer: {}", .{err});
            self.?.close(loop, socket);
            return .disarm;
        };
        socket.write(loop, &self.?.completion, .{ .slice = slice }, Self, self, Self.write_callback);
        return .disarm;
    }

    fn write_callback(
        self: ?*Self,
        loop: *xev.Loop,
        _: *xev.Completion,
        socket: xev.TCP,
        wb: xev.WriteBuffer,
        len_result: xev.WriteError!usize,
    ) xev.CallbackAction {
        const len = len_result catch |err| {
            // I'm not sure this is correct, but I think we need to retry on would block.
            // Feels like something that libxev should handle on its own.
            if (err == error.WouldBlock) {
                return .rearm;
            }
            if (err != error.ConnectionResetByPeer and err != error.EOF) {
                log.warn("Failed to write to tcp connection: {}", .{err});
            }
            self.?.close(loop, socket);
            return .disarm;
        };
        // TODO: handle partial writes.
        // I'm a bit suprised that writing 0 bytes doesn't count as would block.
        if (len == 0) {
            return .rearm;
        }
        log.debug("Response: \n{s}\n", .{wb.slice[0..len]});

        // Send back to reading. Just assuming keep alive for now.
        socket.read(loop, &self.?.completion, .{ .slice = &self.?.buffer }, Self, self, Self.read_callback);
        return .disarm;
    }

    fn close(self: *Self, loop: *xev.Loop, socket: xev.TCP) void {
        socket.close(loop, &self.completion, Self, self, close_callback);
    }

    fn close_callback(
        self: ?*Self,
        _: *xev.Loop,
        _: *xev.Completion,
        _: xev.TCP,
        _: xev.ShutdownError!void,
    ) xev.CallbackAction {
        // If shutdowns fails, should this retry?
        allocator.destroy(self.?);
        return .disarm;
    }
};

Thanks for creating libxev and any help. Initial perf tests of this libxev are looking really good.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions