Last active
April 4, 2025 01:23
-
-
Save mlugg/ba402a8b1c863ebbba0996481dade3b3 to your computer and use it in GitHub Desktop.
Theoretical implementation of parts of `std.Io` in Zig based on #23446
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
gpa: Allocator, | |
/// I don't know how io_uring works, so my usage here is going to be total bollocks. | |
/// I'm going to never init this, and I'll write: | |
/// ``` | |
/// io_uring.pushSqe(.{ | |
/// .op = .sleep, | |
/// .arg0 = clockid, | |
/// .arg1 = std.time.ns_per_s, | |
/// .user = ptr, | |
/// }); | |
/// // later... | |
/// // blocks until there's a cqe | |
/// const cqe = io_uring.popCqe(); | |
/// _ = cqe.user; // the user pointer | |
/// _ = cqe.res0; // a result value | |
/// ``` | |
io_uring: std.DefinitelyRealIoUring, | |
const Coroutine = struct { | |
/// This buffer is the result buffer, followed by the context buffer, followed by the | |
/// async frame buffer. The only one we ever need to access is the result buffer, in | |
/// `@"async"` and `@"await"`. Depending on `buf.ptr`, the buffer begins with 0 or more | |
/// padding bytes to get to the required result alignment. | |
buf: []u8, | |
/// This is the frame which last suspended. | |
/// It may be equal to `main_frame`, but it could also be a nested frame. | |
/// This is the frame that should be resumed when this coroutine is ready. | |
suspended_frame: *std.builtin.AsyncFrame, | |
/// This is the coroutine which can be resumed once this coroutine completes. | |
awaiter: ?struct { | |
c: *Coroutine, | |
result: []u8, | |
result_alignmment: std.mem.Alignment, | |
}, | |
/// Has this coroutine already completed? | |
completed: bool, | |
/// Asserts that `coro.completed == true`. | |
/// Copies the result stored in `coro.buf` into `dest`. | |
fn takeResult(coro: *Coroutine, dest: []u8, result_alignment: std.mem.Alignment) void { | |
assert(coro.completed); | |
const result_ptr: [*]u8 = result_alignment.forwards(@intFromPtr(coro.buf.ptr)); | |
@memcpy(dest, result_ptr); | |
} | |
}; | |
const SuspendInfo = struct { | |
/// The frame that suspended, which should be resumed when its result is available. | |
frame: *std.builtin.AsyncFrame, | |
/// The reason for suspension. This defines what kind of result `frame` wants to | |
/// receive, which it will get returned from its `@asyncSuspend` call (because it | |
/// will have been passed in to `@asyncResume`). | |
reason: union(enum) { | |
/// Awaiting another coroutine. | |
/// The result of `@asyncSuspend` is ignored and may be `undefined`. | |
@"await": struct { | |
other: *Coroutine, | |
result: []u8, | |
result_alignment: std.mem.Alignment, | |
}, | |
/// Sleep for this many nanoseconds. | |
/// The result of `@asyncSuspend` is ignored and may be `undefined`. | |
sleep: struct { | |
clockid: std.posix.clockid_t, | |
deadline: Io.Deadline, | |
}, | |
/// Requesting the current time. | |
/// The result of `@asyncSuspend` should be a pointer to an `Io.Timestamp`. | |
now: std.posix.clockid_t, | |
}, | |
}; | |
/// This event loop needs to have everything happening nested inside its calls, so that it | |
/// has everything running in `@asyncResume`. As such, rather than have an `io` method, we | |
/// have this `loop` method which takes an "entry function" which is passed the `Io`! | |
/// `mainFn` is called with `main_args ++ .{io}`. | |
/// `mainFn` must return `void`. | |
fn loop(sel: *StacklessEventLoop, mainFn: anytype, main_args: anytype) void { | |
const io: Io = .{ | |
.userdata = sel, | |
.vtable = .{ | |
// (not filled in) | |
}, | |
}; | |
// Spawn the root function. | |
// We must never directly `@"await"` this frame; doing so would make this function (`loop`) async! | |
const root_future = io.@"async"(mainFn, main_args ++ .{io}); | |
// We've hit our first suspend point, and it's pulled control flow out of `mainFn` back up to here. | |
// From now, we'll be running our main loop. It waits for a cqe, and then resumes the corresponding | |
// coroutine. This repeats until there are no pending cqes, at which point all coroutines, including | |
// `root_future`, must have finished. | |
while (true) { | |
if (!sel.io_uring.anyPending()) { | |
// There's no active coroutine, and no pending sqe. | |
// That means all coroutines have finished. Exit the main loop! | |
break; | |
} | |
const cqe = sel.io_uring.popCqe(); | |
const coro: *Coroutine = @ptrCast(@alignCast(cqe.user)); | |
// If the cqe doesn't give us back the op, we could have stored it on `coro` instead. | |
// But here, we basically just call `sel.ready` with various arguments! We need to pass | |
// the argument that the caller expects based on the `SuspendInfo.reason` it set. | |
switch (cqe.op) { | |
.sleep => sel.ready(coro, undefined), | |
.now => { | |
const time: Io.Timestamp = .fromSomething(cqe.res0); | |
sel.ready(coro, &time); | |
}, | |
} | |
} | |
// `root_future` must be finished now. We'd like to await it, but that's not allowed, | |
// as it makes this function async. Instead, let's just look at it directly... | |
const root_coro: *Coroutine = @ptrCast(@alignCast(root_future.any_future)); | |
assert(root_coro.completed); | |
sel.destroyCoroutine(root_coro); | |
} | |
/// Allocates a `Coroutine` and its `buf`. | |
/// As well as the `*Coroutine` itself, also returns the needed sub-slices of `buf`. | |
fn allocCoroutine( | |
sel: *StacklessEventLoop, | |
result_len: usize, | |
result_alignment: std.mem.Alignment, | |
context_len: usize, | |
context_alignment: std.mem.Alignment, | |
frame_len: usize, | |
) Allocator.Error!struct { | |
c: *Coroutine, | |
result_buf: []u8, | |
context_buf: []u8, | |
frame_buf: []u8, | |
} { | |
const gpa = sel.gpa; | |
const buf_size = frame_len + | |
result_len + result_alignment.toByteUnits() - 1 + | |
context_len + context_alignment.toByteUnits() - 1; | |
const buf = try gpa.alloc(u8, buf_size); | |
errdefer gpa.free(buf); | |
const coro = try gpa.create(Coroutine); | |
errdefer gpa.destroy(coro); | |
coro.buf = buf; | |
const result_ptr: [*]u8 = @ptrFromInt(result_alignment.forward(@intFromPtr(buf.ptr))); | |
const result_buf: []u8 = result_ptr[0..result_len]; | |
const context_ptr: [*]u8 = @ptrFromInt(result_alignment.forward(@intFromPtr(result_ptr + result_len))); | |
const context_buf: []u8 = context_ptr[0..context_len]; | |
const frame_buf: []u8 = context_ptr[context_len..][0..frame_len]; | |
return .{ | |
.c = coro, | |
.result_buf = result_buf, | |
.context_buf = context_buf, | |
.frame_buf = frame_buf, | |
}; | |
} | |
/// Free all memory associated with a `*Coroutine`. | |
fn freeCoroutine(sel: *StacklessEventLoop, coro: *Coroutine) void { | |
const gpa = sel.gpa; | |
gpa.free(coro.buf); | |
gpa.destroy(coro); | |
} | |
/// Resumes `coro`, passing `suspension_result` as the resume arg. | |
/// If `coro` finishes, this function then resumes its awaiter if necessary. | |
/// If `coro` suspends again, this function handles it via `handleSuspension`. | |
/// When this function returns, a coroutine has just suspended, and is now waiting | |
/// on an I/O operation to complete. | |
fn ready(sel: *StacklessEventLoop, coro: *Coroutine, suspension_result: *anyopaque) void { | |
const raw_suspend_info = @asyncResume(coro.suspended_frame, suspension_result) orelse { | |
// The coroutine has completed! | |
const awaiter = coro.awaiter orelse { | |
// There's no awaiter yet. | |
// Something will `await` it eventually; just do nothing for now. | |
// This `*Coroutine` will leak unless someone awaits it! | |
coro.completed = true; | |
return; | |
}; | |
// Okay, we have an awaiter. Let's copy the result over, then we'll be done with `coro`. | |
coro.takeResult(awaiter.result, awaiter.result_alignment); | |
sel.freeCoroutine(coro); | |
// `awaiter` is now ready to be re-scheduled! We've populated its result buffer. | |
// It's not expecting anything to be returned from `@asyncSuspend()`, so just pass `undefined`. | |
return ready(awaiter, undefined); | |
}; | |
sel.handleSuspension(coro, @ptrCast(@alignCast(raw_suspend_info))); | |
} | |
/// Called after `@asyncResume` sees a suspended frame. | |
/// `coro` is the coroutine we were running; `suspend_info` is the cast return value of `@asyncResume`. | |
/// The suspender should have given us a valid pointer to `SuspendInfo`; see e.g. `sleep`. | |
fn handleSuspension(sel: *StacklessEventLoop, coro: *Coroutine, suspend_info: *const SuspendInfo) void { | |
coro.suspended_frame = suspend_info.frame; | |
switch (suspend_info.reason) { | |
.@"await" => |a| { | |
// This is a single-threaded event loop, and `@"await"` checked that `coro` | |
// wasn't completed, so it's not completed now either. | |
assert(!a.other.completed); | |
a.other.awaiter = .{ | |
.c = coro, | |
.result = a.result, | |
.result_alignment = a.result_alignment, | |
}; | |
}, | |
.sleep => |s| sel.io_uring.pushSqe(.{ | |
.op = .sleep, | |
.arg0 = s.clockid, | |
.arg1 = s.deadline, | |
.user = coro, | |
}), | |
.now => |clockid| sel.io_uring.pushSqe(.{ | |
.op = .now, | |
.arg0 = clockid, | |
.user = coro, | |
}), | |
} | |
} | |
fn @"async"( | |
userdata: ?*anyopaque, | |
result: []u8, | |
result_alignment: std.mem.Alignment, | |
context: []const u8, | |
context_alignment: std.mem.Alignment, | |
start: *const fn (context: *const anyopaque, result: *anyopaque) void, | |
) Allocator.Error!?*AnyFuture { | |
const sel: *StacklessEventLoop = @ptrCast(@alignCast(userdata)); | |
const coro = try sel.allocCoroutine( | |
result, | |
result_alignment, | |
context, | |
context_alignment, | |
@asyncFrameSize(start), | |
); | |
@memcpy(coro.context_buf, context); | |
coro.c.awaiter = null; | |
coro.c.completed = false; | |
// Our coroutine can only take one `*anyopaque` argument, but we need | |
// `start` and `context` and `result`. Wrap them into a struct! | |
const OuterCtx = struct { | |
start: *const fn (context: *const anyopaque, result: *anyopaque) void, | |
context: *const anyopaque, | |
result: *anyopaque, | |
fn startAsync(octx: *const OuterCtx) void { | |
return octx.start(octx.context, octx.result); | |
} | |
}; | |
const init_frame = @asyncInit(coro.frame_buf, &OuterCtx.startAsync); | |
const octx: OuterCtx = .{ | |
.start = start, | |
.context = coro.context_buf.ptr, | |
.result = coro.result_buf.ptr, | |
}; | |
const raw_suspend_info = @asyncResume(coro.root_frame, @constCast(&octx)) orelse { | |
// The call completed without suspending! | |
@memcpy(result, coro.result); | |
sel.freeCoroutine(coro_idx); | |
return null; | |
}; | |
// The call has suspended. | |
cal.handleSuspension(coro.c, @ptrCast(@alignCast(raw_suspend_info))); | |
return @ptrCast(coro.c); | |
} | |
fn @"await"( | |
userdata: ?*anyopaque, | |
any_future: *AnyFuture, | |
result: []u8, | |
result_alignment: std.mem.Alignment, | |
) void { | |
const sel: *StacklessEventLoop = @ptrCast(@alignCast(userdata)); | |
const await_coro: *Coroutine = @ptrCast(@alignCast(any_future)); | |
assert(await_coro.awaiter == null); | |
// Optimization: before we suspend ourselves, let's check if the coroutine we're about | |
// to await has already completed! If it has, we'll take it upon ourselves to destroy it. | |
if (await_coro.completed) { | |
await_coro.takeResult(result, result_alignment); | |
sel.freeCoroutine(await_coro); | |
return; | |
} | |
// Otherwise, let the event loop handle it. | |
const suspend_info: SuspendInfo = .{ | |
.frame = @asyncFrame(), | |
.reason = .{ .@"await" = .{ | |
.other = await_coro, | |
.result = result, | |
.result_alignment = result_alignment, | |
} }, | |
}; | |
// The result from `@asyncSuspend` given this `reason` will be undefined. | |
_ = @asyncSuspend(); | |
// The event loop has populated our `result`. | |
} | |
fn now(userdata: ?*anyopaque, clockid: std.posix.clockid_t) Io.ClockGetTimeError!Io.Timestamp { | |
_ = userdata; // we don't need the `StacklessEventLoop` | |
const suspend_info: SuspendInfo = .{ | |
.frame = @asyncFrame(), | |
.reason = .{ .now = clockid }, | |
}; | |
// The result from `@asyncSuspend` given this `reason` will be a pointer to an `Io.Timestamp`. | |
const result: *const Io.Timestamp = @ptrCast(@alignCast(@asyncSuspend(&suspend_info))); | |
return result.*; | |
} | |
fn sleep(userdata: ?*anyopaque, clockid: std.posix.clockid_t, deadline: Io.Deadline) Io.SleepError!void { | |
_ = userdata; // we don't need the `StacklessEventLoop` | |
const suspend_info: SuspendInfo = .{ | |
.frame = @asyncFrame(), | |
.reason = .{ .sleep = .{ | |
.clockid = clockid, | |
.deadline = deadline, | |
} }, | |
}; | |
// The result from `@asyncSuspend` given this `reason` will be undefined. | |
_ = @asyncSuspend(&suspend_info); | |
} | |
test StacklessEventLoop { | |
const gpa = std.testing.allocator; | |
var sel: StacklessEventLoop = .init(gpa); // unimplemented | |
defer sel.deinit(); // unimplemented | |
// Everything you do with `sel` needs to be from within async-land! | |
// We're not in async-land right now. | |
// We enter it with this `loop` method. | |
sel.loop(testMain, .{gpa}); | |
// Here, `testMain` has exited; everything is done. | |
// `deinit` is only there to free any state on `sel`. | |
} | |
fn testMain(gpa: Allocator, io: Io) void { | |
// this is where we can go ham with io :cool_sunglasses: | |
_ = gpa; | |
_ = io; | |
} | |
const std = @import("std"); | |
const Io = std.Io; | |
const AnyFuture = Io.AnyFuture; | |
const assert = std.debug.assert; | |
const Allocator = std.mem.Allocator; | |
const StacklessEventLoop = @This(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment