5fac67a4-d272-4f93-858d-28948ce785be.zip (17.7 KB)
@balaji.ramaswamy Here is the attached query profile downloaded from dremio’s UI. I am using Dremio oss 25.2.0 version.
Just so that you get a context… What I am doing is I am capturing CDC from mysql using debezium and pushing the data to redpanda(Kafka). Then I am using tabular’s iceberg sink connector to push it to Nessie Catalog. I am running dremio, nessie, postgres, redpanda, minio and mysql all on a docker container in my local laptop.
I have setup iceberg sink connector like below in the connector:
curl --request POST
–url http://localhost:8083/connectors
–header ‘Content-Type: application/json’
–data ‘{
“name”: “realtimeAnalytics-Nessie-Iceberg”,
“config”: {
“connector.class”: “io.tabular.iceberg.connect.IcebergSinkConnector”,
“tasks.max”: “2”,
“errors.logs.enabled”: “true”,
“errors.logs.include.messages”: “true”,
“topics.regex”: “realtimeAnalytics.(.*)”,
“iceberg.tables.dynamic-enabled”: “true”,
“iceberg.tables.route-field”: “__table”,
“iceberg.tables.cdc_field”: “__op”,
“iceberg.tables.auto-create-enabled”: “true”,
“iceberg.tables.default-id-columns”: “id”,
“iceberg.tables.upsert-mode-enabled”: “true”,
“iceberg.tales.default-partition-by”: “__source_ts_ms”,
“iceberg.control.commit.interval-ms”: “60000”,
“iceberg.tables.auto-create-props.gc.enabled”: “false”,
“iceberg.tables.auto-create-props.write.metadata.delete-after-commit.enabled”: “false”,
“iceberg.tables.auto-create-props.write.update.mode”: “copy-on-write”,
“iceberg.tables.auto-create-props.write.delete.mode”: “copy-on-write”,
“iceberg.tables.auto-create-props.write.merge.mode”: “copy-on-write”,
“iceberg.tables.evolve-schema-enabled”: “true”,
“iceberg.catalog.authentication.type”: “NONE”,
“iceberg.catalog.client.region”: “us-east-1”,
“iceberg.catalog.catalog-impl”: “org.apache.iceberg.nessie.NessieCatalog”,
“iceberg.catalog.uri”: “http://nessie:19120/api/v2”,
“iceberg.catalog.ref”: “main”,
“iceberg.catalog.default-namespace”: “pyiceberg_demo”,
“iceberg.catalog.warehouse”: “s3a://warehouse/NessieData”,
“iceberg.catalog.s3.endpoint”: “http://minio:9000”,
“iceberg.catalog.io-impl”: “org.apache.iceberg.aws.s3.S3FileIO”,
“iceberg.catalog.s3.path-style-access”: “true”,
“key.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“key.converter.schema.registry.url”:“http://redpanda:8081”,
“key.converter.schemas.enable”:“false”,
“key.converter.use.latest.version”:“true”,
“value.converter”:“io.confluent.connect.avro.AvroConverter”,
“value.converter.schema.registry.url”:“http://redpanda:8081”,
“value.converter.schemas.enable”:“false”,
“value.converter.use.latest.version”:“true”,
“log4j.logger.io.tabular.iceberg.connect”: “DEBUG”,
“log4j.logger.org.apache.iceberg”:“DEBUG”
}
}’
And my mysql debezium cdc setup looks like this:
curl --request POST
–url http://localhost:8083/connectors
–header ‘Content-Type: application/json’
–data ‘{
“name”: “realtimeAnalytics-src-mysql”,
“config”: {
“connect.keep.alive”: “false”,
“connector.class”: “io.debezium.connector.mysql.MySqlConnector”,
“database.allowPublicKeyRetrieval”: “true”,
“database.hostname”: “mysql”,
“database.include.list”: “inventory”,
“database.password”: “dbz”,
“database.port”: “3306”,
“database.server.id”: “184054”,
“database.user”: “debezium”,
“decimal.handling.mode”: “double”,
“enable.time.adjuster”: “false”,
“exactly.once.source.support”: “enabled”,
“gtid.source.filter.dml.events”: “false”,
“include.query”: “false”,
“include.schema.changes”: “true”,
“key.converter.json.schemas.enable”: “false”,
“key.converter.schemas.enable”: “false”,
“schema.history.internal.kafka.bootstrap.servers”: “redpanda:9092”,
“schema.history.internal.kafka.topic”: “schema-changes.realtimeAnalytics”,
“skipped.operations”: “none”,
“snapshot.mode”: “when_needed”,
“table.ignore.builtin”: “false”,
“tasks.max”: “1”,
“tombstones.on.delete”: “true”,
“topic.creation.default.partitions”: “1”,
“topic.creation.default.replication.factor”: “-1”,
“topic.creation.enable”: “true”,
“topic.prefix”: “realtimeAnalytics”,
“value.converter.json.schemas.enable”: “false”,
“value.converter.schemas.enable”: “false”,
“transforms”: “unwrap”,
“transforms.unwrap.type”: “io.debezium.transforms.ExtractNewRecordState”,
“transforms.unwrap.delete.tombstone.handling.mode”: “rewrite”,
“transforms.unwrap.add.fields”: “op,db,table,lsn,source.ts_ms”,
“transforms.unwrap.add.header”: “db”,
“exactly.once.support”: “requested”
}
}’