Unable to Query Iceberg Data in Dremio - Data Written by Kafka Connector

Issue Description
I am using a Kafka Connect Sink Connector to write data to Iceberg tables with the following configuration:

connector.class=io.tabular.iceberg.connect.IcebergSinkConnector
tasks.max=2
topics=transactions
iceberg.tables.dynamic-enabled=true
iceberg.tables.route-field=table
iceberg.tables.auto-create-enabled=true
iceberg.tables.default-id-columns=id
iceberg.tables.upsert-mode-enabled=true
iceberg.tables.default-partition-by=date

What I’ve Tried

• I verified that the data files exist in the S3 bucket. Path: warehouse/bill_cbfaff37-06d3-4437-a4cd-67e96e3ba498/data/date=2024-03-04

• I ran:

The data is written successfully, and I can see the files in the S3 data directory. However, when I try to query these tables in Dremio, no data is returned.

ALTER TABLE nessie.juantu_aaa REFRESH METADATA;

Suspected Issue

It seems Dremio is unable to read the Iceberg metadata or data files generated by the Kafka Connector. This might be due to a difference in how metadata or partitions are handled between the two systems.

Questions

  1. Does Dremio support querying Iceberg tables written by external tools like Kafka Connect, especially when using upsert mode?

  2. Are there specific configurations needed in Dremio or the connector to ensure compatibility?

  3. Is there a way to force Dremio to recognize these data files?

Any insights or suggestions would be greatly appreciated!

Sounds like you’re hitting the same issues as we did. See Issues with equality deletes in 25.0.0 - and 25.1.0 (hadoop catalog)

Dremio doesn’t support reading table with equality deletes. It is still an issue in 25.2.0. If you do a full rewrite on the table after each update it works, but that’s not really a viable option.

Append only tables written by the connector works though.

Me too facing same issue. Any chances fixation coming soon?? @balaji.ramaswamy

I am able to send data to Nessie from kafka connect using tabular iceberg sink in upsert mode but the error that appears is

Equality delete file s3a://warehouse/NessieData/customers_ed5957c7-42cd-4ec2-a734-95a62e9dce5d/data/00001-1733738404355-5e477cf1-a992-4dbf-9f73-47ca43d289c5-00002.parquet is a global delete file. Equality delete files saved with an unpartitioned spec are treated as global deletes, which are not supported.

There is no problem with append only mode…

@bardan @Barry Can you please attach query profiles of the failed jobs?

5fac67a4-d272-4f93-858d-28948ce785be.zip (17.7 KB)

@balaji.ramaswamy Here is the attached query profile downloaded from dremio’s UI. I am using Dremio oss 25.2.0 version.

Just so that you get a context… What I am doing is I am capturing CDC from mysql using debezium and pushing the data to redpanda(Kafka). Then I am using tabular’s iceberg sink connector to push it to Nessie Catalog. I am running dremio, nessie, postgres, redpanda, minio and mysql all on a docker container in my local laptop.

I have setup iceberg sink connector like below in the connector:

curl --request POST
–url http://localhost:8083/connectors
–header ‘Content-Type: application/json’
–data ‘{
“name”: “realtimeAnalytics-Nessie-Iceberg”,
“config”: {
“connector.class”: “io.tabular.iceberg.connect.IcebergSinkConnector”,
“tasks.max”: “2”,
“errors.logs.enabled”: “true”,
“errors.logs.include.messages”: “true”,
“topics.regex”: “realtimeAnalytics.(.*)”,
“iceberg.tables.dynamic-enabled”: “true”,
“iceberg.tables.route-field”: “__table”,
“iceberg.tables.cdc_field”: “__op”,
“iceberg.tables.auto-create-enabled”: “true”,
“iceberg.tables.default-id-columns”: “id”,
“iceberg.tables.upsert-mode-enabled”: “true”,
“iceberg.tales.default-partition-by”: “__source_ts_ms”,
“iceberg.control.commit.interval-ms”: “60000”,
“iceberg.tables.auto-create-props.gc.enabled”: “false”,
“iceberg.tables.auto-create-props.write.metadata.delete-after-commit.enabled”: “false”,
“iceberg.tables.auto-create-props.write.update.mode”: “copy-on-write”,
“iceberg.tables.auto-create-props.write.delete.mode”: “copy-on-write”,
“iceberg.tables.auto-create-props.write.merge.mode”: “copy-on-write”,
“iceberg.tables.evolve-schema-enabled”: “true”,
“iceberg.catalog.authentication.type”: “NONE”,
“iceberg.catalog.client.region”: “us-east-1”,
“iceberg.catalog.catalog-impl”: “org.apache.iceberg.nessie.NessieCatalog”,
“iceberg.catalog.uri”: “http://nessie:19120/api/v2”,
“iceberg.catalog.ref”: “main”,
“iceberg.catalog.default-namespace”: “pyiceberg_demo”,
“iceberg.catalog.warehouse”: “s3a://warehouse/NessieData”,
“iceberg.catalog.s3.endpoint”: “http://minio:9000”,
“iceberg.catalog.io-impl”: “org.apache.iceberg.aws.s3.S3FileIO”,
“iceberg.catalog.s3.path-style-access”: “true”,
“key.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“key.converter.schema.registry.url”:“http://redpanda:8081”,
“key.converter.schemas.enable”:“false”,
“key.converter.use.latest.version”:“true”,
“value.converter”:“io.confluent.connect.avro.AvroConverter”,
“value.converter.schema.registry.url”:“http://redpanda:8081”,
“value.converter.schemas.enable”:“false”,
“value.converter.use.latest.version”:“true”,
“log4j.logger.io.tabular.iceberg.connect”: “DEBUG”,
“log4j.logger.org.apache.iceberg”:“DEBUG”
}
}’

And my mysql debezium cdc setup looks like this:

curl --request POST
–url http://localhost:8083/connectors
–header ‘Content-Type: application/json’
–data ‘{
“name”: “realtimeAnalytics-src-mysql”,
“config”: {
“connect.keep.alive”: “false”,
“connector.class”: “io.debezium.connector.mysql.MySqlConnector”,
“database.allowPublicKeyRetrieval”: “true”,
“database.hostname”: “mysql”,
“database.include.list”: “inventory”,
“database.password”: “dbz”,
“database.port”: “3306”,
“database.server.id”: “184054”,
“database.user”: “debezium”,
“decimal.handling.mode”: “double”,
“enable.time.adjuster”: “false”,
“exactly.once.source.support”: “enabled”,
“gtid.source.filter.dml.events”: “false”,
“include.query”: “false”,
“include.schema.changes”: “true”,
“key.converter.json.schemas.enable”: “false”,
“key.converter.schemas.enable”: “false”,
“schema.history.internal.kafka.bootstrap.servers”: “redpanda:9092”,
“schema.history.internal.kafka.topic”: “schema-changes.realtimeAnalytics”,
“skipped.operations”: “none”,
“snapshot.mode”: “when_needed”,
“table.ignore.builtin”: “false”,
“tasks.max”: “1”,
“tombstones.on.delete”: “true”,
“topic.creation.default.partitions”: “1”,
“topic.creation.default.replication.factor”: “-1”,
“topic.creation.enable”: “true”,
“topic.prefix”: “realtimeAnalytics”,
“value.converter.json.schemas.enable”: “false”,
“value.converter.schemas.enable”: “false”,
“transforms”: “unwrap”,
“transforms.unwrap.type”: “io.debezium.transforms.ExtractNewRecordState”,
“transforms.unwrap.delete.tombstone.handling.mode”: “rewrite”,
“transforms.unwrap.add.fields”: “op,db,table,lsn,source.ts_ms”,
“transforms.unwrap.add.header”: “db”,
“exactly.once.support”: “requested”
}
}’

I faced the same issue but fixed the upsert error when partition is set using the property ‘iceberg.tables.default-partition-by’. But still data are not visible from dremio

Hi @balaji.ramaswamy Have you had time to go through this last week? This is turning out to be a major road block for what I am trying to implement. Thanks!!

@bardan At this point, it looks like some more work is needed on this front to support all scenarios

Ok. Looking forward to it… Any ETA?

Hi @bardan, I see that we have an internal engineering task to implement support for reading Iceberg tables with global equality deletes, but it is not scheduled for work. We’ll update Dremio Community and the docs when this is available.

@ben
Thanks for the information. Then, using spark looks like the only option to capture the data from redpanda(kafka) and then doing the upsert/delete operation to nessie catalog I guess… I am pursuing the use of spark to perform the upsert and working on building the logic to do this. Will share my findings and steps once I have completed…

Or is there any other alternative that you can suggest.

@bardan I don’t know of any workaround, unfortunately.