BigQuery Stroage Read API with C++ Deserializing Data

226 Views Asked by At

I'm trying to implement method Download table data in the Avro data format from this example, but I don't know how to implement.

namespace {
void ProcessRowsInAvroFormat(
    ::google::cloud::bigquery::storage::v1::AvroSchema const&,
    ::google::cloud::bigquery::storage::v1::AvroRows const&) {
  // Code to deserialize avro rows should be added here.
}
}  // namespace

I installed Apache AVRO C++ library and write the codes like:

bool bq::ReadSessionFromSchema(std::string project_id, std::string dataset_id, std::string table_id)
try
{
    auto table_name = "projects/" + project_id + "/datasets/" + dataset_id + "/tables/" + table_id;
    int max_stream_count = 1;

    google::cloud::bigquery::storage::v1::ReadSession read_session;
    read_session.set_table(table_name);
    read_session.set_data_format(google::cloud::bigquery::storage::v1::DataFormat::AVRO);
    read_session.mutable_read_options()->set_row_restriction(R"(state_name = "Kentucky")");

    auto session = read_client->CreateReadSession("projects/" + project_id, read_session, max_stream_count);
    if (!session)
    {
        std::cerr << session.status() << "\n";
        return false;
    }

    std::cout << "ReadSession successfully created: " << session->name()
              << ".\n";
    constexpr int kRowOffset = 0;
    auto read_rows = read_client->ReadRows(session->streams(0).name(), kRowOffset);

    std::int64_t num_rows = 0;
    for (auto const &row : read_rows)
    {
        if (row.ok())
        {
            num_rows += row->row_count();
            std::cout << row->row_count() << std::endl;

            [](::google::cloud::bigquery::storage::v1::AvroSchema const &schema,
               ::google::cloud::bigquery::storage::v1::AvroRows const &rows)
            {
                auto vs = avro::compileJsonSchemaFromString(schema.schema());
                std::unique_ptr<avro::InputStream> in = avro::memoryInputStream((uint8_t *)(rows.serialized_binary_rows().data()), rows.serialized_binary_rows().size());

                avro::DecoderPtr d = avro::validatingDecoder(vs, avro::binaryDecoder());
                avro::GenericDatum datum(vs);

                d->init(*in);
                avro::decode(*d, datum);

                if (datum.type() == avro::AVRO_RECORD)
                {
                    const avro::GenericRecord &r = datum.value<avro::GenericRecord>();
                    std::cout << "Field-count: " << r.fieldCount() << std::endl;
                    for (auto i = 0; i < r.fieldCount(); i++)
                    {
                        const avro::GenericDatum &f0 = r.fieldAt(i);

                        if (f0.type() == avro::AVRO_STRING)
                        {
                            std::cout << "string: " << f0.value<std::string>() << std::endl;
                        }
                        else if (f0.type() == avro::AVRO_INT)
                        {
                            std::cout << "int: " << f0.value<int>() << std::endl;
                        }
                        else if (f0.type() == avro::AVRO_LONG)
                        {
                            std::cout << "long: " << f0.value<long>() << std::endl;
                        }
                        else
                        {
                            std::cout << f0.type() << std::endl;
                        }
                    }
                }

            }(session->avro_schema(), row->avro_rows());
        }
    }

    std::cout << num_rows << " rows read from table: " << table_name << "\n";
    return true;
}
catch (google::cloud::Status const &status)
{
    std::cerr << "google::cloud::Status thrown: " << status << "\n";
    return false;
}

BigQuery session gives me 3 chunks and each chunk has 41 rows, 25 rows and 10 rows in. But with this code, I can only print first row in the chunks.

I want to print all rows what I received from session.

Original Data is here and I copied this table to my own project.

Expect Result.(78 rows)

project id: MY_PROJECT_ID
ReadSession successfully created: projects/MY_PROJECT_ID/locations/asia-northeast3/sessions/<MY_SESSION_ID>.
row count: 41
Field-count: 25
string: 21083
string: Graves County
string: Kentucky
long: 32460
long: 227
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
string: 21221
string: Trigg County
string: Kentucky
long: 17300
long: 19
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
........<More rows>
........<More rows>
row count: 27
Field-count: 25
string: 21013
string: Bell County
string: Kentucky
long: 33180
long: 223
long: 0
long: 0
long: 0
long: 3
long: 30
long: 26
long: 0
long: 4
long: 0
long: 0
long: 10
long: 0
long: 0
long: 0
long: 0
long: 0
long: 0
long: 0
long: 0
long: 1
........<More rows>
........<More rows>
row count: 10
Field-count: 25
string: 21015
string: Boone County
string: Kentucky
long: 17140
long: 187
long: 0
long: 0
long: 0
long: 0
long: 51
long: 0
long: 0
long: 18
long: 0
long: 0
long: 0
long: 0
long: 0
long: 0
long: 62
long: 0
long: 0
long: 0
long: 16
long: 12
........<More rows>
........<More rows>
78 rows read from table: projects/MY_PROJECT_ID/datasets/MY_DATASET_ID/tables/covid_19

Actual Result.(Only 3 rows)

project id: MY_PROJECT_ID
ReadSession successfully created: projects/MY_PROJECT_ID/locations/asia-northeast3/sessions/<MY_SESSION_ID>.
row count: 41
Field-count: 25
string: 21083
string: Graves County
string: Kentucky
long: 32460
long: 227
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
row count: 27
Field-count: 25
string: 21013
string: Bell County
string: Kentucky
long: 33180
long: 223
long: 0
long: 0
long: 0
long: 3
long: 30
long: 26
long: 0
long: 4
long: 0
long: 0
long: 10
long: 0
long: 0
long: 0
long: 0
long: 0
long: 0
long: 0
long: 0
long: 1
row count: 10
Field-count: 25
string: 21015
string: Boone County
string: Kentucky
long: 17140
long: 187
long: 0
long: 0
long: 0
long: 0
long: 51
long: 0
long: 0
long: 18
long: 0
long: 0
long: 0
long: 0
long: 0
long: 0
long: 62
long: 0
long: 0
long: 0
long: 16
long: 12
78 rows read from table: projects/MY_PROJECT_ID/datasets/MY_DATASET_ID/tables/covid_19
1

There are 1 best solutions below

0
On BEST ANSWER

After a lot spending times for toying codes, I found a working code below.

Use avro::GenericReader.read() to load data from avro::InputStream sequentially. After parsing row, use avro::GenericReader.drain() to remove currunt row and read next row from avro::InputStreamPtr.

bool bq::ReadSessionFromSchema(std::string project_id, std::string dataset_id, std::string table_id)
try
{
    auto table_name = "projects/" + project_id + "/datasets/" + dataset_id + "/tables/" + table_id;
    int max_stream_count = 1;

    google::cloud::bigquery::storage::v1::ReadSession read_session;
    read_session.set_table(table_name);
    read_session.set_data_format(google::cloud::bigquery::storage::v1::DataFormat::AVRO);
    read_session.mutable_read_options()->set_row_restriction(R"(state_name = "Kentucky")");

    auto session = read_client->CreateReadSession("projects/" + project_id, read_session, max_stream_count);
    if (!session)
    {
        std::cerr << session.status() << "\n";
        return false;
    }

    std::cout << "ReadSession successfully created: " << session->name()
              << ".\n";
    constexpr int kRowOffset = 0;
    auto read_rows = read_client->ReadRows(session->streams(0).name(), kRowOffset);

    std::int64_t num_rows = 0;
    for (auto const &row : read_rows)
    {
        if (row.ok())
        {
            num_rows += row->row_count();
            std::cout << "row count: " << row->row_count() << std::endl;

            [](::google::cloud::bigquery::storage::v1::AvroSchema const &schema,
               ::google::cloud::bigquery::storage::v1::AvroRows const &rows,
               int64_t count)
            {
                const avro::ValidSchema vs = avro::compileJsonSchemaFromString(schema.schema());
                std::istringstream iss(rows.serialized_binary_rows(), std::ios::binary);
                std::unique_ptr<avro::InputStream> in = avro::istreamInputStream(iss);

                avro::DecoderPtr d = avro::validatingDecoder(vs, avro::binaryDecoder());
                avro::GenericReader gr(vs, d);
                d->init(*in);

                avro::GenericDatum datum(vs);

                for (auto i = 0; i < count; i++)
                {
                    gr.read(*d, datum, vs);

                    if (datum.type() == avro::AVRO_RECORD)
                    {
                        const avro::GenericRecord &r = datum.value<avro::GenericRecord>();
                        std::cout << "Field-count: " << r.fieldCount() << std::endl;
                        for (auto i = 0; i < r.fieldCount(); i++)
                        {
                            const avro::GenericDatum &f0 = r.fieldAt(i);

                            if (f0.type() == avro::AVRO_STRING)
                            {
                                std::cout << "string: " << f0.value<std::string>() << std::endl;
                            }
                            else if (f0.type() == avro::AVRO_INT)
                            {
                                std::cout << "int: " << f0.value<int>() << std::endl;
                            }
                            else if (f0.type() == avro::AVRO_LONG)
                            {
                                std::cout << "long: " << f0.value<long>() << std::endl;
                            }
                            else
                            {
                                std::cout << f0.type() << std::endl;
                            }
                        }
                    }

                    gr.drain();
                }
            }(session->avro_schema(), row->avro_rows(), row->row_count());
        }
    }

    std::cout << num_rows << " rows read from table: " << table_name << "\n";
    return true;
}
catch (google::cloud::Status const &status)
{
    std::cerr << "google::cloud::Status thrown: " << status << "\n";
    return false;
}