richardwilly98/elasticsearch-river-mongodb

Is it possible to index only documents which oplog transactions after certain datetime.

dittu opened this issue · 7 comments

Hi,

When indexing the river gets all the oplog entries for the mongodb collection and indexes them. I am wondering is there a way where we can query the oplog based on its 'ts' field to index or update the index for those operations filtered on the 'ts' field.

Thanks,

Aditya

Hi,

In the current release there is no way to do that by configuration.

The last timestamp is stored in the _river index ${es.river.name} type query the id ${mongo.db.name}.${mongo.collection.name}

For example:

curl -XGET localhost:9200/_river/river76/mydb76.mycollec76?pretty=true
{
  "_index" : "_river",
  "_type" : "river76",
  "_id" : "mydb76.mycollec76",
  "_version" : 1,
  "exists" : true, "_source" : {"mongodb":{"_last_ts":"{ \"$ts\" : 1373913931 ,
\"$inc\" : 1}"}}
}

So you could probably set this value before to create the river settings.

I could also probably add a parameter options/last_timestamp in the future if required.

Thanks,
Richard.

Hi,

Thank you for the quick response. To give you a quick idea of my set up I am using ElasticSearch with mongodb river on a .net application for all the operations on ElasticSearch I am using PlainElastic.Net as .net client.

When doing the step you mentioned about setting timestamp I am getting a JsonParserError because of the way the .net client executes the command but when I am using the curl command I am able to do it. The only way I could be able to implement it in my application would be to add that in the options. Could you please add the last_timestamp parameter in the options.

Thanks,

Aditya

Hi,

I will include this feature in the next release.
In the meantime can you please provide more details about the error you are getting?
I believe it should be also possible to set _last_ts from any ElasticSearch client.

Thanks,
Richard.

Hi,

It was a mistake from my end regarding setting _last_ts. I was not building the JSON in the proper format.
But setting _last_ts is not making any difference in my indexing process.

In my oplog the number of documents(inserted) for a collection called "queryreadyproducts" are 811499. I set the _last_ts from an operation which happened towards the end of the collection.

I am having the following problems:

  1. When I executed the steps below its not indexing at all.
  2. If I changed the id in step 5 from "brandviewdata.queryreadyproducts" to "_meta" its indexing everything from oplog not filtering on "_last_ts" parameter set.

Please go through the steps and let me know where I am making a mistake.

Here are the log of steps I followed from the beginning:

  1. HEAD http://localhost:9200/queryreadyproducts
  2. Updating index settings

PUT http://localhost:9200/queryreadyproducts
{ "index": { "analysis": { "filter": { "replacementfilter": { "type": "pattern_replace", "pattern": "'", "replacement": "" }, "stopfilter": { "type": "stop", "stopwords": [ "and", "or" ] } }, "analyzer": { "fulltext": { "type": "custom", "tokenizer": "standard", "filter": [ "replacementfilter", "stopfilter", "lowercase" ] } } } } }

  1. Put Mapping

PUT http://localhost:9200/queryreadyproducts/queryreadyproduct/_mapping
{ "queryreadyproduct": { "type": "object", "_all": { "enabled": false }, "dynamic": false, "properties": { "Name": { "type": "string", "analyzer": "fulltext", "index": "analyzed" }, "CustomerId": { "type": "string", "index": "not_analyzed" }, "ProductId": { "type": "string", "index": "not_analyzed" }, "CategoryId": { "type": "string", "index": "not_analyzed" }, "CustomHierarchyId": { "type": "string", "null_value": "", "index": "not_analyzed" }, "ContextualSKU": { "type": "string", "analyzer": "standard", "null_value": "" }, "Volume": { "type": "double", "null_value": "0", "index": "not_analyzed" }, "VolumeMeasureId": { "type": "string", "null_value": "", "index": "not_analyzed" }, "VolumeString": { "type": "string", "null_value": "", "analyzer": "standard" }, "BrandId": { "type": "string", "index": "not_analyzed" }, "ManufacturerId": { "type": "string", "index": "not_analyzed" }, "Branded": { "type": "boolean", "index": "not_analyzed" }, "SupplierId": { "type": "string", "null_value": "", "index": "not_analyzed" }, "ImageId": { "type": "string", "null_value": "", "index": "not_analyzed" }, "GTIN": { "type": "string", "null_value": "", "analyzer": "keyword" }, "CreatedOn": { "type": "date", "index": "not_analyzed" } } } }

  1. For setting _last_ts

PUT http://192.168.100.34:9200/_river/queryreadyproducts/brandviewdata.queryreadyproducts?pretty=true
{ "_index": "_river", "type": "queryreadyproducts", "_id": "brandviewdata.queryreadyproducts", "exists": true, "_source": { "mongodb": { "_last_ts": { "$ts": 1373647861, "$inc": 1 } } } }

  1. Creating river

PUT http://localhost:9200/_river/queryreadyproducts/brandviewdata.queryreadyproducts
{ "type": "mongodb", "mongodb": { "servers": [ { "host": "localhost", "port": "27017" } ], "db": "brandviewdata", "collection": "queryreadyproducts", "options": { "exclude_fields": [ "Attributes", "Mappings" ], "drop_collection": false } }, "index": { "name": "queryreadyproducts", "type": "queryreadyproduct" } }

Thanks,

Aditya A

Hi,

Please try:

curl XPUT http://192.168.100.34:9200/_river/queryreadyproducts/brandviewdata.queryreadyproducts 
{
 "mongodb": {
  "_last_ts": "{
   \"$ts\": 1373647861,
   \"$inc\": 1
  }"
 }
}

_last_ts is BSONTimestamp. It seems that a BSONTimestamp object serialized in Json should have this format.

It's a very useful feature! Thank you for implementing it! Could you document it as well?