Upcoming support for Apache Iceberg partition transforms?

The new Dremio 19 has support for Iceberg tables on S3 (if created using Spark 3). However, Iceberg partition transforms are not supported, as stated in the documentation.

We have rather large, existing Iceberg tables on S3-compatible storage, which use partition transforms extensively (day/hour, bucket and truncate) in order to get good query performance and keep overhead/costs low.

Would Dremio’s current Iceberg limitations be lifted in a future release? As I see it, being able to do hidden partitioning is one of Iceberg’s main selling points. I appreciate it will impact a lot of Dremio’s aggregate reflection functionality, which may not be a priority, but otherwise for external tables then?

1 Like

@wundi This work is currently in progress but involves many tickets. Currently, no exact ETA on which release it would land, please check back in couple of months for a better target date

Hi Balaji.

Just wanted to check back in. Appreciate all the work that has been done in Dremio 19 and 20 to support Iceberg tables with partition transformations (I believe the Iceberg documentation is now outdated, in that regard).

Partition pruning has not been implemented (yet) except for identity transformations. @balaji.ramaswamy - are you aware whether that be upcoming as well for the read path? Appreciate any insight you are allowed to give.

Write support would be a big win as well, but I believe that could come separately, since it touches a lot of areas like UI, reflections, CTAS (and it wouldn’t make sense without the read path anyway).

@wundi yes. with Dremio 20 . this is under key “dremio.iceberg.spec_evol_and_transformation.enabled” (Read Support). With Dremio 21 there is plan to have full default support.

Appreciate your response @Piyush_Vinay_Hurpade , but I think you are misunderstanding my question. We have enabled the dremio.iceberg.spec_evol_and_transformation.enabled support option already, and queries on Iceberg tables with hidden partitioning does work. It is partition pruning using hidden partitioning that is not implemented (except for the identity transform).

Maybe it’s more clear with an example. Assume a table like the following, created using spark:

CREATE TABLE prod.db.day_transform (
id bigint,
data string,
ts timestamp)
USING iceberg
PARTITIONED BY (days(ts))

If you insert data for some days:

INSERT INTO prod.db.day_transform (id, data, ts) VALUES (1, “data1”, ‘2022-02-13 12:34:56’)
INSERT INTO prod.db.day_transform (id, data, ts) VALUES (2, “data2”, ‘2022-02-14 12:34:56’)
INSERT INTO prod.db.day_transform (id, data, ts) VALUES (3, “data3”, ‘2022-02-15 12:34:56’)
INSERT INTO prod.db.day_transform (id, data, ts) VALUES (4, “data4”, ‘2022-02-16 12:34:56’)

-then when you execute the following query:

SELECT * FROM prod.db.day_transform where ts between ‘2022-02-14 00:00:00’ and ‘2022-02-14 20:00:00’

Dremio doesn’t prune files for partitions ts=2022-02-13, ts=2022-02-15 and ts=2022-02-16, although it is able to by using the metadata available in Iceberg.

Had I used an identity transform, like so:

CREATE TABLE prod.db.identity_transform (
id bigint,
data string,
ts timestamp,
ts_day date)
USING iceberg
PARTITIONED BY (ts_day)

INSERT INTO prod.db.identity_transform (id, data, ts) VALUES (1, “data1”, ‘2022-02-13 12:34:56’, ‘2022-02-13’)
INSERT INTO prod.db.identity_transform (id, data, ts) VALUES (2, “data2”, ‘2022-02-14 12:34:56’, ‘2022-02-14’)
INSERT INTO prod.db.identity_transform (id, data, ts) VALUES (3, “data3”, ‘2022-02-15 12:34:56’, ‘2022-02-15’)
INSERT INTO prod.db.identity_transform (id, data, ts) VALUES (4, “data4”, ‘2022-02-16 12:34:56’, ‘2022-02-16’)

SELECT * FROM prod.db.identity_transform where ts_day = ‘2022-02-14’ AND ts between ‘2022-02-14 00:00:00’ and ‘2022-02-14 20:00:00’

-then Dremio is able to prune partitions not having ts_day='2022-02-14'.

However, having to do the latter ts_day style strips away a lot of the value offers in the partitioning being hidden in the first place.

So, to reframe my question - in a future version of Dremio (21 or later), will we be able to define and query Iceberg tables defined using non-identity partition transforms like the day_transform table, but with partition pruning and the performance offered by tables defined using identity partition transforms like the identity_transform table?

I hope that makes my question clear?

dremio.iceberg.spec_evol_and_transformation.enabled also enables pruning. In Execution profile you must able to see ManifestFile Filter and ManifestList Filter in physical plan which shows prune condition.

I do see ManifestList Filter and ManifestFile Filter in the execution profile, when a query filters on a field used for partitioning - as long as the transform function is the identity function. Specifically, given the table (created using Spark):

CREATE TABLE iceberg_identity_date (timestamp timestamp, partition_day date) USING iceberg partitioned by (days(timestamp), partition_day)

-and data (inserted using Spark):

INSERT INTO iceberg_identity_date (timestamp, partition_day) VALUES (timestamp ‘now’, current_date)
INSERT INTO iceberg_identity_date (timestamp, partition_day) VALUES (timestamp ‘yesterday’, date_add(current_date, -1))

The following query in Dremio query filters using the manifest list and manifest files, because the partition transformation on partition_day is the identity function:

SELECT * FROM iceberg_identity_date
where partition_day = current_date

-as seen in the final transformation in the execution profile:

00-07 TableFunction(columns=[splitsIdentity, splits, colIds], ManifestFile Filter AnyColExpression=[(ref(name=“partition_day”) >= 19040 and ref(name=“partition_day”) <= 19040)]) : rowType = …
00-08 IcebergManifestList(table=[minio.“steen-test”.“default”.iceberg_identity_date], columns=[splitsIdentity, splits, colIds], splits=[1], ManifestList Filter Expression =[(ref(name=“partition_day”) >= 19040 and ref(name=“partition_day”) <= 19040)]) : rowType = …

However, if we query by timestamp, filtering using manifest list and files doesn’t happen, as the partition transformation is days(timestamp):

SELECT * FROM iceberg_identity_date
where “timestamp” = current_timestamp

-as seen in the execution profile:

00-09 TableFunction(columns=[splitsIdentity, splits, colIds]) : rowType = …
00-10 IcebergManifestList(table=[minio.“steen-test”.“default”.iceberg_identity_date], columns=[splitsIdentity, splits, colIds], splits=[1]) : rowType =…

As simpler way is to look at the input records reported by the query, where the former has none and the latter has 2, but both return 0 records.

Looking in the Dremio source code, com.dremio.exec.store.iceberg.IcebergUtils:getInvalidColumnsForPruning(...) does the filtering of partitioning columns used for pruning, where non-identity transformations are one of the filter criteria:


if (entry.getKey() != 0 || !partitionField.transform().isIdentity()) {

-which is used when generating splits:


/**
* we can not send partition column value for 1. nonIdentity columns or 2. columns which was not partition but later added as partition columns.
* because in case 1. information will be partial and scan will get incorrect values
* in case2. initially when column was not partition we don’t have value.
*/
if(invalidColumnsForPruning != null && invalidColumnsForPruning.contains(fileSchema.findField(field.sourceId()).name())) {
continue;
}

The code here documents that non-identity columns won’t be used for pruning. I am sure pruning is working as intended, for the moment, because supporting pruning on non-identity transforms seems to require some more work than on simple identity ones.

That said, coming back to my original question - would non-identity transform support “just” need some more work and come sometime in a future release, or is something actually hindering this (either technically or simply due to priority)?

I appreciate if you aren’t able to comment on roadmap or make any promises - it would just be great input when deciding how to optimally design our data lake, while still being able to use Dremio properly on top.

@wundi I just tested your example on Dremio 23.x and I see only one data file was read and only one row, looks like it is working. Can you please check?

Appreciate the followup @balaji.ramaswamy. Yes, it does work as intended now. Since Dremio 21.x there has been full support for partition transforms as well as partition evolution :+1:t2:

@wundi

Thanks a lot for the confirmation