Created
October 6, 2016 18:05
-
-
Save vovasty/c1378f035bfd3a20c7dd2d6941150f59 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ZeroMQ | |
import CZeroMQ | |
import Foundation | |
import Venice | |
public struct ZeroMqError : Error, CustomStringConvertible { | |
public let description: String | |
static var lastError: Error { | |
let description = String(validatingUTF8: zmq_strerror(zmq_errno()))! | |
return ZeroMqError(description: description) | |
} | |
} | |
public final class VeniceContext { | |
let context: UnsafeMutableRawPointer? | |
public init() throws { | |
context = zmq_ctx_new() | |
if context == nil { | |
throw ZeroMqError.lastError | |
} | |
} | |
deinit { | |
zmq_ctx_term(context) | |
} | |
public func terminate() throws { | |
if zmq_ctx_term(context) == -1 { | |
throw ZeroMqError.lastError | |
} | |
} | |
func setOption(_ option: Int32, value: Int32) { | |
zmq_ctx_set(context, option, value) | |
} | |
func getOption(_ option: Int32) -> Int32 { | |
return zmq_ctx_get(context, option) | |
} | |
public func socket(_ type: SocketType) throws -> VeniceSocket { | |
guard let socket = zmq_socket(context, type.rawValue) else { | |
throw ZeroMqError.lastError | |
} | |
return VeniceSocket(socket: socket) | |
} | |
} | |
public class VeniceSocket { | |
let socket: UnsafeMutableRawPointer | |
init(socket: UnsafeMutableRawPointer) { | |
self.socket = socket | |
} | |
deinit { | |
zmq_close(socket) | |
} | |
func getOption(_ option: Int32, value: UnsafeMutableRawPointer, length: UnsafeMutablePointer<Int>) throws { | |
if zmq_getsockopt(socket, option, value, length) == -1 { | |
throw ZeroMqError.lastError | |
} | |
} | |
public func bind(_ endpoint: String) throws { | |
if zmq_bind(socket, endpoint) == -1 { | |
throw ZeroMqError.lastError | |
} | |
} | |
public func connect(_ endpoint: String) throws { | |
if zmq_connect(socket, endpoint) == -1 { | |
throw ZeroMqError.lastError | |
} | |
} | |
public func sendMessage(_ message: Message, mode: SendMode = []) throws { | |
var m = mode | |
m.update(with: .DontWait) | |
try wait { | |
return zmq_msg_send(&message.message, socket, Int32(mode.rawValue)) | |
} | |
} | |
func send(_ buffer: UnsafeMutableRawPointer, length: Int, mode: SendMode = []) throws { | |
var m = mode | |
m.update(with: .DontWait) | |
try wait { | |
return zmq_send(socket, buffer, length, Int32(mode.rawValue)) | |
} | |
} | |
public func send(_ data: Data, mode: SendMode = []) throws { | |
var data = data | |
let _ = try data.withUnsafeMutableBytes { bytes in | |
try self.send(bytes, length: data.count, mode: mode) | |
} | |
} | |
public func receiveMessage(_ mode: ReceiveMode = []) throws -> Message { | |
var m = mode | |
m.update(with: .DontWait) | |
let message = try Message() | |
try wait { | |
return zmq_msg_recv(&message.message, socket, Int32(m.rawValue)) | |
} | |
return message | |
} | |
public func receive(_ bufferSize: Int = 1024, mode: ReceiveMode = []) throws -> Data { | |
var m = mode | |
m.update(with: .DontWait) | |
var data = Data(count: bufferSize) | |
let result = try wait { | |
data.withUnsafeMutableBytes { bytes in | |
zmq_recv(socket, bytes, bufferSize, Int32(mode.rawValue)) | |
} | |
} | |
let bufferEnd = min(Int(result), bufferSize) | |
return Data(data[0 ..< bufferEnd]) | |
} | |
public func close() throws { | |
if zmq_close(socket) == -1 { | |
throw ZeroMqError.lastError | |
} | |
} | |
fileprivate func wait1<T>(_ closure: () throws -> (Int32, T?)) throws -> T? { | |
let result = try closure() | |
guard result.0 == -1 else { return result.1 } | |
guard zmq_errno() == EAGAIN else { throw ZeroMqError.lastError } | |
var value: Int32 = 0 | |
var length = strideof(Int32.self) | |
try getOption(ZMQ_FD, value: &value, length: &length) | |
let fd = value | |
while true { | |
let events = try poll(fd, events: .read, deadline: -1) | |
guard events.contains(.read) else { | |
throw ZeroMqVeniceError(description: "Unable to poll") | |
} | |
let result = try closure() | |
guard result.0 == -1 else { return result.1 } | |
guard zmq_errno() == EAGAIN else { throw ZeroMqError.lastError } | |
} | |
} | |
@discardableResult | |
fileprivate func wait(_ closure: () throws -> (Int32)) throws -> Int32 { | |
let result = try closure() | |
guard result == -1 else { return result } | |
guard zmq_errno() == EAGAIN else { throw ZeroMqError.lastError } | |
var value: Int32 = 0 | |
var length = strideof(Int32.self) | |
try getOption(ZMQ_FD, value: &value, length: &length) | |
let fd = value | |
while true { | |
let events = try poll(fd, events: .read, deadline: -1) | |
guard events.contains(.read) else { | |
throw ZeroMqVeniceError(description: "Unable to poll") | |
} | |
let result = try closure() | |
guard result == -1 else { return result } | |
guard zmq_errno() == EAGAIN else { throw ZeroMqError.lastError } | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment