Skip to content

Instantly share code, notes, and snippets.

View mgodave's full-sized avatar
🤟
move along, nothing to see here...

Dave Rusek mgodave

🤟
move along, nothing to see here...
  • Denver, CO
  • 07:54 (UTC -06:00)
View GitHub Profile
339a340,352
> }
>
> /**
> * Create a new instance.
> *
> * @param service a service to handle incoming requests
> */
> public ServiceFactory(final BlockingTesterService service) {
> this(new Builder().testBlocking(service).testBiDiStreamBlocking(service).testResponseStreamBlocking(service).testRequestStreamBlocking(service));
27a28,30
> import io.servicetalk.grpc.api.GrpcStatus;
> import io.servicetalk.grpc.api.GrpcStatusCode;
> import io.servicetalk.grpc.api.GrpcStatusException;
95c98
< * @throws io.servicetalk.grpc.api.GrpcStatusException if an expected application exception occurs. Its contents will be serialized and propagated to the peer.
---
> * @throws GrpcStatusException if an expected application exception occurs. Its contents will be serialized and propagated to the peer.
126c129
< * @throws io.servicetalk.grpc.api.GrpcStatusException if an expected application exception occurs. Its contents will be serialized and propagated to the peer.
5d4
< import io.servicetalk.concurrent.api.AsyncCloseable;
14d12
< import io.servicetalk.encoding.api.Identity;
26d23
< import io.servicetalk.grpc.api.GrpcPayloadWriter;
28d24
< import io.servicetalk.grpc.api.GrpcSerializationProvider;
34d29
< import io.servicetalk.grpc.protobuf.ProtoBufSerializationProviderBuilder;
4d3
< import io.servicetalk.concurrent.api.AsyncCloseable;
12d10
< import io.servicetalk.encoding.api.Identity;
24d21
< import io.servicetalk.grpc.api.GrpcSerializationProvider;
30d26
< import io.servicetalk.grpc.protobuf.ProtoBufSerializationProviderBuilder;
32d27
< import java.lang.Deprecated;
12d11
< import io.servicetalk.encoding.api.Identity;
15d13
< import io.servicetalk.grpc.api.DefaultGrpcClientMetadata;
20d17
< import io.servicetalk.grpc.api.GrpcClientMetadata;
24d20
< import io.servicetalk.grpc.api.GrpcSerializationProvider;
30d25
< import io.servicetalk.grpc.protobuf.ProtoBufSerializationProviderBuilder;
/**
* Copyright [2020] David J. Rusek <dave.rusek@gmail.com>
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
  • Status: Proposal
  • Author: Dave Rusek - Streamlio
  • Pull Request: See Below
  • Mailing List discussion:

Motivation

Data flowing through a messaging system is typically untyped. Data flows from end-to-end as bytes and only the producers and consumers are aware of the type

Keybase proof

I hereby claim:

  • I am mgodave on github.
  • I am daverusek (https://keybase.io/daverusek) on keybase.
  • I have a public key whose fingerprint is 0632 699D 471F 7255 B581 2B71 2DE6 CBB6 229F 7DBC

To claim this, I am signing this object:

import com.twitter.concurrent.AsyncStream
import com.twitter.concurrent.AsyncStream._
import com.twitter.util.{Await, Future}
object MapConcurrentLeak extends App{
def stream(i: Int, stop: Int): AsyncStream[Int] = {
if (i >= stop) fromFuture(Future.never)
else i +:: stream(i + 1, stop)
}
import com.twitter.concurrent.AsyncStream
import com.twitter.util._
object Merge {
def merge[E](streams: Seq[AsyncStream[E]]): AsyncStream[E] = {
def inner(next: Seq[Future[Option[(E, () => AsyncStream[E])]]]): AsyncStream[E] = {
AsyncStream.fromFuture(Future.select(next)) flatMap {
case (Return(Some((head, tail))), tails) =>