Incremental data reflection with Iceberg

Hi,

DremIO iceberg doc calls out that incremental reflection is not supported for Iceberg tables.
Is there a roadmap that this is going to be supported in the future?
Dremio.

Thanks,
Kyle

Hi @kyleahn

We do support field based incremental refreshes when the Iceberg table is not in the Hadoop catalog (basically not in a filesystem source). So, Hive and Glue are supported:

https://docs.dremio.com/software/data-formats/apache-iceberg/#iceberg-catalogs-in-dremio

We are also currently looking into snapshot based incremental refreshes too.

Can you tell me more about your incremental reflection use case?

Thanks

Hi Benny,

So basically, we plan to have n (can be any number) of Dremio datasets, and we plan to refresh it frequently as soon as some data arrive, with minimal interval of 10s. So, you could imagine that if the data are streamed in, the Dremio datasets will be refreshed every 10s. In stack, we are using Flink + Iceberg. We are currently using Hadoop catalog, but we can certainly attempt to use Glue or Hive. What is better supported in Dremio?

Thanks,
Kyle

All of the catalogs are well supported. Is your new data append only or will you re-write history? Today, for incremental, we only support append only and the Dremio dataset has limitations around no joins and only a single group-by. See Dremio For your use case with short refresh intervals like 10s, you also want to be thinking about table maintenance like small file compaction and old snapshot expiration. Both of these are not automated today for reflections.

A good use case for incremental reflections on big data is if the source is billions of rows and the Dremio dataset is a summary view that is incrementally updated every hour. The filter to find new records in the source should hit partition columns or only process new files.

Since your source data is in Iceberg, ideally, you’d try to incremental update the Dremio dataset using snapshot id diffing.

1 Like

First, thanks a lot! This is great to know.

I see. No joins are fine for now, but I would be curious to know what the roadmap on the query features is.

File compaction and snapshot expiration are definitely what we plan to implement some custom logic to maintain.

Append only is a bummer though. I don’t have a good understanding about the scalability of the full data reflection refresh. That, I would have to try out.

I will try Hive catalog and update the thread.

We have a somewhat similar setup as you are describing, although we commit data every 60 seconds in order to reduce the amount of maintenance that needs to be done as Benny mentions here:

We have tried reflections on Iceberg tables earlier, but we aren’t using it on iceberg tables or large sources for several reasons.
The lack of maintenance would kill performance of reflections over time on tables with frequent updates, unless your use cases will allow only refreshing hourly or daily. The other is the lack of incremental updates on file system tables (more on that later).

What we ended up doing is creating and maintaining corresponding Iceberg tables in our data pipelines instead and then register those tables as external reflections in Dremio. The upsides are:

  • Updates/rollbacks to main tables and “reflection” tables can be managed together and commits happen at the same time (although not atomically across tables)
  • Data can be written and rewrite data with sorting to improve performance (or other tweaks)
  • You can run several Dremio clusters using the same tables without having to run reflections per cluster

Some things to be mindful of though:

  • It is more complicated this way and requires more work. We often prototype a reflecting using the UI and, if useful, later promote it into our data pipeline
  • External reflections are less visible in the UI so more documentation is needed to convey that information to the users

Hope it helps

2 Likes

It doesn’t scale too well, if you have really large datasets. Dremio calculates the reflection by running the full query, which also happens when running the first refresh on an incremental refreshing reflection. This has given us a few surprises:

  1. Running an incremental reflection on a rather large Oracle table ran for so long that the Oracle transaction log ran out of space (because it runs in a transaction). We ended up doing change capture to Kafka instead and writing out that topic as Parquet/Iceberg
  2. Running a nightly reflection refresh on a large Iceberg table, we had to cap the reflection to cover only the latest week of data - otherwise the reflection refresh wouldn’t complete in reasonable time

Regarding 2., we had a use case for having a raw reflection but with different sorting than the original Iceberg table. Since Dremio runs the reflection refresh for all partitions at once, running a reflection refresh on billions of records with sorting would simply take too long. Given several TB of ram and NVMe spill drives, doing a raw reflection for a weeks worth of data would still take 1-4 hours, which was more time and resources than we were willing to spend.

1 Like

We’re doing this in Spark, which given Iceberg tables runs the query/rollup one partition at a time (or several, depending on your configuration)

1 Like

Wow thank you so much @wundi. Your example does really help me understand what I need to verify before fully committing to this design.

Some questions if you don’t mind :slight_smile:

What we ended up doing is creating and maintaining corresponding Iceberg tables in our data pipelines instead and then register those tables as external reflections in Dremio.

This is indeed what we currently do. We already have Iceberg tables created and maintained by Flink streaming & batch data pipelines with Glue catalog. Thus, updates/rollbacks, data compaction, and other data tweaking are all available to us. I’ve gotta do more research about external data reflections, it seems that no reflection refresh is required. Do you find that the performance of external reflections to be less capable than raw data reflections?

Also, is external reflection only available in Dremio Cloud? We are self-hosting Dremio in our EKS.

It doesn’t scale too well, if you have really large datasets. Dremio calculates the reflection by running the full query, which also happens when running the first refresh on an incremental refreshing reflection. This has given us a few surprises:

With external reflections and the Flink-managed Iceberg tables, I assume that your points 1 and 2 are no longer concerns anymore. Is that correct?

Lastly, I am curious how well supported external reflection is in Dremio in general. I have quickly done some research to understand more about external reflection and understand how we would automate the external reflection regisration using API, but there does not seem to be any documentation. The doc below does not even mention EXTERNAL as one of the reflection types.

https://docs.dremio.com/software/rest-api/reflections/reflection-summary/#:~:text=454c-a7bb-a9a8b5eca224-,reflectionType,-String

Thank you so much!

No problem - glad it’s helpful :slightly_smiling_face:

They refresh in the same way as any other Iceberg table (assuming you’re using Iceberg - it can be anything really). I seem to remember there being a support setting for it and it defaulting to one minute, but I can’t seem to find it. That said, you always have the option to trigger it manually via SQL:

ALTER TABLE table1 REFRESH METADATA;

There might be a rest API for it, if you prefer.

In terms of performance, Dremio’s query planner doesn’t really care if the reflection is external or not. That said, external reflections give you full control over the Iceberg table backing the reflections, so you are free to optimise manifests, rewrite data etc. however you please. So performance and capability wise, external reflections are as good or better than regular ones. But the flexibility of external reflections comes with added responsibility and toil, since it’s on you to keep data up-to-date (also across reflections) and do all the necessary maintenance.

For me, the big win of having Dremio manage the reflections is the convenience. Dremio analyses how reflections depend on each other and refreshes them in a sensible order, such that one reflection builds on top of the others rather on base tables. On top of that, Dremio keeps track of when reflections should be refreshed to satisfy your liveness criteria. However, your requirement of 10 second refresh intervals is not a good fit here.

Correct.

We run the OSS version on prem, so they’re available in all versions.

Since we’re running the OSS version, we are mixing and matching the rest API and executing SQL queries, since only one or the other tend to have the capabilities needed.

We use SQL to define and register external reflections. There is good documentation for the SQL approach here

You’re welcome!

1 Like

They refresh in the same way as any other Iceberg table (assuming you’re using Iceberg - it can be anything really)

Just to clarify my understanding, does the refreshing of external reflection for the Iceberg tables run on your Spark? Or does it run on your Dremio cluster?

To be more detailed, if Spark streams data into Iceberg tables, at what point does the external data reflection refresh? Would you be able to walk me through the lifecycle of streaming/batch events arriving Iceberg table, and how this gets updated in the external reflection so Dremio can query the latest data please?

Thank you! Your time is so valuable so I will do further research on my end, too :slight_smile:

At the moment, no. There’s nothing hindering us as such, but our current use cases aren’t that latency sensitive so we just rely on Dremio’s internal polling of the Iceberg metadata for both the base table as well as the one backing the external reflection.

Ingesting into the base table and the table backing the external reflection is split into two Spark jobs (although it could have been the same job). The job ingesting into the base table issues a commit to the Iceberg table at the end, after which Dremio will eventually discover a new commit has been made to that table. This part updates the base table.
Our second Spark job watches the base Iceberg table for changes. Once new commit(s) have been made to the base table, the Spark job then updates the rollup table and issues a commit to that table (which becomes a new snapshot of that Iceberg table, complete with additional metadata etc.). As with the base table, Dremio eventually discovers the new commit and will use that when serving queries using the external reflection.

For low-latency use cases like yours, the above adds more latency than needed. Spark uses polling of Iceberg metadata to discover new commits, which can be removed if the base and reflection tables are part of the same job (Flink or Spark). Dremio does polling as well, so notifying Dremio when you know data has changed will cut away some of that latency as well

1 Like

Gotcha. This makes perfect sense. This is super helpful, and I wish I could have sat down with you for a quick coffee :grinning:

This points me in the right direction in designing our new data query engine. Thank you so much. You have made my day :slight_smile:

1 Like

Excellent! :smile:

Yeah, that might have been a bit quicker, but at least everyone get to benefit this way :slightly_smiling_face:

It was a pleasure! I’ve received a lot of help in my time - it’s nice to be able to pay it forward sometimes :blush:

@wundi Thanks for sharing this information about how you guys are using external reflections. This is very useful for the community. You are exactly right about the planner treating external reflections the same as other raw or aggregation reflections from a matching/substitution point of view.

If the external reflection’s target table is a native Iceberg table, then no ALTER TABLE REFRESH METADATA command should be needed. Every query against this table will consult the Iceberg catalog to find the current metadata pointer and then scan from the latest snapshot.

However, if the external reflection’s target table is a filesystem dataset that has been promoted into a Dremio dataset using the unlimited splits feature (which uses Iceberg metadata behind the scenes), then metadata refresh is needed to see the latest data.

1 Like

Hi Benny, thanks for the further clarification.

If the external reflection’s target table is a native Iceberg table, then no ALTER TABLE REFRESH METADATA command should be needed.

Could you clarify what “native Iceberg table” means in this context? I am not sure what defines a native Iceberg table.

@kyleahn Native iceberg is the actual data in lake is in Iceberg format while even for APRQUET, Dremio stores metadata of the Parquet data in Iceberg format

