Strange planner behavior (and performance consequences)

Hi there,

our team is facing some performance issues in one of the most common queries we execute on dremio.
We have a large number of parquet files organized this way:

month/device/data.parquet

so we have data split by time and “source”. The issue we are facing is simply that if we query the data filtering on time and device, the planner smartly selects only the relevant parquet files, using “device” and, I suppose, parquet metadata on time column. So that if I want device “A” data from Jan 15 to Jan 20, Dremio knows that I can find that data only in “2019-01/A/data.parquet”. However, if we add another WHERE clause, Dremio starts loading also non-relevant data before filtering. This is nonsense since, adding a where clause, we are just restricting the output to a subset of the data we would get without that clause.

Since it’s not so easy to share data (we usually deal with billions rows split into thousands of parquet over S3), here I provide a way to reproduce the situation.

import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from pathlib import Path

df = pd.DataFrame({'Tstart' : [datetime(2019,1,1) + timedelta(minutes=m) for m in range(60*24*7)]})

for day in [datetime(2019,1,1) + timedelta(days=d) for d in range(7)]:
  for cat in ['A', 'B', 'C']:
        partition = df.copy()
        partition['Tend'] = partition['Tstart'] + np.timedelta64(59, 's')
        partition = partition[partition['Tstart'].dt.day == day.day]
        partition['val'] = np.random.random(size=partition.shape[0])
        partition['foo'] = np.random.choice(['x','y','z'], partition.shape[0])
        out_path = Path(f"/var/data/test/partitions/{day.strftime('%Y-%m-%d')}/{cat}/data.parquet")
        out_path.parents[0].mkdir(exist_ok=True, parents=True)
        partition.to_parquet(out_path, index=False)

This code creates a group of 21 parquet files. “dir0” is the day, “dir1” is the “category”. Inside every parquet file, the column “Tstart” contains only timestamps that match the “day” partition. This is the view I define on the dataset:

SELECT Tstart, Tend, val, foo, TO_DATE(dir0, ‘YYYY-MM-DD’) part_day, dir1 part_cat FROM partitions

Now, this is a query (#1) that Dremio can smartly process:

SELECT * FROM test
WHERE
Tstart >= ‘2019-01-01 12:00’
and Tstart < ‘2019-01-01 15:00’
and part_cat = ‘A’

In the job panel, it is reported that this query reads 180 values and returns 180. Awesome, Dremio is reading only “2019-01-01/A/data.parquet” file, and only a part of it (every data.parquet contains 1440 rows, as the minutes in a day).
Then this query (#2):

SELECT * FROM test
WHERE
Tstart >= ‘2019-01-01 6:00’
and Tstart < ‘2019-01-02 18:00’
and part_cat = ‘B’

reads 2160 rows and returns 2160. Brilliant, is is exactly 1080 rows from “2019-01-01/B/data.parquet” file plus 1080 rows from “2019-01-02/B/data.parquet” file.
Now the “problem”, go back to the first query and add a WHERE clause (#3):

SELECT * FROM test
WHERE
Tstart >= ‘2019-01-01 12:00’
and Tstart < ‘2019-01-01 15:00’
and part_cat = ‘A’
and foo = ‘x’

This query reads 3346 rows (more than the same query without the clause on “foo”) and returns 61 rows. The question is why it reads more rows than the first query?
I tried to understand why exactly 3346 rows, and it comes out that this is the number of rows that are returned by the same query, without the time constraints (#4):

SELECT * FROM test
WHERE
part_cat = ‘A’
and foo = ‘x’

So essentially, adding a constraint on foo, Dremio decides to read all the foo=‘x’ rows in the whole dataset, forgetting the fact that with the constraints on Tstart and part_cat it could immediately reduce the search space to only a file, as it does in the first query
Of course, since we humans are smarter, we can try to help Dremio by introducing a constraint also on the other partition level (the day), we can do this because we know (Dremio does not) that the Tstart column is strictly bound to the partition. (#5)

SELECT * FROM test
WHERE
Tstart >= ‘2019-01-01 12:00’
and Tstart < ‘2019-01-01 15:00’
and part_cat = ‘A’
and part_day = ‘2019-01-01’
and foo = ‘x’

This time Dremio reads 474 rows and outputs the same 61 rows, as expected. But we are still reading more than needed (the first query reads 180 rows and we know that the 61 rows we want, are for sure a subset of those 180).
The performances on this small test dataset are not an issue, but our real dataset is organized in months, not days, and the cardinality of both “part_cat” and “part_day” (“part_month”) is much higher (respectively ~5000 and ~50, both growing).

The performance impact on the real dataset is very noticeable, so we wonder if this is a known behavior that will be addressed in the future or we just have to deal with it.
Thanks

@Luca

Would you be able to provide us job profiles with and with out “foo” in the FILTER?

Sure
without_foo.zip (7.1 KB)
with_foo.zip (7.6 KB

looks like “with foo” reads more and has a “filter” stage, while “without” has not.

Hi @Luca,

SELECT * FROM test
WHERE
Tstart >= ‘2019-01-01 12:00’
and Tstart < ‘2019-01-01 15:00’
and part_cat = ‘A’
and part_day = ‘2019-01-01’
and foo = ‘x’

When I run this query with your test dataset, Dremio only looks at a single partition. From the Planning tab if you go to the Final Physical Transformation, you’ll see this recorded as the “splits” count. For example:

00-08 ParquetScan(table=["local-data".partitions], columns=[Tstart, Tend, val, foo, dir0, dir1], splits=[1], filters=[[Filter on foo: equal(foo, 'x') ]]) : rowType = RecordType(TIMESTAMP(3) Tstart, TIMESTAMP(3) Tend, DOUBLE val, VARCHAR(65536) foo, VARCHAR(65536) dir0, VARCHAR(65536) dir1): rowcount = 216.0, cumulative cost = {216.0 rows, 1296.0 cpu, 1296.0 io, 1296.0 network, 0.0 memory}, id = 2390

That’s all fine: the “partition pruning” is working because you’ve put filters on the partitioned columns part_date and part_cat.

Then the parquet scan looks at ~500 records, which is equal to the number of records with foo = x in the single parquet file in that single partition (/2019-01-01/A).

However, the filter on Tstart is then applied on top of this scan. Your other query demonstrated that this Tstart filter is actually more selective, so why not push that down first? Or why not push both filters down to the Parquet scan?

Let me get back to you on those questions…

1 Like