Streaming
markwylde opened this issue · 2 comments
I see on the roadmap is streaming
as 0%
but I don't think it's been started?
Have you thought about the type of API you would like?
I was thinking it might be cool to implement something like this:
import { Chat } from "ragged"
const c = Chat.with('openai', { apiKey: process.env.OPENAI_API_KEY });
// call and wait for all messages
const messages = await c.chat('What is a rickroll?');
// stream messages as an iterator
for each (const message of c.chat('What is a rickroll?')) {
console.log(message)
}
This could be done by making c.chat return a custom iterator/thenable.
To achieve the desired functionality of having c.chat act as both an iterator and a promise, we could implement a custom iterator that can handle both scenarios. This involves creating a class that implements the iterator protocol and wraps the promise-based method to provide streamed results. Here’s an example of what I'm thinking. I hadn't dug too much into this part of the code so this is kinda pseudo.
class Chat {
constructor(provider, options) {
this.provider = provider;
this.options = options;
}
static with(provider, options) {
return new Chat(provider, options);
}
async chat(prompt) {
const response = await this._getChatResponse(prompt);
const messages = this._parseMessages(response);
return messages;
}
async *_chatGenerator(prompt) {
const response = await this._getChatResponse(prompt);
const messages = this._parseMessages(response);
for (const message of messages) {
yield message;
}
}
_getChatResponse(prompt) {
// Simulate the API call to get chat response
return new Promise((resolve) => {
setTimeout(() => {
resolve(`Response for prompt: ${prompt}`);
}, 1000);
});
}
_parseMessages(response) {
// Simulate parsing the response into individual messages
return response.split(' ');
}
[Symbol.asyncIterator]() {
return this._chatGenerator(this.prompt);
}
}
class ChatWrapper {
constructor(chatInstance, prompt) {
this.chatInstance = chatInstance;
this.prompt = prompt;
this.generator = null;
}
async then(resolve, reject) {
try {
const messages = await this.chatInstance.chat(this.prompt);
resolve(messages);
} catch (error) {
reject(error);
}
}
[Symbol.asyncIterator]() {
if (!this.generator) {
this.generator = this.chatInstance._chatGenerator(this.prompt);
}
return this.generator;
}
}
// Extend the Chat class to use the ChatWrapper
class ExtendedChat extends Chat {
chat(prompt) {
return new ChatWrapper(this, prompt);
}
}
Or is this a bit too magical? Is it better to just provide a { stream: true } as an option, that then returns an iterator or promise?
So I guess the three proposals I'm thinking:
Option 1
import { Chat } from "ragged"
const c = Chat.with('openai', { apiKey: process.env.OPENAI_API_KEY });
// call and wait for all messages
const messages = await c.chat('What is a rickroll?');
// stream messages as an iterator
for each (const message of c.chat('What is a rickroll?')) {
console.log(message)
}
Option 2
import { Chat } from "ragged"
const c = Chat.with('openai', { apiKey: process.env.OPENAI_API_KEY });
// call and wait for all messages
const messages = await c.chat('What is a rickroll?');
// stream messages as an iterator
for each (const message of c.chat('What is a rickroll?', { stream: true })) {
console.log(message)
}
Option 3
import { Chat } from "ragged"
const c = Chat.with('openai', { apiKey: process.env.OPENAI_API_KEY });
// call and wait for all messages
const messages = await c.chat('What is a rickroll?');
// stream messages as an iterator
for each (const message of c.streamChat('What is a rickroll?')) {
console.log(message)
}
Do you have a better idea? What are you thinking?
We should definitely support streaming -- I had done some work on this before, if you want to check it out, you should see the 0.3.7 tag. I had based it on rxjs back then, and was using the OpenAI API SDK. I ripped it out because it generated a lot of API complexity for Ragged, I felt we should first focus on making the non-streaming use cases clean. Here is a streaming poetry example: https://github.com/monarchwadia/ragged/blob/v0.3.7/examples/comprehensive-api-usage/sub-examples/streaming-poetry.ts
Re: which option, there are a few considerations. I'm about to change the return API for the c.chat
and e.embed
methods in order to support things like getting the original response & request objects including API headers; count tokens and rate limiting information; and some additional utility functions like getLastMessage
which would make life easier (instead of doing messages.at(-1)?.text
which is a mouthful). So chat is going to turn into something like the following:
const {
history,
incomingMessages,
rawResponse,
rawRequest,
getLastMessage
} = await c.chat("What is a rickroll?")
Given that this is the case, i think option 3 is the sensible option. This way we can provide dedicated fine-grained controls for streaming calls that are not relevant for non-streaming calls. Maybe something like...
import { Chat } from "ragged"
const c = Chat.with('openai', { apiKey: process.env.OPENAI_API_KEY });
// call and wait for all messages
const { stream, abortStream } = await c.chatStream('What is a rickroll?');
// stream messages as an iterator with a small chance of aborting the connection randomly
for each (const message of stream) {
if (Math.random() > 0.95) abortStream(); // just illustrating a point
console.log(message);
}
But I think we need to first do the non-streaming API changes first.
I'll start a separate issue for that.
Here is the new issue #18