I have a spark job as part of my data pipeline implemented in Apache airflow which runs daily and create new partition value in an iceberg table which is partitioned by timestamp column ‘event_time’. Since I may need to rerun the pipeline I am using overwritePartitions() method of DataFrameWriterV2 API to update data in table. I am planning to create reflection on this table and wanted to include reflection refresh also as part of the pipeline using REST APIs.
The pipeline I am planning is :
- Run spark job
- Create raw reflection with partition column same as that of the table using REST APIs if reflection not exists.
API used to create - Reflection | Dremio Documentation
API used to check reflection exists - Reflection | Dremio Documentation - Refresh reflection using below endpoints by setting type as incremental & refresh field as event_time.
Change settings to incremental - Table | Dremio Documentation
Trigger refresh - Table | Dremio Documentation
The approach above works fine for normal runs ( When new values for event_time comes in data ). However I came to know that incremental refresh in step #3 above is not working in case of rerunning the pipeline since we overwrite data for a certain value of event_time which already exists in the table. I think it is due to the limitation mentioned in this link.
Only option to make both the scenarios work for me is full refresh which I don’t want as it needs more resource and time and also refresh time increases day by day.
I tried the below flow without any luck.
- Delete the existing value from table for the event_time it is going to run
- Create reflection if not exists
- Refresh the reflection with incremental & incremental field event_time
- Execute the spark job
- Execute #3
I think the specific feature I am asking for is already available in 24.2 version which is not released for dremio-oss yet. Is there a workaround which I can do to avoid full refresh in my pipeline until the feature is available in dremio-oss?
I checked the dremio-oss code base in this class already thinking I can put some hacks via code but was unable to understand the logic which causes this behavior. Here it is comparing the schema hash of data set and reflection data set.