Last active
August 9, 2023 19:28
-
-
Save ShigeoTejima/330de9f695c81d745017735bea4acf4a to your computer and use it in GitHub Desktop.
Coding according to trace ProcessChangeEventHeader in pub-sub-api
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Demo.java | |
import com.google.protobuf.ByteString; | |
import com.salesforce.eventbus.protobuf.*; | |
import io.grpc.CallCredentials; | |
import io.grpc.ManagedChannel; | |
import io.grpc.ManagedChannelBuilder; | |
import io.grpc.Metadata; | |
import io.grpc.stub.StreamObserver; | |
import org.apache.avro.Schema; | |
import org.apache.avro.generic.GenericData; | |
import org.apache.avro.generic.GenericDatumReader; | |
import org.apache.avro.generic.GenericRecord; | |
import org.apache.avro.io.BinaryDecoder; | |
import org.apache.avro.io.DecoderFactory; | |
import org.apache.avro.util.Utf8; | |
import java.io.ByteArrayInputStream; | |
import java.io.IOException; | |
import java.util.*; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.Executor; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.stream.Collectors; | |
public class Demo { | |
public static void main(String[] args) { | |
String loginEndpoint = System.getenv("loginEndpoint"); | |
String tenantId = System.getenv("tenantId"); | |
String accessToken = System.getenv("accessToken"); | |
String topicName = System.getenv("topicName"); | |
Objects.requireNonNull(loginEndpoint, "loginEndpoint required in environment variable."); | |
Objects.requireNonNull(tenantId, "tenantId required in environment variable."); | |
Objects.requireNonNull(accessToken, "accessToken required in environment variable."); | |
Objects.requireNonNull(topicName, "topicName required in environment variable."); | |
Demo demo = new Demo(loginEndpoint, tenantId, accessToken, topicName); | |
demo.startApp(); | |
demo.stopApp(); | |
} | |
private final Subscriber subscriber; | |
private final String topicName; | |
Demo() { | |
// for hello | |
this.subscriber = null; | |
this.topicName = null; | |
} | |
Demo(String loginEndpoint, String tenantId, String accessToken, String topicName) { | |
this.subscriber = new Subscriber(loginEndpoint, tenantId, accessToken); | |
this.topicName = topicName; | |
} | |
public String hello() { | |
return "Hello"; | |
} | |
void startApp() { | |
this.subscriber.startSubscription( | |
new StreamObserver<>() { | |
@Override | |
public void onNext(FetchResponse fetchResponse) { | |
for (ConsumerEvent ce : fetchResponse.getEventsList()) { | |
try { | |
Schema schema = subscriber.getSchema(ce.getEvent().getSchemaId()); | |
GenericRecord eventPayload = deserialize(schema, ce.getEvent().getPayload()); | |
System.out.println(eventPayload.toString()); | |
List<String> changedFields = getFieldListFromBitmap(schema, (GenericData.Record) eventPayload.get("ChangeEventHeader"), "changedFields"); | |
if (!changedFields.isEmpty()) { | |
System.out.println("=== Changed Fields ==="); | |
changedFields.forEach(System.out::println); | |
System.out.println("======================"); | |
} | |
System.out.println(); | |
} catch (IOException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
} | |
@Override | |
public void onError(Throwable throwable) { | |
System.out.println("onError"); | |
throwable.printStackTrace(); | |
subscriber.isActive.set(false); | |
} | |
@Override | |
public void onCompleted() { | |
System.out.println("onCompleted"); | |
subscriber.isActive.set(false); | |
} | |
private GenericRecord deserialize(Schema schema, ByteString payload) throws IOException { | |
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema); | |
ByteArrayInputStream in = new ByteArrayInputStream(payload.toByteArray()); | |
BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null); | |
return reader.read(null, decoder); | |
} | |
private List<String> getFieldListFromBitmap(Schema schema, GenericData.Record eventHeader, String fieldName) { | |
GenericData.Array<Utf8> utf8value = (GenericData.Array<Utf8>)eventHeader.get(fieldName); | |
List<String> values = utf8value.stream().map(v -> v.toString()).collect(Collectors.toList()); | |
expandBitmap(schema, values); | |
return values; | |
} | |
private void expandBitmap(Schema schema, List<String> values) { | |
if (Objects.isNull(values) || values.isEmpty()) { | |
return; | |
} | |
if (values.get(0).startsWith("0x")) { | |
String bitMap = values.get(0); | |
values.addAll(0, fieldNamesFromBitmap(schema, bitMap)); | |
values.remove(bitMap); | |
} | |
if (values.get(values.size() - 1).contains("-")) { | |
for (ListIterator<String> itr = values.listIterator(); itr.hasNext();) { | |
String[] bitmapMapString = itr.next().split("-"); | |
if (bitmapMapString.length < 2) { | |
continue; | |
} | |
Schema.Field parentField = schema.getFields().get(Integer.valueOf(bitmapMapString[0])); | |
Schema childSchema = getValueSchema(parentField.schema()); | |
if (childSchema.getType().equals(Schema.Type.RECORD)) { | |
int nestedSize = childSchema.getFields().size(); | |
String parentFieldName = parentField.name(); | |
List<String> fullFiledNames = fieldNamesFromBitmap(childSchema, bitmapMapString[1]).stream() | |
.map(col -> parentFieldName + "." + col) | |
.collect(Collectors.toList()); | |
if (!fullFiledNames.isEmpty()) { | |
itr.remove(); | |
if (fullFiledNames.size() == nestedSize) { | |
itr.add(parentFieldName); | |
} else { | |
fullFiledNames.stream() | |
.forEach(itr::add); | |
} | |
} | |
} | |
} | |
} | |
} | |
private List<String> fieldNamesFromBitmap(Schema schema, String bitmap) { | |
BitSet bitSet = convertHexStringToBitSet(bitmap); | |
List<String> fieldNames = bitSet.stream() | |
.mapToObj(pos -> schema.getFields().get(pos).name()) | |
.collect(Collectors.toList()); | |
return fieldNames; | |
} | |
private BitSet convertHexStringToBitSet(String hex) { | |
String s = hex.substring(2); | |
int len = s.length(); | |
byte[] bytes = new byte[len / 2]; | |
for (int i = 0; i < len; i += 2) { | |
bytes[i / 2] = (byte)((Character.digit(s.charAt(i), 16) << 4) + Character.digit(s.charAt(i + 1), 16)); | |
} | |
len /= 2; | |
byte[] reversedBytes = new byte[len]; | |
for (int i = 0; i < len; i++) { | |
reversedBytes[i] = bytes[len - i - 1]; | |
} | |
return BitSet.valueOf(reversedBytes); | |
} | |
private Schema getValueSchema(Schema schema) { | |
if (schema.getType() == Schema.Type.UNION) { | |
List<Schema> types = schema.getTypes(); | |
if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL) { | |
return types.get(1); | |
} else if (types.size() == 2 && types.get(0).getType() == Schema.Type.STRING) { | |
return schema.getTypes().get(1); | |
} else if (types.size() == 3 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.STRING) { | |
return schema.getTypes().get(2); | |
} | |
} | |
return schema; | |
} | |
}, | |
5, | |
this.topicName, | |
ReplayPreset.LATEST, | |
null); | |
} | |
void stopApp() { | |
this.subscriber.close(); | |
} | |
static class Subscriber { | |
public static AtomicBoolean isActive = new AtomicBoolean(false); | |
private final ManagedChannel channel; | |
private final PubSubGrpc.PubSubStub asyncStub; | |
private final PubSubGrpc.PubSubBlockingStub blockingStub; | |
private final Map<String, Schema> schemaCache; | |
private StreamObserver<FetchRequest> serverStream; | |
Subscriber(String loginEndpoint, String tenantId, String accessToken) { | |
String grpcHost = "api.pubsub.salesforce.com"; | |
int grpcPort = 7443; | |
this.channel = ManagedChannelBuilder.forAddress(grpcHost, grpcPort).build(); | |
SessionTokenService sessionTokenService = new SessionTokenService(); | |
APISessionCredentials apiSessionCredentials = sessionTokenService.loginWithAccessToken(loginEndpoint, tenantId, accessToken); | |
this.asyncStub = PubSubGrpc.newStub(channel).withCallCredentials(apiSessionCredentials); | |
this.blockingStub = PubSubGrpc.newBlockingStub(channel).withCallCredentials(apiSessionCredentials); | |
this.schemaCache = new ConcurrentHashMap<>(); | |
isActive.set(true); | |
} | |
void startSubscription( | |
StreamObserver<FetchResponse> responseObserver, | |
int providedBatchSize, | |
String providedTopicName, | |
ReplayPreset providedReplayPreset, | |
ByteString providedReplayId | |
) { | |
this.serverStream = this.asyncStub.subscribe(responseObserver); | |
FetchRequest.Builder fetchRequestBuilder = FetchRequest.newBuilder() | |
.setNumRequested(providedBatchSize) | |
.setTopicName(providedTopicName) | |
.setReplayPreset(providedReplayPreset); | |
if (providedReplayPreset == ReplayPreset.CUSTOM) { | |
fetchRequestBuilder.setReplayId((providedReplayId)); | |
} | |
serverStream.onNext(fetchRequestBuilder.build()); | |
while (isActive.get()) { | |
synchronized (this) { | |
try { | |
this.wait(5_000); | |
} catch (InterruptedException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
} | |
} | |
void close() { | |
if (Objects.nonNull(this.serverStream)) { | |
this.serverStream.onCompleted(); | |
} | |
try { | |
this.channel.shutdown().awaitTermination(20, TimeUnit.SECONDS); | |
} catch (InterruptedException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
Schema getSchema(String schemaId) { | |
return schemaCache.computeIfAbsent(schemaId, id -> { | |
SchemaRequest request = SchemaRequest.newBuilder().setSchemaId(id).build(); | |
String schemaJson = this.blockingStub.getSchema(request).getSchemaJson(); | |
return new Schema.Parser().parse(schemaJson); | |
}); | |
} | |
} | |
static class SessionTokenService { | |
APISessionCredentials loginWithAccessToken(String loginEndpoint, String tenantId, String accessToken) { | |
return new APISessionCredentials(loginEndpoint, tenantId, accessToken); | |
} | |
} | |
static class APISessionCredentials extends CallCredentials { | |
private final String instanceUrl; | |
private final String tenantId; | |
private final String accessToken; | |
APISessionCredentials(String instanceUrl, String tenantId, String accessToken) { | |
this.instanceUrl = instanceUrl; | |
this.tenantId = tenantId; | |
this.accessToken = accessToken; | |
} | |
@Override | |
public void applyRequestMetadata(RequestInfo requestInfo, Executor executor, MetadataApplier metadataApplier) { | |
Metadata metadata = new Metadata(); | |
metadata.put(keyOf("instanceUrl"), instanceUrl); | |
metadata.put(keyOf("tenantId"), tenantId); | |
metadata.put(keyOf("accessToken"), accessToken); | |
metadataApplier.apply(metadata); | |
} | |
@Override | |
public void thisUsesUnstableApi() { | |
// nop | |
} | |
private Metadata.Key<String> keyOf(String name) { | |
return Metadata.Key.of(name, Metadata.ASCII_STRING_MARSHALLER); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<!-- pom.xml --> | |
<?xml version="1.0" encoding="UTF-8"?> | |
<project xmlns="http://maven.apache.org/POM/4.0.0" | |
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<groupId>org.example</groupId> | |
<artifactId>demo</artifactId> | |
<version>1.0-SNAPSHOT</version> | |
<properties> | |
<maven.compiler.source>17</maven.compiler.source> | |
<maven.compiler.target>17</maven.compiler.target> | |
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> | |
<grpc.version>1.57.1</grpc.version> | |
</properties> | |
<dependencyManagement> | |
<dependencies> | |
<dependency> | |
<groupId>io.grpc</groupId> | |
<artifactId>grpc-bom</artifactId> | |
<version>${grpc.version}</version> | |
<type>pom</type> | |
<scope>import</scope> | |
</dependency> | |
</dependencies> | |
</dependencyManagement> | |
<dependencies> | |
<dependency> | |
<groupId>javax.annotation</groupId> | |
<artifactId>javax.annotation-api</artifactId> | |
<version>1.3.2</version> | |
</dependency> | |
<dependency> | |
<groupId>io.grpc</groupId> | |
<artifactId>grpc-protobuf</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>io.grpc</groupId> | |
<artifactId>grpc-stub</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>io.grpc</groupId> | |
<artifactId>grpc-netty-shaded</artifactId> | |
<scope>runtime</scope> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.avro</groupId> | |
<artifactId>avro</artifactId> | |
<version>1.11.2</version> | |
</dependency> | |
<dependency> | |
<groupId>org.junit.jupiter</groupId> | |
<artifactId>junit-jupiter-api</artifactId> | |
<version>5.9.3</version> | |
<scope>test</scope> | |
</dependency> | |
</dependencies> | |
<build> | |
<extensions> | |
<extension> | |
<groupId>kr.motd.maven</groupId> | |
<artifactId>os-maven-plugin</artifactId> | |
<version>1.7.1</version> | |
</extension> | |
</extensions> | |
<plugins> | |
<plugin> | |
<groupId>org.xolstice.maven.plugins</groupId> | |
<artifactId>protobuf-maven-plugin</artifactId> | |
<version>0.6.1</version> | |
<configuration> | |
<protocArtifact>com.google.protobuf:protoc:3.23.4:exe:${os.detected.classifier}</protocArtifact> | |
<pluginId>grpc-java</pluginId> | |
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact> | |
</configuration> | |
<executions> | |
<execution> | |
<goals> | |
<goal>compile</goal> | |
<goal>compile-custom</goal> | |
</goals> | |
</execution> | |
</executions> | |
</plugin> | |
</plugins> | |
</build> | |
</project> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment