/reactive-grpc

Primary LanguagePythonMIT LicenseMIT

build status

reactive-grpc

A simple gRPC bridge to reactive streams.

Example: Given the following Protocol buffers definition:

syntax = "proto3";

package rxgrpc.test;

service TestService {
  rpc GetOneToOne(TestRequest) returns (TestResponse) {}
  rpc GetOneToStream(TestRequest) returns (stream TestResponse) {}
  rpc GetStreamToOne(stream TestRequest) returns (TestResponse) {}
  rpc GetStreamToStream(stream TestRequest) returns (stream TestResponse) {}
}

message TestRequest {
  string message = 1;
}

message TestResponse {
  string message = 1;
}

and a simple Servicer class:

from test.proto.test_pb2_grpc import TestServiceServicer
from test.proto import test_pb2


class _Servicer(TestServiceServicer):
    def GetOneToOne(self, request: test_pb2.TestRequest, context):
        return test_pb2.TestResponse(message='response: {}'.format(request.message))

    def GetOneToStream(self, request, context):
        for i in range(3):
            yield test_pb2.TestResponse(message='response {}: {}'.format(i, request.message))

    def GetStreamToOne(self, request_iterator, context):
        return test_pb2.TestResponse(
            message='response: {}'.format(
                ', '.join(map(lambda d: d.message, request_iterator))
            )
        )

    def GetStreamToStream(self, request_iterator, context):
        yield from map(
            lambda d: test_pb2.TestResponse(message='response: {}'.format(d.message)),
            request_iterator
        )

A simple gRPC reactive server where request messages are transformed can be created as follows:

from test.proto import test_pb2_grpc, test_pb2
from rxgrpc import server, mappers
from rx import operators
from test.proto.test_pb2_grpc import TestServiceServicer


class _Servicer(TestServiceServicer):
    # ...
    pass


workers = 3
rx_server = server.create_server(test_pb2, workers)
test_pb2_grpc.add_TestServiceServicer_to_server(_Servicer(), rx_server)
rx_server.add_insecure_port('[::]:50051')

def _transform_message(m: test_pb2.TestRequest) -> test_pb2.TestRequest:
    return test_pb2.TestRequest(message='TRANSFORMED {}'.format(m.message))

rx_server.set_grpc_observable(
    rx_server.grpc_pipe(
        operators.map(mappers.grpc_invocation_map(_transform_message)),
        method_name='/rxgrpc.test.TestService/GetOneToOne'),
    method_name='/rxgrpc.test.TestService/GetOneToOne'
)

rx_server.start()

Here it is an example of a filter for a streaming input:

from rxgrpc import operators
from test.proto import test_pb2


def _filter_message(m: test_pb2.TestRequest) -> test_pb2.TestRequest:
    return bool(int(m.message[-1]) % 2)

server = ...
server.set_grpc_observable(
    server.grpc_pipe(
        operators.filter(_filter_message),
        method_name='/rxgrpc.test.TestService/GetStreamToOne'),
    method_name='/rxgrpc.test.TestService/GetStreamToOne'
)