pyarrow flight error: Could not finish writing before closing

335 Views Asked by At

I have this code t hat I'm using to execute queries in Dremio using pyarrow flight:

class DremioConnector:
    env: str
    auth_token: str

    def __init__(self, env: str,  auth_token: str):
        self.env = env
        self.auth_token = auth_token

    def get_dremio_client(self):
        dremio_url = get_url_from_template(DREMIO_URL, self.env)

        return FlightClient(f"grpc+tls://{dremio_url}:*****", tls_root_certs=get_cert(),
                            middleware=[DremioClientAuthMiddlewareFactory(), CookieMiddlewareFactory()], **{}, )

    def get_dremio_credentials(self):
       
        ...
        ...

        return username, password

    def __create_flight_call_options(self, username: str, password: str, client: FlightClient) -> FlightCallOptions:
        headers: list[Any] = []

        bearer_token = client.authenticate_basic_token(username, password, FlightCallOptions(headers=headers))
        headers.append(bearer_token)
        return FlightCallOptions(headers=headers)

    def run_query(self, query: str, username: str, password: str, client: FlightClient) -> FlightStreamReader:
        flight_desc = FlightDescriptor.for_command(query)
        flight_info = client.get_flight_info(flight_desc, self.__create_flight_call_options(username, password, client))
        reader = client.do_get(flight_info.endpoints[0].ticket, self.__create_flight_call_options(username, password, client))
        return reader


connector = DremioConnector('env', "auth_token")

dremio_client = connector.get_dremio_client()

recently I've started getting this error: E pyarrow._flight.FlightInternalError: Could not finish writing before closing

when calling

bearer_token = client.authenticate_basic_token(username, password, FlightCallOptions(headers=headers))

Does anyone know what might be the issue? I have no idea why this error started popping up, since it was working before. Am I doing something wrong here?

1

There are 1 best solutions below

1
On

Here is the code I use for connecting to Dremio using PyArrow, works pretty well for Dremio Cloud, hopefully this helps.

Simple PyArrow Client for Dremio

#----------------------------------
# IMPORTS
#----------------------------------
## Import Pyarrow
from pyarrow import flight
from pyarrow.flight import FlightClient
import duckdb
import pyarrow.dataset as ds
import polars as pl
import pandas as pd

class DremioConnection:
    
    def __init__(self, token, location):
        self.token = token
        self.location = location
        self.headers = [
    (b"authorization", f"bearer {token}".encode("utf-8"))
    ]
        self.client = FlightClient(location=(location))
        
    def query(self, query, client, headers):
        ## Options for Query
        options = flight.FlightCallOptions(headers=headers)
        
        ## Get ticket to for query execution, used to get results
        flight_info = client.get_flight_info(flight.FlightDescriptor.for_command(query), options)
    
        ## Get Results (Return Value a FlightStreamReader)
        results = client.do_get(flight_info.endpoints[0].ticket, options)
        return results
        
    # Returns a FlightStreamReader
    def toArrow(self, query):
        return self.query(query, self.client, self.headers)
    
    #Returns a DuckDB Relation
    def toDuckDB(self, querystring):
        streamReader = self.query(querystring, self.client, self.headers)
        table = streamReader.read_all()
        my_ds = ds.dataset(source=[table])
        return duckdb.arrow(my_ds)

    #Returns a Polars Dataframe
    def toPolars(self, querystring):
        streamReader = self.query(querystring, self.client, self.headers)
        table = streamReader.read_all()
        df = pl.from_arrow(table)
        return df

    #Returns a Polars Dataframe
    def toPolars(self, querystring):
        streamReader = self.query(querystring, self.client, self.headers)
        df = streamReader.read_pandas()
        return df