Connecting to arrow flight with dremio_client in python

I’m just beginning my Dremio journey and wanted to get some data out of Dremio via arrow flight. I did some googling and ended up with these useful resources:

and arrow/middleware.py at master · apache/arrow · GitHub

From those I was able to cobble together working code that would make the connect_to_dremio_flight_server_endpoint function fetch data:

connect_to_dremio_flight_server_endpoint(hostname="dremio-dev", flightport=32010, username=username, 
                                              password=password, sqlquery=sql, tls=False, certs=False)

But that required 300 lines of code and many functions. A bit more than it seemed I should have to use. Then a bit more googling revealed that there exists a Dremio Client python library. There was much rejoicing and I read the docs and installed the magic pixies in my machine.

I expected to be able to do something like

from dremio_client.flight import query

sql = 'SELECT * FROM mytable'
hostname = 'dremio-dev'
port=32010

query(sql, hostname=hostname, port=port, username=username, password=password)

But that (and all other methods I have tried) all get me some version of the following error:

---------------------------------------------------------------------------
FlightUnauthenticatedError                Traceback (most recent call last)
<ipython-input-24-f6a1fc28cf57> in <module>
      7 password=tokens[2]
      8 
----> 9 query(sql, hostname=hostname, port=port, username=username, password=password)

