/node-red-contrib-ray-serve

Ray Serve deployments from within nodered

Primary LanguageJavaScript

Deploy Ray Serve Applications with NodeRed.

This is a prototype - I look forward to hearing your experience, feedback, and ideas for improvements.

Ray is a powerful distributed computing framework for Python that allows you to easily scale your applications. By pairing Ray with Node-RED, you can take advantage of Ray's distributed capabilities to handle large workloads and parallelize tasks, while leaving Node-RED to do what it does best - IO and lightweight data plumbing.

Docs:

Install

npm i node-red-contrib-ray-serve

Test Install

For full end-to-end testing (including a local ray cluster and grafana), you can use:

docker-compose up -d --build

You can access:

  • Node-Red dashboard here.
  • The Ray dashboard can be found here.
  • Access Grafana here

Screenshots

Deploy python functions to Ray Serve directly from NodeRed.

alt text

Configure your python functions

alt text

Add python dependencies and environment variables

alt text

Manage Ray servers

alt text

Can potentially access the dashboard (by proxying) but I can't get it to require permissions so it's currently disabled.

alt text

NOTE: This will overwrite existing apps deployed to Ray Serve. Could be fixed with #42974

You can query available routes:

http://ray:8000//-/routes

Examples

Speed of running ML model on CPU: replicas >> batching > vanilla

Vanilla english-to-french translator

from ray import serve
from transformers import pipeline

@serve.deployment
class Translator:
    def __init__(self):
        self.model = pipeline("translation_en_to_fr", model="t5-small")

    def translate(self, text: str) -> str:
        return self.model(text)[0]["translation_text"]

    async def __call__(self, http_request) -> str:
        english_text: str = await http_request.json()
        return self.translate(english_text)
         
app = Translator.bind()

Using batched inference (up to 8 requests at a time)

import asyncio
from ray import serve
from transformers import pipeline

@serve.deployment
class Translator:
    def __init__(self):
        self.model = pipeline("translation_en_to_fr", model="t5-small")

    def translate(self, text) -> str:
        return [t["translation_text"] for t in self.model(text)]

    @serve.batch(max_batch_size=8, batch_wait_timeout_s=0.1)
    async def __call__(self, requests) -> str:
        text: str = await asyncio.gather(*(r.json() for r in requests))
        return self.translate(text)
                
app = Translator.bind()

Using multiple replicas

from ray import serve
from transformers import pipeline

@serve.deployment(num_replicas=3)
class Translator:
    def __init__(self):
        self.model = pipeline("translation_en_to_fr", model="t5-small")

    def translate(self, text: str) -> str:
        return self.model(text)[0]["translation_text"]

    async def __call__(self, http_request) -> str:
        english_text: str = await http_request.json()
        return self.translate(english_text)
         
app = Translator.bind()

Using replica autoscaling

import time
from ray import serve
from transformers import pipeline

@serve.deployment(
     autoscaling_config=dict(
        min_replicas=1, 
        initial_replicas=2,
        max_replicas=4,
        upscaling_factor=0.5,
    )
)
class Translator:
    def __init__(self):
        self.model = pipeline("translation_en_to_fr", model="t5-small")

    def translate(self, text: str) -> str:
        return self.model(text)[0]["translation_text"]

    async def __call__(self, http_request) -> str:
        english_text: str = await http_request.json()
        return self.translate(english_text)

app = Translator.bind()

How it Works

This module offers two nodes:

  • Ray Serve node - a node to store a Ray Serve Application
    • Ray Config node (which is basically just a wrapper to store the Ray Dashboard URL and Ray Serve URLs)

When the flow is deployed, this module will gather all of the Ray Serve applications and for each Server, it will

  • Create zip files of the code for each application and upload them to Ray's GCS storage
    • Repeat uploads will be skipped based on a hash of its content
  • create a Serve application payload
  • Deploy the applications by doing PUT /api/serve/applications
    • Serve Docs: The endpoint declaratively deploys a list of Serve applications. If Serve is already running on the Ray cluster, removes all applications not listed in the new config. If Serve is not running on the Ray cluster, starts Serve. See multi-app config schema for the request’s JSON schema.

When messages come in, they are sent to the deployed Ray Serve HTTP endpoint.

The nice thing about this is you can keep the Ray Serve endpoints private to your cluster and Node-RED can access them from the private network.

Example flow

[{"id":"1b282ad2a8f263c6","type":"tab","label":"Flow 1","disabled":false,"info":"","env":[]},{"id":"4aa8c146e14b5457","type":"inject","z":"1b282ad2a8f263c6","name":"","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"hello! How are you today?","payloadType":"str","x":90,"y":420,"wires":[["fab2d147bd24ee29"]]},{"id":"8a353f104266b6b7","type":"debug","z":"1b282ad2a8f263c6","name":"debug 1","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":660,"y":400,"wires":[]},{"id":"0ac04ebf60711eea","type":"ray serve","z":"1b282ad2a8f263c6","server":"67985cdbc834cbb3","name":"hello-worlda","route_prefix":"/hello-world","variable_name":"app","package_manager":"pip","dependencies":[],"env_vars":[],"code":"from ray import serve\n\n@serve.deployment\nclass HelloWorld:\n    async def __init__(self):\n        pass\n    async def __call__(self, http_request) -> str:\n        msg: dict = await http_request.json()\n        return {\"payload\": \"Hello from ray!\", \"received\": msg}\n                \napp = HelloWorld.bind()\nserve.run(app)","deployments":[{"name":"HelloWorld","DETECTED_FROM_PYTHON":{}}],"deploymentArgs":{"HelloWorld":{"DETECTED_FROM_PYTHON":{}}},"x":470,"y":380,"wires":[["8a353f104266b6b7"]]},{"id":"71d33ea259873f69","type":"ray serve","z":"1b282ad2a8f263c6","server":"67985cdbc834cbb3","name":"hello-world 2","route_prefix":"/asdf","variable_name":"app","package_manager":"pip","dependencies":[],"env_vars":[],"code":"from ray import serve\n\n@serve.deployment\nclass HelloWorld2:\n    async def __call__(self, http_request) -> str:\n        msg: dict = await http_request.json()\n        return {\"payload\": \"Hello from ray 2!\", \"received\": msg}\n                \napp = HelloWorld2.bind()","deployments":[{"0":"{","1":"}","name":"HelloWorld2","DETECTED_FROM_PYTHON":{}}],"deploymentArgs":{"HelloWorld2":{"0":"{","1":"}","DETECTED_FROM_PYTHON":{}}},"x":470,"y":440,"wires":[["8a353f104266b6b7"]]},{"id":"304b2fc1ecd56de4","type":"http request","z":"1b282ad2a8f263c6","name":"query ray deployment","method":"GET","ret":"txt","paytoqs":"body","url":"http://ray:8000//asdf","tls":"","persist":false,"proxy":"","insecureHTTPParser":false,"authType":"","senderr":false,"headers":[],"x":340,"y":600,"wires":[["4a28b7320d2cf247"]]},{"id":"65cc7d67882dff25","type":"inject","z":"1b282ad2a8f263c6","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"{}","payloadType":"json","x":130,"y":600,"wires":[["304b2fc1ecd56de4"]]},{"id":"4a28b7320d2cf247","type":"debug","z":"1b282ad2a8f263c6","name":"debug 2","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":540,"y":600,"wires":[]},{"id":"f296ec25d329b1f4","type":"ray serve","z":"1b282ad2a8f263c6","server":"67985cdbc834cbb3","name":"translation","route_prefix":"/translate","variable_name":"app","package_manager":"pip","dependencies":[{"name":"transformers"},{"name":"torch"}],"env_vars":[],"code":"from starlette.requests import Request\nfrom ray import serve\nfrom transformers import pipeline\n\n@serve.deployment\nclass Translator:\n    def __init__(self):\n        # Load model\n        self.model = pipeline(\"translation_en_to_fr\", model=\"t5-small\")\n\n    def translate(self, text: str) -> str:\n        # Run inference and return the translation text\n        translation = self.model(text)[0][\"translation_text\"]\n        return translation\n\n    async def __call__(self, http_request: Request) -> str:\n        english_text: str = await http_request.json()\n        return self.translate(english_text)\n         \napp = Translator.bind()","deployments":[{"name":"Translator","DETECTED_FROM_PYTHON":{}}],"deploymentArgs":{"HelloWorld":{},"Translator34":{"something":5,"DETECTED_FROM_PYTHON":{"num_replicas":3}},"Translator":{"DETECTED_FROM_PYTHON":{}}},"x":470,"y":280,"wires":[["fdd77c7ad943a72f"]]},{"id":"fdd77c7ad943a72f","type":"debug","z":"1b282ad2a8f263c6","name":"debug 3","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":700,"y":260,"wires":[]},{"id":"3dd4cc4be799ba10","type":"ray serve","z":"1b282ad2a8f263c6","server":"67985cdbc834cbb3","name":"batched translation","route_prefix":"/batched_translate","variable_name":"app","package_manager":"pip","dependencies":[{"name":"transformers"},{"name":"torch"}],"env_vars":[],"code":"import asyncio\nfrom starlette.requests import Request\nfrom ray import serve\nfrom transformers import pipeline\n\n@serve.deployment\nclass BatchedTranslator:\n    def __init__(self):\n        # Load model\n        self.model = pipeline(\"translation_en_to_fr\", model=\"t5-small\")\n\n    def translate(self, text) -> str:\n        # Run inference and return the translation text\n        translation = [t[\"translation_text\"] for t in self.model(text)]\n        return translation\n\n    @serve.batch(max_batch_size=8, batch_wait_timeout_s=0.1)\n    async def __call__(self, requests) -> str:\n        text: str = await asyncio.gather(*(r.json() for r in requests))\n        return self.translate(text)\n                \napp = BatchedTranslator.bind()","deployments":[{"name":"BatchedTranslator","DETECTED_FROM_PYTHON":{}}],"deploymentArgs":{"HelloWorld":{},"Translator34":{"something":5,"DETECTED_FROM_PYTHON":{"num_replicas":3}},"Translator":{"DETECTED_FROM_PYTHON":{}},"BatchedTranslator":{"DETECTED_FROM_PYTHON":{}}},"x":470,"y":220,"wires":[["fdd77c7ad943a72f"]]},{"id":"2bd11c464750ae60","type":"inject","z":"1b282ad2a8f263c6","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"{}","payloadType":"json","x":130,"y":540,"wires":[["52cd0ec8808ab2d7"]]},{"id":"52cd0ec8808ab2d7","type":"http request","z":"1b282ad2a8f263c6","name":"query ray deployment","method":"GET","ret":"txt","paytoqs":"body","url":"http://ray:8000//hello-world","tls":"","persist":false,"proxy":"","insecureHTTPParser":false,"authType":"","senderr":false,"headers":[],"x":340,"y":540,"wires":[["9b98464332e6fa0b"]]},{"id":"9b98464332e6fa0b","type":"debug","z":"1b282ad2a8f263c6","name":"debug 4","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":540,"y":540,"wires":[]},{"id":"03e0fef603eb1235","type":"inject","z":"1b282ad2a8f263c6","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"{}","payloadType":"json","x":130,"y":660,"wires":[["ffb7cf6a4151efae"]]},{"id":"ffb7cf6a4151efae","type":"http request","z":"1b282ad2a8f263c6","name":"query ray deployment","method":"GET","ret":"txt","paytoqs":"body","url":"http://ray:8000//-/routes","tls":"","persist":false,"proxy":"","insecureHTTPParser":false,"authType":"","senderr":false,"headers":[],"x":340,"y":660,"wires":[["e7489e5f80477c88"]]},{"id":"e7489e5f80477c88","type":"debug","z":"1b282ad2a8f263c6","name":"debug 5","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":540,"y":660,"wires":[]},{"id":"2ef41ea6e4554760","type":"ray serve","z":"1b282ad2a8f263c6","server":"67985cdbc834cbb3","name":"translation replicas","route_prefix":"/translate-replicas","variable_name":"app","package_manager":"pip","dependencies":[{"name":"transformers"},{"name":"torch"}],"env_vars":[],"code":"from starlette.requests import Request\nfrom ray import serve\nfrom transformers import pipeline\n\n@serve.deployment(num_replicas=3)\nclass TranslatorReplicas:\n    def __init__(self):\n        # Load model\n        self.model = pipeline(\"translation_en_to_fr\", model=\"t5-small\")\n\n    def translate(self, text: str) -> str:\n        # Run inference and return the translation text\n        translation = self.model(text)[0][\"translation_text\"]\n        return translation\n\n    async def __call__(self, http_request: Request) -> str:\n        english_text: str = await http_request.json()\n        return \"asdfasdf \" + self.translate(english_text)\n         \napp = TranslatorReplicas.bind()","deployments":[{"name":"TranslatorReplicas","DETECTED_FROM_PYTHON":{"num_replicas":3}}],"deploymentArgs":{"HelloWorld":{},"Translator34":{"something":5,"DETECTED_FROM_PYTHON":{"num_replicas":3}},"Translator":{"DETECTED_FROM_PYTHON":{"num_replicas":3}},"TranslatorReplicas":{"DETECTED_FROM_PYTHON":{"num_replicas":3}}},"x":470,"y":160,"wires":[["fdd77c7ad943a72f"]]},{"id":"dc7375473b416612","type":"function","z":"1b282ad2a8f263c6","name":"","func":"\nreturn msg;","outputs":1,"timeout":0,"noerr":0,"initialize":"","finalize":"","libs":[],"x":325,"y":220,"wires":[["f296ec25d329b1f4","3dd4cc4be799ba10","2ef41ea6e4554760"]],"icon":"node-red/cog.svg","l":false},{"id":"0e1151c76dc10b6b","type":"function","z":"1b282ad2a8f263c6","name":"","func":"\nreturn msg;","outputs":1,"timeout":0,"noerr":0,"initialize":"","finalize":"","libs":[],"x":355,"y":420,"wires":[["71d33ea259873f69","0ac04ebf60711eea"]],"icon":"node-red/cog.svg","l":false},{"id":"fab2d147bd24ee29","type":"function","z":"1b282ad2a8f263c6","name":"","func":"\nreturn msg;","outputs":1,"timeout":0,"noerr":0,"initialize":"","finalize":"","libs":[],"x":275,"y":340,"wires":[["dc7375473b416612","0e1151c76dc10b6b"]],"icon":"node-red/cog.svg","l":false},{"id":"f052829ab83e1841","type":"inject","z":"1b282ad2a8f263c6","name":"","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"May I take your hat, sir?","payloadType":"str","x":170,"y":220,"wires":[["dc7375473b416612"]]},{"id":"67985cdbc834cbb3","type":"rayConfig","rayAddress":"http://ray:8265","rayAddressType":"str","serveAddress":"http://ray:8000","serveAddressType":"str"}]