1 Like

Appreciate you sharing that insight @Benny_Chow. I have some issues verifying that behaviour though, at least on the OSS version (versions 23.0.1 and 24.0.0). Here’s my testing process:

I have a native Iceberg dataset, written using the Hadoop catalog, stored in an “Amazon S3” Object storage source. Since the table is a native Iceberg table, the following statement shouldn’t apply:

Testing using the above table and setup I found that:

  1. When I query the table, only the current metadata file is fetched, along with the corrosponding manifest list etc. Specifically, for this table the current .../metadata/v18.metadata.json is fetched - there are no checks whether v19.metadata.json exists nor any requests for .../metadata/version-hint.text. So, although Dremio fetches metadata per query Dremio does not check whether a new metadata version exists.

  2. Metadata is automatically refreshed periodically, but the interval varies somewhat (I observed ~1-3 minutes). If data is queried or not does not seem to make a difference nor does changing the “Metadata → Metadata Refresh → Fetch every” setting on the S3 source.

In regards to 1., I tried setting spinning up a Nessie catalog and configuring services.nessie.remote-uri in dremio.conf, but the Nessie catalog does not receive requests when running queries in Dremio. This behaviour is consistent with the behaviour when using a Hadoop catalog but not with:

In regards to 2., I don’t really know what triggers metadata refreshes. I didn’t run any ALTER TABLE REFRESH METADATA commands, querying the table doesn’t matter and the table has no direct reflections nor any views or reflections referencing it. Observing the traffic to our S3 store shows:

# S3 source metadata refresh configuration initially set to 1 minute
2023-03-23T09:03:12 - Dremio query 
2023-03-23T09:03:51 - Automatic refresh
2023-03-23T09:05:01 - Dremio query (8x)
...
2023-03-23T09:05:08 - Dremio query
2023-03-23T09:06:20 - Automatic refresh
2023-03-23T09:07:12 - Automatic refresh
2023-03-23T09:08:50 - Automatic refresh
# 2023-03-23T09:10:10 - Change S3 source metadata refresh to 5 minutes
2023-03-23T09:11:01 - Automatic refresh
2023-03-23T09:12:51 - Automatic refresh
2023-03-23T09:15:01 - Automatic refresh
2023-03-23T09:17:11 - Automatic refresh
2023-03-23T09:18:51 - Automatic refresh
...

The full trace is here: s3 metadata request trace.log.zip (6.1 KB)

@Benny_Chow I am somewhat puzzled by the above, but maybe you can clarify? Maybe some only applies to the Cloud/Artic version and not the software (OSS and/or enterprise)? Or is a proper “Metastore” configuration needed like Hive or Glue? I would be rather dreadful if the latter is true, but maybe that’s a separate thread altogether.

Hi @wundi

I checked with some colleagues and here’s the expected metadata refresh behavior by Iceberg catalog:

For Arctic/Nessie catalogs, Dremio does not cache the Iceberg metadata (such as snapshot id, schema, partition spec, planning stats, etc) and always accesses the latest metadata/data.

For Hadoop catalog, Dremio does cache the Iceberg metadata because always fetching this from a remote filesystem like S3 could add couple 100ms to seconds to planning time. So, metadata refresh does come into play here and ALTER TABLE REFRESH METADATA can force an immediate refresh.

For Glue/Hive catalog, Dremio also caches the Iceberg metadata but the expiry duration is much shorter and governed by dremio.metadata_expiry_check_interval_in_secs (default 60s). This is because it is less expensive to gather the Iceberg metadata (i.e. metadata.json) from these metastore catalogs than going to a filesystem. This support option is only applicable to native Iceberg tables stored in Glue/Hive catalog.

Appreciate the effort @Benny_Chow.

Maybe some of my confusion is based on what is meant when referring to a Nessie catalog and how configure it. To my knowledge the options are:

  1. Arctic (which uses Nessie under the covers) is available in the “Dremio Cloud” version only. There is good UI support for adding/managing catalogs, as seen here. The Nessie catalogs are managed by Dremio

  2. Configure services.nessie.remote-uri in dremio.conf when using “Dremio Software” and configuring iceberg.catalog_type=nessie as a connection property on the AWS S3 source. To my knowledge, the OSS and Enterprise versions are the same in this regards, but please correct me if I am wrong. There is no UI support and working with Iceberg tables requires that they be defined as an Iceberg table in the AWS S3 source, in the same way as it works for adding Iceberg tables using a Hadoop catalog.

In terms of the metadata refresh behaviour you describe, I assume that:

-actually only applies to 1. as described above - meaning only for Arctic catalogs in Dremio Cloud?

In regards to 2., my testing shown behaviour consistent with your description of Hadoop catalogs:

So the question then becomes, for the ones not able to use Dremio Cloud, whether is a way to use Nessie catalogs that I haven’t described in the above, in order to achieve the refresh behaviour outlined in 1. (or in general)?

There’s a more general question about Nessie and catalog support in the “Dremio Software” version, but I’ll save that for a new post, such that this one isn’t going completely off-topic :slightly_smiling_face: