Last active
May 11, 2018 00:58
-
-
Save eungju/9a859df374a4ef091fffcbc9340bfb38 to your computer and use it in GitHub Desktop.
RawHTTP libthrift TTransport
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.athaydes.rawhttp.core.EagerBodyReader; | |
import com.athaydes.rawhttp.core.EagerHttpResponse; | |
import com.athaydes.rawhttp.core.MethodLine; | |
import com.athaydes.rawhttp.core.RawHttp; | |
import com.athaydes.rawhttp.core.RawHttpHeaders; | |
import com.athaydes.rawhttp.core.RawHttpRequest; | |
import org.apache.thrift.transport.TTransport; | |
import org.apache.thrift.transport.TTransportException; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.io.ByteArrayInputStream; | |
import java.io.ByteArrayOutputStream; | |
import java.io.Closeable; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.io.OutputStream; | |
import java.net.InetSocketAddress; | |
import java.net.Socket; | |
import java.net.SocketException; | |
import java.net.URI; | |
/** | |
* HTTP implementation of the TTransport interface. Used for working with a | |
* Thrift web services implementation (using for example TServlet). | |
* <p> | |
* This class offers an implementations of the HTTP transport. | |
*/ | |
public class TRawHttpSocket extends TTransport { | |
private static final Logger LOGGER = LoggerFactory.getLogger(TRawHttpSocket.class.getName()); | |
private URI uri; | |
private int connectTimeout; | |
private int socketTimeout; | |
private RawHttp rawHttp = new RawHttp(); | |
private MethodLine requestLine; | |
private Socket socket; | |
private InputStream socketIn; | |
private OutputStream socketOut; | |
private ByteArrayOutputStream requestBuffer; | |
private ByteArrayInputStream responseBuffer; | |
public TRawHttpSocket(URI uri, int connectTimeout, int socketTimeout) { | |
this.uri = uri; | |
this.connectTimeout = connectTimeout; | |
this.socketTimeout = socketTimeout; | |
requestLine = new MethodLine("POST", uri, "HTTP/1.1"); | |
} | |
@Override | |
public boolean isOpen() { | |
if (socket == null) { | |
return false; | |
} | |
return socket.isConnected(); | |
} | |
@Override | |
public void open() throws TTransportException { | |
if (isOpen()) { | |
throw new TTransportException(TTransportException.ALREADY_OPEN, "Socket already connected."); | |
} | |
if (uri.getScheme() == null || !"http".equals(uri.getScheme())) { | |
throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open non-http scheme."); | |
} | |
if (uri.getHost() == null || uri.getHost().length() == 0) { | |
throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open null host."); | |
} | |
if (uri.getPort() <= 0 || uri.getPort() > 65535) { | |
throw new TTransportException(TTransportException.NOT_OPEN, "Invalid port " + uri.getPort()); | |
} | |
if (uri.getRawPath() == null || uri.getRawPath().length() == 0) { | |
throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open null path."); | |
} | |
socket = new Socket(); | |
try { | |
socket.setSoLinger(false, 0); | |
socket.setTcpNoDelay(true); | |
socket.setKeepAlive(true); | |
socket.setSoTimeout(socketTimeout); | |
} catch (SocketException sx) { | |
LOGGER.error("Could not configure socket.", sx); | |
} | |
try { | |
socket.connect(new InetSocketAddress(uri.getHost(), uri.getPort()), connectTimeout); | |
socketIn = socket.getInputStream(); | |
socketOut = socket.getOutputStream(); | |
requestBuffer = new ByteArrayOutputStream(); | |
responseBuffer = new ByteArrayInputStream(new byte[0]); | |
} catch (IOException iox) { | |
close(); | |
throw new TTransportException(TTransportException.NOT_OPEN, iox); | |
} | |
} | |
@Override | |
public void close() { | |
closeQuietly(requestBuffer); | |
requestBuffer = null; | |
closeQuietly(socket); | |
socket = null; | |
} | |
@Override | |
public int read(byte[] buf, int off, int len) throws TTransportException { | |
if (responseBuffer == null) { | |
throw new TTransportException(TTransportException.NOT_OPEN, "Cannot read from null inputStream"); | |
} | |
int bytesRead; | |
try { | |
bytesRead = responseBuffer.read(buf, off, len); | |
} catch (/*IO*/Exception iox) { | |
throw new TTransportException(TTransportException.UNKNOWN, iox); | |
} | |
if (bytesRead < 0) { | |
throw new TTransportException(TTransportException.END_OF_FILE); | |
} | |
return bytesRead; | |
} | |
@Override | |
public void write(byte[] buf, int off, int len) throws TTransportException { | |
if (requestBuffer == null) { | |
throw new TTransportException(TTransportException.NOT_OPEN, "Cannot write to null outputStream"); | |
} | |
try { | |
requestBuffer.write(buf, off, len); | |
} catch (/*IO*/Exception iox) { | |
throw new TTransportException(TTransportException.UNKNOWN, iox); | |
} | |
} | |
@Override | |
public void flush() throws TTransportException { | |
if (requestBuffer == null) { | |
throw new TTransportException(TTransportException.NOT_OPEN, "Cannot flush null outputStream"); | |
} | |
byte[] requestBody = requestBuffer.toByteArray(); | |
requestBuffer.reset(); | |
RawHttpRequest request = new RawHttpRequest(requestLine, | |
RawHttpHeaders.Builder.newBuilder() | |
.with("Host", uri.getHost() + ":" + uri.getPort()) | |
.with("User-Agent", "Java/TRawHttpSocket") | |
.with("Accept", "application/x-thrift") | |
.with("Content-Type", "application/x-thrift") | |
.with("Content-Length", String.valueOf(requestBody.length)) | |
.build(), | |
new EagerBodyReader(requestBody)); | |
EagerHttpResponse<?> response; | |
try { | |
request.writeTo(socketOut); | |
response = rawHttp.parseResponse(socketIn).eagerly(true); | |
} catch (IOException iox) { | |
throw new TTransportException(iox); | |
} | |
int responseCode = response.getStatusCode(); | |
byte[] responseBody = response.getBody().get().asBytes(); | |
if (responseCode != 200) { | |
throw new TTransportException("HTTP Response code: " + responseCode); | |
} | |
responseBuffer = new ByteArrayInputStream(responseBody); | |
for (String each : response.getHeaders().get("Connection")) { | |
if ("close".equals(each)) { | |
close(); | |
break; | |
} | |
} | |
} | |
private static void closeQuietly(Closeable resource) { | |
if (resource != null) { | |
try { | |
resource.close(); | |
} catch (IOException iox) { | |
LOGGER.warn("Could not close resource.", iox); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment