Description
I totally might be using the library incorrectly. I am just starting out working with it and trying to set it up to function as an http server. Right now I just have the basics hardcoded. If I benchmark it with wrk
and a high number of connections (512) I end up seeing: error(libxev_kqueue): invalid state in submission queue state=backend.kqueue.Completion.State.active
and an eventual crash with the following stacktrace:
thread 7376789 panic: reached unreachable code
/opt/homebrew/Cellar/zig/0.13.0/lib/zig/std/debug.zig:412:14: 0x100ff6c63 in assert (platform)
if (!ok) unreachable; // assertion failure
^
/Users/bren077s/.cache/zig/p/1220612bc023c21d75234882ec9a8c6a1cbd9d642da3dfb899297f14bb5bd7b6cd78/src/queue.zig:24:19: 0x100ff515f in push (platform)
assert(v.next == null);
^
/Users/bren077s/.cache/zig/p/1220612bc023c21d75234882ec9a8c6a1cbd9d642da3dfb899297f14bb5bd7b6cd78/src/backend/kqueue.zig:225:38: 0x100ff2763 in submit (platform)
self.completions.push(c);
^
/Users/bren077s/.cache/zig/p/1220612bc023c21d75234882ec9a8c6a1cbd9d642da3dfb899297f14bb5bd7b6cd78/src/backend/kqueue.zig:333:24: 0x100ff6dd7 in tick (platform)
try self.submit();
^
/Users/bren077s/.cache/zig/p/1220612bc023c21d75234882ec9a8c6a1cbd9d642da3dfb899297f14bb5bd7b6cd78/src/backend/kqueue.zig:264:62: 0x100ff8b47 in run (platform)
.until_done => while (!self.done()) try self.tick(1),
^
/Users/bren077s/Projects/roc-coro/platform/src/main.zig:63:22: 0x100ff8fcb in main (platform)
try root_loop.run(.until_done);
^
/opt/homebrew/Cellar/zig/0.13.0/lib/zig/std/start.zig:524:37: 0x100ff96af in main (platform)
const result = root.main() catch |err| {
^
???:?:?: 0x18e714273 in ??? (???)
???:?:?: 0x0 in ??? (???)
run
└─ run platform failure
error: the following command terminated unexpectedly:
/Users/bren077s/Projects/roc-coro/platform/zig-out/bin/platform
I don't think this is the same as #111, but I might be wrong.
Any general advise would be greatly appreciated (even if the advice is that libxev is not ready and I should really switch to something else). I'm not trying to build anything production ready, just working on a prototype of a coroutine based webserver to use with roc (which is a new programming language).
The full repo at the current state is here: https://github.com/bhansconnect/roc-coro-webserver/tree/9b07612a95d1b937b0b8e7ec90a14c1a310f5b2b/platform
Here is a slightly reduced version of the main file that is leading to the errors:
const std = @import("std");
const xev = @import("xev");
const log = std.log.scoped(.platform);
const Allocator = std.mem.Allocator;
pub const std_options: std.Options = .{
.log_level = .info,
};
var gpa: std.heap.GeneralPurposeAllocator(.{}) = .{};
var allocator: Allocator = gpa.allocator();
pub fn main() !void {
var root_loop = try xev.Loop.init(.{});
defer root_loop.deinit();
var address = try std.net.Address.parseIp4("127.0.0.1", 8000);
const server = try xev.TCP.init(address);
// Bind and listen
try server.bind(address);
try server.listen(128);
// Is this needed? I think it is getting the port.
// But we specify a specific port...
const fd = if (xev.backend == .iocp) @as(std.os.windows.ws2_32.SOCKET, @ptrCast(server.fd)) else server.fd;
var sock_len = address.getOsSockLen();
try std.posix.getsockname(fd, &address.any, &sock_len);
log.info("Starting server at: http://{}", .{address});
// Setup accepting connections
var c_accept: xev.Completion = undefined;
server.accept(&root_loop, &c_accept, void, null, (struct {
fn callback(
_: ?*void,
loop: *xev.Loop,
_: *xev.Completion,
accept_result: xev.AcceptError!xev.TCP,
) xev.CallbackAction {
const socket = accept_result catch |err| {
log.err("Failed to accept connection: {}", .{err});
return .rearm;
};
log.debug("Accepting new TCP connection", .{});
var handler = allocator.create(Handler) catch unreachable;
socket.read(loop, &handler.completion, .{ .slice = &handler.buffer }, Handler, handler, Handler.read_callback);
return .rearm;
}
}).callback);
try root_loop.run(.until_done);
}
const Handler = struct {
const Self = @This();
completion: xev.Completion,
buffer: [4096]u8,
fn read_callback(
self: ?*Self,
loop: *xev.Loop,
_: *xev.Completion,
socket: xev.TCP,
rb: xev.ReadBuffer,
len_result: xev.ReadError!usize,
) xev.CallbackAction {
const len = len_result catch |err| {
// I'm not sure this is correct, but I think we need to retry on would block.
// Feels like something that libxev should handle on its own.
if (err == error.WouldBlock) {
return .rearm;
}
if (err != error.ConnectionResetByPeer and err != error.EOF) {
log.warn("Failed to read from tcp connection: {}", .{err});
}
self.?.close(loop, socket);
return .disarm;
};
// TODO: handle partial reads.
// I'm a bit suprised that reading 0 bytes doesn't count as would block.
if (len == 0) {
return .rearm;
}
log.debug("Request: \n{s}\n", .{rb.slice[0..len]});
// TODO: This is where we should parse the header, make sure it is valid.
// Check the full lengh and keep polling if more is to come.
// Also should check keep alive.
const response =
"HTTP/1.1 200 OK\r\nContent-Type: text/plain; charset=utf-8\r\nContent-Length: 13\r\n\r\nHello, World!";
const slice = std.fmt.bufPrint(&self.?.buffer, response, .{}) catch |err| {
log.warn("Failed to write to io buffer: {}", .{err});
self.?.close(loop, socket);
return .disarm;
};
socket.write(loop, &self.?.completion, .{ .slice = slice }, Self, self, Self.write_callback);
return .disarm;
}
fn write_callback(
self: ?*Self,
loop: *xev.Loop,
_: *xev.Completion,
socket: xev.TCP,
wb: xev.WriteBuffer,
len_result: xev.WriteError!usize,
) xev.CallbackAction {
const len = len_result catch |err| {
// I'm not sure this is correct, but I think we need to retry on would block.
// Feels like something that libxev should handle on its own.
if (err == error.WouldBlock) {
return .rearm;
}
if (err != error.ConnectionResetByPeer and err != error.EOF) {
log.warn("Failed to write to tcp connection: {}", .{err});
}
self.?.close(loop, socket);
return .disarm;
};
// TODO: handle partial writes.
// I'm a bit suprised that writing 0 bytes doesn't count as would block.
if (len == 0) {
return .rearm;
}
log.debug("Response: \n{s}\n", .{wb.slice[0..len]});
// Send back to reading. Just assuming keep alive for now.
socket.read(loop, &self.?.completion, .{ .slice = &self.?.buffer }, Self, self, Self.read_callback);
return .disarm;
}
fn close(self: *Self, loop: *xev.Loop, socket: xev.TCP) void {
socket.close(loop, &self.completion, Self, self, close_callback);
}
fn close_callback(
self: ?*Self,
_: *xev.Loop,
_: *xev.Completion,
_: xev.TCP,
_: xev.ShutdownError!void,
) xev.CallbackAction {
// If shutdowns fails, should this retry?
allocator.destroy(self.?);
return .disarm;
}
};
Thanks for creating libxev and any help. Initial perf tests of this libxev are looking really good.