I'm using Polars with ConnectorX: why do I get Dataframe type error

495 Views Asked by At

I'm trying to get the table description from a Postgres database. I use ConnectorX with Arrow2 destination and I'm trying to store it in a Polars Dataframe.

Here's the connection definition:

use connectorx::prelude::*;
use polars_core;
use std::convert::TryFrom;

let source_conn = SourceConn::try_from(
    "postgresql://postgres:[email protected]:5432/oceanics?cxprotocol=binary",
)
.expect("parse conn str failed");
let queries = &[CXQuery::from(
    "SELECT * FROM information_schema.columns where table_name = 'spicyperfmanager'",
)];
let destination: Arrow2Destination =
    get_arrow2(&source_conn, None, queries).expect("run failed");

Then I try to extract the data from the query:

let df: polars_core::frame::DataFrame = destination.polars().unwrap();
println!("{:?}", df);

But I get the following error:

error[E0308]: mismatched types
   --> src/data.rs:16:45
    |
16  |     let df: polars_core::frame::DataFrame = destination.polars().unwrap();
    |             -----------------------------   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected `DataFrame`, found `polars_core::frame::DataFrame`
    |             |
    |             expected due to this
    |
    = note: `polars_core::frame::DataFrame` and `DataFrame` have similar names, but are actually distinct types

Here's the cargo dependencies:

[dependencies]
connectorx = { version = "0.3.1", features = ["dst_arrow", "src_postgres", "dst_arrow2"] }
polars-core = "0.30.0"

I tried using the polars::prelude DataFrame, the polars-core::frame DataFrame, but I can't seem to wrap my head around what I'm doing wrong with the typing.

2

There are 2 best solutions below

3
Chayim Friedman On

connectorx uses polars v0.20.0, and it is incompatible with v0.30.0. So you actually have two versions of polars, and their types are incompatible.

Downgrade your polars to v0.20.0, and it should work.

Usually crates should expose the crates they import publicly to avoid situations like that, but it appears connectorx does not export its polars or polars_core (although it does expose polars_io, polars_lazy and polars_time).

0
Anatoly Bugakov On

As of now polars 0.35 does not depend on arrow2 but does depend on arrow-array >=41. So what I did was to leverage on connectorx's dst_arrow feature.

First I convert arrow-array's Field and Array to polars-array and then to Series, see batch_to_df in particular.

use connectorx::{source_router::SourceConn, sql::CXQuery, get_arrow};
use polars::{prelude::{DataFrame, Field}, series::Series, functions::concat_df_diagonal};
use polars_arrow::{array::Array, datatypes::{Field as PolarsArrowField}};
use arrow_array::RecordBatch;

pub fn conn() {

    let uri=format!("mysql://{0}:{1}@{2}:{3}/{4}?cxprotocol=binary",
        "username" , "pwd", "localhost", 3306, "ultima");

    let mut source_conn = SourceConn::try_from(uri.as_str()).expect("parse conn str failed");

    let queries = &[CXQuery::from("SELECT DISTINCT * FROM xyz")];
    let destination = get_arrow::get_arrow(&source_conn, None, queries).expect("run failed");

    let mut data = destination.arrow().unwrap();

    let res = record_batches_to_df(data);

}
    
pub fn record_batches_to_df<I>(batches: I) -> DataFrame
    where I: IntoIterator<Item=RecordBatch>
{
    let mut batches_iter: <I as IntoIterator>::IntoIter = batches.into_iter();

    let mut dfs = vec![];

    for next_batch in batches_iter {
        dfs.push(batch_to_df(next_batch));
    }

    let res = concat_df_diagonal(&dfs).unwrap();

    res
}

pub fn batch_to_df(batch: RecordBatch) -> DataFrame {
    let mut columns = vec![];
    batch.schema().all_fields().into_iter()
            .zip(batch.columns())
            .for_each(|(f, c)|{
                let polars_field = Field::from(&PolarsArrowField::from(f));
                let chunk: Box<dyn Array> = From::from(c.as_ref());
                let s = unsafe {Series::from_chunks_and_dtype_unchecked(polars_field.name.as_str(), vec![chunk], &polars_field.data_type())};
                columns.push(s);
            });
    DataFrame::from_iter(columns)
}

P.S. I leave this step out, but the only little issue is that you need to know the Schema of you DataFrame upfront, since all Array's come as Binary and need to be casted.See cast.

Also, if you get compilation issues due dependnecy resolution, see my question on that.