/scratch/shared/anaconda3/lib/python3.7/site-packages/dremio_client/flight/__init__.py in query(sql, client, hostname, port, username, password, pandas, tls_root_certs_filename)
     79         """
     80         if not client:
---> 81             client = connect(hostname, port, username, password, tls_root_certs_filename)
     82 
     83         info = client.get_flight_info(flight.FlightDescriptor.for_command(sql))

/scratch/shared/anaconda3/lib/python3.7/site-packages/dremio_client/flight/__init__.py in connect(hostname, port, username, password, tls_root_certs_filename)
     50             c = flight.FlightClient(location)
     51         if username:
---> 52             c.authenticate(HttpDremioClientAuthHandler(username, password if password else ""))
     53         return c
     54 

/scratch/shared/anaconda3/lib/python3.7/site-packages/pyarrow/_flight.pyx in pyarrow._flight.FlightClient.authenticate()

/scratch/shared/anaconda3/lib/python3.7/site-packages/pyarrow/_flight.pyx in pyarrow._flight.check_flight_status()

FlightUnauthenticatedError: gRPC returned unauthenticated error, with message: 

It’s a bit of a cliff hanger as it looks like I should be getting more message, but it ends abruptly after “with message:”

Any tips on why my cobbled together bits (which I have included below) work while the package does not?

I presume my pip install dremio-client installed version 0.14.0.

My Dremio install build info is as follows:

Build
12.1.0-202101041749050132-55c827cb
Edition
AWS Edition (activated)

Here’s the code I stole from the aforementioned links which I used to extract data successfully:

import argparse
import sys

import pyarrow
import pyarrow.flight
import pyarrow.csv as csv
from pyarrow import flight


def list_flights(args, client, connection_args={}):
    print('Flights\n=======')
    for flight in client.list_flights():
        descriptor = flight.descriptor
        if descriptor.descriptor_type == pyarrow.flight.DescriptorType.PATH:
            print("Path:", descriptor.path)
        elif descriptor.descriptor_type == pyarrow.flight.DescriptorType.CMD:
            print("Command:", descriptor.command)
        else:
            print("Unknown descriptor type")

        print("Total records:", end=" ")
        if flight.total_records >= 0:
            print(flight.total_records)
        else:
            print("Unknown")

        print("Total bytes:", end=" ")
        if flight.total_bytes >= 0:
            print(flight.total_bytes)
        else:
            print("Unknown")

        print("Number of endpoints:", len(flight.endpoints))
        print("Schema:")
        print(flight.schema)
        print('---')

    print('\nActions\n=======')
    for action in client.list_actions():
        print("Type:", action.type)
        print("Description:", action.description)
        print('---')


def do_action(args, client, connection_args={}):
    try:
        buf = pyarrow.allocate_buffer(0)
        action = pyarrow.flight.Action(args.action_type, buf)
        print('Running action', args.action_type)
        for result in client.do_action(action):
            print("Got result", result.body.to_pybytes())
    except pyarrow.lib.ArrowIOError as e:
        print("Error calling action:", e)


def push_data(args, client, connection_args={}):
    print('File Name:', args.file)
    my_table = csv.read_csv(args.file)
    print('Table rows=', str(len(my_table)))
    df = my_table.to_pandas()
    print(df.head())
    writer, _ = client.do_put(
        pyarrow.flight.FlightDescriptor.for_path(args.file), my_table.schema)
    writer.write_table(my_table)
    writer.close()


def get_flight(args, client, connection_args={}):
    if args.path:
        descriptor = pyarrow.flight.FlightDescriptor.for_path(*args.path)
    else:
        descriptor = pyarrow.flight.FlightDescriptor.for_command(args.command)

    info = client.get_flight_info(descriptor)
    for endpoint in info.endpoints:
        print('Ticket:', endpoint.ticket)
        for location in endpoint.locations:
            print(location)
            get_client = pyarrow.flight.FlightClient(location,
                                                     **connection_args)
            reader = get_client.do_get(endpoint.ticket)
            df = reader.read_pandas()
            print(df)


def _add_common_arguments(parser):
    parser.add_argument('--tls', action='store_true',
                        help='Enable transport-level security')
    parser.add_argument('--tls-roots', default=None,
                        help='Path to trusted TLS certificate(s)')
    parser.add_argument("--mtls", nargs=2, default=None,
                        metavar=('CERTFILE', 'KEYFILE'),
                        help="Enable transport-level security")
    parser.add_argument('host', type=str,
                        help="Address or hostname to connect to")

def connect_to_dremio_flight_server_endpoint(hostname, flightport, username, password, sqlquery,
  tls, certs):
    """
    Connects to Dremio Flight server endpoint with the provided credentials.
    It also runs the query and retrieves the result set.
    """

    try:
        # Default to use an unencrypted TCP connection.
        scheme = "grpc+tcp"
        connection_args = {}

        if tls:
            # Connect to the server endpoint with an encrypted TLS connection.
            print('[INFO] Enabling TLS connection')
            scheme = "grpc+tls"
            if certs:
                print('[INFO] Trusted certificates provided')
                # TLS certificates are provided in a list of connection arguments.
                with open(certs, "rb") as root_certs:
                    connection_args["tls_root_certs"] = root_certs.read()
            else:
                print('[ERROR] Trusted certificates must be provided to establish a TLS connection')
                sys.exit()
 
        # Two WLM settings can be provided upon initial authneitcation
        # with the Dremio Server Flight Endpoint:
        # - routing-tag
        # - routing queue
        initial_options = flight.FlightCallOptions(headers=[
            (b'routing-tag', b'test-routing-tag'),
            (b'routing-queue', b'Low Cost User Queries')
        ])
        client_auth_middleware = DremioClientAuthMiddlewareFactory()
        client = flight.FlightClient("{}://{}:{}".format(scheme, hostname, flightport),
          middleware=[client_auth_middleware], **connection_args)

        # Authenticate with the server endpoint.
        bearer_token = client.authenticate_basic_token(username, password, initial_options)
        print('[INFO] Authentication was successful')

        if sqlquery:
            # Construct FlightDescriptor for the query result set.
            flight_desc = flight.FlightDescriptor.for_command(sqlquery)
            print('[INFO] Query: ', sqlquery)

            # In addition to the bearer token, a query context can also
            # be provided as an entry of FlightCallOptions. 
            # options = flight.FlightCallOptions(headers=[
            #     bearer_token,
            #     (b'schema', b'test.schema')
            # ])

            # Retrieve the schema of the result set.
            options = flight.FlightCallOptions(headers=[bearer_token])
            schema = client.get_schema(flight_desc, options)
            print('[INFO] GetSchema was successful')
            print('[INFO] Schema: ', schema)

            # Get the FlightInfo message to retrieve the Ticket corresponding
            # to the query result set.
            flight_info = client.get_flight_info(flight.FlightDescriptor.for_command(sqlquery),
                options)
            print('[INFO] GetFlightInfo was successful')
            print('[INFO] Ticket: ', flight_info.endpoints[0].ticket)

            # Retrieve the result set as a stream of Arrow record batches.
            reader = client.do_get(flight_info.endpoints[0].ticket, options)
            print('[INFO] Reading query results from Dremio')
            return(reader.read_pandas())

    except Exception as exception:
        print("[ERROR] Exception: {}".format(repr(exception)))
        raise

class DremioClientAuthMiddlewareFactory(flight.ClientMiddlewareFactory):
    """A factory that creates DremioClientAuthMiddleware(s)."""

    def __init__(self):
        self.call_credential = []

    def start_call(self, info):
        return DremioClientAuthMiddleware(self)

    def set_call_credential(self, call_credential):
        self.call_credential = call_credential


class DremioClientAuthMiddleware(flight.ClientMiddleware):
    """
    A ClientMiddleware that extracts the bearer token from 
    the authorization header returned by the Dremio 
    Flight Server Endpoint.
    Parameters
    ----------
    factory : ClientHeaderAuthMiddlewareFactory
        The factory to set call credentials if an
        authorization header with bearer token is
        returned by the Dremio server.
    """

    def __init__(self, factory):
        self.factory = factory

    def received_headers(self, headers):
        auth_header_key = 'authorization'
        authorization_header = []
        for key in headers:
          if key.lower() == auth_header_key:
            authorization_header = headers.get(auth_header_key)
        self.factory.set_call_credential([
            b'authorization', authorization_header[0].encode("utf-8")])


def parse_arguments():
    """
    Parses the command-line arguments supplied to the script.
    """
    parser = argparse.ArgumentParser()
    parser.add_argument('-host', '--hostname', type=str, help='Dremio co-ordinator hostname',
      default='localhost')
    parser.add_argument('-port', '--flightport', type=str, help='Dremio flight server port',
      default='32010')
    parser.add_argument('-user', '--username', type=str, help='Dremio username',
      required=True)
    parser.add_argument('-pass', '--password', type=str, help='Dremio password',
      required=True)
    parser.add_argument('-query', '--sqlquery', type=str, help='SQL query to test',
      required=False)
    parser.add_argument('-tls', '--tls', dest='tls', help='Enable encrypted connection',
      required=False, default=False, action='store_true')
    parser.add_argument('-certs', '--trustedCertificates', type=str,
      help='Path to trusted certificates for encrypted connection', required=False)
    return parser.parse_args()

I had a chat via email with @Jonny at Dremio to figure out what’s going on. Jonny has a great perspective and helped me understand that the issue is Dremio being a big ahead of the open source community library. Here’s Jonny’s response:

Essentially what has happened is that more recently the flight authentication protocol was upgraded from auth0 to auth2 in a very recent version. As dremio_client is an community project it is often a little behind. One thing I would add is that with flight being upgraded to auth2 this is the point at which the protocol is being stabilized, so these issues with the dremio_client should not occur again. I have seen that someone in the community is starting working on this - here - so I expect that the issues should be rectified relatively soon.

here’s the GitHub issue where this feature is being tracked:

Hi Team,
I am facing the same issue. I followed the steps mentioned above and in this link

But having the same issue.

Can some one please help me with this?

Thanks

/arrow/cpp/src/arrow/python/flight.cc:354: Python client middleware failed in StartCall: Unknown error: Did not receive authorization header back from server.. Detail: Python exception: Exception
[ERROR] Exception: FlightUnavailableError('gRPC returned unavailable error, with message: Socket closed')
Traceback (most recent call last):
  File "/usr/local/bin/cre_business", line 33, in <module>
    sys.exit(load_entry_point('dl-cre', 'console_scripts', 'cre_business')())
  File "/usr/local/bin/cre_business", line 25, in importlib_load_entry_point
    return next(matches).load()
  File "/usr/local/lib/python3.10/importlib/metadata/__init__.py", line 171, in load
    module = import_module(match.group('module'))
  File "/usr/local/lib/python3.10/importlib/__init__.py", line 126, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1050, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
  File "<frozen importlib._bootstrap>", line 1006, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 688, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 883, in exec_module
  File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
  File "/usr/local/statement_quality_reports/src/business_rule/business_rule_new.py", line 81, in <module>
    connect_to_dremio_flight_server_endpoint(host, port, username, password,
  File "/usr/local/statement_quality_reports/src/common/example.py", line 214, in connect_to_dremio_flight_server_endpoint
    bearer_token = client.authenticate_basic_token(username, password,
  File "pyarrow/_flight.pyx", line 1245, in pyarrow._flight.FlightClient.authenticate_basic_token
  File "pyarrow/_flight.pyx", line 69, in pyarrow._flight.check_flight_status
pyarrow._flight.FlightUnavailableError: gRPC returned unavailable error, with message: Socket closed