At grpc server side:
CANCELLED:client cancelled io.grpc.StatusRuntimeException:CANCELLED:client cancelled at io.grpc.Status.asRuntimeException(Status.java:530)
at io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:291)
at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:365)
At grpc client side:
io.grpc.StatusRuntimeException:INTERNAL:RST_STREAM closed stream.HTTP/2 error code:PROTOCOL_ERROR at io.grpc.Status.asRuntimeException(Status.java:539)
at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:487)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:567)
at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:71)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:735)
I have started a grpc server on aws ec2 instance as below:
` `server=ServerBuilder.forPort(GRPC_SERVER_PORT)
.addService(ServerInterceptors.intercept((BindableService)new GrpcServiceImpl(),new AuthServerInterceptor()))
.build().start();``
I have submitted a job on databricks cluster from where a bidirectional grpc channel is created as below:
private static StreamObserver<InspectCommand> inspectCommandStreamObserver;
//client creation
ManagedChannel channel=ManagedChannelBuilder.forAddress(SERVER_HOST,SERVER_PORT)
.useTransportSecurity()
.intercept(new AuthClientInterceptor(authToken))
.enableRetry()
.maxRetryAttempts(2)
.build();
LOGGER.info("channel created");
GrpcServiceDefinitionGrpc.GrpcServiceDefinitionStub stub=GrpcServiceDefinitionGrpc.newStub(channel);
createBidirectionalGrpcStreaming(grpcStub);
private void createBidirectionalGrpcStreaming(GrpcServiceDefinitionGrpc.GrpcServiceDefinitionStub grpcStub){
inspectCommandStreamObserver=grpcStub.inspectClient(new StreamObserver<InspectCommand>(){
@Override public void onNext(InspectCommand inspectCommand){
try{
//code for command processing
inspectCommandStreamObserver.onNext(response));
}catch(IOException e){
throw new RuntimeException(e);
}catch(ClassNotFoundException e){
throw new RuntimeException(e);
}
}
@Override public void onError(Throwable t){
inspectCommandStreamObserver=null;
createBidirectionalGrpcStreaming(grpcStub);
t.printStackTrace();
}
@Override public void onCompleted(){
LOGGER.info("Communication completed.");
}
});
LOGGER.info("Signalling ready");
// Send the "ready" message to the GRPC Server
byte[]commandBytes="Waiting for command".getBytes(StandardCharsets.UTF_8);
InspectCommand readyCommand=InspectCommand.newBuilder()
.setCommand(ByteString.copyFrom(commandBytes))
.build();
inspectCommandStreamObserver.onNext(readyCommand);
}
As soon as job runs on databricks cluster,above code is executed and ready message is sent to Grpc server side for bidirectional comm.
Now,at GRPC server side:
private final ConcurrentMap<String, StreamObserver<InspectCommand>>inspectCommandStreamObservers=new ConcurrentHashMap<>();
private final ConcurrentMap<String, StreamObserver<DynamicMethodResponse>>mvcResponseObservers=new ConcurrentHashMap<>();
@Override public StreamObserver<InspectCommand> inspectClient(StreamObserver<InspectCommand> responseObserver){
return new StreamObserver<InspectCommand>(){
@Override public void onNext(InspectCommand command){
synchronized (lock){
inspectCommandStreamObservers.put(command.getHttpSessionId(),responseObserver);
}
// Process the received command from Databricks
if(mvcResponseObservers.containsKey(command.getCommandId())){
LOGGER.info("Returning response to MVC");
mvcResponseObservers.get(command.getCommandId())
.onNext(DynamicMethodResponse.newBuilder().setInspectCommandResponse(command.getCommand()).build());
mvcResponseObservers.get(command.getCommandId()).onCompleted();
mvcResponseObservers.remove(command.getCommandId());
}
}
@Override public void onError(Throwable t){
t.printStackTrace();
}
@Override public void onCompleted(){
// Communication completed
}
};
}
Bidirectional channel is created and ready message sent from databricks cluster is received at Grpc Server.
Expectation:Bidirectional channel to be long running.
Actual:CANCELLED:client cancelled error after every one minute.