I am new to Rust and writing a simple application that will stream some values over gRPC, using Tonic. These values are initially acquired from an external library as a BoxStream (Pin<Box<Stream>>), and tonic's API requires something that implements Stream (which of course Pin does not).
Tonic's streaming example uses a ReceiverStream to convert a mpsc channel into a stream, and spinning off a thread to push values into it. This would require a stream lifetime of 'static which is not an option for my actual implementation because the lifetime of my stream is associated with the class that returns it.
What is the best way to provide something that implements Stream, that I can give to Tonic, from my Pin<Box<Stream>>?
src/main.rs (This will not compile, since BoxStream<'static, Entry> does not implement IntoStreamingRequest)
use futures::prelude::stream::BoxStream;
use async_stream::stream;
use tonic::{IntoStreamingRequest};
struct Entry {
key: String,
}
fn main() {
// Create Request
let stream: BoxStream<'static, Entry> = api_function();
let request = stream.into_streaming_request();
// Send request
//let mut client = DataImporterClient::connect("http://[::1]:50051").await.unwrap();
//let response = client.grpc_function(request).await?;
}
fn api_function() -> BoxStream<'static, Entry> {
Box::pin(stream! {
let entries = vec!(
Entry {key: String::from("value1")},
Entry {key: String::from("value2")}
);
for entry in entries {
yield entry;
}
})
}
Cargo.toml
[package]
name = "tonic-streaming-minimum-example"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tonic = "0.5"
futures = "0.3"
tokio-stream = "0.1"
async-stream = "0.3"
Compilation Error provided:
error[E0599]: the method `into_streaming_request` exists for struct `Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>`, but its trait bounds were not satisfied
--> src\main.rs:12:26
|
12 | let request = stream.into_streaming_request();
| ^^^^^^^^^^^^^^^^^^^^^^ method cannot be called on `Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>` due to unsatisfied trait bounds
|
::: C:\Users\tmathews\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\core\src\pin.rs:408:1
|
408 | pub struct Pin<P> {
| -----------------
| |
| doesn't satisfy `_: IntoStreamingRequest`
| doesn't satisfy `_: Sync`
|
::: C:\Users\tmathews\.cargo\registry\src\github.com-1ecc6299db9ec823\futures-core-0.3.17\src\stream.rs:27:1
|
27 | pub trait Stream {
| ----------------
| |
| doesn't satisfy `_: IntoStreamingRequest`
| doesn't satisfy `_: Sized`
| doesn't satisfy `_: Sync`
|
= note: the following trait bounds were not satisfied:
`Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: Sync`
which is required by `Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
`&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: futures::Stream`
which is required by `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
`&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: std::marker::Send`
which is required by `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
`&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: Sync`
which is required by `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
`&mut Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>: Sync`
which is required by `&mut Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>: IntoStreamingRequest`
`dyn futures::Stream<Item = Entry> + std::marker::Send: Sized`
which is required by `dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
`dyn futures::Stream<Item = Entry> + std::marker::Send: Sync`
which is required by `dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
`&dyn futures::Stream<Item = Entry> + std::marker::Send: futures::Stream`
which is required by `&dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
`&dyn futures::Stream<Item = Entry> + std::marker::Send: std::marker::Send`
which is required by `&dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
`&dyn futures::Stream<Item = Entry> + std::marker::Send: Sync`
which is required by `&dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
`&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): futures::Stream`
which is required by `&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): IntoStreamingRequest`
`&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): Sync`
which is required by `&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): IntoStreamingRequest`
The problem is that tonic implements
IntoStreamingRequestonly for types that are bothSendandSync:But
BoxStreamis not:Instead of using
BoxStreamyou should copy its definition and add an additional+ Syncbound:And because the stream returned by the
stream!()macro is alreadySend + Syncyour code will compile fine.PS: remove the unnecessary type hint at: