Batching
itolosa opened this issue ยท 18 comments
Hi there,
I've been working with this library for a while, and I'm using it to query an endpoint that is capable of receive multiple queries at once. The API returns responses in the same order as requested.
I've found this feature is implemented by other libraries under the name of Batching. And I've successfully implemented this in Python too. My question is, have you considered this already, are you accepting PRs for this?
I'd be glad to contribute
Something like transport-level batching? Yes, that could be useful.
- Do you know a public GraphQL server configured for this that we could use for tests?
- On which transport do you plan to implement this?
You would probably need to create execute_batch
, execute_batch_sync
, execute_batch_async
methods in client.py which have the same arguments as the execute
methods but taking in argument a list of documents
, variable_values
and operation_names
instead of a single element and returning a list of answers.
Any help is greatly appreciated. Please check the CONTRIBUTING.md file. The PR needs to have 100% test code coverage to be accepted.
Do you know a public GraphQL server configured for this that we could use for tests?
I have one from a crypto exchange. We just need to generate a token but's pretty straightforward, I've managed to obtain one using a temporary email.
On which transport do you plan to implement this?
I'd create another transport that derives from the http / requests. In my case BatchingTransport
Actually, I have a working prototype of this on another public repo with 100% coverage, using the github ci tool. I've used github secrets to store the api token.
I've used threads and futures to implement this, but we could do better than that. As a prototype though it's ok.
You have to use the execution result instead of dict to keep an open communication within the transport processor.
Check out this:
https://github.com/itolosa/orionx-api-client/blob/master/orionx_api_client/transports/batch.py
The client should care to not wait for the data from the execution result, So part of the validation should be split to outside of it, as a helper function. In my case called as_completed
I'm sorry, I've realized I haven't explained myself enough
My implementation uses the same client you have, it receives a single document each time, but the clever thing is that if you execute it many times in a row on a short period of time, then a little delay in the transport code allows to pack the documents together before sending the entire batch to the server. So every time you receive an execution, the transport is waiting in another thread for extra documents in a queue. After this short waiting, the batcher start processing, and it repeats the cycle. Every result is sent to the client using an special ExecutionResult that has a future injected though the constructor in order to achieve lazy loading, so it fetches and wait for the result of the future if and only if you get a value from it. The transport returns each one of the responses in the list received from the server using the future: .set_result or .set_exception
I hope I've explained myself enough, if not let me know, I'm glad to contribute
I see.
So you want to mix two different things:
- batching (to merge multiple queries in a single one)
- lazy loading (to avoid needing to send a query if the result is not used)
It's too bad you want to do that with a sync transport as that would have been much more straightforward with async...
Now, if I understand correctly, in your batch example, it does not actually help to wait using a timer as you have to first provide all the queries, and then you run as_completed
to wait for the answers after the timer expired. But in that case, the user knows all the queries he want to do, as the code is sync, so instead if he could simply provide all the queries and run an execute_batch
method manually, then in that case that would be better for him (the two queries would be sent in a single batch query, but without needing to wait).
So that code with the timer only starts to be useful if the queries are provided from different threads, in that case you don't know when multiple different queries from different threads might be needed in the same time frame, and waiting might in some cases be useful. For that use case, we could call future.result()
inside the execute method in order not to return a Future, the downside being that you need to use different threads to make different queries.
The reason I not really sure about sending a Future from the transport is that if you check the client.py file of gql, you'll see we can already return a Dict or an ExecutionResult from the execute methods and for that we have ugly @overload
all over the place, it's getting quite complicated.
In any case we need to separate the concerns, adding only an execute_batch
method in the current transports to send a batch request directly, and managing the batching should be done in the client.py
, to easily be able to add batching functionalities to any transport.
We could split this in multiple PRs:
- a first PR with manual batching implemented for
RequestsHTTPTransport
andexecute_batch
methods inclient.py
- a second PR with the queue and the timer implemented in
client.py
, allowing to execute queries in batch by specifyingbatch=True
in an execute method
Note: I don't understand how your lazy loading could work. In your code the query will be sent after the timer except if the user canceled the future before. But if the user is simply receiving the future and not bothering to get the data, then the future is not canceled and the query will be sent to the server. Even if future.result()
is not called the query should have been sent.
Right! I agree with your concerns. Better if we split all the different variations of transports. In this case what I call "lazy-load" is actually not the official meaning per se. What it actually means is that the result is not yet fetched at the exact moment you asked it, but it will be in the future, so it's more like a "promise". In my case the requests are never getting cancelled; what the delay does, is to wait for couple of milliseconds so a bunch of documents are enqueued by the client and gathered by the transport so they could be sent at once to the server, it's a useful trick that I believe I've seen it on other implementations of graphql clients. But I agree with you 100% on that this kind of tricks are not optimal. My decision to do this is because it's more transparent to the user to simply use the client as usual and allow them to simply set batching=True
parameter in the client. In that case the usage will be the same from the outside and the result of every execute will be an ExecutionResult (or actually a delayed ExecutionResult) that fills its values from the internal future stored in itself when a property is accessed or when its asked to do so explicitly by using the as_completed
utility. In this last case when the future is asked for the data it'll block the flow of the program until the transport has filled the data.
I think it's a good idea to start with the sync batch first that receives a collection of documents and returns another collection with the results. I'll open a PR for it.
Thanks!
I think it's a good idea to start with the sync batch first that receives a collection of documents and returns another collection with the results. I'll open a PR for it.
Alright. To be clear this should update the existing RequestsHTTPTransport and not create another transport.
Yes, the design would be to add a Transport#execute_batch
and a Client#execute_batch
. In the beginning just to the sync transport, raising a non-implemented error for the rest.
A thing that I didn't understand was what did you mean with?:
It's too bad you want to do that with a sync transport as that would have been much more straightforward with async...
Is it applicable in this case? I think you mean that instead of using threads and futures, a simple async/promise would have been more straightforward, right?
Yes, the design would be to add a
Transport#execute_batch
and aClient#execute_batch
. In the beginning just to the sync transport, raising a non-implemented error for the rest.
๐
A thing that I didn't understand was what did you mean with?:
It's too bad you want to do that with a sync transport as that would have been much more straightforward with async...
Is it applicable in this case? I think you mean that instead of using threads and futures, a simple async/promise would have been more straightforward, right?
Right
Right
Agree, actually I wrote this code about six years ago and I didn't know about async ๐
I'm struggling a little bit about the signature of the .execute_batch
method since the .execute
receives a single document
, this new method would receive an iterable of them, but the same happens for variable_values
, and operation_name
, and I see a couple of solutions here:
- Receive iterables for every param:
documents=[<document>,...], variable_values=[<variable_name>,...], operation_names=[<operation_name>,...]
- Receive an iterable of a data-structure that contains those 3 values:
queries=[<document | variable_values | operation_name>, ...]
Biggest problem with solution 1, is that it seems very dangerous to me, since the user may end up mixing the order of the params and send the a query in the wrong corresponding order between its elements <document,variables,operation>
Solution 2 seems better but it implies the user has to build a Query
data-structure or an ugly list that contains all the sub-queries, and it seems very inconsistent with the execute
method.
If I have to choose, I'd go with the Query
option. And execute_batch
would receive a queries=Iterable[Query]
Do you have any idea about it? Thanks
Also, I believe some queries don't need to receive a variable nor operation name. So if we decide to receive an iterable for every parameter, we'll end up with some None
in between.
transport.execute_batch(
documents=[doc1, doc2, doc3],
variable_values=[None, variables2, None],
operation_names=["operation1", None, "operation3"],
)
With a datastructure instead:
transport.execute_batch(
queries=[
GraphQLQuery(document=doc1, operation_name="operation1")
GraphQLQuery(document=doc2, variable_values=variables2)
GraphQLQuery(document=doc3, operation_name="operation3"),
],
)
Yes, using a dataclass seems to be a much better option indeed.
but I would use something like Operation
instead of GraphQLQuery
to better correspond to GraphQL terminology.
The spec says: Request
Therefore:
@dataclass(frozen=True)
class GQLRequest:
document: DocumentNode
variable_values: Optional[Dict[str, Any]] = None
operation_name: Optional[str] = None
GQLRequest
instead of Request
to avoid conflicts with requests library
I've opened a PR. Draft for now, I need to adjust the docstrings. Please let me know if you have any thoughts on anything. Thanks
@itolosa I made the PR #436 to implement the auto batching with sync transports, with the inspiration of your code.
Could you please take a look?
You can test it with a max batch of 3 on the countries backend with the following command:
pytest tests/test_requests_batch.py::test_requests_sync_batch_auto --run-online -s
I looked your code and it seems ok. I can see that your implementation requires that the user has to send the executions using threads. While this is correct at the implementation level, and also has the advantage of not breaking the interface of the client, I believe there's an alternative way to improve this a little bit. I mean, if we're already using a thread and a queue in the client (therefore is a thread safe implementation), why not remove the waiting for the future result inside the client:
request = GraphQLRequest(
document,
variable_values=variable_values,
operation_name=operation_name,
)
future_result = self._execute_future(request)
result = future_result.result() # <<< remove this. client.py:848
and find a way to allow the user to call execute in this way:
session.execute(query1)
session.execute(query2)
...
internally making a single batch request with those queries:
# >>> to the server
[
query1,
query2
]
I know that the real issue is thatClient#execute
has Union[Dict[str, Any], ExecutionResult]
as the return type. But what if we create some other classes that act like those objects when instantiated but receiving the future in the constructor, so thet only call future_result.result()
when it's needed?
What do you think about it?
Let's talk about use cases.
Use case 1: automatic batching
What is the main use for automatically batching GraphQL requests together?
In javascript, for a modern web page, you could have a multitude of components making GraphQL requests independently at the load of a page. That could be a bit inefficient so waiting a bit for all those queries to be made and sending them in a single request could be better.
This is how apollo-link-batch-http (the main library for batching requests I think) operates, allowing the developers of the components not to care about it and not changing the components, not changing the methods used to send the requests.
Note that in this case:
- this is only possible because all those components are running concurrently
- the wait is necessary as apollo-link-batch-http does not have control and does not know exactly when a request would be made
That approach is how it is currently implemented in the PR #436 introducing automatic batching, with the introduction of the batch_interval
and batch_max
parameters to the Client, but it makes sense only if we receive requests concurrently, so from different threads in the sync mode.
If we don't receive requests concurrently, meaning from a single thread, then it does not make any sense to wait if we want to combine multiple requests in a batch!
In that case we know exactly which are the requests to be sent in the batch and it would make more sense to let the developer manually decide which requests are to be combined.
This brings us to the next use case.
Use case 2: manual batching
For that use case, we are in a single thread and we want to send multiple GraphQL requests.
This would be the case for most of the Python code in my opinion.
Here we know exactly all the requests which are supposed to be sent as we have the control of the flow of execution.
So in this case you can make a list of requests manually and use the execute_batch
method.
We could also use the automatic batching feature, and getting futures from multiple requests as you proposed, and waiting the batch_interval
, but that would not make sense as we would be waiting for nothing as we already know all the requests which are supposed to be in the batch.
Current state
In fact, what you want to do is already possible by calling session._execute_future
directly and getting ExecutionResult
instances.
I just made two new tests to demonstrate:
with client as session:
request_eu = GraphQLRequest(query, variable_values={"continent_code": "EU"})
future_result_eu = session._execute_future(request_eu)
request_af = GraphQLRequest(query, variable_values={"continent_code": "AF"})
future_result_af = session._execute_future(request_af)
result_eu = future_result_eu.result().data
result_af = future_result_af.result().data
assert result_eu["continent"]["name"] == "Europe"
assert result_af["continent"]["name"] == "Africa"
and the equivalent code with execute_batch
, without waiting:
with client as session:
request_eu = GraphQLRequest(query, variable_values={"continent_code": "EU"})
request_af = GraphQLRequest(query, variable_values={"continent_code": "AF"})
result_eu, result_af = session.execute_batch([request_eu, request_af])
assert result_eu["continent"]["name"] == "Europe"
assert result_af["continent"]["name"] == "Africa"
Conclusion
Either we have the control of the flow of execution and we should use execute_batch
manually without a wait period, or we don't because the requests are coming from different threads and we should use the automatic batching functionality, waiting for batch_interval
for new requests when a new request arrives.
I guess we could make a session.execute_future
method, using _execute_future
, which returns a future which will return the data or raise a TransportQueryError
instead of returning the ExecutionResult, but I don't really see the point of promoting a use case which does not make sense in practice.
Thank you for your thorough explanation.
The way I proposed to use this feature has another value for me, mostly about personal style and software design, but that's for another time. I'm not committed into making this issue into a flamewar. Also, you're right that I can use execute as I want, as in your example.
I'll review your code in a few hours and I'll let you know if I found something to improve.
Thanks!