Query pushdown on elasticsearch

I have an index in elasticsearch index which contains two fields called “header” and “control”. Both of these contain nested JSON structures. I’ve pasted a sample below and full example is attached.

master.systemx.trade.eod.control.zip (1.2 KB)

I’m running a query in Dremio like the below:

SELECT header, data.header.batchId AS batchId, control, “_index”, “_type”, “_uid”, “_id”
FROM ES.“master.systemx.trade.eod.control”.data AS data where data.header.batchId = ‘1517350156032’

When I profile the query which dremio pushes down to elasticsearch I see that dremio does a “select all” on the index whereas one would expect an ES query which filters on the “data.header.batchId”.

ElasticScan(table=[[ES, master.systemx.trade.eod.control, data]], resource=[master.systemx.trade.eod.control/data], columns=[[header, control, _index, _type, _uid, _id]], pushdown
=[{
“from” : 0,
“size” : 1000,
“query” : {
“match_all” : {
“boost” : 1.0
}
}

For indexes which are very large this simple query takes very long as dremio pulls all the data from the index in whereas the running a query using lucene directly on elasticsearch it is instantaneous.

eg: http://elastic.mydomain.com:9200/master.systemx.trade.eod.control/_search?pretty&q=header.batchId:1517350156032

I know creating a reflection on the index definitely will help but is there a way for Dremio to push down the query to elasticsearch which would include the filter on the field which is part of a JSON structure or is this a limitation?

Sample from elasticsearch:

  {
    "_index" : "master.systemx.trade.eod.control",
    "_type" : "data",
    "_id" : "AWFJIXuEBISOowAUEIxL",
    "_score" : 1.0,
    "_source" : {
      "header" : {
        "messageId" : "systemx.ctrl.1517350386841",
        "batchId" : "1517350156032",
        "sourceSystem" : "systemx",
        "secondarySourceSystem" : null,
        "sourceSystemCreationTimestamp" : "2018-01-30T22:13:06.841Z",
        "sentBy" : "systemx",
        "sentTo" : "MYSYSTEM",
        "messageType" : "Control",
        "schemaVersion" : "0.4.16-SNAPSHOT",
        "processing" : "Batch"
      },
      "control" : {
        "action" : "End",
        "subject" : "EOD",
        "eodDate" : "2018-01-30",
        "details" : "Trade Data Batch End",
        "batchSizeIntended" : 33850,
        "batchSizeSent" : 33850
      }
    }
  },
  {
    "_index" : "master.systemx.trade.eod.control",
    "_type" : "data",
    "_id" : "AWJntEJZcqKwS7XmuU_E",
    "_score" : 1.0,
    "_source" : {
      "header" : {
        "messageId" : "systemx.ctrl.1522158289293",
        "batchId" : "1522158098381",
        "sourceSystem" : "systemx",
        "secondarySourceSystem" : null,
        "sourceSystemCreationTimestamp" : "2018-03-27T13:44:49.293Z",
        "sentBy" : "systemx",
        "sentTo" : "MYSYSTEM",
        "messageType" : "Control",
        "schemaVersion" : "0.4.21",
        "processing" : "Batch"
      },
      "control" : {
        "action" : "End",
        "subject" : "EOD",
        "eodDate" : "2018-03-23",
        "details" : "Trade Data Batch End",
        "batchSizeIntended" : 34525,
        "batchSizeSent" : 34525
      }
    }
  },
  ...

Hi @igreg

Can you please send us the profile for this job?

Share a Query Profile

Thanks
@balaji.ramaswamy

Hi @balaji.ramaswamy

Please see the profile attached.

eb1e1ec2-adfd-4c2f-9ee3-dc7699766b5a.zip (8.9 KB)

I found that if I check the below option under Advanced Options in the Elasticsearch source configuration it the query runs reasonably quickly and I can see a proper elasticsearch query pushed down:

“Use index/doc fields when pushing down aggregates and filters on analyzed and normalized fields (may produce unexpected results)”

Is this the correct setting and what does the “(may produce unexpected results)” mean?

ElasticScan(table=[[ES, master.systemx.trade.eod.control, data]], resource=[master.systemx.trade.eod.control/data], columns=[[ header , control , _index , _type , _uid , _id ]], pushdown
=[{
“from” : 0,
“size” : 500,
“query” : {
“match” : {
“header.batchId” : {
“query” : “1517350156032”,
“operator” : “OR”,
“prefix_length” : 0,
“max_expansions” : 50000,
“fuzzy_transpositions” : false,
“lenient” : false,
“zero_terms_query” : “NONE”,
“boost” : 1.0
}
}
}

Hi @igreg

Somehow we are not able read the profile you have uploaded. Can we try again?

Do you have the mapping for this index?

Hi @balaji.ramaswamy

Here is the profile.
eb1e1ec2-adfd-4c2f-9ee3-dc7699766b5a.zip (8.9 KB)

Thanks @igreg

Any chance you have the mapping for ES index “master.systemx.trade.eod.control”.data

Want know more details about the column data.header.batchId?

Thanks
@balaji.ramaswamy

Hi @balaji.ramaswamy

Here is the mapping for the ES index:

{
“master.systemx.trade.eod.control” : {
“mappings” : {
“data” : {
“dynamic_date_formats” : [
“yyyy-MM-dd”,
“yyyy-MM-dd’T’HH:mm:ss.SSSZZ”
],
“properties” : {
“control” : {
“properties” : {
“action” : {
“type” : “text”,
“fields” : {
“keyword” : {
“type” : “keyword”,
“ignore_above” : 256
}
}
},
“batchSizeIntended” : {
“type” : “long”
},
“batchSizeSent” : {
“type” : “long”
},
“details” : {
“type” : “text”,
“fields” : {
“keyword” : {
“type” : “keyword”,
“ignore_above” : 256
}
}
},
“eodDate” : {
“type” : “date”,
“format” : “yyyy-MM-dd”
},
“subject” : {
“type” : “text”,
“fields” : {
“keyword” : {
“type” : “keyword”,
“ignore_above” : 256
}
}
}
}
},
“header” : {
“properties” : {
“batchId” : {
“type” : “text”,
“fields” : {
“keyword” : {
“type” : “keyword”,
“ignore_above” : 256
}
}
},
“messageId” : {
“type” : “text”,
“fields” : {
“keyword” : {
“type” : “keyword”,
“ignore_above” : 256
}
}
},
“messageType” : {
“type” : “text”,
“fields” : {
“keyword” : {
“type” : “keyword”,
“ignore_above” : 256
}
}
},
“processing” : {
“type” : “text”,
“fields” : {
“keyword” : {
“type” : “keyword”,
“ignore_above” : 256
}
}
},
“schemaVersion” : {
“type” : “text”,
“fields” : {
“keyword” : {
“type” : “keyword”,
“ignore_above” : 256
}
}
},
“sentBy” : {
“type” : “text”,
“fields” : {
“keyword” : {
“type” : “keyword”,
“ignore_above” : 256
}
}
},
“sentTo” : {
“type” : “text”,
“fields” : {
“keyword” : {
“type” : “keyword”,
“ignore_above” : 256
}
}
},
“sourceSystem” : {
“type” : “text”,
“fields” : {
“keyword” : {
“type” : “keyword”,
“ignore_above” : 256
}
}
},
“sourceSystemCreationTimestamp” : {
“type” : “date”,
“format” : “yyyy-MM-dd’T’HH:mm:ss.SSSZZ”
}
}
}
}
}
}
}
}

Hi @balaji.ramaswamy

Just wondering if you were able to check? I’m curious whether I need the below setting checked for elasticsearch source and when can it produce unexpected results?

Use index/doc fields when pushing down aggregates and filters on analyzed and normalized fields (may produce unexpected results)

HI @igreg

Sorry for the delay, I was wondering if you can create the index in Kibana and copy paste the exact command and also click on the wrench icon on Kibana, click on “Copy as CURL” and give me the exact curl command

Thanks
@balaji.ramaswamy

hi @balaji.ramaswamy

Here is the create index command:

PUT master.systemx.trade.eod.control
{
“settings” : {
“number_of_shards” : 1
},
“mappings” : {
“data”: {
“dynamic_date_formats”: [
“yyyy-MM-dd”,
“yyyy-MM-dd’T’HH:mm:ss.SSSZZ”
],
“properties”: {
“control”: {
“properties”: {
“action”: {
“type”: “text”,
“fields”: {
“keyword”: {
“type”: “keyword”,
“ignore_above”: 256
}
}
},
“batchSizeIntended”: {
“type”: “long”
},
“batchSizeSent”: {
“type”: “long”
},
“details”: {
“type”: “text”,
“fields”: {
“keyword”: {
“type”: “keyword”,
“ignore_above”: 256
}
}
},
“eodDate”: {
“type”: “date”,
“format”: “yyyy-MM-dd”
},
“subject”: {
“type”: “text”,
“fields”: {
“keyword”: {
“type”: “keyword”,
“ignore_above”: 256
}
}
}
}
},
“header”: {
“properties”: {
“batchId”: {
“type”: “text”,
“fields”: {
“keyword”: {
“type”: “keyword”,
“ignore_above”: 256
}
}
},
“messageId”: {
“type”: “text”,
“fields”: {
“keyword”: {
“type”: “keyword”,
“ignore_above”: 256
}
}
},
“messageType”: {
“type”: “text”,
“fields”: {
“keyword”: {
“type”: “keyword”,
“ignore_above”: 256
}
}
},
“processing”: {
“type”: “text”,
“fields”: {
“keyword”: {
“type”: “keyword”,
“ignore_above”: 256
}
}
},
“schemaVersion”: {
“type”: “text”,
“fields”: {
“keyword”: {
“type”: “keyword”,
“ignore_above”: 256
}
}
},
“sentBy”: {
“type”: “text”,
“fields”: {
“keyword”: {
“type”: “keyword”,
“ignore_above”: 256
}
}
},
“sentTo”: {
“type”: “text”,
“fields”: {
“keyword”: {
“type”: “keyword”,
“ignore_above”: 256
}
}
},
“sourceSystem”: {
“type”: “text”,
“fields”: {
“keyword”: {
“type”: “keyword”,
“ignore_above”: 256
}
}
},
“sourceSystemCreationTimestamp”: {
“type”: “date”,
“format”: “yyyy-MM-dd’T’HH:mm:ss.SSSZZ”
}
}
}
}
}
}
}

Hi @balaji.ramaswamy

We are running through a Prototype with Dremio and are still facing a similar issue with Dremio 3.3.1 with pushdowns to elasticsearch. Seems dremio does a “match-all” on almost any query. Can you please help take a look?

Hi @igreg

Can you please send me the profile from 3.3.1?