Skip to content

Instantly share code, notes, and snippets.

@insipx
Created October 16, 2025 22:34
Show Gist options
  • Save insipx/dd53015de34854b64f4b600fc07a19a7 to your computer and use it in GitHub Desktop.
Save insipx/dd53015de34854b64f4b600fc07a19a7 to your computer and use it in GitHub Desktop.
let topics = request.messages.topics()?;
let mut data_hashes = request
.messages
.clone()
.consume::<MlsDataExtractor>()?
.into_iter()
.map_ok(|data| {
let mut hasher = DefaultHasher::new();
data.hash(&mut hasher);
hasher.finish()
})
.collect::<Result<HashSet<_>, _>>()?;
let envelopes = request.messages.client_envelopes()?;
let last_seen = self
.cursor_store
.load()
.lowest_common_cursor(&topics.iter().collect::<Vec<_>>())?;
let s = SubscribeEnvelopes::builder()
.last_seen(last_seen)
.topics(topics)
.build()?
.subscribe(&self.message_client)
.await?;
let s = stream::try_extractor::<_, WelcomeMessageExtractor>(s);
// TODO:d14n revert this once [batch publishes](https://github.com/xmtp/xmtpd/issues/262)
for e in envelopes {
PublishClientEnvelopes::builder()
.envelope(e)
.build()?
.ignore_response()
.query(&self.gateway_client)
.await?;
}
futures::pin_mut!(s);
let duration = std::time::Duration::from_secs(5);
while let Some(item) = timeout(duration, s.try_next()).await?? {
let mut hasher = DefaultHasher::new();
let _ = &item.data.hash(&mut hasher);
let hash = hasher.finish();
tracing::info!(
"read welcome for {hash} @cursor {} after publish",
item.cursor
);
data_hashes.remove(&hash);
if data_hashes.is_empty() {
break;
}
}
Ok(())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment