How do I return rows from my query with odbc_api?

53 Views Asked by At

To test Rust on real-life data, I would like to make a connection to an ODBC database. I have made some progress, but I'm struggling to retrieve the output from the cursor.

The best I have achieved so far is printing 'Hurray' 200 times, but this is not what I am looking for. I would really appreciate it if someone could guide me on how to push data to a Polars DataFrame. Does anyone know how I could accomplish this?

fn main() {
    use odbc_api::{ConnectionOptions, Cursor, Environment};
    let connection_string = connection_string();
    let env = Environment::new().unwrap();
    let conn = env
        .connect_with_connection_string(&connection_string, ConnectionOptions::default())
        .unwrap();
    let result_set = conn
        .execute(&String::from("SELECT TOP 200 * FROM pub.table"), ())
        .unwrap();
    if let Some(mut cursor) = result_set {
        loop {
            match cursor.next_row() {
                Ok(Some(_cursor_row)) => {
                    println!("Hurray!")
                }
                Ok(None) => {
                    break;
                }
                Err(err) => {
                    eprintln!("Error while fetching next row: {:?}", err);
                    break;
                }
            }
        }
    } else {
        eprintln!("Result set is None");
    }
}
1

There are 1 best solutions below

0
Hein Burgmans On

I found a working solution:

use anyhow::Error;
use odbc_api::{buffers::TextRowSet, ConnectionOptions, Cursor, Environment, ResultSetMetadata};
use polars::error::PolarsResult;
use polars::frame::DataFrame;
use polars::prelude::*;

use std::{
    ffi::CStr,
    fs::File,
    io::{stdout, Write},
    path::PathBuf,
    time::Instant,
};

fn connection_string() -> String {
    let hostname = String::from();
    let port = String::from();
    let database = String::from();
    let username = String::from();
    let password = String::from();
    let driver = String::from("{Progress OpenEdge 11.7 Driver}");
    let connection_string = format!(
        "DRIVER={};HostName={};DATABASENAME={};PORTNUMBER={};LogonID={};PASSWORD={}",
        driver, hostname, database, port, username, password
    );
    connection_string
}
const BATCH_SIZE: usize = 100000;
fn extract_data() -> Result<(), Error> {
    let out = stdout();
    let mut writer = csv::Writer::from_writer(out);
    let connection_string = connection_string();
    let env = Environment::new()?;
    let conn =
        env.connect_with_connection_string(&connection_string, ConnectionOptions::default())?;
    match conn.execute(
        &String::from("SELECT top 200 * FROM pub.table"),
        (),
    )? {
        Some(mut cursor) => {
            let mut headline: Vec<String> = cursor.column_names()?.collect::<Result<_, _>>()?;
            writer.write_record(headline)?;
            let mut buffers = TextRowSet::for_cursor(BATCH_SIZE, &mut cursor, Some(4096))?;
            let mut row_set_cursor = cursor.bind_buffer(&mut buffers)?;
            while let Some(batch) = row_set_cursor.fetch()? {
                // Within a batch, iterate over every row
                for row_index in 0..batch.num_rows() {
                    // Within a row iterate over every column
                    let record = (0..batch.num_cols())
                        .map(|col_index| {
                            let column_data = batch.at(col_index, row_index).unwrap_or(&[]);
                            // Convert non-UTF-8 bytes to UTF-8
                            String::from_utf8_lossy(column_data).to_string()
                        })
                        .collect::<Vec<String>>();

                    // Writes row as csv
                    writer.write_record(&record)?;
                }
            }
        }
        None => {
            eprintln!("Query came back empty. No output has been created.");
        }
    }
    Ok(())
}

fn dataframe_creator() -> PolarsResult<DataFrame> {
    let df = CsvReader::from_path("output.csv")?
        .has_header(true)
        .finish();
    df
}

fn main() {
    let now = Instant::now();
    let _ = extract_data();
    let pl_df = dataframe_creator();
    println!("{:#?}", pl_df);
    let elapsed = now.elapsed();
    println!("Elapsed {:#?}", elapsed);
}

Unfortunately, I have to export the data to a CSV file and then import it into a DataFrame, which means the same Python script runs faster.