Created
August 12, 2024 13:20
-
-
Save darrarski/ef03a660b0096a27709ecaa2c8ff548f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ComposableArchitecture | |
import Foundation | |
@Reducer | |
struct AsyncStreamReducer<Element> | |
where Element: Sendable, | |
Element: Equatable | |
{ | |
struct State: Equatable { | |
let id = UUID() | |
var element: Element? | |
} | |
enum Action { | |
case observe | |
case element(Element) | |
case finished | |
case cancel | |
} | |
enum CancelID: Hashable { | |
case observe(id: UUID) | |
} | |
let stream: @Sendable () -> AsyncStream<Element> | |
var body: some ReducerOf<Self> { | |
Reduce { state, action in | |
switch action { | |
case .observe: | |
return .run { send in | |
for await element in stream() { | |
await send(.element(element)) | |
} | |
await send(.finished) | |
} | |
.cancellable( | |
id: CancelID.observe(id: state.id), | |
cancelInFlight: true | |
) | |
case .element(let element): | |
state.element = element | |
return .none | |
case .finished: | |
return .none | |
case .cancel: | |
return .cancel( | |
id: CancelID.observe(id: state.id) | |
) | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ComposableArchitecture | |
import Foundation | |
@Reducer | |
struct AsyncThrowingStreamReducer<Element, Failure> | |
where Element: Sendable, | |
Element: Equatable, | |
Failure: Error | |
{ | |
struct State: Equatable { | |
let id = UUID() | |
var element: Element? | |
} | |
enum Action { | |
case observe | |
case element(Element) | |
case completion(Failure? = nil) | |
case cancel | |
} | |
enum CancelID: Hashable { | |
case observe(id: UUID) | |
} | |
let stream: @Sendable () -> AsyncThrowingStream<Element, Failure> | |
var body: some ReducerOf<Self> { | |
Reduce { state, action in | |
switch action { | |
case .observe: | |
return .run { send in | |
for try await element in stream() { | |
await send(.element(element)) | |
} | |
await send(.completion()) | |
} catch: { error, send in | |
let error: Failure = error as! Failure | |
await send(.completion(error)) | |
} | |
.cancellable( | |
id: CancelID.observe(id: state.id), | |
cancelInFlight: true | |
) | |
case .element(let element): | |
state.element = element | |
return .none | |
case .completion(_): | |
return .none | |
case .cancel: | |
return .cancel( | |
id: CancelID.observe(id: state.id) | |
) | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment