|
package ben.test; |
|
|
|
import com.fasterxml.jackson.core.JsonFactory; |
|
import com.fasterxml.jackson.core.JsonParser; |
|
import com.fasterxml.jackson.core.JsonToken; |
|
import com.fasterxml.jackson.core.async.ByteBufferFeeder; |
|
import com.fasterxml.jackson.core.async.NonBlockingInputFeeder; |
|
import org.springframework.core.io.buffer.DataBuffer; |
|
import org.springframework.core.io.buffer.DataBufferUtils; |
|
import org.springframework.http.MediaType; |
|
import org.springframework.web.bind.annotation.PostMapping; |
|
import org.springframework.web.bind.annotation.RequestBody; |
|
import org.springframework.web.bind.annotation.RestController; |
|
import reactor.core.publisher.Flux; |
|
import reactor.core.publisher.SynchronousSink; |
|
import reactor.core.scheduler.Schedulers; |
|
|
|
import java.io.IOException; |
|
import java.io.InputStream; |
|
import java.nio.ByteBuffer; |
|
|
|
@RestController |
|
public class StreamingController { |
|
private final JsonFactory jsonFactory = JsonFactory.builder().build(); |
|
|
|
@PostMapping(value = "/stream-json", consumes = MediaType.APPLICATION_JSON_VALUE) |
|
public Flux<String> streamJson(@RequestBody Flux<DataBuffer> body) throws IOException { |
|
return Flux.create(sink -> { |
|
try (JsonParser parser = jsonFactory.createNonBlockingByteBufferParser()) { |
|
ByteBufferFeeder feeder = (ByteBufferFeeder) parser.getNonBlockingInputFeeder(); |
|
|
|
body.subscribe( |
|
buf -> processBuffer(buf, feeder, parser, sink), // Process each buffer chunk |
|
sink::error, // Handle errors |
|
() -> completeProcessing(feeder, sink) // Complete the stream |
|
); |
|
|
|
} catch (IOException e) { |
|
sink.error(e); |
|
} |
|
}); |
|
|
|
} |
|
|
|
|
|
private void processBuffer(DataBuffer buf, ByteBufferFeeder feeder, JsonParser parser, reactor.core.publisher.FluxSink<String> sink) { |
|
try (DataBuffer.ByteBufferIterator it = buf.readableByteBuffers()) { |
|
it.forEachRemaining(bbuf -> parseJsonChunk(bbuf, feeder, parser, sink)); |
|
} catch (Exception e) { |
|
sink.error(e); |
|
} |
|
} |
|
|
|
private void parseJsonChunk(ByteBuffer bbuf, ByteBufferFeeder feeder, JsonParser parser, reactor.core.publisher.FluxSink<String> sink) { |
|
try { |
|
feeder.feedInput(bbuf); // Feed chunk to parser |
|
|
|
JsonToken token; |
|
while ((token = parser.nextToken()) != null) { // Process available tokens |
|
if (token == JsonToken.FIELD_NAME) { |
|
sink.next("Field: " + parser.currentName()); |
|
} |
|
} |
|
} catch (IOException e) { |
|
sink.error(e); |
|
} |
|
} |
|
|
|
private void completeProcessing(ByteBufferFeeder feeder, reactor.core.publisher.FluxSink<String> sink) { |
|
feeder.endOfInput(); // Signal no more data |
|
sink.complete(); |
|
} |
|
|
|
} |