[Ray[serve] gRPC] Unable to pickle protobuf objects
Opened this issue · 9 comments
What happened + What you expected to happen
I went through the documentation with 1 to 1 proto file but I keep getting the following errors when running the app.
Serializing 'Multiplexing' <function GrpcDeployment.Multiplexing at 0x111c63240>...
!!! FAIL serialization: cannot pickle 'google._upb._message.Descriptor' object
Detected 2 global variables. Checking serializability...
Serializing 'serve' <module 'ray.serve' from '/Users/.../...app-v2/.venv/lib/python3.11/site-packages/ray/serve/__init__.py'>...
Serializing 'UserDefinedResponse2' <class 'src.app.protos.user_defined_protos_pb2.UserDefinedResponse2'>...
!!! FAIL serialization: cannot pickle 'google._upb._message.Descriptor' object
Serializing 'RegisterExtension' <function Message.RegisterExtension at 0x101aedda0>...
Serializing '_SetListener' <function Message._SetListener at 0x101aede40>...
Serializing '__getstate__' <function Message.__getstate__ at 0x101aedee0>...
Serializing '__reduce__' <function Message.__reduce__ at 0x101aee020>...
Serializing '__setstate__' <function Message.__setstate__ at 0x101aedf80>...
Serializing '__unicode__' <function Message.__unicode__ at 0x101aed120>...
Serializing 'ByteSize' <method 'ByteSize' of 'google._upb._message.Message' objects>...
Serializing 'Clear' <method 'Clear' of 'google._upb._message.Message' objects>...
Serializing 'ClearExtension' <method 'ClearExtension' of 'google._upb._message.Message' objects>...
Serializing 'ClearField' <method 'ClearField' of 'google._upb._message.Message' objects>...
Serializing 'CopyFrom' <method 'CopyFrom' of 'google._upb._message.Message' objects>...
Serializing 'DESCRIPTOR' <google._upb._message.Descriptor object at 0x111c7ae40>...
!!! FAIL serialization: cannot pickle 'google._upb._message.Descriptor' object
================================================================================
Variable:
FailTuple(DESCRIPTOR [obj=<google._upb._message.Descriptor object at 0x111c7ae40>, parent=<class 'src.app.protos.user_defined_protos_pb2.UserDefinedResponse2'>])
FailTuple(DESCRIPTOR [obj=<google._upb._message.Descriptor object at 0x111c7ae40>, parent=<class 'src.app.protos.user_defined_protos_pb2.UserDefinedResponse2'>])
was found to be non-serializable. There may be multiple other undetected variables that were non-serializable.
Consider either removing the instantiation/imports of these variables or moving the instantiation into the scope of the function/class.
================================================================================
Check https://docs.ray.io/en/master/ray-core/objects/serialization.html#troubleshooting for more information.
If you have any suggestions on how to improve this error message, please reach out to the Ray developers on github.com/ray-project/ray/issues/
================================================================================
Similar issue was posted here by someone else https://discuss.ray.io/t/keep-getting-error-typeerror-cannot-pickle-classmethod-descriptor-object/10153
Versions / Dependencies
grpcio-tools==1.60.0
grpcio==1.60.0
protobuf==4.25.1
ray[serve]==2.22.0
but I tired so many version and combination including the latest grpcio-tools, grpcio and protobuf as well as the very old one. The issue is still the same.
I am running python 3.11.9 and apple silicon.
Reproduction script
Please follow the official documentation https://docs.ray.io/en/latest/serve/advanced-guides/grpc-guide.html
Issue Severity
High: It blocks me from completing my task.
@mgierada I just tried it again on Ray master and it has no issue. The only difference in the dependencies is I'm on protobuf==4.21.12
so not sure if that's the cause.
Otherwise you can probably try those files generated from my version to see if it solves the issue
#### user_defined_protos_pb2.py ####
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: user_defined_protos.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x19user_defined_protos.proto\x12\x11userdefinedprotos\"?\n\x12UserDefinedMessage\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0e\n\x06origin\x18\x02 \x01(\t\x12\x0b\n\x03num\x18\x03 \x01(\x03\"4\n\x13UserDefinedResponse\x12\x10\n\x08greeting\x18\x01 \x01(\t\x12\x0b\n\x03num\x18\x02 \x01(\x03\"\x15\n\x13UserDefinedMessage2\"(\n\x14UserDefinedResponse2\x12\x10\n\x08greeting\x18\x01 \x01(\t\"*\n\tImageData\x12\x0b\n\x03url\x18\x01 \x01(\t\x12\x10\n\x08\x66ilename\x18\x02 \x01(\t\"4\n\nImageClass\x12\x0f\n\x07\x63lasses\x18\x01 \x03(\t\x12\x15\n\rprobabilities\x18\x02 \x03(\x02\x32\xae\x02\n\x12UserDefinedService\x12Y\n\x08__call__\x12%.userdefinedprotos.UserDefinedMessage\x1a&.userdefinedprotos.UserDefinedResponse\x12_\n\x0cMultiplexing\x12&.userdefinedprotos.UserDefinedMessage2\x1a\'.userdefinedprotos.UserDefinedResponse2\x12\\\n\tStreaming\x12%.userdefinedprotos.UserDefinedMessage\x1a&.userdefinedprotos.UserDefinedResponse0\x01\x32\x64\n\x1aImageClassificationService\x12\x46\n\x07Predict\x12\x1c.userdefinedprotos.ImageData\x1a\x1d.userdefinedprotos.ImageClassB:\n#io.ray.examples.user_defined_protosB\x11UserDefinedProtosP\x01\x62\x06proto3')
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'user_defined_protos_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
DESCRIPTOR._serialized_options = b'\n#io.ray.examples.user_defined_protosB\021UserDefinedProtosP\001'
_USERDEFINEDMESSAGE._serialized_start=48
_USERDEFINEDMESSAGE._serialized_end=111
_USERDEFINEDRESPONSE._serialized_start=113
_USERDEFINEDRESPONSE._serialized_end=165
_USERDEFINEDMESSAGE2._serialized_start=167
_USERDEFINEDMESSAGE2._serialized_end=188
_USERDEFINEDRESPONSE2._serialized_start=190
_USERDEFINEDRESPONSE2._serialized_end=230
_IMAGEDATA._serialized_start=232
_IMAGEDATA._serialized_end=274
_IMAGECLASS._serialized_start=276
_IMAGECLASS._serialized_end=328
_USERDEFINEDSERVICE._serialized_start=331
_USERDEFINEDSERVICE._serialized_end=633
_IMAGECLASSIFICATIONSERVICE._serialized_start=635
_IMAGECLASSIFICATIONSERVICE._serialized_end=735
# @@protoc_insertion_point(module_scope)
#### user_defined_protos_pb2_grpc.py ####
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import user_defined_protos_pb2 as user__defined__protos__pb2
class UserDefinedServiceStub(object):
"""Missing associated documentation comment in .proto file."""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.__call__ = channel.unary_unary(
'/userdefinedprotos.UserDefinedService/__call__',
request_serializer=user__defined__protos__pb2.UserDefinedMessage.SerializeToString,
response_deserializer=user__defined__protos__pb2.UserDefinedResponse.FromString,
)
self.Multiplexing = channel.unary_unary(
'/userdefinedprotos.UserDefinedService/Multiplexing',
request_serializer=user__defined__protos__pb2.UserDefinedMessage2.SerializeToString,
response_deserializer=user__defined__protos__pb2.UserDefinedResponse2.FromString,
)
self.Streaming = channel.unary_stream(
'/userdefinedprotos.UserDefinedService/Streaming',
request_serializer=user__defined__protos__pb2.UserDefinedMessage.SerializeToString,
response_deserializer=user__defined__protos__pb2.UserDefinedResponse.FromString,
)
class UserDefinedServiceServicer(object):
"""Missing associated documentation comment in .proto file."""
def __call__(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def Multiplexing(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def Streaming(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_UserDefinedServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
'__call__': grpc.unary_unary_rpc_method_handler(
servicer.__call__,
request_deserializer=user__defined__protos__pb2.UserDefinedMessage.FromString,
response_serializer=user__defined__protos__pb2.UserDefinedResponse.SerializeToString,
),
'Multiplexing': grpc.unary_unary_rpc_method_handler(
servicer.Multiplexing,
request_deserializer=user__defined__protos__pb2.UserDefinedMessage2.FromString,
response_serializer=user__defined__protos__pb2.UserDefinedResponse2.SerializeToString,
),
'Streaming': grpc.unary_stream_rpc_method_handler(
servicer.Streaming,
request_deserializer=user__defined__protos__pb2.UserDefinedMessage.FromString,
response_serializer=user__defined__protos__pb2.UserDefinedResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'userdefinedprotos.UserDefinedService', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
# This class is part of an EXPERIMENTAL API.
class UserDefinedService(object):
"""Missing associated documentation comment in .proto file."""
@staticmethod
def __call__(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/userdefinedprotos.UserDefinedService/__call__',
user__defined__protos__pb2.UserDefinedMessage.SerializeToString,
user__defined__protos__pb2.UserDefinedResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def Multiplexing(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/userdefinedprotos.UserDefinedService/Multiplexing',
user__defined__protos__pb2.UserDefinedMessage2.SerializeToString,
user__defined__protos__pb2.UserDefinedResponse2.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def Streaming(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_stream(request, target, '/userdefinedprotos.UserDefinedService/Streaming',
user__defined__protos__pb2.UserDefinedMessage.SerializeToString,
user__defined__protos__pb2.UserDefinedResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
class ImageClassificationServiceStub(object):
"""Missing associated documentation comment in .proto file."""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.Predict = channel.unary_unary(
'/userdefinedprotos.ImageClassificationService/Predict',
request_serializer=user__defined__protos__pb2.ImageData.SerializeToString,
response_deserializer=user__defined__protos__pb2.ImageClass.FromString,
)
class ImageClassificationServiceServicer(object):
"""Missing associated documentation comment in .proto file."""
def Predict(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_ImageClassificationServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
'Predict': grpc.unary_unary_rpc_method_handler(
servicer.Predict,
request_deserializer=user__defined__protos__pb2.ImageData.FromString,
response_serializer=user__defined__protos__pb2.ImageClass.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'userdefinedprotos.ImageClassificationService', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
# This class is part of an EXPERIMENTAL API.
class ImageClassificationService(object):
"""Missing associated documentation comment in .proto file."""
@staticmethod
def Predict(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/userdefinedprotos.ImageClassificationService/Predict',
user__defined__protos__pb2.ImageData.SerializeToString,
user__defined__protos__pb2.ImageClass.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
Hey @GeneDer Thanks for taking a look. This is still not working for me. I made sure I am running the same version of protobuf
as you. Also, I copy-paste the compiled code you shared. I am still facing issues with serialization. Is there anything else you do that I might be missing? Any additional conifg/setup? Apparently, ray
is not able to serialize the protobuf objects.
oh actually another difference, I'm running on Python 3.10.12. Maybe try it out. Also my pickle version is 4.0 not sure if that makes a difference.
I tired with python 3.10.12 and with pickle 4.0 @GeneDer but I still got the same error.
Here's what I am running, maybe I missed something obvious.
- I am copy pasting protos defined in docs to
./src/protos/user_defined_protos.proto
// user_defined_protos.proto
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.ray.examples.user_defined_protos";
option java_outer_classname = "UserDefinedProtos";
package userdefinedprotos;
message UserDefinedMessage {
string name = 1;
string origin = 2;
int64 num = 3;
}
message UserDefinedResponse {
string greeting = 1;
int64 num = 2;
}
message UserDefinedMessage2 {}
message UserDefinedResponse2 {
string greeting = 1;
}
message ImageData {
string url = 1;
string filename = 2;
}
message ImageClass {
repeated string classes = 1;
repeated float probabilities = 2;
}
service UserDefinedService {
rpc __call__(UserDefinedMessage) returns (UserDefinedResponse);
rpc Multiplexing(UserDefinedMessage2) returns (UserDefinedResponse2);
rpc Streaming(UserDefinedMessage) returns (stream UserDefinedResponse);
}
service ImageClassificationService {
rpc Predict(ImageData) returns (ImageClass);
}
- compiling protos
python -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. ./src/protos/user_defined_protos.proto
- Starting ray cluster making sure gRPC is enabled
ray start --head
serve start \
--grpc-port 9000 \
--grpc-servicer-functions protos.user_defined_protos_pb2_grpc.add_UserDefinedServiceServicer_to_server
That correctly starts the ray cluster.
- Creating a simple deployment script (again, copy paste from docs) in
./src/simple_deploy.py
import time
from typing import Generator
from protos.user_defined_protos_pb2 import (
UserDefinedMessage,
UserDefinedMessage2,
UserDefinedResponse,
UserDefinedResponse2,
)
import ray
from ray import serve
@serve.deployment
class GrpcDeployment:
def __call__(self, user_message: UserDefinedMessage) -> UserDefinedResponse:
greeting = f"Hello {user_message.name} from {user_message.origin}"
num = user_message.num * 2
user_response = UserDefinedResponse(
greeting=greeting,
num=num,
)
return user_response
@serve.multiplexed(max_num_models_per_replica=1)
async def get_model(self, model_id: str) -> str:
return f"loading model: {model_id}"
async def Multiplexing(self, user_message: UserDefinedMessage2) -> UserDefinedResponse2:
model_id = serve.get_multiplexed_model_id()
model = await self.get_model(model_id)
user_response = UserDefinedResponse2(
greeting=f"Method2 called model, {model}",
)
return user_response
def Streaming(
self, user_message: UserDefinedMessage
) -> Generator[UserDefinedResponse, None, None]:
for i in range(10):
greeting = f"{i}: Hello {user_message.name} from {user_message.origin}"
num = user_message.num * 2 + i
user_response = UserDefinedResponse(
greeting=greeting,
num=num,
)
yield user_response
time.sleep(0.1)
g = GrpcDeployment.bind()
app1 = "app1"
serve.run(target=g, name=app1, route_prefix=f"/{app1}")
- trying to run
python ./src/simple_deploy
and got the error from the issue description above
What if you don't put those in a sub-directory and instead just generate in the same root?
Nah, unfortunately that is not helping much,
Oh I finally solved that issue. Docs are very misleading on this. The cause was that my proto imports were out of scope. Moving them close to the place they are used made a difference. So instead of importing on top of the file like this
import time
from typing import Generator
from protos.user_defined_protos_pb2 import (
UserDefinedMessage,
UserDefinedMessage2,
UserDefinedResponse,
UserDefinedResponse2,
)
import ray
from ray import serve
# ... ray serve logic
do this
# file name ./src/deploy.py
import time
from typing import Generator
from ray import serve
@serve.deployment
class GrpcDeployment:
def __call__(self, user_message):
from user_defined_protos_pb2 import UserDefinedMessage, UserDefinedResponse
greeting = f"Hello {user_message.name} from {user_message.origin}"
num = user_message.num * 2
user_response = UserDefinedResponse(
greeting=greeting,
num=num,
)
return user_response
@serve.multiplexed(max_num_models_per_replica=1)
async def get_model(self, model_id: str) -> str:
return f"loading model: {model_id}"
async def Multiplexing(self, user_message):
from user_defined_protos_pb2 import UserDefinedMessage2, UserDefinedResponse2
model_id = serve.get_multiplexed_model_id()
model = await self.get_model(model_id)
user_response = UserDefinedResponse2(
greeting=f"Method2 called model, {model}",
)
return user_response
def Streaming(self, user_message) -> Generator:
from user_defined_protos_pb2 import UserDefinedMessage, UserDefinedResponse
for i in range(10):
greeting = f"{i}: Hello {user_message.name} from {user_message.origin}"
num = user_message.num * 2 + i
user_response = UserDefinedResponse(
greeting=greeting,
num=num,
)
yield user_response
time.sleep(0.1)
g = GrpcDeployment.bind()
app1 = "app1"
serve.run(target=g, name=app1, route_prefix=f"/{app1}")
Then, assuming your ray is available on the localhost or whatever, deploy by running the file
python src/deploy.py
@GeneDer Okay I agree that everything works fine if I keep it in the root of the Ray's working directory. The current PR #45862 does not make sense in that context. However, the issue appears when I organize the logic in subdirectories, e.g.
.
├── __init__.py
├── config.yaml
├── playground
│ ├── __init__.py
│ ├── __pycache__
│ │ ├── __init__.cpython-311.pyc
│ │ ├── deployment.cpython-311.pyc
│ │ ├── user_defined_protos_pb2.cpython-311.pyc
│ │ └── user_defined_protos_pb2_grpc.cpython-311.pyc
│ ├── deployment.py
│ ├── user_defined_protos.proto
│ ├── user_defined_protos_pb2.py
│ └── user_defined_protos_pb2_grpc.py
├──
With the config.yaml
looking like this
grpc_options:
port: 9000
grpc_servicer_functions:
- playground.user_defined_protos_pb2_grpc.add_UserDefinedServiceServicer_to_server
applications:
- name: app1
route_prefix: /app1
import_path: playground.deployment:g
runtime_env: {}