I'm trying to implement method Download table data in the Avro data format from this example, but I don't know how to implement.
namespace {
void ProcessRowsInAvroFormat(
::google::cloud::bigquery::storage::v1::AvroSchema const&,
::google::cloud::bigquery::storage::v1::AvroRows const&) {
// Code to deserialize avro rows should be added here.
}
} // namespace
I installed Apache AVRO C++ library and write the codes like:
bool bq::ReadSessionFromSchema(std::string project_id, std::string dataset_id, std::string table_id)
try
{
auto table_name = "projects/" + project_id + "/datasets/" + dataset_id + "/tables/" + table_id;
int max_stream_count = 1;
google::cloud::bigquery::storage::v1::ReadSession read_session;
read_session.set_table(table_name);
read_session.set_data_format(google::cloud::bigquery::storage::v1::DataFormat::AVRO);
read_session.mutable_read_options()->set_row_restriction(R"(state_name = "Kentucky")");
auto session = read_client->CreateReadSession("projects/" + project_id, read_session, max_stream_count);
if (!session)
{
std::cerr << session.status() << "\n";
return false;
}
std::cout << "ReadSession successfully created: " << session->name()
<< ".\n";
constexpr int kRowOffset = 0;
auto read_rows = read_client->ReadRows(session->streams(0).name(), kRowOffset);
std::int64_t num_rows = 0;
for (auto const &row : read_rows)
{
if (row.ok())
{
num_rows += row->row_count();
std::cout << row->row_count() << std::endl;
[](::google::cloud::bigquery::storage::v1::AvroSchema const &schema,
::google::cloud::bigquery::storage::v1::AvroRows const &rows)
{
auto vs = avro::compileJsonSchemaFromString(schema.schema());
std::unique_ptr<avro::InputStream> in = avro::memoryInputStream((uint8_t *)(rows.serialized_binary_rows().data()), rows.serialized_binary_rows().size());
avro::DecoderPtr d = avro::validatingDecoder(vs, avro::binaryDecoder());
avro::GenericDatum datum(vs);
d->init(*in);
avro::decode(*d, datum);
if (datum.type() == avro::AVRO_RECORD)
{
const avro::GenericRecord &r = datum.value<avro::GenericRecord>();
std::cout << "Field-count: " << r.fieldCount() << std::endl;
for (auto i = 0; i < r.fieldCount(); i++)
{
const avro::GenericDatum &f0 = r.fieldAt(i);
if (f0.type() == avro::AVRO_STRING)
{
std::cout << "string: " << f0.value<std::string>() << std::endl;
}
else if (f0.type() == avro::AVRO_INT)
{
std::cout << "int: " << f0.value<int>() << std::endl;
}
else if (f0.type() == avro::AVRO_LONG)
{
std::cout << "long: " << f0.value<long>() << std::endl;
}
else
{
std::cout << f0.type() << std::endl;
}
}
}
}(session->avro_schema(), row->avro_rows());
}
}
std::cout << num_rows << " rows read from table: " << table_name << "\n";
return true;
}
catch (google::cloud::Status const &status)
{
std::cerr << "google::cloud::Status thrown: " << status << "\n";
return false;
}
BigQuery session gives me 3 chunks and each chunk has 41 rows, 25 rows and 10 rows in. But with this code, I can only print first row in the chunks.
I want to print all rows what I received from session.
Original Data is here and I copied this table to my own project.
Expect Result.(78 rows)
project id: MY_PROJECT_ID
ReadSession successfully created: projects/MY_PROJECT_ID/locations/asia-northeast3/sessions/<MY_SESSION_ID>.
row count: 41
Field-count: 25
string: 21083
string: Graves County
string: Kentucky
long: 32460
long: 227
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
string: 21221
string: Trigg County
string: Kentucky
long: 17300
long: 19
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
........<More rows>
........<More rows>
row count: 27
Field-count: 25
string: 21013
string: Bell County
string: Kentucky
long: 33180
long: 223
long: 0
long: 0
long: 0
long: 3
long: 30
long: 26
long: 0
long: 4
long: 0
long: 0
long: 10
long: 0
long: 0
long: 0
long: 0
long: 0
long: 0
long: 0
long: 0
long: 1
........<More rows>
........<More rows>
row count: 10
Field-count: 25
string: 21015
string: Boone County
string: Kentucky
long: 17140
long: 187
long: 0
long: 0
long: 0
long: 0
long: 51
long: 0
long: 0
long: 18
long: 0
long: 0
long: 0
long: 0
long: 0
long: 0
long: 62
long: 0
long: 0
long: 0
long: 16
long: 12
........<More rows>
........<More rows>
78 rows read from table: projects/MY_PROJECT_ID/datasets/MY_DATASET_ID/tables/covid_19
Actual Result.(Only 3 rows)
project id: MY_PROJECT_ID
ReadSession successfully created: projects/MY_PROJECT_ID/locations/asia-northeast3/sessions/<MY_SESSION_ID>.
row count: 41
Field-count: 25
string: 21083
string: Graves County
string: Kentucky
long: 32460
long: 227
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
row count: 27
Field-count: 25
string: 21013
string: Bell County
string: Kentucky
long: 33180
long: 223
long: 0
long: 0
long: 0
long: 3
long: 30
long: 26
long: 0
long: 4
long: 0
long: 0
long: 10
long: 0
long: 0
long: 0
long: 0
long: 0
long: 0
long: 0
long: 0
long: 1
row count: 10
Field-count: 25
string: 21015
string: Boone County
string: Kentucky
long: 17140
long: 187
long: 0
long: 0
long: 0
long: 0
long: 51
long: 0
long: 0
long: 18
long: 0
long: 0
long: 0
long: 0
long: 0
long: 0
long: 62
long: 0
long: 0
long: 0
long: 16
long: 12
78 rows read from table: projects/MY_PROJECT_ID/datasets/MY_DATASET_ID/tables/covid_19
After a lot spending times for toying codes, I found a working code below.
Use
avro::GenericReader.read()
to load data from avro::InputStream sequentially. After parsing row, useavro::GenericReader.drain()
to remove currunt row and read next row fromavro::InputStreamPtr
.