I’m just beginning my Dremio journey and wanted to get some data out of Dremio via arrow flight. I did some googling and ended up with these useful resources:
and arrow/middleware.py at master · apache/arrow · GitHub
From those I was able to cobble together working code that would make the connect_to_dremio_flight_server_endpoint
function fetch data:
connect_to_dremio_flight_server_endpoint(hostname="dremio-dev", flightport=32010, username=username,
password=password, sqlquery=sql, tls=False, certs=False)
But that required 300 lines of code and many functions. A bit more than it seemed I should have to use. Then a bit more googling revealed that there exists a Dremio Client python library. There was much rejoicing and I read the docs and installed the magic pixies in my machine.
I expected to be able to do something like
from dremio_client.flight import query
sql = 'SELECT * FROM mytable'
hostname = 'dremio-dev'
port=32010
query(sql, hostname=hostname, port=port, username=username, password=password)
But that (and all other methods I have tried) all get me some version of the following error:
---------------------------------------------------------------------------
FlightUnauthenticatedError Traceback (most recent call last)
<ipython-input-24-f6a1fc28cf57> in <module>
7 password=tokens[2]
8
----> 9 query(sql, hostname=hostname, port=port, username=username, password=password)
/scratch/shared/anaconda3/lib/python3.7/site-packages/dremio_client/flight/__init__.py in query(sql, client, hostname, port, username, password, pandas, tls_root_certs_filename)
79 """
80 if not client:
---> 81 client = connect(hostname, port, username, password, tls_root_certs_filename)
82
83 info = client.get_flight_info(flight.FlightDescriptor.for_command(sql))
/scratch/shared/anaconda3/lib/python3.7/site-packages/dremio_client/flight/__init__.py in connect(hostname, port, username, password, tls_root_certs_filename)
50 c = flight.FlightClient(location)
51 if username:
---> 52 c.authenticate(HttpDremioClientAuthHandler(username, password if password else ""))
53 return c
54
/scratch/shared/anaconda3/lib/python3.7/site-packages/pyarrow/_flight.pyx in pyarrow._flight.FlightClient.authenticate()
/scratch/shared/anaconda3/lib/python3.7/site-packages/pyarrow/_flight.pyx in pyarrow._flight.check_flight_status()
FlightUnauthenticatedError: gRPC returned unauthenticated error, with message:
It’s a bit of a cliff hanger as it looks like I should be getting more message, but it ends abruptly after “with message:”
Any tips on why my cobbled together bits (which I have included below) work while the package does not?
I presume my pip install dremio-client
installed version 0.14.0.
My Dremio install build info is as follows:
Build
12.1.0-202101041749050132-55c827cb
Edition
AWS Edition (activated)
Here’s the code I stole from the aforementioned links which I used to extract data successfully:
import argparse
import sys
import pyarrow
import pyarrow.flight
import pyarrow.csv as csv
from pyarrow import flight
def list_flights(args, client, connection_args={}):
print('Flights\n=======')
for flight in client.list_flights():
descriptor = flight.descriptor
if descriptor.descriptor_type == pyarrow.flight.DescriptorType.PATH:
print("Path:", descriptor.path)
elif descriptor.descriptor_type == pyarrow.flight.DescriptorType.CMD:
print("Command:", descriptor.command)
else:
print("Unknown descriptor type")
print("Total records:", end=" ")
if flight.total_records >= 0:
print(flight.total_records)
else:
print("Unknown")
print("Total bytes:", end=" ")
if flight.total_bytes >= 0:
print(flight.total_bytes)
else:
print("Unknown")
print("Number of endpoints:", len(flight.endpoints))
print("Schema:")
print(flight.schema)
print('---')
print('\nActions\n=======')
for action in client.list_actions():
print("Type:", action.type)
print("Description:", action.description)
print('---')
def do_action(args, client, connection_args={}):
try:
buf = pyarrow.allocate_buffer(0)
action = pyarrow.flight.Action(args.action_type, buf)
print('Running action', args.action_type)
for result in client.do_action(action):
print("Got result", result.body.to_pybytes())
except pyarrow.lib.ArrowIOError as e:
print("Error calling action:", e)
def push_data(args, client, connection_args={}):
print('File Name:', args.file)
my_table = csv.read_csv(args.file)
print('Table rows=', str(len(my_table)))
df = my_table.to_pandas()
print(df.head())
writer, _ = client.do_put(
pyarrow.flight.FlightDescriptor.for_path(args.file), my_table.schema)
writer.write_table(my_table)
writer.close()
def get_flight(args, client, connection_args={}):
if args.path:
descriptor = pyarrow.flight.FlightDescriptor.for_path(*args.path)
else:
descriptor = pyarrow.flight.FlightDescriptor.for_command(args.command)
info = client.get_flight_info(descriptor)
for endpoint in info.endpoints:
print('Ticket:', endpoint.ticket)
for location in endpoint.locations:
print(location)
get_client = pyarrow.flight.FlightClient(location,
**connection_args)
reader = get_client.do_get(endpoint.ticket)
df = reader.read_pandas()
print(df)
def _add_common_arguments(parser):
parser.add_argument('--tls', action='store_true',
help='Enable transport-level security')
parser.add_argument('--tls-roots', default=None,
help='Path to trusted TLS certificate(s)')
parser.add_argument("--mtls", nargs=2, default=None,
metavar=('CERTFILE', 'KEYFILE'),
help="Enable transport-level security")
parser.add_argument('host', type=str,
help="Address or hostname to connect to")
def connect_to_dremio_flight_server_endpoint(hostname, flightport, username, password, sqlquery,
tls, certs):
"""
Connects to Dremio Flight server endpoint with the provided credentials.
It also runs the query and retrieves the result set.
"""
try:
# Default to use an unencrypted TCP connection.
scheme = "grpc+tcp"
connection_args = {}
if tls:
# Connect to the server endpoint with an encrypted TLS connection.
print('[INFO] Enabling TLS connection')
scheme = "grpc+tls"
if certs:
print('[INFO] Trusted certificates provided')
# TLS certificates are provided in a list of connection arguments.
with open(certs, "rb") as root_certs:
connection_args["tls_root_certs"] = root_certs.read()
else:
print('[ERROR] Trusted certificates must be provided to establish a TLS connection')
sys.exit()
# Two WLM settings can be provided upon initial authneitcation
# with the Dremio Server Flight Endpoint:
# - routing-tag
# - routing queue
initial_options = flight.FlightCallOptions(headers=[
(b'routing-tag', b'test-routing-tag'),
(b'routing-queue', b'Low Cost User Queries')
])
client_auth_middleware = DremioClientAuthMiddlewareFactory()
client = flight.FlightClient("{}://{}:{}".format(scheme, hostname, flightport),
middleware=[client_auth_middleware], **connection_args)
# Authenticate with the server endpoint.
bearer_token = client.authenticate_basic_token(username, password, initial_options)
print('[INFO] Authentication was successful')
if sqlquery:
# Construct FlightDescriptor for the query result set.
flight_desc = flight.FlightDescriptor.for_command(sqlquery)
print('[INFO] Query: ', sqlquery)
# In addition to the bearer token, a query context can also
# be provided as an entry of FlightCallOptions.
# options = flight.FlightCallOptions(headers=[
# bearer_token,
# (b'schema', b'test.schema')
# ])
# Retrieve the schema of the result set.
options = flight.FlightCallOptions(headers=[bearer_token])
schema = client.get_schema(flight_desc, options)
print('[INFO] GetSchema was successful')
print('[INFO] Schema: ', schema)
# Get the FlightInfo message to retrieve the Ticket corresponding
# to the query result set.
flight_info = client.get_flight_info(flight.FlightDescriptor.for_command(sqlquery),
options)
print('[INFO] GetFlightInfo was successful')
print('[INFO] Ticket: ', flight_info.endpoints[0].ticket)
# Retrieve the result set as a stream of Arrow record batches.
reader = client.do_get(flight_info.endpoints[0].ticket, options)
print('[INFO] Reading query results from Dremio')
return(reader.read_pandas())
except Exception as exception:
print("[ERROR] Exception: {}".format(repr(exception)))
raise
class DremioClientAuthMiddlewareFactory(flight.ClientMiddlewareFactory):
"""A factory that creates DremioClientAuthMiddleware(s)."""
def __init__(self):
self.call_credential = []
def start_call(self, info):
return DremioClientAuthMiddleware(self)
def set_call_credential(self, call_credential):
self.call_credential = call_credential
class DremioClientAuthMiddleware(flight.ClientMiddleware):
"""
A ClientMiddleware that extracts the bearer token from
the authorization header returned by the Dremio
Flight Server Endpoint.
Parameters
----------
factory : ClientHeaderAuthMiddlewareFactory
The factory to set call credentials if an
authorization header with bearer token is
returned by the Dremio server.
"""
def __init__(self, factory):
self.factory = factory
def received_headers(self, headers):
auth_header_key = 'authorization'
authorization_header = []
for key in headers:
if key.lower() == auth_header_key:
authorization_header = headers.get(auth_header_key)
self.factory.set_call_credential([
b'authorization', authorization_header[0].encode("utf-8")])
def parse_arguments():
"""
Parses the command-line arguments supplied to the script.
"""
parser = argparse.ArgumentParser()
parser.add_argument('-host', '--hostname', type=str, help='Dremio co-ordinator hostname',
default='localhost')
parser.add_argument('-port', '--flightport', type=str, help='Dremio flight server port',
default='32010')
parser.add_argument('-user', '--username', type=str, help='Dremio username',
required=True)
parser.add_argument('-pass', '--password', type=str, help='Dremio password',
required=True)
parser.add_argument('-query', '--sqlquery', type=str, help='SQL query to test',
required=False)
parser.add_argument('-tls', '--tls', dest='tls', help='Enable encrypted connection',
required=False, default=False, action='store_true')
parser.add_argument('-certs', '--trustedCertificates', type=str,
help='Path to trusted certificates for encrypted connection', required=False)
return parser.parse_args()