Created
July 15, 2021 15:09
-
-
Save rac021/3b0d3a54537bf5174ff7e8bdfd5ef1d6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main ; | |
import io.vertx.core.AsyncResult; | |
import io.vertx.core.Promise; | |
import io.vertx.core.Vertx; | |
import io.vertx.core.VertxOptions; | |
import io.vertx.core.impl.cpu.CpuCoreSensor; | |
import io.vertx.pgclient.PgConnectOptions; | |
import io.vertx.pgclient.PgPool; | |
import io.vertx.sqlclient.Cursor; | |
import io.vertx.sqlclient.PoolOptions; | |
import io.vertx.sqlclient.PreparedStatement; | |
import io.vertx.sqlclient.Row; | |
import io.vertx.sqlclient.RowSet; | |
import io.vertx.sqlclient.SqlConnection; | |
import io.vertx.sqlclient.Transaction; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.TimeUnit; | |
public class VertxPgClient { | |
static CountDownLatch countDownLatch ; | |
public static void main( String[] args ) throws InterruptedException { | |
String sqlQuery = "SELECT ...." ; | |
PgConnectOptions connectOption = getOptions( "5432" , | |
"localhost" , | |
"MY_DB" , | |
"admin" , | |
"admin" ) ; | |
PoolOptions poolOptions = new PoolOptions().setMaxSize( 12 ) ; | |
VertxOptions options = new VertxOptions().setWorkerPoolSize( 50 ) | |
.setEventLoopPoolSize( 3 * CpuCoreSensor.availableProcessors() ) | |
.setWarningExceptionTime( 5 ) | |
.setWarningExceptionTimeUnit( TimeUnit.SECONDS ) | |
.setMaxEventLoopExecuteTime(10) | |
.setMaxEventLoopExecuteTimeUnit( TimeUnit.SECONDS ) | |
.setMaxWorkerExecuteTime(10) | |
.setMaxEventLoopExecuteTimeUnit(TimeUnit.SECONDS ) ; | |
Vertx vertx = Vertx.vertx(options) ; | |
PgPool pgPoolclient = PgPool.pool( vertx, connectOption , poolOptions ); | |
countDownLatch = new CountDownLatch(1) ; | |
runSqlQuery( pgPoolclient, sqlQuery ); | |
countDownLatch.await() ; | |
pgPoolclient.close() ; | |
vertx.close() ; | |
} | |
private static void runSqlQuery( PgPool pgPoolclient , | |
String sqlQuery ) { | |
pgPoolclient.getConnection( (AsyncResult<SqlConnection> ar0) -> { | |
if (ar0.succeeded()) { | |
SqlConnection connection = ar0.result(); | |
connection.prepare(sqlQuery , (AsyncResult<PreparedStatement> ar1) -> { | |
if (ar1.succeeded()) { | |
PreparedStatement pq = ar1.result(); | |
Cursor cursor = pq.cursor(); | |
connection.begin( (AsyncResult<Transaction> ar2) -> { | |
if (ar2.succeeded()) { | |
Transaction tx = ar2.result() ; | |
Vertx.currentContext().executeBlocking( (Promise<Object> promise ) -> { | |
cursor.read( 5 , ar3 -> { | |
if (ar3.succeeded()) { | |
RowSet<Row> rows = ar3.result(); | |
rows.iterator().forEachRemaining( row -> { | |
System.out.println( "row = " + row ) ; | |
}); | |
while ( cursor.hasMore() ) { | |
cursor.read( 5 , ar4 -> { | |
if (ar4.succeeded()) { | |
RowSet<Row> rows1 = ar4.result(); | |
rows1.iterator().forEachRemaining( row -> { | |
System.out.println( "row1 = " + row ) ; | |
}); | |
} else if (ar4.failed()) { | |
ar4.cause().printStackTrace() ; | |
} | |
}); | |
} | |
tx.commit() ; | |
if( ! cursor.isClosed() ) cursor.close() ; | |
connection.close() ; | |
countDownLatch.countDown() ; | |
} | |
}); | |
}, false , hdlr -> { | |
tx.commit() ; | |
if( ! cursor.isClosed() ) cursor.close() ; | |
connection.close() ; | |
countDownLatch.countDown() ; | |
}); | |
} else { | |
if ( ! cursor.isClosed() ) cursor.close() ; | |
connection.close() ; | |
countDownLatch.countDown() ; | |
} | |
}); | |
} | |
}).exceptionHandler( hdlr -> { | |
hdlr.getCause().printStackTrace(); | |
countDownLatch.countDown() ; | |
connection.close(); | |
}); | |
} else { | |
ar0.cause().printStackTrace(); | |
countDownLatch.countDown() ; | |
} | |
}); | |
} | |
private static PgConnectOptions getOptions( String db_port , | |
String db_host , | |
String db_name , | |
String db_user , | |
String db_password ) throws NumberFormatException { | |
return new PgConnectOptions().setPort ( Integer.parseInt(db_port) ) | |
.setHost ( db_host ) | |
.setDatabase( db_name ) | |
.setUser ( db_user ) | |
.setPassword( db_password ) | |
.setConnectTimeout(10_000 ) ; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment