Skip to content

Instantly share code, notes, and snippets.

@dicej
Last active August 29, 2025 16:57
Show Gist options
  • Save dicej/3d0fead64faa2432f5c673cd7098f2ab to your computer and use it in GitHub Desktop.
Save dicej/3d0fead64faa2432f5c673cd7098f2ab to your computer and use it in GitHub Desktop.
Streams API Draft
/// Represents the host-owned write end of a stream.
pub trait StreamProducer<D>: Send + 'static {
type Item;
type Buffer: WriteBuffer<Self::Item> + Default;
/// Handle a host- or guest-initiated read by delivering zero or more items
/// to the specified destination.
///
/// This will be called whenever the reader starts a read.
///
/// If the implementation is able to produce one or more items immediately,
/// it should write them to `destination` and return either
/// `Poll::Ready(Ok(StreamState::Open))` if it expects to produce more
/// items, or `Poll::Ready(Ok(StreamState::Closed))` if it cannot produce
/// any more items.
///
/// If the implementation is unable to produce any items immediately, but
/// expects to do so later, and `finish` is _false_, it should store the
/// waker from `cx` for later and return `Poll::Pending` without writing
/// anything to `destination`. Later, it should alert the waker when either
/// the items arrive, the stream has ended, or an error occurs.
///
/// If the implementation is unable to produce any items immediately, but
/// expects to do so later, and `finish` is _true_, it should, if possible,
/// return `Poll::Ready(Ok(StreamState::Open))` immediately without writing
/// anything to `destination`. However, that might not be possible if an
/// earlier call to `poll_produce` kicked off an asynchronous operation
/// which needs to be completed (and possibly interrupted) gracefully, in
/// which case the implementation may return `Poll::Pending` and later alert
/// the waker as described above. In other words, when `finish` is true,
/// the implementation should prioritize returning a result to the reader
/// (even if no items can be produced) rather than wait indefinitely for at
/// least one item to arrive.
///
/// In all of the above cases, the implementation may alternatively choose
/// to return `Err(_)` to indicate an unrecoverable error. This will cause
/// the guest (if any) to trap and render the component instance (if any)
/// unusable. The implementation should report errors that _are_
/// recoverable by other means (e.g. by writing to a `future`) and returning
/// `Poll::Ready(Ok(StreamState::Closed))`.
///
/// Note that the implementation should never return `Poll::Pending` after
/// writing one or more items to `destination`; if it does, the caller will
/// trap as if `Err(_)` was returned. Likewise, it should only return
/// `Poll::Ready(Ok(StreamState::Open))` without having written any items to
/// `destination` if called with `finish` set to true. If it does so when
/// `finish` is false, the caller will trap.
///
/// If more items are written to `destination` than the reader has immediate
/// capacity to accept, they will be retained in memory by the caller and
/// used to satisify future reads, in which case `poll_produce` will only be
/// called again once all those items have been delivered. This is
/// particularly important for zero-length reads, in which case the
/// implementation is expected to produce at least one item (if possible,
/// and if `finish` is false) so that it is ready to be delivered
/// immediately upon the next non-zero-length read.
fn poll_produce(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
store: StoreContextMut<D>,
destination: &mut Destination<Self::Item, Self::Buffer>,
finish: bool,
) -> Poll<Result<StreamState>>;
}
/// Represents the host-owned read end of a stream.
pub trait StreamConsumer<D>: Send + 'static {
type Item;
/// Handle a host- or guest-initiated write by accepting zero or more items
/// from the specified source.
///
/// This will be called whenever the writer starts a write.
///
/// If the implementation is able to consume one or more items immediately,
/// it should take them from `source` and return either
/// `Poll::Ready(Ok(StreamState::Open))` if it expects to be able to consume
/// more items, or `Poll::Ready(Ok(StreamState::Closed))` if it cannot
/// accept any more items. It _may_ return `Poll::Pending`, in which case
/// the caller will delay delivering a `COMPLETED` event to the writer until
/// a future call returns `Poll::Ready(_)`.
///
/// If the implementation cannot consume any items immediately and `finish`
/// is _false_, it should store the waker from `cx` for later and return
/// `Poll::Pending` without writing anything to `destination`. Later, it
/// should alert the waker when either (1) the items arrive, (2) the stream
/// has ended, or (3) an error occurs.
///
/// If the implementation cannot consume any items immediately and `finish`
/// is _true_, it should, if possible, return
/// `Poll::Ready(Ok(StreamState::Open))` immediately without taking anything
/// from `source`. However, that might not be possible if an earlier call
/// to `poll_consume` kicked off an asynchronous operation which needs to be
/// completed (and possibly interrupted) gracefully, in which case the
/// implementation may return `Poll::Pending` and later alert the waker as
/// described above. In other words, when `finish` is true, the
/// implementation should prioritize returning a result to the reader (even
/// if no items can be consumed) rather than wait indefinitely for at
/// capacity to free up.
///
/// In all of the above cases, the implementation may alternatively choose
/// to return `Err(_)` to indicate an unrecoverable error. This will cause
/// the guest (if any) to trap and render the component instance (if any)
/// unusable. Errors that _are_ recoverable should be reported by other
/// means (e.g. by writing to a `future`).
///
/// Note that the implementation should only return
/// `Poll::Ready(Ok(StreamState::Open))` without having taken any items from
/// `source` if called with `finish` set to true. If it does so when
/// `finish` is false, the caller will trap.
///
/// Note that any items which the implementation of this trait takes from
/// `source` become the responsibility of that implementation. For that
/// reason, an implementation which forwards items to an upstream sink
/// should reserve capacity in that sink before taking items out of
/// `source`, if possible. Alternatively, it might buffer items which can't
/// be forwarded immediately and send them once capacity is freed up.
fn poll_consume(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
store: StoreContextMut<D>,
source: &mut Source<T>,
finish: bool,
) -> Poll<Result<StreamState>>;
}
/// Represents the buffer for a host- or guest-initiated stream read.
pub struct Destination<'a, T, B> {
// ...
}
impl<'a, T, B> Destination<'a, T, B> {
/// Return a unique reference to the buffer in which items may be stored.
///
/// Any items added to this buffer will be delivered to the reader after the
/// `StreamProducer::poll_produce` call to which this `Destination` was
/// passed returns.
///
/// If items are added to this buffer _and_ written via a `DirectDestination`
/// view of `self`, then the items in the buffer will be delivered after the
/// ones written using `DirectDestination`.
pub fn buffer(&mut self) -> &'a mut B {
// ...
}
/// Return the remaining number of items the current read has capacity to
/// accept, if known.
///
/// This will return `Some(_)` if the reader is a guest; it will return
/// `None` if the reader is the host.
pub fn remaining(&self, mut store: impl AsContextMut) -> Option<usize> {
// ...
}
}
impl Destination<u8> {
/// Return a `DirectDestination` view of `self` if the guest is reading.
pub fn as_direct_destination<'a, D>(
&'a mut self,
store: StoreContextMut<'a, D>,
) -> Option<DirectDestination<'a, D>> {
// ...
}
}
/// Represents a guest read from a `stream<u8>`, providing direct access to the
/// guest's buffer.
pub struct DirectDestination<'a, D: 'static> {
// ...
}
impl<D: 'static> DirectDestination<'_, D> {
/// Provide direct access to the guest's buffer.
pub fn remaining(&mut self) -> &mut [u8] {
// ...
}
/// Mark the specified number of bytes as written to the guest's buffer.
///
/// This will panic if the count is larger than the size of the
/// buffer returned by `Self::remaining`.
pub fn mark_written(&mut self, count: usize) {
// ...
}
}
/// Represents the buffer for a host- or guest-initiated stream write.
pub struct Source<'a, T> {
// ...
}
impl<T> Source<'_, T> {
/// Accept zero or more items from the writer.
pub fn read<B, S: AsContextMut>(&mut self, mut store: S, buffer: &mut B) -> Result<()>
where
T: func::Lift + 'static,
B: ReadBuffer<T>,
{
// ...
}
/// Return the number of items remaining to be read from the current write
/// operation.
pub fn remaining(&self, mut store: impl AsContextMut) -> usize
where
T: 'static,
{
// ...
}
}
impl Source<'_, u8> {
/// Return a `DirectSource` view of `self`.
pub fn as_direct_source<'a, D>(
&'a mut self,
store: StoreContextMut<'a, D>,
) -> DirectSource<'a, D> {
// ...
}
}
/// Represents a write to a `stream<u8>`, providing direct access to the
/// writer's buffer.
pub struct DirectSource<'a, D: 'static> {
// ...
}
impl<D: 'static> DirectSource<'_, D> {
/// Provide direct access to the writer's buffer.
pub fn remaining(&mut self) -> &[u8] {
// ...
}
/// Mark the specified number of bytes as read from the writer's buffer.
///
/// This will panic if the count is larger than the size of the buffer
/// returned by `Self::remaining`.
pub fn mark_read(&mut self, count: usize) {
// ...
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment