Bulk Resolution Support
austince opened this issue ยท 13 comments
Similar to the ES bulk API, do you think it is feasible to add bulk resolution support to limit the number of network requests? It seems rare to want to resolve just a single entity if you're doing NER on a decently large piece of text.
We've released a potential implementation for this in our fork's 2.0.0 release, would love to get your feedback on the API + all!
@austince I agree, this would be a useful feature, and not an edge case either. Here are my thoughts on your linked fork.
Feedback
I like the API that you've implemented in your fork. The API matches the format of the Elasticsearch Bulk API, which is important for ease of use and adoption.
POST /_zentity/resolution/_bulk[?PARAMS]
{ PARAMS }
{ PAYLOAD }
...
POST /_zentity/resolution/ENTITY_TYPE/_bulk[?PARAMS]
{ PARAMS }
{ PAYLOAD }
...
I tested the API on the zentity sandbox. I was delighted to see that it worked, including in the Kibana Dev Tools Console. You've successfully implemented the NDJSON request format.
If I correctly understand the code for the bulk handler, you've implemented bulk requests using asynchronous requests with concurrency control, which was a great choice for performance.
So far my only critique is that when a validation error exists in one of the embedded requests, it fails the entire bulk request, when it should fail just that one request and set "errors": true
in the response. Error handling works as expected when the error occurs at query time, such as an invalid matcher clause.
Bulk operations more generally
Thinking out loud for a moment about bulk operations more generally in zentity. While this conversation is about bulk resolution, it makes sense to consider bulk operations more broadly so that an eventual implementation of bulk model management doesn't conflict or cause confusion with bulk resolution.
Having been working on a prototype user interface for managing entity models in zentity, I've decided it would be useful to include an endpoint for bulk model management, too.
POST /_zentity/models/_bulk[?PARAMS]
{ PARAMS }
{ PAYLOAD }
...
POST /_zentity/models/ENTITY_TYPE/_bulk[?PARAMS]
{ PARAMS }
{ PAYLOAD }
...
Looking at the syntax, I expect this would be a seamless additional feature, and wouldn't conflict or create confusion with your bulk resolution endpoint. I think we're good there.
Side effects and breaking changes
One small issue to account for is that the proposed endpoint creates a "special" word that an entity model would not be allowed to use.
Currently in zentity it is valid to create an entity model named "_bulk" and resolve it. Example:
POST /_zentity/models/_bulk
{ ... }
POST /_zentity/resolution/_bulk
{ ... }
With the new bulk endpoint, this would prevent a model named "_bulk" from being usable.
While I think it's unlikely that anyone would insist on requiring an entity model named "_bulk", the broader issue is that we should account for reserved words as we introduce endpoints that would override user-defined variables.
I do think your endpoint syntax should stay the way it is, because it matches the syntax of the Elasticsearch Bulk API. Instead I think we should impose a breaking that limits the acceptable format of entity model names.
Proposed change
Entity model names should follow the index name requirements in Elasticsearch. Among these requirements is that the name cannot start with an underscore (_
). This would prevent entity model names from conflicting with special endpoints such as _bulk.
This would create a breaking change (though likely easily fixable) for any users that have an entity model whose name does not meet these syntax requirements.
Conclusion
I'd love to pull this feature into the official branch. Let me know if you're interested.
If so, there are other considerations I'll want to address. I wouldn't want to perform a straight merge, because there are other features and implementations in your fork that I would prefer to introduce (or exclude) individually. Considering the bulk feature in isolation, I'll still want to take a closer look to assess whether it's coupled with your new query builder job implementation (vs. the current string-building job implementation). The queries I've run on your fork appear to be slower (~15-20%) and I suspect it's due to the need to serialize the objects from the builders to JSON. But I'm prepared to make a surgical merge if needed.
Hey @davemoore- , thanks so much for digging in and your thoughtful feedback. I'd definitely be interested in contributing a version back and totally understand your hesitation to include more of the other features in the fork. I think now that I understand how the jobs work, I can implement the bulk aspects without touching the Job class once we decide on a few of those details.
That's a really nice note about the validation errors, and easy enough to try implementing a fix. Also, do have any advice on streaming json from the body instead of parsing it as a whole, or about choosing threadpools (either a main one or maintaining a separate one) for the bulk concurrent jobs? Understanding those bits would help in implementing other _bulk
operations across the routes.
For the breaking changes around model names, I totally agree with the approach of introducing well-defined rules and the ES index rules look perfect.
Thanks for the find on the performance hit! Likely right that the query builders are contributing to it. Are you seeing this just from the queries in the zentity-sandbox?
advice on streaming json from the body instead of parsing it as a whole
Can you clarify this a bit? e.g. are you envisioning a persistent connection that continuously feeds input to zentity, which then lazily splits by newlines, runs the jobs, and feeds the responses back to the client?
advice on choosing threadpools for the bulk concurrent jobs
I hadn't considered making purposeful choices of threads before, relying instead on the NodeClient (which Elasticsearch gives its plugins) to use its threadpool however it chooses to. If I'm reading the Node class correctly, the same client and thread pool is passed to all plugins. If true, then it may be wise to use the given thread pool instead of creating new threads; it suggests to me that the node itself was intended to manage threads for the many native plugins that require them.
Are you seeing this just from the queries in the zentity-sandbox
Yes this was just a quick test: two sandboxes side-by-side, same data and resolution job, running each job many times to account for query caching, and observing the reported response times.
Can you clarify this a bit? e.g. are you envisioning a persistent connection that continuously feeds input to zentity, which then lazily splits by newlines, runs the jobs, and feeds the responses back to the client?
Yeah, I think something to that effect, or at least lazily splitting the newlines and running jobs. If we can't stream to the client and need to collect all the results first, I think that's fine. I'm mainly looking at RestHandler#supportsContentStream
and if it is possible to use that to natively stream NDJSON lines.
For the threadpools, that makes sense to me. I know the NodeClient gives you easy access to that ThreadPool
as well, so we can use one of those managed pools to run the jobs in the bulk
requests concurrently.
Now that 1.6.2 is out @davemoore- , you think it's time to try this one? I'm up for taking it, though will likely be slowish. My initial thoughts are to try to use a GroupedActionListener
to wait for all jobs to complete, though I foresee a few drawbacks with this approach:
- Need to know exactly how many actions are being waited for on creation, so we would need to deserialize and parse the entire body before kicking off any actions. This doesn't pose an immediate issue, but would make it more difficult to refactor into a streaming approach later.
- Probably more important, I'm not sure how we'd enforce concurrency limits in a straight-forward way. Even if we execute "batches" of grouped listeners, which are all waited for in one higher level listener, we'd then run into a non-optimal waiting situation as all the jobs in a "batch" would have to finish before kicking off the next.
Maybe these will get easier once we start digging in. Let me know what you think!
@austince Definitely, let's do it! No worries about going slow.
I agree with your thinking. A third challenge possibly is that the repeated use of async actions in one call could lead to a really large call stack. At least that's how I interpret what's happening in our use of async in the Job class, which recursively calls itself after each query. I imagine that this approach could lead to a stack overflow if applied to an eventual streaming implementation of bulk actions (and large single resolution jobs of transactional data).
Some more thoughts:
- Ideally the method performing bulk actions will be decoupled enough from the REST handlers that it could be used for both bulk resolution and bulk model management (a feature I'd like to include in one bulk-themed release). Not the end of the world if we have to duplicate some code, but it's something to aim for.
- It'd be worth checking how Elasticsearch performs bulk actions -- or better yet if there's a public bulk action method in the Elasticsearch Java API that's decoupled enough that we could simply call it. Maybe wishful thinking, but definitely worth checking because it could save us a lot of thinking.
Do you want to take the lead on this? While you do I'm happy to contribute and also work in parallel on a project that will benefit from this feature (zentity-ui; a little too early for an initial commit to a public repo, but I can let you know when it's out there if you're interested).
Sounds good, I'll let you know how I make out with it! Those are all good notes, hopefully someone's solved these upstream ๐ค๐ผ
I couldn't find any clear way that this is currently handled in Elasticsearch. It seems like Bulk is implemented at the Transport level, which buries it a little deep where I'm not sure ActionListeners are used. There's also a Client#bulk method, but that just delegates to the transport. Therefore, I've tried to implement some ActionListeners based on the GroupedActionLIsteners. I've pushed dcd266b, which is quite rough but shows my general approach. I don't think we'll have issues with the call stack, but concurrency is definitely something that will be tricky (we'll need to order the results better).
On another note, I've had some issues with the docker-compose testing setup, so I haven't been able to test this yet. It seems like the use of volumes makes it difficult to restart tests, and there is a little maybe race-condition in the Resolution IT test where the @BeforeClass hooks are both trying to start a cluster. But hoping to make progress there!
Anyway, let me know what you think about the general direction if you have time :)
Looks like the changes didn't break single-job resolution though ๐ https://github.com/austince/zentity/runs/1944061442
Nice find on GroupedActionListener
-- hopefully that helps us get to a reasonably simple solution. I might have used it if I'd known about it when refactoring the Job class (though it's working so I'll leave it as-is and keep it in mind if needed). Thanks for investigating. I like what I see so far in your draft. I'll leave some comments as you progress and after I get the 1.7.0 release out (aiming for today).
Good to know about the potential race condition in docker-compose. I don't think I've run into that behavior yet. Might be good to turn that into an issue if it keeps happening.
Yeah, I'm mostly satisfied with the solution using a GroupedActionListener
, though the lack of result ordering has forced me to customize it a bit so we can have deterministic result ordering. I'll convert it from a "draft PR" once the tests are passing and I'm happy enough -- good luck on the 1.7.0 release! Falling a bit into callback hell ๐ฌ .
For the docker-compose issue, I've seen it cause timeouts a few times in CI as well: https://github.com/zentity-io/zentity/runs/1947271843#step:5:707
Actually, they all just passed and it seems to be in a good spot for review ๐ let me know if there's anything I can do to help out in the meantime!