Skip to content

Instantly share code, notes, and snippets.

@viggin543
Created February 22, 2025 13:34
Show Gist options
  • Save viggin543/16bc1fa683fb0f022e9ab5ee179d7c08 to your computer and use it in GitHub Desktop.
Save viggin543/16bc1fa683fb0f022e9ab5ee179d7c08 to your computer and use it in GitHub Desktop.
ingest.java
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
*/
import net.snowflake.ingest.streaming.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* Example on how to use the Streaming Ingest client APIs.
*
* <p>Please read the README.md file for detailed steps
* https://github.com/snowflakedb/snowflake-ingest-java/blob/master/README.md
*/
public class SnowflakeStreamingIngestExample {
// Please follow the example in profile_streaming.json.example to see the required properties, or
// if you have already set up profile.json with Snowpipe before, all you need is to add the "role"
// property. If the "role" is not specified, the default user role will be applied.
public static void main(String[] args) throws Exception {
Properties props = getConnectionProps();
try (SnowflakeStreamingIngestClient client =
SnowflakeStreamingIngestClientFactory.
builder("VISUALLY_STREAM").
setProperties(props).
build()) {
// Create an open channel request on table MY_TABLE, note that the corresponding
// db/schema/table needs to be present
// Example: create or replace table MY_TABLE(c1 number);
SnowflakeStreamingIngestChannel channel1 = client.openChannel(
OpenChannelRequest.builder("MY_CHANNEL4")
.setDBName("JITSU")
.setSchemaName("PUBLIC")
.setTableName("FOO")
.setOnErrorOption(
OpenChannelRequest.OnErrorOption.CONTINUE)
.build());
final int totalRowsInTable = 2;
for (int val = 0; val < totalRowsInTable; val++) {
Map<String, Object> row = new HashMap<>();
row.put("USER_ANONYMOUS_ID", "testingeststream" + val);
row.put("ALIAS", "SNOWSTREAM_TEST");
row.put("DOC_PATH", "/test");
// Insert the row with the current offset_token
InsertValidationResponse response = channel1.insertRow(row, String.valueOf(val));
if (response.hasErrors()) {
// Simply throw if there is an exception, or you can do whatever you want with the
// erroneous row
throw response.getInsertErrors().get(0).getException();
}
}
// If needed, you can check the offset_token registered in Snowflake to make sure everything
// is committed
final int expectedOffsetTokenInSnowflake = totalRowsInTable - 1; // 0 based offset_token
final int maxRetries = 10;
int retryCount = 0;
do {
String offsetTokenFromSnowflake = channel1.getLatestCommittedOffsetToken();
if (offsetTokenFromSnowflake != null
&& offsetTokenFromSnowflake.equals(String.valueOf(expectedOffsetTokenInSnowflake))) {
System.out.println("SUCCESSFULLY inserted " + totalRowsInTable + " rows");
break;
}
retryCount++;
} while (retryCount < maxRetries);
// Close the channel, the function internally will make sure everything is committed (or throw
// an exception if there is any issue)
channel1.close().get();
}
}
private static Properties getConnectionProps() throws IOException {
Properties props = new Properties();
props.put("user", "xxx");
props.put("port", "443");
props.put("name", "stream");
props.put("scheme", "https");
props.put("ssl", "on");
props.put("warehouse", "COMPUTE_WH");
props.put("role", "SYSADMIN");
props.put("database", "JISTU");
props.put("connect_string", "jdbc:snowflake://xxx.us-central1.gcp.snowflakecomputing.com:443?warehouse=COMPUTE_WH");
props.put("host", "xxx.us-central1.gcp.snowflakecomputing.com");
props.put("schema", "PUBLIC");
props.put("url", "https://xxx.us-central1.gcp.snowflakecomputing.com:443");
props.put("account", "xxx");
props.put("private_key", xxx
-----BEGIN PRIVATE KEY-----
...
-----END PRIVATE KEY-----
""");
return props;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment