meteorhacks/meteor-aggregate

Publish function returned an array of non-Cursors

Opened this issue ยท 22 comments

Hello,

I' am returning an aggregation on meteor publish and i receiving the following error:

Exception from sub TySCANsTYoXHQCKyq Error: Publish function returned an array of non-Cursors

How do i enable that the aggregate returns an array of cursors?

You can't do that.
Use a Meteor method instead.

But what about reactivity? Could you give me an example please how do i create that with meteor method server/client side?

You can't do reactive aggregations. If needed, call the meteor method again and again time to time.

Correct me if I'am wrong but the aggregate returns only one document with thousand entries of my collection. How do i handle that on the client side with one document? It sound like I'am losing what meteor makes so powerful. I'am thinking about iterating over the result from the array with cursors and merge it with the aggregate result-set. What do you think?

I'am using the $geoNear aggregate to receive the distance. That all what i need.

Found a workaround here http://ask.ttwait.com/que/26861070 which is working, but i think i'll become some performance issues.

var self = this;
  // This works for Meteor 1.0
  var db = MongoInternals.defaultRemoteCollectionDriver().mongo.db;

  var posts = db.collection("mycollection").aggregate(pipeline,// Need to wrap the callback so it gets called in a Fiber.
    Meteor.bindEnvironment(
      function(err, result) {
        // Add each of the results to the subscription.
        _.each(result, function(e) {
          self.added("mycollection", e._id, e);
        });
        self.ready();
      },
      function(error) {
        Meteor._debug( "Error doing aggregation: " + error);
      }
    ));

You may need to call this.added in your publication. Josh explains it well in this article

I'm also trying to use $geoNear to return a collection sorted by distance. I need to observe added/removed messaged and am unsure how to do this with aggregation.

mrn3 commented

@arunoda , you said to mimic reactivity, "If needed, call the meteor method again and again time to time."

How do you do this? Does the helper have a setTimeout or something like that so it keeps calling publication? Some sample code would help. Thanks.

full disclosure: I'm a newbie. But I might have an answer:

Template.myTemplate.rendered = function() {
 Meteor.setInterval(function () {
//do your stuff
  }, 500)
};

Someone competent should check it first.

Results of collection.find() can be used everywhere: a publish function, server methods and so on. It is expected that collection.aggregate() would have same functionality.

Also cursor.observeChanges() could be used to add reactivity to aggregations. Though it is not trivial maybe to build code that would work work any collection.

I'm getting the same error and I can't understand how to make this work if I can't use on client side or make a publish. Someone can give an example?

This is my server-side (just an example to check if aggregate works)

Meteor.publish 'metrics-invoices', ->
  Invoices.aggregate([
      {
        '$match': {'Status': 1}
      }
    ])

Getting metrics-invoices id G26qqg5pLhrn5JgFz TypeError

What should I do to return this to client-side?

I ended up just running the find() using the ids retreived with aggregate. It's definitely not the best solution but for the time being it would do it:

// I just want to retreive the nearest child per each parent

Meteor.publish('findNearestChildren', function(opts) {

    childIds = Children.aggregate([{
        $geoNear: {
            near: {type: 'Point', coordinates: [Number(opts.lng), Number(opts.lat)]},
            distanceField: 'distance',
            maxDistance: 5000,
            spherical: true,
            sort: -1
        }
    }, {
        $group: {
            _id: "$parentId",
            childId: { $first: "$_id" }
        }
    }]).map(function(child) { return child.childId });

    return Children.find({_id: {$in: childIds}})
});

@cellulosa, in your example you loose the "distanceField" that would normally be added to the documents by the aggrigate correct? If this is the case, and you don't need the "distanceField", it's cleaner and probably more efficient to just use the $near operator in a standard find query and not use aggregate at all.

@danielparas Yep the goal was then to rebuild an array also with distanceField, because I actually need it - but I still didn't find a way. Is is resolved in meteor 1.2?

For me, comming back here after get this done a few months ago. Simply create a Meteor.method on server-side and Meteor.call it on client-side. Very simple ๐Ÿ‘

Hi @danielparas, but then you are publishing the entire collection and doing the filtering with the method? Or are you pushing back with the method only the relevant results? Could you please provide a working example?

Hey @cellulosa what I meant in my original comment is that since you're not getting the distance field might as well go for a non-aggrigate implementation as follows. (location is the field that contains the geojson point for the child document.)

Meteor.publish('findNearestChildren', function(opts) {
    selector =    {
      location:
        { $near :
          {
            $geometry: { type: "Point",  coordinates: [Number(opts.lng), Number(opts.lat)] },
            $minDistance: 1000,
            $maxDistance: 5000
          }
        }
      }

    projections = {}

    return Children.find(selector, projections)
});

But as you correctly said, this will return the full collection - did not notice the $group segment of you pipeline when writing my original comment! I guess using an aggrigate is the only way since you need to group.

What if you tried something on these lines? Should give you the distance returned by the geoNear (Disclaimer - I did not test it out! - just an idea for you to experiment with)

Meteor.publish('findNearestChildren', function(opts) {

    this = sub

    result = Children.aggregate([{
        $geoNear: {
            near: {type: 'Point', coordinates: [Number(opts.lng), Number(opts.lat)]},
            distanceField: 'distance',
            maxDistance: 5000,
            spherical: true,
            sort: -1
        }
    }, {
        $group: {
            _id: "$parentId",
            children: { $push: "$$ROOT" }
        }
    }]);

    _.each(result, function(parent){
      _.each(parent.children, function(child){
        sub.added('children', child._id, child)
      });
    });

    sub.ready()

});

Another way could be this. (working for me atm)

Meteor.publish('findNearestChildren', function(opts) {
  let initializing = 1, run = (action) => {
    // Define our aggregation pipeline ( aggregate(pipeline) )
    Children.aggregate([
      {
        $geoNear: {
          near: {type: 'Point', coordinates: [Number(opts.lng), Number(opts.lat)]},
          distanceField: 'distance',
          maxDistance: 5000,
          spherical: true,
          sort: -1
        }
      }, 
      {
        $group: {
          _id: "$parentId",
          children: { $push: "$$ROOT" }
        }
      }
    ]).forEach((e) => {
      this[action]('offers-price', e._id, e)
      this.ready()
    })
  }
  // Run the aggregation initially to add some data to our aggregation collection
  run('added')
    // Track any changes on the collection we are going to use for aggregation
  let handle = Children.find({}).observeChanges({
    added(id) {
      // observeChanges only returns after the initial `added` callbacks
      // have run. Until then, we don't want to send a lot of
      // `self.changed()` messages - hence tracking the
      // `initializing` state.
      if ( initializing && initializing-- )
        run('changed')
    },
    removed(id) {
      run('changed')
    },
    changed(id) {
      run('changed')
    },
    error(err) {
      throw new Meteor.Error('Uh oh! something went wrong!', err.message)
    }
  })
  // Stop observing the cursor when client unsubs.
  // Stopping a subscription automatically takes
  // care of sending the client any removed messages.
  this.onStop(function () {
    handle.stop();
  })
})

This observes changes and if necessary reruns the aggregation.