Skip to content

Instantly share code, notes, and snippets.

@corporatepiyush
Last active August 29, 2025 06:59
Show Gist options
  • Save corporatepiyush/68d65cdb8e69b77970acd58faa3b407e to your computer and use it in GitHub Desktop.
Save corporatepiyush/68d65cdb8e69b77970acd58faa3b407e to your computer and use it in GitHub Desktop.
Optimized Web Socket Server for Millions of connections with miminum memory allocation
import 'dart:convert';
import 'dart:typed_data';
import 'package:shelf/shelf.dart';
import 'package:shelf/shelf_io.dart' as io;
import 'package:shelf_web_socket/shelf_web_socket.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
// Message types, same as in the JS version.
enum MessageType {
statusUpdate(0x01),
sensorData(0x02),
telemetryData(0x03),
userProfile(0x04),
sensorConfig(0x05),
unknown(0x00);
final int id;
const MessageType(this.id);
}
/// An extension on `ByteData` that provides convenient methods for reading
/// various data types. This approach avoids a heap allocation for a separate
/// reader class, prioritizing performance.
extension ByteDataReader on ByteData {
(int, int) readUInt8(int offset) {
final value = getUint8(offset);
return (value, offset + 1);
}
(int, int) readUInt16LE(int offset) {
final value = getUint16(offset, Endian.little);
return (value, offset + 2);
}
(int, int) readUInt32LE(int offset) {
final value = getUint32(offset, Endian.little);
return (value, offset + 4);
}
(int, int) readBigUInt64LE(int offset) {
final value = getUint64(offset, Endian.little);
return (value, offset + 8);
}
(double, int) readFloatLE(int offset) {
final value = getFloat32(offset, Endian.little);
return (value, offset + 4);
}
(double, int) readDoubleLE(int offset) {
final value = getFloat64(offset, Endian.little);
return (value, offset + 8);
}
(bool, int) readBoolean(int offset) {
final value = getUint8(offset);
return (value != 0, offset + 1);
}
(String, int) readString(int offset, int length) {
final List<int> bytes = buffer.asUint8List(offset, length);
return (utf8.decode(bytes).trim(), offset + length);
}
}
// Abstract base class for all messages.
abstract class Message {
final MessageType messageTypeId;
final ByteData _buffer;
Message._(this.messageTypeId, this._buffer);
ByteData serialize() => _buffer;
}
class StatusUpdate extends Message {
static const int FIXED_SIZE = 1 + 2 + 1 + 1 + 8;
StatusUpdate._(ByteData buffer) : super._(MessageType.statusUpdate, buffer);
factory StatusUpdate.deserialize(ByteData byteData) {
return StatusUpdate._(byteData);
}
static StatusUpdate create({
required int deviceId,
required int status,
required int batteryLevel,
required int timestamp,
}) {
final buffer = ByteData(FIXED_SIZE);
buffer.setUint8(0, MessageType.statusUpdate.id);
buffer.setUint16(1, deviceId, Endian.little);
buffer.setUint8(3, status);
buffer.setUint8(4, batteryLevel);
buffer.setUint64(5, timestamp, Endian.little);
return StatusUpdate._(buffer);
}
int get deviceId => _buffer.getUint16(1, Endian.little);
int get status => _buffer.getUint8(3);
int get batteryLevel => _buffer.getUint8(4);
int get timestamp => _buffer.getUint64(5, Endian.little);
@override
String toString() {
return 'StatusUpdate(deviceId: $deviceId, status: $status, batteryLevel: $batteryLevel, timestamp: $timestamp)';
}
}
class SensorData extends Message {
static const int FIXED_SIZE = 1 + 2 + 8 + 4;
SensorData._(ByteData buffer) : super._(MessageType.sensorData, buffer);
factory SensorData.deserialize(ByteData byteData) {
return SensorData._(byteData);
}
static SensorData create({
required int sensorId,
required int timestamp,
required double value,
}) {
final buffer = ByteData(FIXED_SIZE);
buffer.setUint8(0, MessageType.sensorData.id);
buffer.setUint16(1, sensorId, Endian.little);
buffer.setUint64(3, timestamp, Endian.little);
buffer.setFloat32(11, value, Endian.little);
return SensorData._(buffer);
}
int get sensorId => _buffer.getUint16(1, Endian.little);
int get timestamp => _buffer.getUint64(3, Endian.little);
double get value => _buffer.getFloat32(11, Endian.little);
@override
String toString() {
return 'SensorData(sensorId: $sensorId, timestamp: $timestamp, value: $value)';
}
}
class TelemetryData extends Message {
static const int FIXED_SIZE = 1 + 8 + 8 + 8 + 8 + 8 + 4 + 1 + 8 + 4 + 8;
TelemetryData._(ByteData buffer) : super._(MessageType.telemetryData, buffer);
factory TelemetryData.deserialize(ByteData byteData) {
return TelemetryData._(byteData);
}
static TelemetryData create({
required int timestamp,
required double latitude,
required double longitude,
required double altitude,
required double speed,
required int heading,
required int fuelLevel,
required double engineTemp,
required double tirePressure,
required double batteryVoltage,
}) {
final buffer = ByteData(FIXED_SIZE);
buffer.setUint8(0, MessageType.telemetryData.id);
buffer.setUint64(1, timestamp, Endian.little);
buffer.setFloat64(9, latitude, Endian.little);
buffer.setFloat64(17, longitude, Endian.little);
buffer.setFloat64(25, altitude, Endian.little);
buffer.setFloat64(33, speed, Endian.little);
buffer.setUint32(41, heading, Endian.little);
buffer.setUint8(45, fuelLevel);
buffer.setFloat64(46, engineTemp, Endian.little);
buffer.setFloat32(54, tirePressure, Endian.little);
buffer.setFloat64(58, batteryVoltage, Endian.little);
return TelemetryData._(buffer);
}
int get timestamp => _buffer.getUint64(1, Endian.little);
double get latitude => _buffer.getFloat64(9, Endian.little);
double get longitude => _buffer.getFloat64(17, Endian.little);
double get altitude => _buffer.getFloat64(25, Endian.little);
double get speed => _buffer.getFloat64(33, Endian.little);
int get heading => _buffer.getUint32(41, Endian.little);
int get fuelLevel => _buffer.getUint8(45);
double get engineTemp => _buffer.getFloat64(46, Endian.little);
double get tirePressure => _buffer.getFloat32(54, Endian.little);
double get batteryVoltage => _buffer.getFloat64(58, Endian.little);
@override
String toString() {
return 'TelemetryData(timestamp: $timestamp, latitude: $latitude, longitude: $longitude, altitude: $altitude, speed: $speed, heading: $heading, fuelLevel: $fuelLevel, engineTemp: $engineTemp, tirePressure: $tirePressure, batteryVoltage: $batteryVoltage)';
}
}
class UserProfile extends Message {
static const int FIXED_SIZE = 1 + 4 + 20 + 8 + 8 + 1 + 1 + 4 + 4 + 32 + 10;
UserProfile._(ByteData buffer) : super._(MessageType.userProfile, buffer);
factory UserProfile.deserialize(ByteData byteData) {
return UserProfile._(byteData);
}
static UserProfile create({
required int userId,
required String username,
required int registrationDate,
required int lastLoginDate,
required bool isVerified,
required bool isAdmin,
required int avatarId,
required int sessionCount,
required String emailHash,
required String preferredTheme,
}) {
final buffer = ByteData(FIXED_SIZE);
int offset = 0;
buffer.setUint8(offset, MessageType.userProfile.id);
offset += 1;
buffer.setUint32(offset, userId, Endian.little);
offset += 4;
final usernameBytes = utf8.encode(username.padRight(20));
buffer.buffer.asUint8List().setRange(offset, offset + 20, usernameBytes);
offset += 20;
buffer.setUint64(offset, registrationDate, Endian.little);
offset += 8;
buffer.setUint64(offset, lastLoginDate, Endian.little);
offset += 8;
buffer.setUint8(offset, isVerified ? 1 : 0);
offset += 1;
buffer.setUint8(offset, isAdmin ? 1 : 0);
offset += 1;
buffer.setUint32(offset, avatarId, Endian.little);
offset += 4;
buffer.setUint32(offset, sessionCount, Endian.little);
offset += 4;
final emailHashBytes = utf8.encode(emailHash.padRight(32));
buffer.buffer.asUint8List().setRange(offset, offset + 32, emailHashBytes);
offset += 32;
final preferredThemeBytes = utf8.encode(preferredTheme.padRight(10));
buffer.buffer.asUint8List().setRange(offset, offset + 10, preferredThemeBytes);
return UserProfile._(buffer);
}
int get userId => _buffer.getUint32(1, Endian.little);
String get username => utf8.decode(_buffer.buffer.asUint8List(5, 20)).trim();
int get registrationDate => _buffer.getUint64(25, Endian.little);
int get lastLoginDate => _buffer.getUint64(33, Endian.little);
bool get isVerified => _buffer.getUint8(41) != 0;
bool get isAdmin => _buffer.getUint8(42) != 0;
int get avatarId => _buffer.getUint32(43, Endian.little);
int get sessionCount => _buffer.getUint32(47, Endian.little);
String get emailHash => utf8.decode(_buffer.buffer.asUint8List(51, 32)).trim();
String get preferredTheme => utf8.decode(_buffer.buffer.asUint8List(83, 10)).trim();
@override
String toString() {
return 'UserProfile(userId: $userId, username: $username, registrationDate: $registrationDate, lastLoginDate: $lastLoginDate, isVerified: $isVerified, isAdmin: $isAdmin, avatarId: $avatarId, sessionCount: $sessionCount, emailHash: $emailHash, preferredTheme: $preferredTheme)';
}
}
class SensorConfig extends Message {
static const int FIXED_SIZE = 1 + 2 + 30 + 1 + 8 + 8 + 4 + 10 + 128 + 8 + 1;
SensorConfig._(ByteData buffer) : super._(MessageType.sensorConfig, buffer);
factory SensorConfig.deserialize(ByteData byteData) {
return SensorConfig._(byteData);
}
static SensorConfig create({
required int sensorId,
required String sensorName,
required bool isActive,
required double minThreshold,
required double maxThreshold,
required int sampleRate,
required String units,
required String description,
required int calibrationDate,
required bool requiresRestart,
}) {
final buffer = ByteData(FIXED_SIZE);
int offset = 0;
buffer.setUint8(offset, MessageType.sensorConfig.id);
offset += 1;
buffer.setUint16(offset, sensorId, Endian.little);
offset += 2;
final sensorNameBytes = utf8.encode(sensorName.padRight(30));
buffer.buffer.asUint8List().setRange(offset, offset + 30, sensorNameBytes);
offset += 30;
buffer.setUint8(offset, isActive ? 1 : 0);
offset += 1;
buffer.setFloat64(offset, minThreshold, Endian.little);
offset += 8;
buffer.setFloat64(offset, maxThreshold, Endian.little);
offset += 8;
buffer.setUint32(offset, sampleRate, Endian.little);
offset += 4;
final unitsBytes = utf8.encode(units.padRight(10));
buffer.buffer.asUint8List().setRange(offset, offset + 10, unitsBytes);
offset += 10;
final descriptionBytes = utf8.encode(description.padRight(128));
buffer.buffer.asUint8List().setRange(offset, offset + 128, descriptionBytes);
offset += 128;
buffer.setUint64(offset, calibrationDate, Endian.little);
offset += 8;
buffer.setUint8(offset, requiresRestart ? 1 : 0);
return SensorConfig._(buffer);
}
int get sensorId => _buffer.getUint16(1, Endian.little);
String get sensorName => utf8.decode(_buffer.buffer.asUint8List(3, 30)).trim();
bool get isActive => _buffer.getUint8(33) != 0;
double get minThreshold => _buffer.getFloat64(34, Endian.little);
double get maxThreshold => _buffer.getFloat64(42, Endian.little);
int get sampleRate => _buffer.getUint32(50, Endian.little);
String get units => utf8.decode(_buffer.buffer.asUint8List(54, 10)).trim();
String get description => utf8.decode(_buffer.buffer.asUint8List(64, 128)).trim();
int get calibrationDate => _buffer.getUint64(192, Endian.little);
bool get requiresRestart => _buffer.getUint8(200) != 0;
@override
String toString() {
return 'SensorConfig(sensorId: $sensorId, sensorName: $sensorName, isActive: $isActive, minThreshold: $minThreshold, maxThreshold: $maxThreshold, sampleRate: $sampleRate, units: $units, description: $description, calibrationDate: $calibrationDate, requiresRestart: $requiresRestart)';
}
}
class BinaryProtocolHandler {
final Map<int, Function(ByteData)> _deserializers = {
MessageType.statusUpdate.id: StatusUpdate.deserialize,
MessageType.sensorData.id: SensorData.deserialize,
MessageType.telemetryData.id: TelemetryData.deserialize,
MessageType.userProfile.id: UserProfile.deserialize,
MessageType.sensorConfig.id: SensorConfig.deserialize,
};
Message? deserialize(Uint8List data) {
if (data.isEmpty) {
print('Received empty message.');
return null;
}
final byteData = data.buffer.asByteData();
try {
final messageType = byteData.getUint8(0);
final deserializer = _deserializers[messageType];
if (deserializer == null) {
print('Received unknown message type: $messageType');
return null;
}
return deserializer(byteData);
} on RangeError catch (e) {
print('Security Alert: Malformed message received. $e');
return null;
}
}
Uint8List serialize(Message message) {
return message.serialize().buffer.asUint8List();
}
}
final protocolHandler = BinaryProtocolHandler();
// Send message to the WebSocket server.
void sendMessage(WebSocketChannel channel, Message message) {
try {
channel.sink.add(protocolHandler.serialize(message));
} catch (e) {
print('Serialization or send error: $e');
}
}
// WebSocket Server
void startWebSocketServer() {
final handler = webSocketHandler((WebSocketChannel webSocket) {
print('A new client connected.');
webSocket.stream.listen(
(message) {
if (message is Uint8List) {
final deserializedMessage = protocolHandler.deserialize(message);
if (deserializedMessage != null) {
print('Server received message: $deserializedMessage');
// Echo the message back to the client
sendMessage(webSocket, deserializedMessage);
}
} else {
print('Server received text message: $message');
}
},
onDone: () => print('Client disconnected.'),
onError: (e) => print('Server-side WebSocket error: $e'),
);
});
io.serve(handler, 'localhost', 8081).then((server) {
print('WebSocket server is listening at ws://${server.address.host}:${server.port}');
});
}
// WebSocket Client
void startWebSocketClient() async {
final serverUrl = 'ws://localhost:8081';
print('Connecting to $serverUrl...');
try {
final channel = WebSocketChannel.connect(Uri.parse(serverUrl));
await channel.ready;
print('Connected to WebSocket server.');
channel.stream.listen(
(message) {
if (message is Uint8List) {
final deserializedMessage = protocolHandler.deserialize(message);
if (deserializedMessage != null) {
print('Client received message: $deserializedMessage');
}
} else {
print('Client received text message: $message');
}
},
onDone: () => print('Connection closed.'),
onError: (e) => print('Client-side WebSocket error: $e'),
);
final telemetryData = TelemetryData.create(
timestamp: DateTime.now().millisecondsSinceEpoch,
latitude: 34.0191,
longitude: -118.4912,
altitude: 250.5,
speed: 95.7,
heading: 180,
fuelLevel: 75,
engineTemp: 90.2,
tirePressure: 32.5,
batteryVoltage: 12.8,
);
sendMessage(channel, telemetryData);
final statusUpdate = StatusUpdate.create(
deviceId: 101,
status: 1,
batteryLevel: 95,
timestamp: DateTime.now().millisecondsSinceEpoch,
);
sendMessage(channel, statusUpdate);
final sensorData = SensorData.create(
sensorId: 501,
timestamp: DateTime.now().millisecondsSinceEpoch,
value: 42.7,
);
sendMessage(channel, sensorData);
final userProfile = UserProfile.create(
userId: 12345,
username: 'john_doe',
registrationDate: 1672531200000,
lastLoginDate: 1672617600000,
isVerified: true,
isAdmin: false,
avatarId: 5,
sessionCount: 150,
emailHash: 'abcdef123456',
preferredTheme: 'dark',
);
sendMessage(channel, userProfile);
final sensorConfig = SensorConfig.create(
sensorId: 102,
sensorName: 'temperature_sensor',
isActive: true,
minThreshold: -10.0,
maxThreshold: 50.0,
sampleRate: 100,
units: 'Celsius',
description: 'Main temperature sensor in the engine bay.',
calibrationDate: 1672531200000,
requiresRestart: false,
);
sendMessage(channel, sensorConfig);
} catch (e) {
print('Could not connect to WebSocket server: $e');
}
}
void main() async {
startWebSocketServer();
// Wait a moment for the server to start before connecting the client
await Future.delayed(Duration(seconds: 1));
startWebSocketClient();
}
// Use a WebSocket library for the server side.
const WebSocket = require("ws");
// require faker for generating random data
const { faker } = require("@faker-js/faker");
// Message types, same as in the Dart version.
const MessageType = {
statusUpdate: 0x01,
sensorData: 0x02,
telemetryData: 0x03,
userProfile: 0x04,
sensorConfig: 0x05,
unknown: 0x00,
};
// TextEncoder and TextDecoder are used for string to byte conversions
const textEncoder = new TextEncoder();
const textDecoder = new TextDecoder();
// Abstract base class for all messages.
class Message {
constructor(messageTypeId, buffer) {
this.messageTypeId = messageTypeId;
this.buffer = buffer;
}
serialize() {
return this.buffer.buffer;
}
}
class StatusUpdate extends Message {
static FIXED_SIZE = 1 + 2 + 1 + 1 + 8;
constructor(buffer) {
super(MessageType.statusUpdate, buffer);
}
static create(deviceId, status, batteryLevel, timestamp) {
const buffer = new DataView(new ArrayBuffer(this.FIXED_SIZE));
buffer.setUint8(0, MessageType.statusUpdate);
buffer.setUint16(1, deviceId, true); // true for little-endian
buffer.setUint8(3, status);
buffer.setUint8(4, batteryLevel);
buffer.setBigUint64(5, BigInt(timestamp), true);
return new StatusUpdate(buffer);
}
get deviceId() {
return this.buffer.getUint16(1, true);
}
get status() {
return this.buffer.getUint8(3);
}
get batteryLevel() {
return this.buffer.getUint8(4);
}
get timestamp() {
return Number(this.buffer.getBigUint64(5, true));
}
toString() {
return `StatusUpdate(deviceId: ${this.deviceId}, status: ${this.status}, batteryLevel: ${this.batteryLevel}, timestamp: ${this.timestamp})`;
}
}
class SensorData extends Message {
static FIXED_SIZE = 1 + 2 + 8 + 4;
constructor(buffer) {
super(MessageType.sensorData, buffer);
}
static create(sensorId, timestamp, value) {
const buffer = new DataView(new ArrayBuffer(this.FIXED_SIZE));
buffer.setUint8(0, MessageType.sensorData);
buffer.setUint16(1, sensorId, true);
buffer.setBigUint64(3, BigInt(timestamp), true);
buffer.setFloat32(11, value, true);
return new SensorData(buffer);
}
get sensorId() {
return this.buffer.getUint16(1, true);
}
get timestamp() {
return Number(this.buffer.getBigUint64(3, true));
}
get value() {
return this.buffer.getFloat32(11, true);
}
toString() {
return `SensorData(sensorId: ${this.sensorId}, timestamp: ${this.timestamp}, value: ${this.value})`;
}
}
class TelemetryData extends Message {
static FIXED_SIZE = 1 + 8 + 8 + 8 + 8 + 8 + 4 + 1 + 8 + 4 + 8;
constructor(buffer) {
super(MessageType.telemetryData, buffer);
}
static create(
timestamp,
latitude,
longitude,
altitude,
speed,
heading,
fuelLevel,
engineTemp,
tirePressure,
batteryVoltage,
) {
const buffer = new DataView(new ArrayBuffer(this.FIXED_SIZE));
buffer.setUint8(0, MessageType.telemetryData);
buffer.setBigUint64(1, BigInt(timestamp), true);
buffer.setFloat64(9, latitude, true);
buffer.setFloat64(17, longitude, true);
buffer.setFloat64(25, altitude, true);
buffer.setFloat64(33, speed, true);
buffer.setUint32(41, heading, true);
buffer.setUint8(45, fuelLevel);
buffer.setFloat64(46, engineTemp, true);
buffer.setFloat32(54, tirePressure, true);
buffer.setFloat64(58, batteryVoltage, true);
return new TelemetryData(buffer);
}
get timestamp() {
return Number(this.buffer.getBigUint64(1, true));
}
get latitude() {
return this.buffer.getFloat64(9, true);
}
get longitude() {
return this.buffer.getFloat64(17, true);
}
get altitude() {
return this.buffer.getFloat64(25, true);
}
get speed() {
return this.buffer.getFloat64(33, true);
}
get heading() {
return this.buffer.getUint32(41, true);
}
get fuelLevel() {
return this.buffer.getUint8(45);
}
get engineTemp() {
return this.buffer.getFloat64(46, true);
}
get tirePressure() {
return this.buffer.getFloat32(54, true);
}
get batteryVoltage() {
return this.buffer.getFloat64(58, true);
}
toString() {
return `TelemetryData(timestamp: ${this.timestamp}, latitude: ${this.latitude}, longitude: ${this.longitude}, altitude: ${this.altitude}, speed: ${this.speed}, heading: ${this.heading}, fuelLevel: ${this.fuelLevel}, engineTemp: ${this.engineTemp}, tirePressure: ${this.tirePressure}, batteryVoltage: ${this.batteryVoltage})`;
}
}
class UserProfile extends Message {
static FIXED_SIZE = 1 + 4 + 20 + 8 + 8 + 1 + 1 + 4 + 4 + 32 + 10;
static USERNAME_OFFSET = 5;
static USERNAME_SIZE = 20;
static EMAIL_HASH_OFFSET = 51;
static EMAIL_HASH_SIZE = 32;
static THEME_OFFSET = 83;
static THEME_SIZE = 10;
constructor(buffer) {
super(MessageType.userProfile, buffer);
}
static create(
userId,
username,
registrationDate,
lastLoginDate,
isVerified,
isAdmin,
avatarId,
sessionCount,
emailHash,
preferredTheme,
) {
const buffer = new DataView(new ArrayBuffer(this.FIXED_SIZE));
buffer.setUint8(0, MessageType.userProfile);
buffer.setUint32(1, userId, true);
const usernameBytes = textEncoder.encode(
username.padEnd(this.USERNAME_SIZE, "\0"),
);
new Uint8Array(buffer.buffer).set(usernameBytes, this.USERNAME_OFFSET);
buffer.setBigUint64(25, BigInt(registrationDate), true);
buffer.setBigUint64(33, BigInt(lastLoginDate), true);
buffer.setUint8(41, isVerified ? 1 : 0);
buffer.setUint8(42, isAdmin ? 1 : 0);
buffer.setUint32(43, avatarId, true);
buffer.setUint32(47, sessionCount, true);
const emailHashBytes = textEncoder.encode(
emailHash.padEnd(this.EMAIL_HASH_SIZE, "\0"),
);
new Uint8Array(buffer.buffer).set(emailHashBytes, this.EMAIL_HASH_OFFSET);
const preferredThemeBytes = textEncoder.encode(
preferredTheme.padEnd(this.THEME_SIZE, "\0"),
);
new Uint8Array(buffer.buffer).set(preferredThemeBytes, this.THEME_OFFSET);
return new UserProfile(buffer);
}
get userId() {
return this.buffer.getUint32(1, true);
}
get username() {
const bytes = new Uint8Array(
this.buffer.buffer,
this.USERNAME_OFFSET,
this.USERNAME_SIZE,
);
return textDecoder.decode(bytes).replace(/\0/g, "");
}
get registrationDate() {
return Number(this.buffer.getBigUint64(25, true));
}
get lastLoginDate() {
return Number(this.buffer.getBigUint64(33, true));
}
get isVerified() {
return this.buffer.getUint8(41) !== 0;
}
get isAdmin() {
return this.buffer.getUint8(42) !== 0;
}
get avatarId() {
return this.buffer.getUint32(43, true);
}
get sessionCount() {
return this.buffer.getUint32(47, true);
}
get emailHash() {
const bytes = new Uint8Array(
this.buffer.buffer,
this.EMAIL_HASH_OFFSET,
this.EMAIL_HASH_SIZE,
);
return textDecoder.decode(bytes).replace(/\0/g, "");
}
get preferredTheme() {
const bytes = new Uint8Array(
this.buffer.buffer,
this.THEME_OFFSET,
this.THEME_SIZE,
);
return textDecoder.decode(bytes).replace(/\0/g, "");
}
toString() {
return `UserProfile(userId: ${this.userId}, username: ${this.username}, registrationDate: ${this.registrationDate}, lastLoginDate: ${this.lastLoginDate}, isVerified: ${this.isVerified}, isAdmin: ${this.isAdmin}, avatarId: ${this.avatarId}, sessionCount: ${this.sessionCount}, emailHash: ${this.emailHash}, preferredTheme: ${this.preferredTheme})`;
}
}
class SensorConfig extends Message {
static FIXED_SIZE = 1 + 2 + 30 + 1 + 8 + 8 + 4 + 10 + 128 + 8 + 1;
static SENSOR_NAME_OFFSET = 3;
static SENSOR_NAME_SIZE = 30;
static UNITS_OFFSET = 54;
static UNITS_SIZE = 10;
static DESCRIPTION_OFFSET = 64;
static DESCRIPTION_SIZE = 128;
constructor(buffer) {
super(MessageType.sensorConfig, buffer);
}
static create(
sensorId,
sensorName,
isActive,
minThreshold,
maxThreshold,
sampleRate,
units,
description,
calibrationDate,
requiresRestart,
) {
const buffer = new DataView(new ArrayBuffer(this.FIXED_SIZE));
buffer.setUint8(0, MessageType.sensorConfig);
buffer.setUint16(1, sensorId, true);
const sensorNameBytes = textEncoder.encode(
sensorName.padEnd(this.SENSOR_NAME_SIZE, "\0"),
);
new Uint8Array(buffer.buffer).set(sensorNameBytes, this.SENSOR_NAME_OFFSET);
buffer.setUint8(33, isActive ? 1 : 0);
buffer.setFloat64(34, minThreshold, true);
buffer.setFloat64(42, maxThreshold, true);
buffer.setUint32(50, sampleRate, true);
const unitsBytes = textEncoder.encode(units.padEnd(this.UNITS_SIZE, "\0"));
new Uint8Array(buffer.buffer).set(unitsBytes, this.UNITS_OFFSET);
const descriptionBytes = textEncoder.encode(
description.padEnd(this.DESCRIPTION_SIZE, "\0"),
);
new Uint8Array(buffer.buffer).set(
descriptionBytes,
this.DESCRIPTION_OFFSET,
);
buffer.setBigUint64(192, BigInt(calibrationDate), true);
buffer.setUint8(200, requiresRestart ? 1 : 0);
return new SensorConfig(buffer);
}
get sensorId() {
return this.buffer.getUint16(1, true);
}
get sensorName() {
const bytes = new Uint8Array(
this.buffer.buffer,
this.SENSOR_NAME_OFFSET,
this.SENSOR_NAME_SIZE,
);
return textDecoder.decode(bytes).replace(/\0/g, "");
}
get isActive() {
return this.buffer.getUint8(33) !== 0;
}
get minThreshold() {
return this.buffer.getFloat64(34, true);
}
get maxThreshold() {
return this.buffer.getFloat64(42, true);
}
get sampleRate() {
return this.buffer.getUint32(50, true);
}
get units() {
const bytes = new Uint8Array(
this.buffer.buffer,
this.UNITS_OFFSET,
this.UNITS_SIZE,
);
return textDecoder.decode(bytes).replace(/\0/g, "");
}
get description() {
const bytes = new Uint8Array(
this.buffer.buffer,
this.DESCRIPTION_OFFSET,
this.DESCRIPTION_SIZE,
);
return textDecoder.decode(bytes).replace(/\0/g, "");
}
get calibrationDate() {
return Number(this.buffer.getBigUint64(192, true));
}
get requiresRestart() {
return this.buffer.getUint8(200) !== 0;
}
toString() {
return `SensorConfig(sensorId: ${this.sensorId}, sensorName: ${this.sensorName}, isActive: ${this.isActive}, minThreshold: ${this.minThreshold}, maxThreshold: ${this.maxThreshold}, sampleRate: ${this.sampleRate}, units: ${this.units}, description: ${this.description}, calibrationDate: ${this.calibrationDate}, requiresRestart: ${this.requiresRestart})`;
}
}
class BinaryProtocolHandler {
constructor() {
this.deserializers = {
[MessageType.statusUpdate]: (buffer) => new StatusUpdate(buffer),
[MessageType.sensorData]: (buffer) => new SensorData(buffer),
[MessageType.telemetryData]: (buffer) => new TelemetryData(buffer),
[MessageType.userProfile]: (buffer) => new UserProfile(buffer),
[MessageType.sensorConfig]: (buffer) => new SensorConfig(buffer),
};
}
deserialize(data) {
if (!data || data.byteLength === 0) {
console.log("Received empty message.");
return null;
}
const buffer = new DataView(data);
try {
const messageType = buffer.getUint8(0);
const deserializer = this.deserializers[messageType];
if (!deserializer) {
console.log(`Received unknown message type: ${messageType}`);
return null;
}
return deserializer(buffer);
} catch (e) {
console.log(`Security Alert: Malformed message received. ${e.message}`);
return null;
}
}
serialize(message) {
return message.serialize();
}
}
const protocolHandler = new BinaryProtocolHandler();
// Send message to the WebSocket server.
function sendMessage(ws, message) {
try {
ws.send(protocolHandler.serialize(message));
} catch (e) {
console.log(`Serialization or send error: ${e.message}`);
}
}
// Utility to send a message via HTTP REST API with application/octet-stream
async function sendBinaryPost(url, message) {
try {
const buffer = protocolHandler.serialize(message);
console.log(
`Sending binary POST request to ${url} with message of type ${message.constructor.name}`,
);
const response = await fetch(url, {
method: "POST",
headers: {
"Content-Type": "application/octet-stream",
},
body: buffer,
});
if (response.ok) {
console.log(`HTTP POST successful! Status: ${response.status}`);
// In a real scenario, you'd handle the binary response here.
const responseData = await response.arrayBuffer();
const deserializedResponse = protocolHandler.deserialize(responseData);
console.log("Received binary response:", deserializedResponse.toString());
} else {
console.error(`HTTP POST failed with status: ${response.status}`);
}
} catch (e) {
console.error(`Error sending HTTP request: ${e.message}`);
console.log(
"Note: This will fail unless a mock REST endpoint is running on the specified URL.",
);
}
}
// WebSocket Server
function startWebSocketServer() {
const wss = new WebSocket.Server({ port: 8081 });
wss.on("connection", (ws) => {
console.log("A new client connected.");
ws.on("message", (message) => {
// WebSocket messages are ArrayBuffer by default for binary data
if (message instanceof ArrayBuffer) {
const deserializedMessage = protocolHandler.deserialize(message);
if (deserializedMessage) {
console.log(
"Server received message:",
deserializedMessage.toString(),
);
// Echo the message back to the client
sendMessage(ws, deserializedMessage);
}
} else {
console.log("Server received text message:", message.toString());
}
});
ws.on("close", () => console.log("Client disconnected."));
ws.on("error", (e) =>
console.log(`Server-side WebSocket error: ${e.message}`),
);
});
console.log("WebSocket server is listening at ws://localhost:8081");
}
// WebSocket Client
function startWebSocketClient() {
const ws = new WebSocket("ws://localhost:8081");
ws.onopen = () => {
console.log("Connected to WebSocket server.");
const telemetryData = TelemetryData.create(
Date.now(),
34.0191,
-118.4912,
250.5,
95.7,
180,
75,
90.2,
32.5,
12.8,
);
sendMessage(ws, telemetryData);
const statusUpdate = StatusUpdate.create(101, 1, 95, Date.now());
sendMessage(ws, statusUpdate);
const sensorData = SensorData.create(501, Date.now(), 42.7);
sendMessage(ws, sensorData);
const userProfile = UserProfile.create(
12345,
"john_doe",
1672531200000,
1672617600000,
true,
false,
5,
150,
"abcdef123456",
"dark",
);
sendMessage(ws, userProfile);
const sensorConfig = SensorConfig.create(
102,
"temperature_sensor",
true,
-10.0,
50.0,
100,
"Celsius",
"Main temperature sensor in the engine bay.",
1672531200000,
false,
);
sendMessage(ws, sensorConfig);
};
ws.onmessage = (event) => {
if (event.data instanceof ArrayBuffer) {
const deserializedMessage = protocolHandler.deserialize(event.data);
if (deserializedMessage) {
console.log("Client received message:", deserializedMessage.toString());
}
} else {
console.log("Client received text message:", event.data.toString());
}
};
ws.onclose = () => console.log("Connection closed.");
ws.onerror = (e) => console.log(`Client-side WebSocket error: ${e.message}`);
}
const NUM_MESSAGES = 100000;
// PRE-GENERATE ALL DATA BEFORE THE BENCHMARKS
const rawTestData = [];
function generateTestData() {
console.log("Generating all test data before starting benchmarks...");
for (let i = 0; i < NUM_MESSAGES; i++) {
// Generate raw data and store it
rawTestData.push({
timestamp: Date.now(),
latitude: faker.location.latitude(),
longitude: faker.location.longitude(),
altitude: faker.number.float({ min: 0, max: 1000, precision: 0.1 }),
speed: faker.number.float({ min: 0, max: 200, precision: 0.1 }),
heading: faker.number.int({ min: 0, max: 360 }),
fuelLevel: faker.number.int({ min: 0, max: 100 }),
engineTemp: faker.number.float({ min: -40, max: 150, precision: 0.1 }),
tirePressure: faker.number.float({ min: 20, max: 50, precision: 0.1 }),
batteryVoltage: faker.number.float({ min: 10, max: 15, precision: 0.1 }),
});
}
console.log("Test data generation complete.");
}
// Benchmarking function for JSON creation, serialization, and deserialization
function runJsonBenchmark() {
console.log("\n--- JSON Benchmark ---");
const jsonStart = performance.now();
const serializedMessages = [];
let jsonTotalSize = 0;
const memoryBefore = process.memoryUsage().heapUsed;
// Serialization benchmark
for (const data of rawTestData) {
// Step 1: Create the JSON object literal
const jsonData = {
messageType: MessageType.telemetryData,
...data,
};
// Step 2: Serialize it
const serialized = JSON.stringify(jsonData);
serializedMessages.push(serialized);
jsonTotalSize += serialized.length;
}
const jsonSerializationTime = performance.now() - jsonStart;
const memoryAfter = process.memoryUsage().heapUsed;
console.log("JSON Serialization Stats:");
console.log(
`- Time to create and serialize ${NUM_MESSAGES} messages: ${jsonSerializationTime.toFixed(2)} ms`,
);
console.log(
`- Total serialized size: ${(jsonTotalSize / 1024).toFixed(2)} KB`,
);
console.log(
`- Average message size: ${(jsonTotalSize / NUM_MESSAGES).toFixed(2)} bytes`,
);
const jsonDeserializationStart = performance.now();
for (const data of serializedMessages) {
JSON.parse(data);
}
const jsonDeserializationTime = performance.now() - jsonDeserializationStart;
console.log(
`- Time to deserialize ${NUM_MESSAGES} messages: ${jsonDeserializationTime.toFixed(2)} ms`,
);
}
// Benchmarking function for binary creation, serialization, and deserialization
function runBinaryBenchmark() {
console.log("\n--- Binary Benchmark ---");
const binaryStart = performance.now();
const serializedMessages = [];
let binaryTotalSize = 0;
const binaryMemoryBefore = process.memoryUsage().heapUsed;
// Serialization benchmark
for (const data of rawTestData) {
// Step 1: Create the TelemetryData object
const msg = TelemetryData.create(
data.timestamp,
data.latitude,
data.longitude,
data.altitude,
data.speed,
data.heading,
data.fuelLevel,
data.engineTemp,
data.tirePressure,
data.batteryVoltage,
);
// Step 2: Serialize it
const serialized = protocolHandler.serialize(msg);
serializedMessages.push(serialized);
binaryTotalSize += serialized.byteLength;
}
const binarySerializationTime = performance.now() - binaryStart;
const binaryMemoryAfter = process.memoryUsage().heapUsed;
console.log("Binary Serialization Stats:");
console.log(
`- Time to create and serialize ${NUM_MESSAGES} messages: ${binarySerializationTime.toFixed(2)} ms`,
);
console.log(
`- Total serialized size: ${(binaryTotalSize / 1024).toFixed(2)} KB`,
);
console.log(
`- Average message size: ${(binaryTotalSize / NUM_MESSAGES).toFixed(2)} bytes`,
);
const binaryDeserializationStart = performance.now();
for (const data of serializedMessages) {
const msg = protocolHandler.deserialize(data);
const val = msg.latitude; // Force full deserialization
}
const binaryDeserializationTime =
performance.now() - binaryDeserializationStart;
console.log(
`- Time to deserialize ${NUM_MESSAGES} messages: ${binaryDeserializationTime.toFixed(2)} ms`,
);
}
// Main execution
function main() {
generateTestData(); // Generate data once before any benchmarks run
startWebSocketServer();
setTimeout(startWebSocketClient, 1000); // Wait for server to start
setTimeout(() => {
runJsonBenchmark();
setTimeout(() => {
runBinaryBenchmark();
}, 3000); // Sleep for 3 seconds before running the next benchmark
}, 2000); // Wait for everything to be ready before running benchmark
}
main();
import 'dart:io';
import 'dart:typed_data';
/// Enum equivalent for our custom message types.
enum MessageType {
statusUpdate(1),
sensorData(2),
telemetryData(3),
userProfile(4),
sensorConfig(5),
unknown(0);
final int id;
const MessageType(this.id);
}
/// A helper class to read from a ByteData buffer with robust bounds checking.
/// This is critical for security to prevent buffer over-reads.
class BinaryReader {
final ByteData _buffer;
int _offset = 0;
BinaryReader(this._buffer) {
if (_buffer == null) {
throw ArgumentError('Input must be a non-null ByteData object.');
}
}
/// Internal helper for bounds checking.
void _checkBounds(int byteCount) {
if (_offset + byteCount > _buffer.lengthInBytes) {
throw RangeError(
'Cannot read $byteCount bytes, out of bounds. Current offset: $_offset, Buffer length: ${_buffer.lengthInBytes}');
}
}
void skipBytes(int count) {
_checkBounds(count);
_offset += count;
}
int readUInt8() {
_checkBounds(1);
final value = _buffer.getUint8(_offset);
_offset += 1;
return value;
}
int readUInt16LE() {
_checkBounds(2);
final value = _buffer.getUint16(_offset, Endian.little);
_offset += 2;
return value;
}
int readUInt32LE() {
_checkBounds(4);
final value = _buffer.getUint32(_offset, Endian.little);
_offset += 4;
return value;
}
int readBigUInt64LE() {
_checkBounds(8);
final value = _buffer.getUint64(_offset, Endian.little);
_offset += 8;
return value;
}
double readFloatLE() {
_checkBounds(4);
final value = _buffer.getFloat32(_offset, Endian.little);
_offset += 4;
return value;
}
double readDoubleLE() {
_checkBounds(8);
final value = _buffer.getFloat64(_offset, Endian.little);
_offset += 8;
return value;
}
bool readBoolean() {
_checkBounds(1);
final value = _buffer.getUint8(_offset);
_offset += 1;
return value != 0;
}
/// Rectification for potential denial-of-service and buffer over-read attacks:
/// We limit the maximum length for strings to prevent attackers from requesting
/// a massive string size that could consume excessive memory and crash the server.
String readString(int length) {
// Add a sane maximum length for string fields to prevent memory-based DoS attacks.
const MAX_STRING_LENGTH = 1024; // 1 KB
if (length > MAX_STRING_LENGTH) {
throw RangeError(
'String length of $length exceeds maximum allowed size of $MAX_STRING_LENGTH.');
}
_checkBounds(length);
final value = String.fromCharCodes(
_buffer.buffer.asUint8List(_buffer.offsetInBytes + _offset, length));
_offset += length;
return value;
}
}
/// Data class for a Status Update message.
class StatusUpdate {
static const messageTypeId = 1;
static const fixedSize = 1 + 2 + 1 + 1 + 8;
final int deviceId;
final int status;
final int batteryLevel;
final int timestamp;
StatusUpdate(this.deviceId, this.status, this.batteryLevel, this.timestamp) {
if (deviceId < 0 || deviceId > 65535) {
throw ArgumentError('Invalid deviceId');
}
if (batteryLevel < 0 || batteryLevel > 100) {
throw ArgumentError('Invalid batteryLevel');
}
}
static StatusUpdate deserialize(BinaryReader reader) {
final deviceId = reader.readUInt16LE();
final status = reader.readUInt8();
final batteryLevel = reader.readUInt8();
final timestamp = reader.readBigUInt64LE();
return StatusUpdate(deviceId, status, batteryLevel, timestamp);
}
int serialize(ByteData buffer, int offset) {
buffer.setUint8(offset++, messageTypeId);
buffer.setUint16(offset, deviceId, Endian.little);
offset += 2;
buffer.setUint8(offset++, status);
buffer.setUint8(offset++, batteryLevel);
buffer.setUint64(offset, timestamp, Endian.little);
offset += 8;
return offset;
}
}
/// Data class for Sensor Data.
class SensorData {
static const messageTypeId = 2;
static const fixedSize = 1 + 2 + 8 + 4;
final int sensorId;
final int timestamp;
final double value;
SensorData(this.sensorId, this.timestamp, this.value) {
if (sensorId < 0 || sensorId > 65535) {
throw ArgumentError('Invalid sensorId');
}
}
static SensorData deserialize(BinaryReader reader) {
final sensorId = reader.readUInt16LE();
final timestamp = reader.readBigUInt64LE();
final value = reader.readFloatLE();
return SensorData(sensorId, timestamp, value);
}
int serialize(ByteData buffer, int offset) {
buffer.setUint8(offset++, messageTypeId);
buffer.setUint16(offset, sensorId, Endian.little);
offset += 2;
buffer.setUint64(offset, timestamp, Endian.little);
offset += 8;
buffer.setFloat32(offset, value, Endian.little);
offset += 4;
return offset;
}
}
/// Data class for Telemetry Data with 10 fields.
class TelemetryData {
static const messageTypeId = 3;
static const fixedSize = 1 + 8 + 8 + 8 + 8 + 8 + 4 + 1 + 8 + 4 + 8;
final int timestamp;
final double latitude;
final double longitude;
final double altitude;
final double speed;
final int heading;
final int fuelLevel;
final double engineTemp;
final double tirePressure;
final double batteryVoltage;
TelemetryData(
this.timestamp,
this.latitude,
this.longitude,
this.altitude,
this.speed,
this.heading,
this.fuelLevel,
this.engineTemp,
this.tirePressure,
this.batteryVoltage);
static TelemetryData deserialize(BinaryReader reader) {
final timestamp = reader.readBigUInt64LE();
final latitude = reader.readDoubleLE();
final longitude = reader.readDoubleLE();
final altitude = reader.readDoubleLE();
final speed = reader.readDoubleLE();
final heading = reader.readUInt32LE();
final fuelLevel = reader.readUInt8();
final engineTemp = reader.readDoubleLE();
final tirePressure = reader.readFloatLE();
final batteryVoltage = reader.readDoubleLE();
return TelemetryData(timestamp, latitude, longitude, altitude, speed,
heading, fuelLevel, engineTemp, tirePressure, batteryVoltage);
}
int serialize(ByteData buffer, int offset) {
buffer.setUint8(offset++, messageTypeId);
buffer.setUint64(offset, timestamp, Endian.little);
offset += 8;
buffer.setFloat64(offset, latitude, Endian.little);
offset += 8;
buffer.setFloat64(offset, longitude, Endian.little);
offset += 8;
buffer.setFloat64(offset, altitude, Endian.little);
offset += 8;
buffer.setFloat64(offset, speed, Endian.little);
offset += 8;
buffer.setUint32(offset, heading, Endian.little);
offset += 4;
buffer.setUint8(offset++, fuelLevel);
buffer.setFloat64(offset, engineTemp, Endian.little);
offset += 8;
buffer.setFloat32(offset, tirePressure, Endian.little);
offset += 4;
buffer.setFloat64(offset, batteryVoltage, Endian.little);
offset += 8;
return offset;
}
}
/// Data class for a User Profile message.
class UserProfile {
static const messageTypeId = 4;
final int userId;
final String username;
final int registrationDate;
final int lastLoginDate;
final bool isVerified;
final bool isAdmin;
final int avatarId;
final int sessionCount;
final String emailHash;
final String preferredTheme;
UserProfile(
this.userId,
this.username,
this.registrationDate,
this.lastLoginDate,
this.isVerified,
this.isAdmin,
this.avatarId,
this.sessionCount,
this.emailHash,
this.preferredTheme) {
if (userId < 0) throw ArgumentError('Invalid userId');
}
static UserProfile deserialize(BinaryReader reader) {
final userId = reader.readUInt32LE();
final usernameLength = reader.readUInt32LE();
final username = reader.readString(usernameLength);
final registrationDate = reader.readBigUInt64LE();
final lastLoginDate = reader.readBigUInt64LE();
final isVerified = reader.readBoolean();
final isAdmin = reader.readBoolean();
final avatarId = reader.readUInt32LE();
final sessionCount = reader.readUInt32LE();
final emailHashLength = reader.readUInt32LE();
final emailHash = reader.readString(emailHashLength);
final themeLength = reader.readUInt32LE();
final preferredTheme = reader.readString(themeLength);
return UserProfile(
userId,
username,
registrationDate,
lastLoginDate,
isVerified,
isAdmin,
avatarId,
sessionCount,
emailHash,
preferredTheme);
}
int serialize(ByteData buffer, int offset) {
final usernameBytes = Uint8List.fromList(username.codeUnits);
final emailHashBytes = Uint8List.fromList(emailHash.codeUnits);
final themeBytes = Uint8List.fromList(preferredTheme.codeUnits);
buffer.setUint8(offset++, messageTypeId);
buffer.setUint32(offset, userId, Endian.little);
offset += 4;
buffer.setUint32(offset, usernameBytes.length, Endian.little);
offset += 4;
buffer.buffer
.asUint8List(buffer.offsetInBytes + offset)
.setAll(0, usernameBytes);
offset += usernameBytes.length;
buffer.setUint64(offset, registrationDate, Endian.little);
offset += 8;
buffer.setUint64(offset, lastLoginDate, Endian.little);
offset += 8;
buffer.setUint8(offset++, isVerified ? 1 : 0);
buffer.setUint8(offset++, isAdmin ? 1 : 0);
buffer.setUint32(offset, avatarId, Endian.little);
offset += 4;
buffer.setUint32(offset, sessionCount, Endian.little);
offset += 4;
buffer.setUint32(offset, emailHashBytes.length, Endian.little);
offset += 4;
buffer.buffer
.asUint8List(buffer.offsetInBytes + offset)
.setAll(0, emailHashBytes);
offset += emailHashBytes.length;
buffer.setUint32(offset, themeBytes.length, Endian.little);
offset += 4;
buffer.buffer
.asUint8List(buffer.offsetInBytes + offset)
.setAll(0, themeBytes);
offset += themeBytes.length;
return offset;
}
}
/// Data class for Sensor Configuration.
class SensorConfig {
static const messageTypeId = 5;
final int sensorId;
final String sensorName;
final bool isActive;
final double minThreshold;
final double maxThreshold;
final int sampleRate;
final String units;
final String description;
final int calibrationDate;
final bool requiresRestart;
SensorConfig(
this.sensorId,
this.sensorName,
this.isActive,
this.minThreshold,
this.maxThreshold,
this.sampleRate,
this.units,
this.description,
this.calibrationDate,
this.requiresRestart) {
if (sensorId < 0 || sensorId > 65535) {
throw ArgumentError('Invalid sensorId');
}
}
static SensorConfig deserialize(BinaryReader reader) {
final sensorId = reader.readUInt16LE();
final nameLength = reader.readUInt32LE();
final sensorName = reader.readString(nameLength);
final isActive = reader.readBoolean();
final minThreshold = reader.readDoubleLE();
final maxThreshold = reader.readDoubleLE();
final sampleRate = reader.readUInt32LE();
final unitsLength = reader.readUInt32LE();
final units = reader.readString(unitsLength);
final descLength = reader.readUInt32LE();
final description = reader.readString(descLength);
final calibrationDate = reader.readBigUInt64LE();
final requiresRestart = reader.readBoolean();
return SensorConfig(
sensorId,
sensorName,
isActive,
minThreshold,
maxThreshold,
sampleRate,
units,
description,
calibrationDate,
requiresRestart);
}
int serialize(ByteData buffer, int offset) {
final nameBytes = Uint8List.fromList(sensorName.codeUnits);
final unitsBytes = Uint8List.fromList(units.codeUnits);
final descBytes = Uint8List.fromList(description.codeUnits);
buffer.setUint8(offset++, messageTypeId);
buffer.setUint16(offset, sensorId, Endian.little);
offset += 2;
buffer.setUint32(offset, nameBytes.length, Endian.little);
offset += 4;
buffer.buffer
.asUint8List(buffer.offsetInBytes + offset)
.setAll(0, nameBytes);
offset += nameBytes.length;
buffer.setUint8(offset++, isActive ? 1 : 0);
buffer.setFloat64(offset, minThreshold, Endian.little);
offset += 8;
buffer.setFloat64(offset, maxThreshold, Endian.little);
offset += 8;
buffer.setUint32(offset, sampleRate, Endian.little);
offset += 4;
buffer.setUint32(offset, unitsBytes.length, Endian.little);
offset += 4;
buffer.buffer
.asUint8List(buffer.offsetInBytes + offset)
.setAll(0, unitsBytes);
offset += unitsBytes.length;
buffer.setUint32(offset, descBytes.length, Endian.little);
offset += 4;
buffer.buffer
.asUint8List(buffer.offsetInBytes + offset)
.setAll(0, descBytes);
offset += descBytes.length;
buffer.setUint64(offset, calibrationDate, Endian.little);
offset += 8;
buffer.setUint8(offset++, requiresRestart ? 1 : 0);
return offset;
}
}
/// Handler to process the binary message based on its type.
void handleBinaryMessage(Uint8List data) {
if (data.isEmpty) return;
final reader = BinaryReader(
ByteData.sublistView(data));
final messageTypeId = reader.readUInt8();
final messageType =
MessageType.values.firstWhere((e) => e.id == messageTypeId, orElse: () => MessageType.unknown);
switch (messageType) {
case MessageType.statusUpdate:
final statusUpdate = StatusUpdate.deserialize(reader);
print('Received status update: $statusUpdate');
break;
case MessageType.sensorData:
final sensorData = SensorData.deserialize(reader);
print('Received sensor data: $sensorData');
break;
case MessageType.telemetryData:
final telemetryData = TelemetryData.deserialize(reader);
print('Received telemetry data: $telemetryData');
break;
case MessageType.userProfile:
final userProfile = UserProfile.deserialize(reader);
print('Received user profile: $userProfile');
break;
case MessageType.sensorConfig:
final sensorConfig = SensorConfig.deserialize(reader);
print('Received sensor config: $sensorConfig');
break;
default:
print('Received unknown message type: $messageTypeId');
break;
}
}
/// A single, globally accessible buffer.
/// A size of 2048 bytes should be sufficient for the example messages.
final ByteData _globalBuffer = ByteData(2048);
/// Sends a message instance over the WebSocket connection.
void sendMessage(WebSocket ws, dynamic messageInstance) {
try {
// The serialize method now returns the number of bytes written.
final endOffset = messageInstance.serialize(_globalBuffer, 0);
// Create a new Uint8List from the relevant part of the buffer.
// This is the only small, unavoidable allocation due to the API.
final messageBytes = _globalBuffer.buffer.asUint8List(0, endOffset);
ws.add(messageBytes);
} catch (e) {
print('Serialization or send error: $e');
}
}
/// Handles the 'open' event for the WebSocket connection.
void handleOpen(WebSocket ws) {
print('Connected to WebSocket server.');
// Example of sending new message types
final telemetryData = TelemetryData(
DateTime.now().millisecondsSinceEpoch,
34.0191,
-118.4912,
250.5,
95.7,
180,
75,
90.2,
32.5,
12.8);
sendMessage(ws, telemetryData);
final userProfile = UserProfile(
12345,
'john_doe',
1672531200000,
1672617600000,
true,
false,
5,
150,
'abcdef123456',
'dark');
sendMessage(ws, userProfile);
}
/// Handles the 'message' event for the WebSocket connection.
void handleMessage(dynamic message) {
if (message is Uint8List) {
try {
// Use a try/catch block to prevent a malformed message from crashing the application.
handleBinaryMessage(message);
} catch (e) {
print('Security Alert: Malformed message received. $e');
}
} else {
print('Received text message: $message');
}
}
/// Handles the 'close' event for the WebSocket connection.
void handleClose(int? code, String? reason) {
print('Connection closed with code $code: $reason');
}
/// Handles the 'error' event for the WebSocket connection.
void handleError(error) {
print('Connection error: $error');
}
/// Main function to connect and set up WebSocket event listeners.
void main() async {
final serverUrl = 'wss://localhost:8080';
try {
final ws = await WebSocket.connect(serverUrl);
ws.listen(
handleMessage,
onDone: () => handleClose(ws.closeCode, ws.closeReason),
onError: handleError,
);
// Call handleOpen after a delay to ensure listeners are set up.
Future.delayed(Duration(seconds: 1), () => handleOpen(ws));
} catch (e) {
print('Could not connect to WebSocket server: $e');
}
}
package main
import (
"bytes"
"encoding/binary"
"fmt"
"log"
"net/http"
"time"
"golang.org/x/net/websocket"
)
// MessageType represents the type of binary message being sent or received.
type MessageType uint8
const (
StatusUpdateType MessageType = iota + 1
SensorDataType
TelemetryDataType
UserProfileType
SensorConfigType
)
// Endianness defines the byte order for serialization.
const Endianness = binary.LittleEndian
// BinaryReader is a helper to read data from a bytes.Buffer.
type BinaryReader struct {
buf *bytes.Buffer
}
// NewBinaryReader creates a new BinaryReader.
func NewBinaryReader(data []byte) *BinaryReader {
return &BinaryReader{buf: bytes.NewBuffer(data)}
}
// ReadUInt8 reads a single byte from the buffer.
func (r *BinaryReader) ReadUInt8() (uint8, error) {
return r.buf.ReadByte()
}
// ReadUInt16 reads a 16-bit unsigned integer.
func (r *BinaryReader) ReadUInt16() (uint16, error) {
var v uint16
err := binary.Read(r.buf, Endianness, &v)
return v, err
}
// ReadUInt32 reads a 32-bit unsigned integer.
func (r *BinaryReader) ReadUInt32() (uint32, error) {
var v uint32
err := binary.Read(r.buf, Endianness, &v)
return v, err
}
// ReadUInt64 reads a 64-bit unsigned integer.
func (r *BinaryReader) ReadUInt64() (uint64, error) {
var v uint64
err := binary.Read(r.buf, Endianness, &v)
return v, err
}
// ReadFloat32 reads a 32-bit float.
func (r *BinaryReader) ReadFloat32() (float32, error) {
var v float32
err := binary.Read(r.buf, Endianness, &v)
return v, err
}
// ReadFloat64 reads a 64-bit float.
func (r *BinaryReader) ReadFloat64() (float64, error) {
var v float64
err := binary.Read(r.buf, Endianness, &v)
return v, err
}
// ReadBoolean reads a boolean value (1 byte).
func (r *BinaryReader) ReadBoolean() (bool, error) {
b, err := r.buf.ReadByte()
if err != nil {
return false, err
}
return b != 0, nil
}
// ReadString reads a length-prefixed string.
func (r *BinaryReader) ReadString(length int) (string, error) {
// Add a sane maximum length to prevent memory-based DoS attacks.
const maxStringLength = 1024
if length > maxStringLength {
return "", fmt.Errorf("string length %d exceeds maximum allowed size %d", length, maxStringLength)
}
buf := make([]byte, length)
n, err := r.buf.Read(buf)
if err != nil {
return "", err
}
if n < length {
return "", fmt.Errorf("read only %d bytes, expected %d", n, length)
}
return string(buf), nil
}
// Deserializable is an interface that all message types must implement.
type Deserializable interface {
Deserialize(r *BinaryReader) error
}
// StatusUpdate represents a status message.
type StatusUpdate struct {
DeviceID uint16
Status uint8
BatteryLevel uint8
Timestamp uint64
}
// Deserialize reads a StatusUpdate from a BinaryReader.
func (su *StatusUpdate) Deserialize(r *BinaryReader) error {
var err error
su.DeviceID, err = r.ReadUInt16()
if err != nil {
return err
}
su.Status, err = r.ReadUInt8()
if err != nil {
return err
}
su.BatteryLevel, err = r.ReadUInt8()
if err != nil {
return err
}
su.Timestamp, err = r.ReadUInt64()
return err
}
// SensorData represents a sensor reading.
type SensorData struct {
SensorID uint16
Timestamp uint64
Value float32
}
// Deserialize reads SensorData from a BinaryReader.
func (sd *SensorData) Deserialize(r *BinaryReader) error {
var err error
sd.SensorID, err = r.ReadUInt16()
if err != nil {
return err
}
sd.Timestamp, err = r.ReadUInt64()
if err != nil {
return err
}
sd.Value, err = r.ReadFloat32()
return err
}
// TelemetryData represents a complex telemetry message.
type TelemetryData struct {
Timestamp uint64
Latitude float64
Longitude float64
Altitude float64
Speed float64
Heading uint32
FuelLevel uint8
EngineTemp float64
TirePressure float32
BatteryVoltage float64
}
// Deserialize reads TelemetryData from a BinaryReader.
func (td *TelemetryData) Deserialize(r *BinaryReader) error {
var err error
td.Timestamp, err = r.ReadUInt64()
if err != nil {
return err
}
td.Latitude, err = r.ReadFloat64()
if err != nil {
return err
}
td.Longitude, err = r.ReadFloat64()
if err != nil {
return err
}
td.Altitude, err = r.ReadFloat64()
if err != nil {
return err
}
td.Speed, err = r.ReadFloat64()
if err != nil {
return err
}
td.Heading, err = r.ReadUInt32()
if err != nil {
return err
}
td.FuelLevel, err = r.ReadUInt8()
if err != nil {
return err
}
td.EngineTemp, err = r.ReadFloat64()
if err != nil {
return err
}
td.TirePressure, err = r.ReadFloat32()
if err != nil {
return err
}
td.BatteryVoltage, err = r.ReadFloat64()
return err
}
// UserProfile represents a user profile message.
type UserProfile struct {
UserID uint32
Username string
RegistrationDate uint64
LastLoginDate uint64
IsVerified bool
IsAdmin bool
AvatarID uint32
SessionCount uint32
EmailHash string
PreferredTheme string
}
// Deserialize reads a UserProfile from a BinaryReader.
func (up *UserProfile) Deserialize(r *BinaryReader) error {
var err error
up.UserID, err = r.ReadUInt32()
if err != nil {
return err
}
usernameLen, err := r.ReadUInt32()
if err != nil {
return err
}
up.Username, err = r.ReadString(int(usernameLen))
if err != nil {
return err
}
up.RegistrationDate, err = r.ReadUInt64()
if err != nil {
return err
}
up.LastLoginDate, err = r.ReadUInt64()
if err != nil {
return err
}
up.IsVerified, err = r.ReadBoolean()
if err != nil {
return err
}
up.IsAdmin, err = r.ReadBoolean()
if err != nil {
return err
}
up.AvatarID, err = r.ReadUInt32()
if err != nil {
return err
}
up.SessionCount, err = r.ReadUInt32()
if err != nil {
return err
}
emailHashLen, err := r.ReadUInt32()
if err != nil {
return err
}
up.EmailHash, err = r.ReadString(int(emailHashLen))
if err != nil {
return err
}
themeLen, err := r.ReadUInt32()
if err != nil {
return err
}
up.PreferredTheme, err = r.ReadString(int(themeLen))
return err
}
// SensorConfig represents a sensor configuration message.
type SensorConfig struct {
SensorID uint16
SensorName string
IsActive bool
MinThreshold float64
MaxThreshold float64
SampleRate uint32
Units string
Description string
CalibrationDate uint64
RequiresRestart bool
}
// Deserialize reads a SensorConfig from a BinaryReader.
func (sc *SensorConfig) Deserialize(r *BinaryReader) error {
var err error
sc.SensorID, err = r.ReadUInt16()
if err != nil {
return err
}
nameLen, err := r.ReadUInt32()
if err != nil {
return err
}
sc.SensorName, err = r.ReadString(int(nameLen))
if err != nil {
return err
}
sc.IsActive, err = r.ReadBoolean()
if err != nil {
return err
}
sc.MinThreshold, err = r.ReadFloat64()
if err != nil {
return err
}
sc.MaxThreshold, err = r.ReadFloat64()
if err != nil {
return err
}
sc.SampleRate, err = r.ReadUInt32()
if err != nil {
return err
}
unitsLen, err := r.ReadUInt32()
if err != nil {
return err
}
sc.Units, err = r.ReadString(int(unitsLen))
if err != nil {
return err
}
descLen, err := r.ReadUInt32()
if err != nil {
return err
}
sc.Description, err = r.ReadString(int(descLen))
if err != nil {
return err
}
sc.CalibrationDate, err = r.ReadUInt64()
if err != nil {
return err
}
sc.RequiresRestart, err = r.ReadBoolean()
return err
}
// Message is an interface that all message types must implement.
type Message interface {
Serialize(w *bytes.Buffer) error
}
// processMessage deserializes and logs the message.
func processMessage(msgType MessageType, msg Deserializable, reader *BinaryReader) {
if err := msg.Deserialize(reader); err != nil {
log.Printf("Error deserializing message type %d (%T): %v", msgType, msg, err)
return
}
log.Printf("Received message type %d (%T): %+v", msgType, msg, msg)
}
// handleMessage handles incoming binary messages.
func handleMessage(data []byte) {
if len(data) == 0 {
return
}
reader := NewBinaryReader(data)
msgType, err := reader.ReadUInt8()
if err != nil {
log.Printf("Error reading message type: %v", err)
return
}
switch MessageType(msgType) {
case StatusUpdateType:
processMessage(StatusUpdateType, &StatusUpdate{}, reader)
case SensorDataType:
processMessage(SensorDataType, &SensorData{}, reader)
case TelemetryDataType:
processMessage(TelemetryDataType, &TelemetryData{}, reader)
case UserProfileType:
processMessage(UserProfileType, &UserProfile{}, reader)
case SensorConfigType:
processMessage(SensorConfigType, &SensorConfig{}, reader)
default:
log.Printf("Received unknown message type: %d", msgType)
}
}
// wsHandler handles a single WebSocket connection. It is called for each new client.
func wsHandler(ws *websocket.Conn) {
defer ws.Close()
log.Printf("New client connected from %s", ws.Request().RemoteAddr)
// Keep the connection open and listen for messages indefinitely.
for {
var msg []byte
if err := websocket.Message.Receive(ws, &msg); err != nil {
if err.Error() == "EOF" {
log.Printf("Client disconnected: %s", ws.Request().RemoteAddr)
return
}
log.Printf("Error receiving message from client %s: %v", ws.Request().RemoteAddr, err)
continue
}
handleMessage(msg)
}
}
// main function to start the WebSocket server.
func main() {
// Register the WebSocket handler at the "/ws" endpoint.
http.Handle("/ws", websocket.Handler(wsHandler))
// Listen and serve on port 8080.
log.Println("WebSocket server started on ws://localhost:8080/ws")
err := http.ListenAndServe(":8080", nil)
if err != nil {
log.Fatal("ListenAndServe: ", err)
}
}
// A WebSocket client for a custom binary protocol in Node.js,
// using a single, pre-allocated unsafe buffer.
const WebSocket = require('ws');
const util = require('util');
// Enum equivalent for our custom message types.
const MessageType = {
statusUpdate: 0x01,
sensorData: 0x02,
telemetryData: 0x03,
userProfile: 0x04,
sensorConfig: 0x05,
unknown: 0x00,
};
// A helper class to read from a Buffer with robust bounds checking.
// This is critical for security to prevent buffer over-reads.
class BinaryReader {
constructor(buffer) {
if (!Buffer.isBuffer(buffer)) {
throw new TypeError('Input must be a Buffer.');
}
this.buffer = buffer;
this.offset = 0;
}
// Internal helper for bounds checking.
_checkBounds(byteCount) {
if (this.offset + byteCount > this.buffer.length) {
throw new RangeError(`Cannot read ${byteCount} bytes, out of bounds. Current offset: ${this.offset}, Buffer length: ${this.buffer.length}`);
}
}
skipBytes(count) {
this._checkBounds(count);
this.offset += count;
}
readUInt8() {
this._checkBounds(1);
const value = this.buffer.readUInt8(this.offset);
this.offset += 1;
return value;
}
readUInt16LE() {
this._checkBounds(2);
const value = this.buffer.readUInt16LE(this.offset);
this.offset += 2;
return value;
}
readUInt32LE() {
this._checkBounds(4);
const value = this.buffer.readUInt32LE(this.offset);
this.offset += 4;
return value;
}
readBigUInt64LE() {
this._checkBounds(8);
const value = this.buffer.readBigUInt64LE(this.offset);
this.offset += 8;
return value;
}
readFloatLE() {
this._checkBounds(4);
const value = this.buffer.readFloatLE(this.offset);
this.offset += 4;
return value;
}
readDoubleLE() {
this._checkBounds(8);
const value = this.buffer.readDoubleLE(this.offset);
this.offset += 8;
return value;
}
readBoolean() {
this._checkBounds(1);
const value = this.buffer.readUInt8(this.offset);
this.offset += 1;
return value !== 0;
}
// Rectification for potential denial-of-service and buffer over-read attacks:
// We limit the maximum length for strings to prevent attackers from requesting
// a massive string size that could consume excessive memory and crash the server.
readString(length) {
// Add a sane maximum length for string fields to prevent memory-based DoS attacks.
const MAX_STRING_LENGTH = 1024; // 1 KB
if (length > MAX_STRING_LENGTH) {
throw new RangeError(`String length of ${length} exceeds maximum allowed size of ${MAX_STRING_LENGTH}.`);
}
this._checkBounds(length);
const value = this.buffer.toString('utf8', this.offset, this.offset + length);
this.offset += length;
return value;
}
}
// Data class for a Status Update message.
class StatusUpdate {
static messageTypeId = MessageType.statusUpdate;
static fixedSize = 1 + 2 + 1 + 1 + 8;
constructor(deviceId, status, batteryLevel, timestamp) {
// Input validation: ensure inputs are of the correct type and within expected ranges.
if (typeof deviceId !== 'number' || deviceId < 0 || deviceId > 65535) throw new Error('Invalid deviceId');
if (typeof status !== 'number') throw new Error('Invalid status');
if (typeof batteryLevel !== 'number' || batteryLevel < 0 || batteryLevel > 100) throw new Error('Invalid batteryLevel');
if (typeof timestamp !== 'number') throw new Error('Invalid timestamp');
this.deviceId = deviceId;
this.status = status;
this.batteryLevel = batteryLevel;
this.timestamp = timestamp;
}
static deserialize(reader) {
const deviceId = reader.readUInt16LE();
const status = reader.readUInt8();
const batteryLevel = reader.readUInt8();
const timestamp = reader.readBigUInt64LE();
return new StatusUpdate(deviceId, status, batteryLevel, Number(timestamp));
}
// Modified serialize to accept a buffer and an offset
serialize(buffer, offset) {
buffer.writeUInt8(StatusUpdate.messageTypeId, offset);
offset += 1;
buffer.writeUInt16LE(this.deviceId, offset);
offset += 2;
buffer.writeUInt8(this.status, offset);
offset += 1;
buffer.writeUInt8(this.batteryLevel, offset);
offset += 1;
buffer.writeBigUInt64LE(BigInt(this.timestamp), offset);
offset += 8;
return offset;
}
}
// Data class for Sensor Data.
class SensorData {
static messageTypeId = MessageType.sensorData;
static fixedSize = 1 + 2 + 8 + 4;
constructor(sensorId, timestamp, value) {
// Input validation
if (typeof sensorId !== 'number' || sensorId < 0 || sensorId > 65535) throw new Error('Invalid sensorId');
if (typeof timestamp !== 'number') throw new Error('Invalid timestamp');
if (typeof value !== 'number') throw new Error('Invalid value');
this.sensorId = sensorId;
this.timestamp = timestamp;
this.value = value;
}
static deserialize(reader) {
const sensorId = reader.readUInt16LE();
const timestamp = reader.readBigUInt64LE();
const value = reader.readFloatLE();
return new SensorData(sensorId, Number(timestamp), value);
}
// Modified serialize to accept a buffer and an offset
serialize(buffer, offset) {
buffer.writeUInt8(SensorData.messageTypeId, offset);
offset += 1;
buffer.writeUInt16LE(this.sensorId, offset);
offset += 2;
buffer.writeBigUInt64LE(BigInt(this.timestamp), offset);
offset += 8;
buffer.writeFloatLE(this.value, offset);
offset += 4;
return offset;
}
}
// Data class for Telemetry Data with 10 fields.
class TelemetryData {
static messageTypeId = MessageType.telemetryData;
static fixedSize = 1 + 8 + 8 + 8 + 8 + 8 + 4 + 1 + 8 + 4 + 8;
constructor(timestamp, latitude, longitude, altitude, speed, heading, fuelLevel, engineTemp, tirePressure, batteryVoltage) {
// Input validation
if (typeof timestamp !== 'number') throw new Error('Invalid timestamp');
if (typeof latitude !== 'number') throw new Error('Invalid latitude');
if (typeof longitude !== 'number') throw new Error('Invalid longitude');
if (typeof altitude !== 'number') throw new Error('Invalid altitude');
if (typeof speed !== 'number') throw new Error('Invalid speed');
if (typeof heading !== 'number') throw new Error('Invalid heading');
if (typeof fuelLevel !== 'number') throw new Error('Invalid fuelLevel');
if (typeof engineTemp !== 'number') throw new Error('Invalid engineTemp');
if (typeof tirePressure !== 'number') throw new Error('Invalid tirePressure');
if (typeof batteryVoltage !== 'number') throw new Error('Invalid batteryVoltage');
this.timestamp = timestamp;
this.latitude = latitude;
this.longitude = longitude;
this.altitude = altitude;
this.speed = speed;
this.heading = heading;
this.fuelLevel = fuelLevel;
this.engineTemp = engineTemp;
this.tirePressure = tirePressure;
this.batteryVoltage = batteryVoltage;
}
static deserialize(reader) {
const timestamp = reader.readBigUInt64LE();
const latitude = reader.readDoubleLE();
const longitude = reader.readDoubleLE();
const altitude = reader.readDoubleLE();
const speed = reader.readDoubleLE();
const heading = reader.readUInt32LE();
const fuelLevel = reader.readUInt8();
const engineTemp = reader.readDoubleLE();
const tirePressure = reader.readFloatLE();
const batteryVoltage = reader.readDoubleLE();
return new TelemetryData(Number(timestamp), latitude, longitude, altitude, speed, heading, fuelLevel, engineTemp, tirePressure, batteryVoltage);
}
// Modified serialize to accept a buffer and an offset
serialize(buffer, offset) {
buffer.writeUInt8(TelemetryData.messageTypeId, offset);
offset += 1;
buffer.writeBigUInt64LE(BigInt(this.timestamp), offset);
offset += 8;
buffer.writeDoubleLE(this.latitude, offset);
offset += 8;
buffer.writeDoubleLE(this.longitude, offset);
offset += 8;
buffer.writeDoubleLE(this.altitude, offset);
offset += 8;
buffer.writeDoubleLE(this.speed, offset);
offset += 8;
buffer.writeUInt32LE(this.heading, offset);
offset += 4;
buffer.writeUInt8(this.fuelLevel, offset);
offset += 1;
buffer.writeDoubleLE(this.engineTemp, offset);
offset += 8;
buffer.writeFloatLE(this.tirePressure, offset);
offset += 4;
buffer.writeDoubleLE(this.batteryVoltage, offset);
offset += 8;
return offset;
}
}
// Data class for a User Profile message.
class UserProfile {
static messageTypeId = MessageType.userProfile;
constructor(userId, username, registrationDate, lastLoginDate, isVerified, isAdmin, avatarId, sessionCount, emailHash, preferredTheme) {
// Input validation
if (typeof userId !== 'number' || userId < 0) throw new Error('Invalid userId');
if (typeof username !== 'string') throw new Error('Invalid username');
if (typeof registrationDate !== 'number') throw new Error('Invalid registrationDate');
if (typeof lastLoginDate !== 'number') throw new Error('Invalid lastLoginDate');
if (typeof isVerified !== 'boolean') throw new Error('Invalid isVerified');
if (typeof isAdmin !== 'boolean') throw new Error('Invalid isAdmin');
if (typeof avatarId !== 'number') throw new Error('Invalid avatarId');
if (typeof sessionCount !== 'number') throw new Error('Invalid sessionCount');
if (typeof emailHash !== 'string') throw new Error('Invalid emailHash');
if (typeof preferredTheme !== 'string') throw new Error('Invalid preferredTheme');
this.userId = userId;
this.username = username;
this.registrationDate = registrationDate;
this.lastLoginDate = lastLoginDate;
this.isVerified = isVerified;
this.isAdmin = isAdmin;
this.avatarId = avatarId;
this.sessionCount = sessionCount;
this.emailHash = emailHash;
this.preferredTheme = preferredTheme;
}
static deserialize(reader) {
const userId = reader.readUInt32LE();
const usernameLength = reader.readUInt32LE();
const username = reader.readString(usernameLength);
const registrationDate = reader.readBigUInt64LE();
const lastLoginDate = reader.readBigUInt64LE();
const isVerified = reader.readBoolean();
const isAdmin = reader.readBoolean();
const avatarId = reader.readUInt32LE();
const sessionCount = reader.readUInt32LE();
const emailHashLength = reader.readUInt32LE();
const emailHash = reader.readString(emailHashLength);
const themeLength = reader.readUInt32LE();
const preferredTheme = reader.readString(themeLength);
return new UserProfile(userId, username, Number(registrationDate), Number(lastLoginDate), isVerified, isAdmin, avatarId, sessionCount, emailHash, preferredTheme);
}
// Modified serialize to accept a buffer and an offset
serialize(buffer, offset) {
const usernameBytes = Buffer.from(this.username, 'utf8');
const emailHashBytes = Buffer.from(this.emailHash, 'utf8');
const themeBytes = Buffer.from(this.preferredTheme, 'utf8');
buffer.writeUInt8(UserProfile.messageTypeId, offset);
offset += 1;
buffer.writeUInt32LE(this.userId, offset);
offset += 4;
buffer.writeUInt32LE(usernameBytes.length, offset);
offset += 4;
usernameBytes.copy(buffer, offset);
offset += usernameBytes.length;
buffer.writeBigUInt64LE(BigInt(this.registrationDate), offset);
offset += 8;
buffer.writeBigUInt64LE(BigInt(this.lastLoginDate), offset);
offset += 8;
buffer.writeUInt8(this.isVerified ? 1 : 0, offset);
offset += 1;
buffer.writeUInt8(this.isAdmin ? 1 : 0, offset);
offset += 1;
buffer.writeUInt32LE(this.avatarId, offset);
offset += 4;
buffer.writeUInt32LE(this.sessionCount, offset);
offset += 4;
buffer.writeUInt32LE(emailHashBytes.length, offset);
offset += 4;
emailHashBytes.copy(buffer, offset);
offset += emailHashBytes.length;
buffer.writeUInt32LE(themeBytes.length, offset);
offset += 4;
themeBytes.copy(buffer, offset);
offset += themeBytes.length;
return offset;
}
}
// Data class for Sensor Configuration.
class SensorConfig {
static messageTypeId = MessageType.sensorConfig;
constructor(sensorId, sensorName, isActive, minThreshold, maxThreshold, sampleRate, units, description, calibrationDate, requiresRestart) {
// Input validation
if (typeof sensorId !== 'number' || sensorId < 0 || sensorId > 65535) throw new Error('Invalid sensorId');
if (typeof sensorName !== 'string') throw new Error('Invalid sensorName');
if (typeof isActive !== 'boolean') throw new Error('Invalid isActive');
if (typeof minThreshold !== 'number') throw new Error('Invalid minThreshold');
if (typeof maxThreshold !== 'number') throw new Error('Invalid maxThreshold');
if (typeof sampleRate !== 'number') throw new Error('Invalid sampleRate');
if (typeof units !== 'string') throw new Error('Invalid units');
if (typeof description !== 'string') throw new Error('Invalid description');
if (typeof calibrationDate !== 'number') throw new Error('Invalid calibrationDate');
if (typeof requiresRestart !== 'boolean') throw new Error('Invalid requiresRestart');
this.sensorId = sensorId;
this.sensorName = sensorName;
this.isActive = isActive;
this.minThreshold = minThreshold;
this.maxThreshold = maxThreshold;
this.sampleRate = sampleRate;
this.units = units;
this.description = description;
this.calibrationDate = calibrationDate;
this.requiresRestart = requiresRestart;
}
static deserialize(reader) {
const sensorId = reader.readUInt16LE();
const nameLength = reader.readUInt32LE();
const sensorName = reader.readString(nameLength);
const isActive = reader.readBoolean();
const minThreshold = reader.readDoubleLE();
const maxThreshold = reader.readDoubleLE();
const sampleRate = reader.readUInt32LE();
const unitsLength = reader.readUInt32LE();
const units = reader.readString(unitsLength);
const descLength = reader.readUInt32LE();
const description = reader.readString(descLength);
const calibrationDate = reader.readBigUInt64LE();
const requiresRestart = reader.readBoolean();
return new SensorConfig(sensorId, sensorName, isActive, minThreshold, maxThreshold, sampleRate, units, description, Number(calibrationDate), requiresRestart);
}
// Modified serialize to accept a buffer and an offset
serialize(buffer, offset) {
const nameBytes = Buffer.from(this.sensorName, 'utf8');
const unitsBytes = Buffer.from(this.units, 'utf8');
const descBytes = Buffer.from(this.description, 'utf8');
buffer.writeUInt8(SensorConfig.messageTypeId, offset);
offset += 1;
buffer.writeUInt16LE(this.sensorId, offset);
offset += 2;
buffer.writeUInt32LE(nameBytes.length, offset);
offset += 4;
nameBytes.copy(buffer, offset);
offset += nameBytes.length;
buffer.writeUInt8(this.isActive ? 1 : 0, offset);
offset += 1;
buffer.writeDoubleLE(this.minThreshold, offset);
offset += 8;
buffer.writeDoubleLE(this.maxThreshold, offset);
offset += 8;
buffer.writeUInt32LE(this.sampleRate, offset);
offset += 4;
buffer.writeUInt32LE(unitsBytes.length, offset);
offset += 4;
unitsBytes.copy(buffer, offset);
offset += unitsBytes.length;
buffer.writeUInt32LE(descBytes.length, offset);
offset += 4;
descBytes.copy(buffer, offset);
offset += descBytes.length;
buffer.writeBigUInt64LE(BigInt(this.calibrationDate), offset);
offset += 8;
buffer.writeUInt8(this.requiresRestart ? 1 : 0, offset);
offset += 1;
return offset;
}
}
// Function to determine message type from the first byte.
function getMessageType(firstByte) {
for (const key in MessageType) {
if (MessageType[key] === firstByte) {
return MessageType[key];
}
}
return MessageType.unknown;
}
// Handler to process the binary message based on its type.
function handleBinaryMessage(data) {
if (data.length === 0) return;
const reader = new BinaryReader(data);
const messageType = reader.readUInt8();
switch (messageType) {
case MessageType.statusUpdate:
const statusUpdate = StatusUpdate.deserialize(reader);
console.log('Received status update:', util.inspect(statusUpdate, { depth: null }));
break;
case MessageType.sensorData:
const sensorData = SensorData.deserialize(reader);
console.log('Received sensor data:', util.inspect(sensorData, { depth: null }));
break;
case MessageType.telemetryData:
const telemetryData = TelemetryData.deserialize(reader);
console.log('Received telemetry data:', util.inspect(telemetryData, { depth: null }));
break;
case MessageType.userProfile:
const userProfile = UserProfile.deserialize(reader);
console.log('Received user profile:', util.inspect(userProfile, { depth: null }));
break;
case MessageType.sensorConfig:
const sensorConfig = SensorConfig.deserialize(reader);
console.log('Received sensor config:', util.inspect(sensorConfig, { depth: null }));
break;
default:
console.log('Received unknown message type:', messageType);
break;
}
}
// A single, globally accessible buffer.
// It's allocated with Buffer.allocUnsafe to avoid zero-filling, a minor
// optimization that comes with the risk of exposing old memory data.
// A size of 2048 bytes should be sufficient for the example messages.
const GLOBAL_BUFFER = Buffer.allocUnsafe(2048);
let isBufferLocked = false;
/**
* Sends a message instance over the WebSocket connection.
* @param {WebSocket} ws The WebSocket instance.
* @param {object} messageInstance The message object to serialize and send.
* @param {number} size The total size of the serialized message.
*/
function sendMessage(ws, messageInstance, size) {
// A minimal attempt to prevent race conditions. Not a robust solution.
if (isBufferLocked) {
console.error('Buffer is busy, skipping message send.');
// In a real application, you would queue this message for later.
return;
}
isBufferLocked = true;
try {
// Check if the serialized size exceeds the buffer's capacity to prevent
// buffer overruns.
if (size > GLOBAL_BUFFER.length) {
throw new RangeError(`Message size (${size}) exceeds global buffer capacity (${GLOBAL_BUFFER.length}).`);
}
messageInstance.serialize(GLOBAL_BUFFER, 0);
ws.send(GLOBAL_BUFFER.slice(0, size));
} catch (e) {
console.error('Serialization or send error:', e);
} finally {
isBufferLocked = false;
}
}
/**
* Handles the 'open' event for the WebSocket connection.
* @param {WebSocket} ws The WebSocket instance.
*/
function handleOpen(ws) {
console.log('Connected to WebSocket server.');
// Example of sending new message types
const telemetryData = new TelemetryData(
Date.now(),
34.0191,
-118.4912,
250.5,
95.7,
180,
75,
90.2,
32.5,
12.8
);
sendMessage(ws, telemetryData, telemetryData.serialize(GLOBAL_BUFFER, 0));
const userProfile = new UserProfile(
12345,
'john_doe',
1672531200000,
1672617600000,
true,
false,
5,
150,
'abcdef123456',
'dark'
);
sendMessage(ws, userProfile, userProfile.serialize(GLOBAL_BUFFER, 0));
}
/**
* Handles the 'message' event for the WebSocket connection.
* @param {Buffer | string} message The received message.
*/
function handleMessage(message) {
// Input validation: ensure the message is a Buffer.
if (message instanceof Buffer) {
try {
// Use a try/catch block to prevent a malformed message from crashing the application.
handleBinaryMessage(message);
} catch (e) {
console.error('Security Alert: Malformed message received.', e);
}
} else {
console.log('Received text message:', message.toString());
}
}
/**
* Handles the 'close' event for the WebSocket connection.
* @param {number} code The WebSocket close code.
* @param {Buffer} reason The reason for the close.
*/
function handleClose(code, reason) {
console.log(`Connection closed with code ${code}: ${reason.toString()}`);
}
/**
* Handles the 'error' event for the WebSocket connection.
* @param {Error} error The error object.
*/
function handleError(error) {
console.error('Connection error:', error);
}
// Main function to connect and set up WebSocket event listeners.
async function main() {
const serverUrl = 'wss://localhost:8080';
try {
const ws = new WebSocket(serverUrl);
ws.on('open', () => handleOpen(ws));
ws.on('message', handleMessage);
ws.on('close', handleClose);
ws.on('error', handleError);
} catch (e) {
console.error('Could not connect to WebSocket server:', e);
}
}
main();
package main
import (
"log"
"net/http"
"time"
"unsafe" // Import the unsafe package to work with pointers and memory
"golang.org/x/net/websocket"
)
// A note on memory alignment:
// The fields in these structs are ordered carefully to ensure they match
// the byte layout of the incoming binary data. This is crucial for
// the unsafe pointer casting to work correctly and is a common pitfall.
// StatusUpdate represents a fixed-size binary message.
type StatusUpdate struct {
DeviceID uint16
Status uint8
BatteryLevel uint8
Timestamp uint64
}
// SensorData represents a sensor reading.
type SensorData struct {
SensorID uint16
Timestamp uint64
Value float32
}
// TelemetryData represents a fixed-size telemetry message.
type TelemetryData struct {
Timestamp uint64
Latitude float64
Longitude float64
Altitude float64
Speed float64
Heading uint32
FuelLevel uint8
EngineTemp float64
TirePressure float32
BatteryVoltage float64
}
// UserProfile is an example of a message with variable-length data (strings).
// The unsafe trick won't work for the strings themselves. Instead, we must
// read the fixed-size parts with unsafe and then manually slice the rest
// for the strings.
type UserProfileFixed struct {
UserID uint32
RegistrationDate uint64
LastLoginDate uint64
IsVerified bool
IsAdmin bool
AvatarID uint32
SessionCount uint32
}
// These constants define the message types, replacing the enum from the
// FlatBuffers schema. The first byte of the message buffer will be this type.
const (
MessageTypeStatusUpdate byte = iota + 1 // 1
MessageTypeSensorData // 2
MessageTypeTelemetryData // 3
MessageTypeUserProfile // 4
MessageTypeSensorConfig // 5
)
// processMessage handles incoming binary messages using unsafe pointers.
func processMessage(data []byte) {
if len(data) == 0 {
log.Println("Received empty message.")
return
}
// The first byte of the message is our custom message type.
messageType := data[0]
// The rest of the buffer is the payload.
payload := data[1:]
switch messageType {
case MessageTypeStatusUpdate:
// Security Check: Verify that the payload is large enough for the struct.
if len(payload) < int(unsafe.Sizeof(StatusUpdate{})) {
log.Println("Payload too small for StatusUpdate")
return
}
su := (*StatusUpdate)(unsafe.Pointer(&payload[0]))
log.Printf("Received StatusUpdate: DeviceID=%d, Status=%d, BatteryLevel=%d, Timestamp=%d",
su.DeviceID, su.Status, su.BatteryLevel, su.Timestamp)
case MessageTypeSensorData:
// Security Check: Verify that the payload is large enough for the struct.
if len(payload) < int(unsafe.Sizeof(SensorData{})) {
log.Println("Payload too small for SensorData")
return
}
sd := (*SensorData)(unsafe.Pointer(&payload[0]))
log.Printf("Received SensorData: SensorID=%d, Timestamp=%d, Value=%.2f",
sd.SensorID, sd.Timestamp, sd.Value)
case MessageTypeTelemetryData:
// Security Check: Verify that the payload is large enough for the struct.
if len(payload) < int(unsafe.Sizeof(TelemetryData{})) {
log.Println("Payload too small for TelemetryData")
return
}
td := (*TelemetryData)(unsafe.Pointer(&payload[0]))
log.Printf("Received TelemetryData: Timestamp=%d, Lat=%.2f, Lon=%.2f, Alt=%.2f, Speed=%.2f, Heading=%d",
td.Timestamp, td.Latitude, td.Longitude, td.Altitude, td.Speed, td.Heading)
case MessageTypeUserProfile:
// Security Check: Verify payload size for the fixed part of the struct.
fixedSize := unsafe.Sizeof(UserProfileFixed{})
if len(payload) < int(fixedSize) {
log.Println("Payload too small for UserProfileFixed")
return
}
upFixed := (*UserProfileFixed)(unsafe.Pointer(&payload[0]))
offset := fixedSize
// Security Check: Bounds check for reading username length.
if offset+unsafe.Sizeof(uint32(0)) > uintptr(len(payload)) {
log.Println("Invalid payload for UserProfile (username length missing)")
return
}
// Read username length (uint32)
usernameLen := *(*uint32)(unsafe.Pointer(&payload[offset]))
offset += unsafe.Sizeof(uint32(0))
// Security Check: Bounds check before slicing the username.
if offset+uintptr(usernameLen) > uintptr(len(payload)) {
log.Println("Invalid payload for UserProfile (username out of bounds)")
return
}
username := string(payload[offset : offset+uintptr(usernameLen)])
offset += uintptr(usernameLen)
// Security Check: Bounds check for reading email hash length.
if offset+unsafe.Sizeof(uint32(0)) > uintptr(len(payload)) {
log.Println("Invalid payload for UserProfile (email hash length missing)")
return
}
// Read email hash length (uint32)
emailHashLen := *(*uint32)(unsafe.Pointer(&payload[offset]))
offset += unsafe.Sizeof(uint32(0))
// Security Check: Bounds check before slicing the email hash.
if offset+uintptr(emailHashLen) > uintptr(len(payload)) {
log.Println("Invalid payload for UserProfile (email hash out of bounds)")
return
}
emailHash := string(payload[offset : offset+uintptr(emailHashLen)])
log.Printf("Received UserProfile: UserID=%d, Username=%s, IsVerified=%t, IsAdmin=%t, AvatarID=%d, SessionCount=%d, EmailHash=%s",
upFixed.UserID, username, upFixed.IsVerified, upFixed.IsAdmin, upFixed.AvatarID, upFixed.SessionCount, emailHash)
default:
log.Printf("Received unknown message type: %d", messageType)
}
}
// wsHandler handles a single WebSocket connection. It's called for each new client.
func wsHandler(ws *websocket.Conn) {
defer ws.Close()
log.Printf("New client connected from %s", ws.Request().RemoteAddr)
for {
var msg []byte
// websocket.Message.Receive directly reads into a byte slice, which is perfect
// for our zero-copy approach.
if err := websocket.Message.Receive(ws, &msg); err != nil {
if err.Error() == "EOF" {
log.Printf("Client disconnected: %s", ws.Request().RemoteAddr)
return
}
log.Printf("Error receiving message from client %s: %v", ws.Request().RemoteAddr, err)
continue
}
// We pass the raw byte slice to our processMessage function.
processMessage(msg)
}
}
// main function to start the WebSocket server.
func main() {
// Register the WebSocket handler at the "/ws" endpoint.
http.Handle("/ws", websocket.Handler(wsHandler))
// Listen and serve on port 8080.
log.Println("WebSocket server started on ws://localhost:8080/ws")
err := http.ListenAndServe(":8080", nil)
if err != nil {
log.Fatal("ListenAndServe: ", err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment