Skip to content

Instantly share code, notes, and snippets.

@siracusa
Created April 7, 2025 13:50
Show Gist options
  • Save siracusa/fdc148f4746ff2e8b736b5d602e3c320 to your computer and use it in GitHub Desktop.
Save siracusa/fdc148f4746ff2e8b736b5d602e3c320 to your computer and use it in GitHub Desktop.
Adding a timeout to code like this is tricky…
// Assume Swift 6 language mode
import Network
import Foundation
print("Asking for the current time from an NTP server...")
do {
let now = try await currentTime()
print("Current time from NTP: \(now)")
}
catch {
print("Failed to get current time from NTP server: \(error)")
}
exit(0)
// To cause this code to hang, add this to the bottom of /etc/pf.conf
//
// # /etc/pf.conf
// # Define time.apple.com
// table <time_apple> { time.apple.com }
//
// # Block outgoing NTP traffic to time.apple.com
// block out proto udp from any to <time_apple> port 123
// block out proto tcp from any to <time_apple> port 123
//
// # Block incoming NTP traffic from time.apple.com
// block in proto udp from <time_apple> port 123 to any
// block in proto tcp from <time_apple> port 123 to any
//
// Then run this command to enable the new rules:
//
// sudo pfctl -f /etc/pf.conf && sudo pfctl -e
//
// To disable the rules temporarily, run:
//
// sudo pfctl -d
//
// To revert these changes, delete or comment out the lines added
// above and then run the first command again.
func currentTime() async throws -> Date {
return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Date, Error>) in
// Some scenarios to consider:
//
// 1. connection.send() completes successfully, but no response ever
// comes. Will the code below hang forever waiting for that response?
// And when it's in this state, is there any way to check for Task
// cancellation or some other mechanism to honor a timeout imposed
// on this code by something outside it?
//
// 2. If the timeout is "imposed" by calling connection.cancel() when
// the timeout arrives, consider this race:
//
// 1. connection.send() is called from the stateUpdateHandler
// when the state is "ready".
// 2. The timeout arrives and the thing enforcing the timeout
// calls connection.cancel()
// 3. The stateUpdateHandler is called with state set to
// "cancelled" which calls continuation.resume(throwing: ...)
// 4. The completion handler for the connection.send() call in
// step 1 is called, and error is set in the call. It calls
// continuation.resume(), leading to a fatal double resume()
// error.
//
// Some Sendable shared state (protected by a queue or Mutex) to
// track whether the continuation has been resumed already or not
// can be used to avoid this, but there must be a better way...
let connection = NWConnection(host: NWEndpoint.Host("time.apple.com"), port: 123, using: .udp)
let ntpRequest = createNTPRequestPacket()
connection.stateUpdateHandler = { state in
switch state {
case.ready:
connection.send(content: ntpRequest, completion: .contentProcessed { error in
if let error {
continuation.resume(throwing: error)
}
})
case .cancelled:
continuation.resume(throwing: NTPClientError.cancelled)
case .waiting(let error), .failed(let error):
continuation.resume(throwing: error)
case .preparing, .setup:
break
@unknown default:
break
}
}
connection.receiveMessage { data, _, isComplete, error in
if let error {
continuation.resume(throwing: error)
}
else if let data, isComplete {
if let date = parseNTPResponse(data) {
continuation.resume(returning: date)
}
else {
continuation.resume(throwing: NTPClientError.parseError)
}
}
}
connection.start(queue: .global())
}
}
// XXX: Extremely janky, for demo purposes only
func createNTPRequestPacket() -> Data {
var packet = Data(count: 48)
packet[0] = 0b00100011
return packet
}
// XXX: Extremely janky, for demo purposes only
func parseNTPResponse(_ data: Data) -> Date? {
guard data.count >= 48 else { return nil }
let responseTimeOffset = 40
let secondsSince1900 =
TimeInterval(data.withUnsafeBytes {
$0.load(fromByteOffset: responseTimeOffset, as: UInt32.self).bigEndian
})
let ntpToUnixOffset: TimeInterval = 2208988800
return Date(timeIntervalSince1970: secondsSince1900 - ntpToUnixOffset)
}
enum NTPClientError : Error {
case cancelled
case parseError
}
@mattmassicotte
Copy link

mattmassicotte commented Apr 7, 2025

I spent a few minutes trying to come up with a few ideas. I don't know if they are good, but perhaps could be a starting point?

// I *think* this adequately models the system involved
struct ExternalSystem: Sendable {
    var completionHandler: (@Sendable (Result<Int, any Error>) -> Void)?
    var eventHandler: (@Sendable (Result<Int, any Error>) -> Void)?
    
    func start() {
    }
}

// This is what we want to build
extension ExternalSystem {
    mutating func doTheThing(timeout: TimeInterval) async throws -> Int {
        try await withThrowingTaskGroup() { group in
            // wait for completion
            group.addTask {
                try await withCheckedThrowingContinuation { continuation in
                    self.completionHandler = { result in
                        continuation.resume(with: result)
                    }
                }
            }
            
            // wait for events
            group.addTask {
                try await withCheckedThrowingContinuation { continuation in
                    self.eventHandler = { result in
                        continuation.resume(with: result)
                    }
                }
            }
            
            group.addTask {
                try await Task.sleep(for: .seconds(timeout))
                
                throw CancellationError()
            }
            
            // I'm not 100% sure this has guaranteed that both handlers are installed before beginning
            self.start()
            
            return try await group.next()!
        }
    }
    
    mutating func doTheThing2(timeout: TimeInterval) async throws -> Int {
        let (stream, continuation) = AsyncStream<Result<Int, Error>>.makeStream()
        
        self.completionHandler = { continuation.yield($0) }
        self.eventHandler = { continuation.yield($0) }
        
        Task {
            try await Task.sleep(for: .seconds(timeout))
            
            continuation.yield(.failure(CancellationError()))
        }
        
        start()
        
        for await value in stream {
            return try value.get()
        }
    }
}

// And here's where we use it, with a guarantee that it will never block
func runTest() async throws {
    var system = ExternalSystem()
    
    _ = try await system.doTheThing(timeout: 2.0)
}

await runTest()

@saagarjha
Copy link

Here's mine. I was lazy and aimed for a small diff, so this uses an AsyncStream.

// Assume Swift 6 language mode

import Foundation
import Network

print("Asking for the current time from an NTP server...")

do {
	let now = try await currentTime(timeout: .milliseconds(1))
	print("Current time from NTP: \(now)")
} catch {
	print("Failed to get current time from NTP server: \(error)")
}

exit(0)

// To cause this code to hang, add this to the bottom of /etc/pf.conf
//
//     # /etc/pf.conf
//     # Define time.apple.com
//     table <time_apple> { time.apple.com }
//
//     # Block outgoing NTP traffic to time.apple.com
//     block out proto udp from any to <time_apple> port 123
//     block out proto tcp from any to <time_apple> port 123
//
//     # Block incoming NTP traffic from time.apple.com
//     block in proto udp from <time_apple> port 123 to any
//     block in proto tcp from <time_apple> port 123 to any
//
// Then run this command to enable the new rules:
//
//     sudo pfctl -f /etc/pf.conf && sudo pfctl -e
//
// To disable the rules temporarily, run:
//
//     sudo pfctl -d
//
// To revert these changes, delete or comment out the lines added
// above and then run the first command again.

func currentTime() async throws -> Date {
	let connection = NWConnection(host: NWEndpoint.Host("time.apple.com"), port: 123, using: .udp)

	return try await withTaskCancellationHandler(
		operation: {
			let (stream, continuation) = AsyncThrowingStream.makeStream(of: Date.self)

			// Some scenarios to consider:
			//
			// 1. connection.send() completes successfully, but no response ever
			//    comes. Will the code below hang forever waiting for that response?
			//    And when it's in this state, is there any way to check for Task
			//    cancellation or some other mechanism to honor a timeout imposed
			//    on this code by something outside it?
			//
			// 2. If the timeout is "imposed" by calling connection.cancel() when
			//    the timeout arrives, consider this race:
			//
			//     1. connection.send() is called from the stateUpdateHandler
			//        when the state is "ready".
			//     2. The timeout arrives and the thing enforcing the timeout
			//        calls connection.cancel()
			//     3. The stateUpdateHandler is called with state set to
			//        "cancelled" which calls continuation.resume(throwing: ...)
			//     4. The completion handler for the connection.send() call in
			//        step 1 is called, and error is set in the call. It calls
			//        continuation.resume(), leading to a fatal double resume()
			//        error.
			//
			//     Some Sendable shared state (protected by a queue or Mutex) to
			//     track whether the continuation has been resumed already or not
			//     can be used to avoid this, but there must be a better way...

			let ntpRequest = createNTPRequestPacket()

			connection.stateUpdateHandler = { state in
				switch state {
					case .ready:
						connection.send(
							content: ntpRequest,
							completion: .contentProcessed { error in
								if let error {
									continuation.finish(throwing: error)
								}
							})
					case .cancelled:
						continuation.finish(throwing: NTPClientError.cancelled)
					case .waiting(let error), .failed(let error):
						continuation.finish(throwing: error)
					case .preparing, .setup:
						break
					@unknown default:
						break
				}
			}

			connection.receiveMessage { data, _, isComplete, error in
				if let error {
					continuation.finish(throwing: error)
				} else if let data, isComplete {
					if let date = parseNTPResponse(data) {
						continuation.yield(date)
					} else {
						continuation.finish(throwing: NTPClientError.parseError)
					}
				}
			}

			connection.start(queue: .global())

            // Wait for the first result to come back. Either this will be an
            // error, at which point we're done (and should rethrow it) or the
            // actual date.
            if let date = try await stream.first(where: { _ in true }) {
                return date
            } else {
                // We got cancelled before we even had a chance to send the
                // message. Throw an error in this case.
                throw CancellationError()
            }
		},
		onCancel: {
            // We got cancelled, and have no need for this anymore.
			connection.cancel()
		})
}

func currentTime(timeout: Duration) async throws -> Date? {
	try await withThrowingTaskGroup { group in
		group.addTask {
			Optional.some(try await currentTime())
		}

		group.addTask {
			try await Task.sleep(for: timeout)
			return nil
		}

        // Once we're done we should cancel the other task. This isn't too
        // important if we get a result first because the timeout task is cheap
        // but if the timeout expires we do want to cancel the connection early.
        defer {
            group.cancelAll()
        }

        // Wait for either the timeout to expire, or the time (or an error) to
        // be produced. Pick the first one (there will always be one).
		return try await group.first { _ in true }!
	}
}

// XXX: Extremely janky, for demo purposes only
func createNTPRequestPacket() -> Data {
	var packet = Data(count: 48)
	packet[0] = 0b00100011
	return packet
}

// XXX: Extremely janky, for demo purposes only
func parseNTPResponse(_ data: Data) -> Date? {
	guard data.count >= 48 else { return nil }

	let responseTimeOffset = 40

	let secondsSince1900 =
		TimeInterval(
			data.withUnsafeBytes {
				$0.load(fromByteOffset: responseTimeOffset, as: UInt32.self).bigEndian
			})

	let ntpToUnixOffset: TimeInterval = 2_208_988_800
	return Date(timeIntervalSince1970: secondsSince1900 - ntpToUnixOffset)
}

enum NTPClientError: Error {
	case cancelled
	case parseError
}

@siracusa
Copy link
Author

siracusa commented Apr 8, 2025

@saagarjha This approach helps a lot because AsyncThrowingStream.Continuation's finish(…) method is much kinder than CheckedContinuation's resume(…) methods when it comes to handling multiple calls! The async stream finish(…) method documentation says "Calling this function more than once has no effect" which is much nicer than the fatal error you get when calling CheckedContinuation's resume(…) more than once!

(Also, I think you're missing the (of: Date?.self) parameter to the withThrowingTaskGroup call.)

[Update: I'm using Swift 6 in Xcode 16.2, but it turns out that Swift 6.1 in Xcode 16.3 makes the (of: Date?.self) part unnecessary.]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment