Skip to content

Instantly share code, notes, and snippets.

@l1x
Last active December 11, 2024 13:37
Show Gist options
  • Save l1x/a982f4c64a9e3030515cc8b8d4032e6a to your computer and use it in GitHub Desktop.
Save l1x/a982f4c64a9e3030515cc8b8d4032e6a to your computer and use it in GitHub Desktop.
Reading and writing from/to Kafka with ZSTD compressed messages from Javascript
const { Kafka, logLevel, Partitioners, CompressionTypes, CompressionCodecs } =
require("kafkajs");
const ZstdCodec = require("@kafkajs/zstd");
const compressionParams = { level: 1 };
const decompressionParams = {};
CompressionCodecs[CompressionTypes.ZSTD] = ZstdCodec(
compressionParams,
decompressionParams,
);
const kafka = new Kafka({
clientId: "my-consumer-client",
brokers: [
"b-2.kafkadevcluster.qmy2pc.c3.kafka.eu-west-1.amazonaws.com:9092",
"b-1.kafkadevcluster.qmy2pc.c3.kafka.eu-west-1.amazonaws.com:9092",
],
connectionTimeout: 10000,
requestTimeout: 15000,
retry: {
retries: 10,
factor: 2,
maxRetryTime: 60000,
initialRetryTime: 500,
},
logLevel: logLevel.INFO,
ack: "all",
});
const consumer = kafka.consumer({ groupId: "my-consumer-group" });
const run = async () => {
// Connect to Kafka broker
await consumer.connect();
console.log("Connected to Kafka");
// Subscribe to a Kafka topic
await consumer.subscribe({ topic: "test-topic", fromBeginning: true }); // Change topic name
// Start consuming messages
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log(`Received message: ${message.value.toString()}`);
console.log(`From topic: ${topic}, Partition: ${partition}`);
// Add your custom message processing logic here
},
});
};
// Start the consumer
run().catch(console.error);

Error:

