theme | ||
---|---|---|
|
- official Ruby SDK coming some times this week
- among other things
- owner of the ruby printer
- they even support streaming!
client = OpenAI::Client.new
client.chat(
parameters: {
model: "llama3",
messages: [{ role: "user", content: "Hello!"}],
temperature: 0.7,
stream: proc do |chunk, _bytesize|
print chunk.dig("choices", 0, "delta", "content")
end
}
)
Official openai/openai-ruby
- looks pretty much the same
# typed: strong
client = OpenAI::Client.new
stream = client.chat.stream_raw(
model: "gpt-4",
messages: [{ role: "user", content: "Hello!"}],
temperature: 0.7,
)
stream.each do |chunk|
pp chunk.choices
end
- typed: strong
# typed: strong
- #each
stream.each
-
OpenAI ↔ HTTP ↔ JSON ↔ SDK ↔ Ruby
-
Server ↔ Transport ↔ Data ↔ Decoding / Encoding ↔ Your Code
- community SDK
module OpenAI
class Client
include OpenAI::HTTP
# ...
def chat(parameters: {})
json_post(path: "/chat/completions", parameters: parameters)
end
def embeddings(parameters: {})
json_post(path: "/embeddings", parameters: parameters)
end
def completions(parameters: {})
json_post(path: "/completions", parameters: parameters)
end
# ...
end
end
- you
- and your machine
- usually yard docs are kinda meh
- vscode solargraph generics support
- vscode solargraph generics support
-
typed: strong
-
mistake 1: "role" → "rule"
-
mistake 2: "choices.first.delta" → "choices.first&.delta"
-
does anybody even use this?
-
we just support it because we can
module OpenAI
module Models
type completion =
{
id: String,
choices: ::Array[OpenAI::Models::CompletionChoice],
created: Integer,
model: String,
object: :text_completion,
}
class Completion < OpenAI::Internal::Type::BaseModel
attr_accessor id: String
attr_accessor choices: ::Array[OpenAI::Models::CompletionChoice]
attr_accessor created: Integer
attr_accessor model: String
attr_accessor object: :text_completion
# ...
def to_hash: -> OpenAI::Models::completion
end
end
end
-
every tool chain in Ruby reads nominal types. e.g. Class
-
once you start writing Classes...
- do nothing, win
- do everything, lose
- i speak for every programmer
- my abstractions
- other people's abstractions
-
human readable DSL for defining data containers
-
SDK readable metadata for performing canonicalization during (de)serialization
-
tooling readable anchors points for static analysis
- data → Class
module OpenAI
module Models
# @see OpenAI::Resources::Completions#create
#
# @see OpenAI::Resources::Completions#create_streaming
class Completion < OpenAI::Internal::Type::BaseModel
# @!attribute id
# @return [String]
required :id, String
# @!attribute choices
# @return [Array<OpenAI::Models::CompletionChoice>]
required :choices, -> { OpenAI::Internal::Type::ArrayOf[OpenAI::Models::CompletionChoice] }
# @!attribute created
# @return [Integer]
required :created, Integer
# @!attribute model
# @return [String]
required :model, String
# @!attribute object
# @return [Symbol, :text_completion]
required :object, const: :text_completion
end
end
end
-
unions
-
expresses a set of possible types
module Anthropic
module Models
module RawMessageStreamEvent
extend Anthropic::Internal::Type::Union
discriminator :type
variant :message_start, -> { Anthropic::Models::RawMessageStartEvent }
variant :message_delta, -> { Anthropic::Models::RawMessageDeltaEvent }
variant :message_stop, -> { Anthropic::Models::RawMessageStopEvent }
variant :content_block_start, -> { Anthropic::Models::RawContentBlockStartEvent }
variant :content_block_delta, -> { Anthropic::Models::RawContentBlockDeltaEvent }
variant :content_block_stop, -> { Anthropic::Models::RawContentBlockStopEvent }
end
end
end
-
github repo is not public because: "😭, they forgot to give me permission"
-
Class → data
-
deconstruction
stream = client.completions.create_streaming(
model: :"gpt-3.5-turbo-instruct",
prompt: "1,2,3,",
max_tokens: 5,
temperature: 0.0
)
# deconstruct the first `completion` object, and pluck out the first `choice`
stream.first => { choices: [choice] }
# choice here is still a class type
pp choice.text
pp choice.finish_reason
- recursive consistency
# you can also do it recursively
stream.first => { choices: [{ text: , finish_reason: }] }
pp text
pp finish_reason
- trade static analysis for thinness
- great for static analysis
stream =
anthropic.messages.create_streaming(
max_tokens: 1024,
messages: [{role: :user, content: "Say hello there!"}],
model: :"claude-3-7-sonnet-latest"
)
event = stream.first
# true
puts(event in Anthropic::Models::RawMessageStreamEvent)
case event
when Anthropic::Models::RawMessageStartEvent
pp(event.message)
when Anthropic::Models::RawMessageDeltaEvent
pp(event.delta)
when Anthropic::Models::RawContentBlockStartEvent
pp(event.content_block)
else
# ...
end
-
if you get some data from the SDK:
-
they work with:
case ... in
andcase ... when
-
Enum, Union, BaseModel, ArrayOf, HashOf, etc.
-
- deconsturct any Class in the SDK
case event
in {type: :message_start, message: {content: content}}
pp(content)
in {type: :message_delta, delta: delta}
pp(delta)
in {type: :content_block_start, content_block: content_block}
pp(content_block)
else
# ...
end
- recursive consistency
class OpenAI::Test::Resources::Chat::CompletionsTest < OpenAI::Test::ResourceTest
def test_create_required_params
response =
@openai.chat.completions.create(messages: [{content: "string", role: :developer}], model: :"o3-mini")
assert_pattern do
response => OpenAI::Models::Chat::ChatCompletion
end
assert_pattern do
response => {
id: String,
choices: ^(OpenAI::Internal::Type::ArrayOf[OpenAI::Models::Chat::ChatCompletion::Choice]),
created: Integer,
model: String,
object: Symbol,
service_tier: OpenAI::Models::Chat::ChatCompletion::ServiceTier | nil,
system_fingerprint: String | nil,
usage: OpenAI::Models::CompletionUsage | nil
}
end
end
end
- in fact, most tests are written via recursive pattern matching assertions
- Server ↔ Transport ↔ Data ↔ Encoding / Decoding ↔ Your Code
- Server ↔ Transport ↔ Data ↔ (Decoding Canonicalization) ↔ (De)-Nominalization ↔ (Encoding Canonicalization) ↔ Your Code
-
(content-type + bytes) → decoding → primitive data → fancy data
-
("JSON" +
...
) → JSON.parse → "1969-12-31" → Date.parse("1969-12-31")
-
nominal type (Class) → primitive data → (content-type + encode)
-
OpenAI::Models::ChatCompletionMessageParam → {role: :user, content: "Hello!"} → JSON.stringify({role: :user, content: "Hello!"})
- we use standard library "net-http"
-
the entire HTTP pipeline is a single stream
-
does not buffer entire file into memory for either upstream or down stream
require "pathname"
file_object = openai.files.create(file: Pathname("input.jsonl"), purpose: "fine-tune")
puts(file_object.id)
- buffering
file = File.read("input.jsonl")
file_object = openai.files.create(file: StringIO.new(file), purpose: "fine-tune")
puts(file_object.id)
-
"#.each" means that we have implemented Enumerable protocol
-
all HTTP responses in sdk are Enumerators of strings, and we do successive stream processing
stream = client.completions.create_streaming(
model: :"gpt-3.5-turbo-instruct",
prompt: "1,2,3,",
max_tokens: 5,
temperature: 0.0
)
stream_of_choices =
stream
.lazy
.select do |completion|
completion.object == :text_completion
end
.flat_map do |completion|
completion.choices
end
# calling #.each cleans up HTTP connection
stream_of_choices.each do |choice|
pp(choice)
end
- note the automatic HTTP connection cleanup
def decode_lines(enum)
re = /(\r\n|\r|\n)/
buffer = String.new.b
cr_seen = nil
chain_fused(enum) do |y|
enum.each do |row|
offset = buffer.bytesize
buffer << row
while (match = re.match(buffer, cr_seen&.to_i || offset))
case [match.captures.first, cr_seen]
in ["\r", nil]
cr_seen = match.end(1)
next
in ["\r" | "\r\n", Integer]
y << buffer.slice!(..(cr_seen.pred))
else
y << buffer.slice!(..(match.end(1).pred))
end
offset = 0
cr_seen = nil
end
end
y << buffer.slice!(..(cr_seen.pred)) unless cr_seen.nil?
y << buffer unless buffer.empty?
end
end
def decode_sse(lines)
# rubocop:disable Metrics/BlockLength
chain_fused(lines) do |y|
blank = {event: nil, data: nil, id: nil, retry: nil}
current = {}
lines.each do |line|
case line.sub(/\R$/, "")
in ""
next if current.empty?
y << {**blank, **current}
current = {}
in /^:/
next
in /^([^:]+):\s?(.*)$/
field, value = Regexp.last_match.captures
case field
in "event"
current.merge!(event: value)
in "data"
(current[:data] ||= String.new.b) << (value << "\n")
in "id" unless value.include?("\0")
current.merge!(id: value)
in "retry" if /^\d+$/ =~ value
current.merge!(retry: Integer(value))
else
end
else
end
end
# rubocop:enable Metrics/BlockLength
y << {**blank, **current} unless current.empty?
end
end
private def iterator
# rubocop:disable Metrics/BlockLength
@iterator ||= OpenAI::Internal::Util.chain_fused(@stream) do |y|
consume = false
@stream.each do |msg|
next if consume
case msg
in { data: String => data } if data.start_with?("[DONE]")
consume = true
next
in { data: String => data }
case JSON.parse(data, symbolize_names: true)
in { error: error }
message =
case error
in String
error
in { message: String => m }
m
else
"An error occurred during streaming"
end
OpenAI::Errors::APIError.for(
url: @url,
status: @status,
body: body,
request: nil,
response: @response,
message: message
)
in decoded
y << OpenAI::Internal::Type::Converter.coerce(@model, decoded)
end
else
end
end
end
# rubocop:enable Metrics/BlockLength
end
- the streaming core is implemented via using a ruby Fiber to flip the push based iteration from "net-http"
# ... writing this up 1 hour before the talk
# sorry! its a state machine here OK
- into an Enumerator
enum = Enumerator.new do |y|
y << 1
y << 2
y << 3
end
# who knew?
enum.rewind
# clean up mechanism
enum.rewind >> prev_enum.rewind >> prev_prev_enum.rewind
def fused_enum(enum, external: false, &close)
fused = false
iter = Enumerator.new do |y|
next if fused
fused = true
if external
loop { y << enum.next }
else
enum.each(&y)
end
ensure
close&.call
close = nil
end
iter.define_singleton_method(:rewind) do
fused = true
self
end
iter
end
def close_fused!(enum)
enum.rewind.each { break }
end
- EOF