googleapis/python-language

KeyError in LanguageServiceClient with 'grpc_asyncio' transport

nemanja-m opened this issue · 5 comments

Hi,

When language_v1.LanguageServiceClient is used with grpc_asyncio transport, a KeyError is raised on any language service method call (e.g. annotate_text).

After investigation, I've found that the issue is in LanguageServiceGrpcAsyncIOTransport.__init__. Stubs are set to empty dictionary after super().__init__ which causes KeyError on service method calls.

Environment details

  • OS type and version: macOS Catalina v10.15.6
  • Python version: 3.8.5
  • pip version: 20.3
  • google-cloud-language version: 2.0.0

Steps to reproduce

  1. Create a language_v1.LanguageServiceClient client with transport="grpc_asyncio".
  2. Call any service method

Code example

from google.cloud import language_v1

client = language_v1.LanguageServiceClient(transport="grpc_asyncio")

document = language_v1.Document(
    content="Example content.",
    language="en",
    type_=language_v1.Document.Type.PLAIN_TEXT,
)

response = client.annotate_text(
    request={
        "document": document,
        "features": {"extract_document_sentiment": True, "classify_text": True},
        "encoding_type": language_v1.EncodingType.UTF8,
    }
)

Stack trace

Traceback (most recent call last):
  File "example.py", line 11, in <module>
    response = client.annotate_text(
  File "/Users/nemanja/.pyenv/versions/example/lib/python3.8/site-packages/google/cloud/language_v1/services/language_service/client.py", line 725, in annotate_text
    rpc = self._transport._wrapped_methods[self._transport.annotate_text]
KeyError: <grpc.aio._channel.UnaryUnaryMultiCallable object at 0x10abc6d90>
Error in sys.excepthook:

Original exception was:

Hi,

I'll come take a closer look tomorrow, but my guess is that this is from using the asyncio transport with the regular client. Could you try with the async client instead?

It seems that we have the same problem because LanguageServiceAsyncClient is using LanguageServiceClient with grpci_asyncio transport.

This means that LanguageServiceGrpcAsyncIOTransport is used as a transport class and _stubs will be set to empty dict after super().__init__ here.

Hi,

The plain grpc transport does the same thing with stubs and I haven't seen reports of it causing issues when used with the regular client. The stubs get populated when individual methods are called for the first time.

@property
def analyze_sentiment(
self,
) -> Callable[
[language_service.AnalyzeSentimentRequest],
language_service.AnalyzeSentimentResponse,
]:
r"""Return a callable for the analyze sentiment method over gRPC.
Analyzes the sentiment of the provided text.
Returns:
Callable[[~.AnalyzeSentimentRequest],
~.AnalyzeSentimentResponse]:
A function that, when called, will call the underlying RPC
on the server.
"""
# Generate a "stub function" on-the-fly which will actually make
# the request.
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "analyze_sentiment" not in self._stubs:
self._stubs["analyze_sentiment"] = self.grpc_channel.unary_unary(
"/google.cloud.language.v1.LanguageService/AnalyzeSentiment",
request_serializer=language_service.AnalyzeSentimentRequest.serialize,
response_deserializer=language_service.AnalyzeSentimentResponse.deserialize,
)
return self._stubs["analyze_sentiment"]

The synchronous client fetches pre-wrapped methods from self._transport._wrapped_methods.

# Wrap the RPC method; this adds retry and timeout information,
# and friendly error handling.
rpc = self._transport._wrapped_methods[self._transport.analyze_sentiment]

def _prep_wrapped_messages(self, client_info):
# Precompute the wrapped methods.
self._wrapped_methods = {
self.analyze_sentiment: gapic_v1.method.wrap_method(
self.analyze_sentiment,
default_retry=retries.Retry(
initial=0.1,
maximum=60.0,
multiplier=1.3,
predicate=retries.if_exception_type(
exceptions.ServiceUnavailable, exceptions.DeadlineExceeded,
),
),
default_timeout=600.0,
client_info=client_info,
),

This dictionary is only used for (and at the moment only works for) the synchronous client. The async_client wraps the methods with a different method gapic_v1.method_async.wrap_method.

# Wrap the RPC method; this adds retry and timeout information,
# and friendly error handling.
rpc = gapic_v1.method_async.wrap_method(
self._client._transport.analyze_sentiment,
default_retry=retries.Retry(
initial=0.1,
maximum=60.0,
multiplier=1.3,
predicate=retries.if_exception_type(
exceptions.ServiceUnavailable, exceptions.DeadlineExceeded,
),
),
default_timeout=600.0,
client_info=DEFAULT_CLIENT_INFO,
)

Even if the async methods were in self._transport._wrapped_methods I don't think you could use the async_client with the synchronous transport or the synchronous client with the async transport. The async client expects an Awaitable.

# Send the request.
response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,)

# Send the request.
response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,)
# Done; return the response.
return response

Hi,

I've tried async client with grpc_asyncio again, and everything works (sorry for the previous wrong results). So the conclusion is that the sync language client should not be configured with grpc_asyncio.

Thank you!

@Neenu1995 Yes exactly! The sync client will only work with grpc, and the async client will only work with grpc_asyncio.