#18 101.6 error /data/node_modules/cppzst: Command failed.
#18 101.6 Exit code: 1
#18 101.6 Command: node-gyp rebuild
#18 101.6 Arguments: 
#18 101.6 Directory: /data/node_modules/cppzst
#18 101.6 Output:
#18 101.6 gyp info it worked if it ends with ok
#18 101.6 gyp info using [email protected]
#18 101.6 gyp info using [email protected] | linux | arm64
#18 101.6 gyp info find Python using Python version 3.7.3 found at "/usr/bin/python3"
#18 101.6 gyp info spawn /usr/bin/python3
#18 101.6 gyp info spawn args [
#18 101.6 gyp info spawn args   '/usr/local/lib/node_modules/npm/node_modules/node-gyp/gyp/gyp_main.py',
#18 101.6 gyp info spawn args   'binding.gyp',
#18 101.6 gyp info spawn args   '-f',
#18 101.6 gyp info spawn args   'make',
#18 101.6 gyp info spawn args   '-I',
#18 101.6 gyp info spawn args   '/data/node_modules/cppzst/build/config.gypi',
#18 101.6 gyp info spawn args   '-I',
#18 101.6 gyp info spawn args   '/usr/local/lib/node_modules/npm/node_modules/node-gyp/addon.gypi',
#18 101.6 gyp info spawn args   '-I',
#18 101.6 gyp info spawn args   '/root/.cache/node-gyp/16.17.0/include/node/common.gypi',
#18 101.6 gyp info spawn args   '-Dlibrary=shared_library',
#18 101.6 gyp info spawn args   '-Dvisibility=default',
#18 101.6 gyp info spawn args   '-Dnode_root_dir=/root/.cache/node-gyp/16.17.0',
#18 101.6 gyp info spawn args   '-Dnode_gyp_dir=/usr/local/lib/node_modules/npm/node_modules/node-gyp',
#18 101.6 gyp info spawn args   '-Dnode_lib_file=/root/.cache/node-gyp/16.17.0/<(target_arch)/node.lib',
#18 101.6 gyp info spawn args   '-Dmodule_root_dir=/data/node_modules/cppzst',
#18 101.6 gyp info spawn args   '-Dnode_engine=v8',
#18 101.6 gyp info spawn args   '--depth=.',
#18 101.6 gyp info spawn args   '--no-parallel',
#18 101.6 gyp info spawn args   '--generator-output',
#18 101.6 gyp info spawn args   'build',
#18 101.6 gyp info spawn args   '-Goutput_dir=.'
#18 101.6 gyp info spawn args ]
#18 101.6 gyp info spawn make
#18 101.6 gyp info spawn args [ 'BUILDTYPE=Release', '-C', 'build' ]
#18 101.6 make: Entering directory '/data/node_modules/cppzst/build'
#18 101.6   CC(target) Release/obj.target/compress/deps/zstd/lib/common/debug.o
#18 101.6   CC(target) Release/obj.target/compress/deps/zstd/lib/common/entropy_common.o
#18 101.6   CC(target) Release/obj.target/compress/deps/zstd/lib/common/error_private.o
#18 101.6   CC(target) Release/obj.target/compress/deps/zstd/lib/common/fse_decompress.o
#18 101.6   CC(target) Release/obj.target/compress/deps/zstd/lib/common/pool.o
#18 101.6   CC(target) Release/obj.target/compress/deps/zstd/lib/common/threading.o
#18 101.6   CC(target) Release/obj.target/compress/deps/zstd/lib/common/xxhash.o
#18 101.6   CC(target) Release/obj.target/compress/deps/zstd/lib/common/zstd_common.o
#18 101.6   CC(target) Release/obj.target/compress/deps/zstd/lib/compress/fse_compress.o
#18 101.6   CC(target) Release/obj.target/compress/deps/zstd/lib/compress/hist.o
#18 101.6   CC(target) Release/obj.target/compress/deps/zstd/lib/compress/huf_compress.o
#18 101.6   CC(target) Release/obj.target/compress/deps/zstd/lib/compress/zstd_compress.o
#18 101.6   CC(target) Release/obj.target/compress/deps/zstd/lib/compress/zstd_compress_literals.o
#18 101.6   CC(target) Release/obj.target/compress/deps/zstd/lib/compress/zstd_compress_sequences.o
#18 101.6   CC(target) Release/obj.target/compress/deps/zstd/lib/compress/zstd_compress_superblock.o
#18 101.6   CC(target) Release/obj.target/compress/deps/zstd/lib/compress/zstd_double_fast.o
#18 101.6   CC(target) Release/obj.target/compress/deps/zstd/lib/compress/zstd_fast.o
#18 101.6   CC(target) Release/obj.target/compress/deps/zstd/lib/compress/zstd_lazy.o
#18 101.6   CC(target) Release/obj.target/compress/deps/zstd/lib/compress/zstd_ldm.o
#18 101.6   CC(target) Release/obj.target/compress/deps/zstd/lib/compress/zstd_opt.o
#18 101.6   CC(target) Release/obj.target/compress/deps/zstd/lib/compress/zstdmt_compress.o
#18 101.6   AR(target) Release/obj.target/deps/compress.a
#18 101.6   COPY Release/compress.a
#18 101.6   CXX(target) Release/obj.target/compressor/src/common/stream_coder.o
#18 101.6 In file included from /root/.cache/node-gyp/16.17.0/include/node/v8.h:30,
#18 101.6                  from /root/.cache/node-gyp/16.17.0/include/node/node.h:73,
#18 101.6                  from ../../nan/nan.h:60,
#18 101.6                  from ../src/common/stream_coder.h:4,
#18 101.6                  from ../src/common/stream_coder.cc:1:
#18 101.6 /root/.cache/node-gyp/16.17.0/include/node/v8-internal.h: In function 'void v8::internal::PerformCastCheck(T*)':
#18 101.6 /root/.cache/node-gyp/16.17.0/include/node/v8-internal.h:492:38: error: 'remove_cv_t' is not a member of 'std'
#18 101.6              !std::is_same<Data, std::remove_cv_t<T>>::value>::Perform(data);
#18 101.6                                       ^~~~~~~~~~~
#18 101.6 /root/.cache/node-gyp/16.17.0/include/node/v8-internal.h:492:38: note: suggested alternative: 'remove_cv'
#18 101.6              !std::is_same<Data, std::remove_cv_t<T>>::value>::Perform(data);
#18 101.6                                       ^~~~~~~~~~~
#18 101.6                                       remove_cv
#18 101.6 /root/.cache/node-gyp/16.17.0/include/node/v8-internal.h:492:38: error: 'remove_cv_t' is not a member of 'std'
#18 101.6 /root/.cache/node-gyp/16.17.0/include/node/v8-internal.h:492:38: note: suggested alternative: 'remove_cv'
#18 101.6              !std::is_same<Data, std::remove_cv_t<T>>::value>::Perform(data);
#18 101.6                                       ^~~~~~~~~~~
#18 101.6                                       remove_cv
#18 101.6 /root/.cache/node-gyp/16.17.0/include/node/v8-internal.h:492:50: error: template argument 2 is invalid
#18 101.6              !std::is_same<Data, std::remove_cv_t<T>>::value>::Perform(data);
#18 101.6                                                   ^
#18 101.6 /root/.cache/node-gyp/16.17.0/include/node/v8-internal.h:492:63: error: '::Perform' has not been declared
#18 101.6              !std::is_same<Data, std::remove_cv_t<T>>::value>::Perform(data);
#18 101.6                                                                ^~~~~~~
#18 101.6 /root/.cache/node-gyp/16.17.0/include/node/v8-internal.h:492:63: note: suggested alternative: 'herror'
#18 101.6              !std::is_same<Data, std::remove_cv_t<T>>::value>::Perform(data);
#18 101.6                                                                ^~~~~~~
#18 101.6                                                                herror
#18 101.6 make: *** [compressor.target.mk:124: Release/obj.target/compressor/src/common/stream_coder.o] Error 1
#18 101.6 make: Leaving directory '/data/node_modules/cppzst/build'
#18 101.6 gyp ERR! build error 
#18 101.6 gyp ERR! stack Error: `make` failed with exit code: 2
#18 101.6 gyp ERR! stack     at ChildProcess.onExit (/usr/local/lib/node_modules/npm/node_modules/node-gyp/lib/build.js:194:23)
#18 101.6 gyp ERR! stack     at ChildProcess.emit (node:events:513:28)
#18 101.6 gyp ERR! stack     at Process.ChildProcess._handle.onexit (node:internal/child_process:291:12)
#18 101.6 gyp ERR! System Linux 5.10.124-linuxkit
#18 101.6 gyp ERR! command "/usr/local/bin/node" "/usr/local/lib/node_modules/npm/node_modules/node-gyp/bin/node-gyp.js" "rebuild"
#18 101.6 gyp ERR! cwd /data/node_modules/cppzst
#18 101.6 gyp ERR! node -v v16.17.0
#18 101.6 gyp ERR! node-gyp -v v9.0.0
#18 101.6 gyp ERR! not ok
#18 101.6 info Visit https://yarnpkg.com/en/docs/cli/install for documentation about this command.

Solution:

export CXXFLAGS="-std=c++17"
export CXX=g++
[tasks.fmt]
description = "FMT"
run = "deno fmt"
[tools]
bun = "latest"
deno = "latest"
node = "lts"
{
"dependencies": {
"@kafkajs/zstd": "^0.1.1",
"kafkajs": "^2.2.4",
"node-gyp": "^11.0.0"
}
}
const { Kafka, logLevel, Partitioners } = require("kafkajs");
const kafka = new Kafka({
clientId: "my-consumer-client",
brokers: [
"b-2.kafkadevcluster.qmy2pc.c3.kafka.eu-west-1.amazonaws.com:9092",
"b-1.kafkadevcluster.qmy2pc.c3.kafka.eu-west-1.amazonaws.com:9092",
],
connectionTimeout: 10000,
requestTimeout: 15000,
retry: {
retries: 10,
factor: 2,
maxRetryTime: 60000,
initialRetryTime: 500,
},
logLevel: logLevel.INFO,
ack: "all",
});
const admin = kafka.admin();
await admin.connect();
const topics = await admin.listTopics();
if (topics.indexOf("test-topic") == -1) {
const createTopic = await admin.createTopics({
topics: [
{
topic: "test-topic",
replicationFactor: 2,
numPartitions: 2,
},
],
});
createTopic
? console.log("Topic CREATED!")
: console.log("TOPIC CREATION ERROR!");
}
const producer = kafka.producer({
createPartitioner: Partitioners.DefaultPartitioner,
});
const produceMessage = async () => {
try {
await producer.connect();
console.log("Producer connected.");
const promises = [...Array(100).keys()].map(async (element) => {
const result = await producer.send({
topic: "test-topic", // The Kafka topic
messages: [
{ value: `Hello, Kafka! This is event: ${element}` },
],
});
console.log("Message sent:", result);
});
await Promise.all(promises);
} catch (error) {
console.error("Error sending message:", error);
} finally {
console.log("Finally start");
await producer.disconnect();
console.log("Finally end");
}
};
await produceMessage();
console.log("The end");
process.exit(0);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment