Skip to content

Instantly share code, notes, and snippets.

@muhdkhokhar
Created February 15, 2025 11:55
Show Gist options
  • Save muhdkhokhar/80cc4e84b8f11ee6a96cf1027e62c811 to your computer and use it in GitHub Desktop.
Save muhdkhokhar/80cc4e84b8f11ee6a96cf1027e62c811 to your computer and use it in GitHub Desktop.
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.ColumnMapRowMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Map;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class StreamQueryUtil {
private static final Logger log = LoggerFactory.getLogger(StreamQueryUtil.class);
private final NamedParameterJdbcTemplate jdbcTemplate;
public StreamQueryUtil(NamedParameterJdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
public void queryToWriteStream(JsonGenerator jsonGenerator, String sql, MapSqlParameterSource parameterMap) {
Stream<Map<String, Object>> stream = jdbcTemplate.execute(sql, parameterMap, ps -> {
Connection conn = ps.getConnection();
AtomicInteger counter = new AtomicInteger();
Integer sessionId = getSessionId(conn);
if (sessionId != null) {
log.info("Captured Session ID: {}", sessionId);
} else {
log.error("Failed to capture session ID.");
}
ResultSet rs = ps.executeQuery();
RowMapper<Map<String, Object>> rowMapper = new ColumnMapRowMapper();
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(new Iterator<Map<String, Object>>() {
@Override
public boolean hasNext() {
try {
return rs.next();
} catch (SQLException e) {
throw new RuntimeException("Error iterating ResultSet", e);
}
}
@Override
public Map<String, Object> next() {
try {
Map<String, Object> row = rowMapper.mapRow(rs, counter.getAndIncrement());
writeObject(jsonGenerator, row);
return row;
} catch (SQLException e) {
throw new RuntimeException("Error mapping ResultSet row", e);
}
}
}, Spliterator.ORDERED), false).onClose(() -> {
try {
rs.close();
} catch (SQLException e) {
log.error("Error closing ResultSet", e);
}
});
});
stream.forEach(row -> log.info("{}", row));
}
private Integer getSessionId(Connection conn) {
try (PreparedStatement ps = conn.prepareStatement("SELECT @@SPID");
ResultSet rs = ps.executeQuery()) {
return rs.next() ? rs.getInt(1) : null;
} catch (SQLException e) {
log.error("Failed to retrieve session ID", e);
return null;
}
}
private void writeObject(JsonGenerator jsonGenerator, Map<String, Object> row) {
// Implement JSON serialization logic here
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment