Cancel Read from grpc streaming rpc

49 Views Asked by At

This concerns the grpc library. How can I cancel reading the next element from the stream in an bidirectional streaming rpc call on the server without cancelling the stream?

Here is a small code example of the problem and what I would like to achieve:

#include <chrono>
#include <thread>
#include <atomic>
#include "my_service.h"

class MyServiceImpl : public MyService::Service {
    public:
        // Get the live reads object for ReadUntil actions
        Status my_rpc_call(ServerContext* context, ServerReaderWriter<RPCResponse, RPCRequest>* stream) override {
            while (stream->Read(&request)) {
                // Problem: blocks forever if no request is sent from client
                // do something here
            }

            /*
            desired

            // thread listening for user input
            std::atomic_bool cancel_read;
            std::jthread cancel_periodically_thread([&cancel_read] {
                for (int i(0); true; ++i) {
                    std::this_thread::sleep_for(std::chrono::seconds(i)); // simulate times when user cancels read request
                    cancel_read = true;
                }
            });

            while (true) {
                cancel_read = false;
                while (stream->Read(&request, cancel_read)) {
                    // Problem: blocks forever if no request is sent from client
                    // do something here
                }
                stream->Write("please input something"); // incorrect syntax, just to convey meaning
            }
            */
        }
}

I do not want to cancel the entire stream, just make the call to stream->Read(..) return. I also do not know in advance when to cancel it, this is determined by external circumstances, e.g. determined by a user.

Same question applies to the client side reading.

2

There are 2 best solutions below

1
Terry On

There is no way to "cancel" a particular message in a stream. You have to read the message in order to receive the next one. Your application can of course choose to not do anything with a particular message.

0
Eugene On

You should consider switching to callback interface so you do not have to block for read. Check this gRPC example of callback server for streaming API: RouteGuideImpl::ListFeatures