Understanding how the parquet reader works and a suggestion

I have a dataset that currently contains 16 billion records, and that’s only data from YTD. The data is stored in parquet files that I have stored in a partitioned directory structured, as suggested in the Dremio docs.

As you can probably imagine, I’m looking to aggregate this data to an hourly level to make it easier to work with, and it works fine on a sub-set level, but I’m unable to create a reflection that contains the full aggregation. I’ve tried to set the reflection refresh to just be incremental, but I keep getting out of memory errors, so I gave up.

My current solution is running a python script that first prepares SQL queries for smaller chunks of the dataset and then I send a bunch of concurrent queries via turbodbc, and store away the result as new parquet files, that then becomes a new datasource in Dremio.

I read another question earlier tonight where Kelly from Dremio stated that:

Dremio includes a vectorized Parquet reader that reads directly to Arrow, so you should see some nice benefits when Parquet is the source if you’re seeing reads as the bottleneck (assuming I/O isn’t the limiting factor, of course).

This made me wonder if the out of memory errors I’m getting is because the Parquet reader is trying to ingest everything at the same time? (I’m guessing you probably want some query profiles, I’ll see what I can do)

Next, I started thinking that it would be great to have a little more control over how the query is performed. In my example, I first aggregated the data from seconds to minutes, and then to hours, so I can easily run the queries on daily subsets and not miss any data. The one idea I have now would be to be able to add a command to my SQL that tells the query planner how to run this query in parallel, maybe something like this:

select
  dir0,
  dir1,
  dir2,
  dimension1,
  dimension2,
  count(distinct value)
from dataset
group by 	
  dir0,
  dir1,
  dir2,
  dimension1,
  dimension2,
concurrent by
  dir0,
  dir1,
  dir2,
  dimension1

It wouldn’t surprise me one bit if some version of this is already being done, but I think it would be a nice addition!

/Niclas

Can you describe your deployment and how large the Parquet data is?

Right now it’s a test environment on Google Cloud, 164 GB memory and 24 cores, Windows Server 2016.

The Parquet data is roughly 16 billion rows and ~50 columns, and something like 0.75 TB on disk at the moment.

Ideally, I would be able to create a reflection for the aggregation, first to minute and then to hour, that is updated incrementally with new files, but the first run of it keeps failing on me.

A different way of phrasing this could be:

In cases where Dremio is creating a reflection or a table (reflections being ctas under the covers anyway) and the inputs and outputs are both partitioned, could Dremio intelligently break up its work into incremental operations on each relevant partition/set of partitions rather than attempting to tackle the entire table at once in cases where table statistics indicate that it will be a costly operation?

3 Likes

Exactly, well put!

Glad that it make sense to someone! :slight_smile:

That would be a wonderful technique or feature. This would potentially allow very large data sets to be optimized on initial load of a reflection. Which I find myself waiting for paint to dry on very large ones.

The work around, like the technique above, is to do this partitioning at the data source level, but then this defeats one of the benefits of dremio.

1 Like

@roos.niclas just out of curiosity - you mentioned you are running on a Windows Server 2016, does that mean you are using the Dremio Windows app?

Yes, currently running on Windows, but I’ve also done some test on a cluster of unix machines and gotten the same behavior.

You should see different performance behavior between our Windows & Unix deployments.
FYI per https://docs.dremio.com/quickstart/windows.html -

Dremio Windows app is pre-configured to use a maximum 2GB of heap memory and 4GB of direct memory.

Windows won’t leverage your full hardware power, while Unix implementations will.

I’ve tried changing the heap and direct memory sizes, both on Windows and Unix, but I get roughly the same issues. I think the only one that I’ve seen on the Windows machine only is something about a message in the RPC communication being to big, which causes everything to fail.

@desidero @roos.niclas @dealercrm thanks for all the input – can see this optimization be useful. We’ll make sure to consider this (or some variation) as we think through items for planning/execution roadmap.

Also, we’re already working on features for our next major release that should help with this scenario (i.e. can’t build reflection due to lack of memory.)

2 Likes