How to implement bidirectional HTTP/2 communication in Java/Kotlin

338 Views Asked by At

I have a PHP script which accepts a long-running HTTP/2 connection and processes pieces of data as they are streamed from the client. The amount of data streamed from client to server until the connection is closed is expected to be in realm of 5 GB.

After each piece of data, the server responds with a JSON, containing the related information.

The requirements are as follows:

  • The PHP script cannot be a standalone application (i .e. a separate PHP server with WebSocket support is not an option hence the HTTP/2 attempt)
  • This must happen on a single connection and can't be split up into multiple requests (for reasons I am not going to disclose now)
  • The data being streamed is binary.

I have tried using Ktor, OkHttp and the Java 11 HTTP Client but the problem I get is that the response body can't be used until the request has finished. The important aspect here is that it is bidirectional and interleaved. The RFC for HTTP/2 says that this behavior is supported.

The idea is that some thread writes data into an OutputStream, which is fed into the HTTP client using a pipe. Same thread uses the InputStream from the connection to receive the response. From what I understood, the second part never happens because the HTTP client is waiting for the OutputStream to be closed.

Here is my current attempt at OkHttp:

val request = Request.Builder()
    .url("/media".customApiUrl)
    .post(object : RequestBody() {
        override fun contentType(): MediaType = "application/octet-stream".toMediaType()

        override fun writeTo(sink: BufferedSink) {
            sink.writeAll(pipeIn.source())
        }
    })
    .build()
OkHttpClient().newBuilder()
    .protocols(listOf(Protocol.H2_PRIOR_KNOWLEDGE))
    .build()
    .newCall(request).enqueue(object : Callback {
        override fun onFailure(call: Call, e: IOException) {
            TODO("Not yet implemented")
        }

        override fun onResponse(call: Call, response: Response) {
            // this is never called
            logger.debug("Media session executing")
        }

    })

This was my attempt at Ktor:

        logger.debug("Making HTTP call")
        httpClient.preparePut("/media".customApiUrl) {
            contentType(ContentType("application", "octet-stream"))
            setBody(pipeIn)
            logger.debug("Media session ready to execute")
        }.execute {
            // This is never called (checked using breakpoint)
            logger.debug("Media session executing")
        }
        // this is also never called but that's fine
        logger.info("Media session closed")

This is how the streams are created:

        val pipeOut = PipedOutputStream()
        val pipeIn = PipedInputStream().also { it.connect(pipeOut) }

How pipeOut is used and where it is stored has been omitted for brevity as it is irrelevant. The debug log statement should be called, even if no byte has been transmitted yet in either direction so that the InputStream from the response can be used.

I would accept any Java or Kotlin response that runs on the JVM and supports HTTP/2. The InputStream and OutputStream of the connection should be readable and writable, respectively, at any time, even if not all of the bytes have been transmitted to the server yet.

Edit: I am also debugging the PHP script at the moment. A suggestion for a solution of above mentioned scenario or correction of my code would be appreciated.

1

There are 1 best solutions below

0
sbordet On

[Disclaimer, I work on the Jetty Project.]

This is possible with Jetty's HttpClient, exactly like you want:

HTTP2Client http2Client = new HTTP2Client();
HttpClient client = new HttpClient(new HttpClientTransportOverHTTP2(http2Client));
client.start();

OutputStreamRequestContent content = new OutputStreamRequestContent();
InputStreamResponseListener listener = new InputStreamResponseListener();

client.newRequest("http://localhost:8080/path")
    .method(HttpMethod.POST)
    .body(content)
    .send(listener);

// Optionally, get the response (with headers) 
// from the server and verify it is a 200.
Response response = listener.get(5, TimeUnit.SECONDS);
assertEquals(HttpStatus.OK_200, response.getStatus());

InputStream input = listener.getInputStream();
OutputStream output = content.getOutputStream();

// Write some content and expect the response
// (assuming it is echoed back in this example).
output.write('a');
int read = input.read();
assertEquals('a', read);

output.write('b');
read = input.read();
assertEquals('b', read);

// Terminate.
output.close();
read = input.read();
assertEquals(-1, read);

client.stop();