monarchwadia/ragged

Streaming

markwylde opened this issue · 2 comments

I see on the roadmap is streaming as 0% but I don't think it's been started?

Have you thought about the type of API you would like?

I was thinking it might be cool to implement something like this:

import { Chat } from "ragged"
const c = Chat.with('openai', { apiKey: process.env.OPENAI_API_KEY });

// call and wait for all messages
const messages = await c.chat('What is a rickroll?');

// stream messages as an iterator
for each (const message of c.chat('What is a rickroll?')) {
  console.log(message)
}

This could be done by making c.chat return a custom iterator/thenable.

To achieve the desired functionality of having c.chat act as both an iterator and a promise, we could implement a custom iterator that can handle both scenarios. This involves creating a class that implements the iterator protocol and wraps the promise-based method to provide streamed results. Here’s an example of what I'm thinking. I hadn't dug too much into this part of the code so this is kinda pseudo.

class Chat {
  constructor(provider, options) {
    this.provider = provider;
    this.options = options;
  }

  static with(provider, options) {
    return new Chat(provider, options);
  }

  async chat(prompt) {
    const response = await this._getChatResponse(prompt);
    const messages = this._parseMessages(response);
    return messages;
  }

  async *_chatGenerator(prompt) {
    const response = await this._getChatResponse(prompt);
    const messages = this._parseMessages(response);
    for (const message of messages) {
      yield message;
    }
  }

  _getChatResponse(prompt) {
    // Simulate the API call to get chat response
    return new Promise((resolve) => {
      setTimeout(() => {
        resolve(`Response for prompt: ${prompt}`);
      }, 1000);
    });
  }

  _parseMessages(response) {
    // Simulate parsing the response into individual messages
    return response.split(' ');
  }

  [Symbol.asyncIterator]() {
    return this._chatGenerator(this.prompt);
  }
}

class ChatWrapper {
  constructor(chatInstance, prompt) {
    this.chatInstance = chatInstance;
    this.prompt = prompt;
    this.generator = null;
  }

  async then(resolve, reject) {
    try {
      const messages = await this.chatInstance.chat(this.prompt);
      resolve(messages);
    } catch (error) {
      reject(error);
    }
  }

  [Symbol.asyncIterator]() {
    if (!this.generator) {
      this.generator = this.chatInstance._chatGenerator(this.prompt);
    }
    return this.generator;
  }
}

// Extend the Chat class to use the ChatWrapper
class ExtendedChat extends Chat {
  chat(prompt) {
    return new ChatWrapper(this, prompt);
  }
}

Or is this a bit too magical? Is it better to just provide a { stream: true } as an option, that then returns an iterator or promise?

So I guess the three proposals I'm thinking:

Option 1

import { Chat } from "ragged"
const c = Chat.with('openai', { apiKey: process.env.OPENAI_API_KEY });

// call and wait for all messages
const messages = await c.chat('What is a rickroll?');

// stream messages as an iterator
for each (const message of c.chat('What is a rickroll?')) {
  console.log(message)
}

Option 2

import { Chat } from "ragged"
const c = Chat.with('openai', { apiKey: process.env.OPENAI_API_KEY });

// call and wait for all messages
const messages = await c.chat('What is a rickroll?');

// stream messages as an iterator
for each (const message of c.chat('What is a rickroll?', { stream: true })) {
  console.log(message)
}

Option 3

import { Chat } from "ragged"
const c = Chat.with('openai', { apiKey: process.env.OPENAI_API_KEY });

// call and wait for all messages
const messages = await c.chat('What is a rickroll?');

// stream messages as an iterator
for each (const message of c.streamChat('What is a rickroll?')) {
  console.log(message)
}

Do you have a better idea? What are you thinking?

We should definitely support streaming -- I had done some work on this before, if you want to check it out, you should see the 0.3.7 tag. I had based it on rxjs back then, and was using the OpenAI API SDK. I ripped it out because it generated a lot of API complexity for Ragged, I felt we should first focus on making the non-streaming use cases clean. Here is a streaming poetry example: https://github.com/monarchwadia/ragged/blob/v0.3.7/examples/comprehensive-api-usage/sub-examples/streaming-poetry.ts

Re: which option, there are a few considerations. I'm about to change the return API for the c.chat and e.embed methods in order to support things like getting the original response & request objects including API headers; count tokens and rate limiting information; and some additional utility functions like getLastMessage which would make life easier (instead of doing messages.at(-1)?.text which is a mouthful). So chat is going to turn into something like the following:

const {
  history,
  incomingMessages,
  rawResponse,
  rawRequest,
  getLastMessage
} = await c.chat("What is a rickroll?")

Given that this is the case, i think option 3 is the sensible option. This way we can provide dedicated fine-grained controls for streaming calls that are not relevant for non-streaming calls. Maybe something like...

import { Chat } from "ragged"
const c = Chat.with('openai', { apiKey: process.env.OPENAI_API_KEY });

// call and wait for all messages
const { stream, abortStream } = await c.chatStream('What is a rickroll?');

// stream messages as an iterator with a small chance of aborting the connection randomly
for each (const message of stream) {
  if (Math.random() > 0.95) abortStream(); // just illustrating a point
  console.log(message);
}

But I think we need to first do the non-streaming API changes first.

I'll start a separate issue for that.

Here is the new issue #18