dominictarr/mux-demux

Doesn't play nice with newer Stream API

rmg opened this issue · 14 comments

rmg commented

The streams created by mux-demux only support the data event interface, not the new readable event as documented in Node 0.10.x.

Or I'm completely misunderstanding things.

yes. but new streams are backwards compatible with classic-streams, to it should still work fine.

Oh, just saw your tweet - can you describe what your problem was, so we can improve documentation?

rmg commented

I fork off a child process which chroots itself. The child process runs a bunch of commands, some concurrently. I wanted to use mux-demux to relay all the chroot's sub-processes stdio to the non-chrooted parent process.

What I encountered along the way was child_process.fork won't make pipes (even when specified in options) and child_process.fork's IPC channel doesn't expose a stream (or isn't documented to do so) that mux-demux can use.

I got things to run by switching to child_process.spawn('node', ['child.js'], {stdio: 'pipe'})

When using the old stream interface, I had to wrap the data in a Buffer. This is probably the wrong thing to do, but this is what I ended up with:

stream.on('data', function (data) {
  data = new Buffer(data).toString('utf8')
  console.log("demuxed: ", data)
})

I tried wrapping the stream like so:

mdm.on('connection', function (demux_stream) {
  var new_stream = new stream.Readable()
  new_stream.wrap(demux_stream)
  new_stream.on('readable', function () {
    console.log("demuxed: ", new_stream.read())
  })
})

But I get a bunch of decoding errors. new_stream.setEncoding() stops the errors, but the data isn't decoded properly.

Aha, it's not new streams that is the problem, but buffers.
mux-demux uses a line separated json protocol,

in coming data gets turned into json, which isn't nice for a buffer.

Are you sending buffers? or strings?

the wrapper that mux-demux uses to serialize the stream is pluggable,

you might want to try using this approach
https://github.com/dominictarr/mux-demux/blob/master/test/msg-pack.js

I have also been considering wrapping mux-demux in the redis protocol
github.com/dominictarr/redis-protocol-stream but they will require changing how some of the internals (that expect json) work.

But still, that is a good option - I've just been waiting until I have a personal need,
If you want to tackle this (it actually just requires adding JSON.parse and JSON.stringify a few places),
I'll be happy to help guide you & merge!

But try it with msg-pack stream first, that might work - if so, I'll update the docs.

rmg commented

I'm not directly sending anything myself. It's short, so here's the toy code I've been working with (not actually using chroot).

parent.js:

var spawn  = require("child_process").spawn
  , mdm    = require("mux-demux")()
  , stream = require("stream")

var child = spawn('node', ['./child.js'], {'stdio': ['ignore', 'pipe', 'ignore']})

child.stdout.pipe(mdm)

child.on('error', console.log)
child.on('exit', function() {
  console.log("Child exit")
})

function data_logger(name) {
  console.log("new logger for: " + name)
  return function (data) {
    data = new Buffer(data).toString('utf8')
    console.log("demuxed " + name + ": ", data[0], data.length)
  }
}

function read_logger(name, demuxed_stream) {
  console.log("new logger for: " + name)
  return function () {
    data = demuxed_stream.read()
    console.log("demuxed " + name + ": ", data[0], data.length)
  }
}

var use_streams2 = true
function connection(demux_stream) {
  console.log("New connection!")
  if (use_streams2) {
    var new_stream = new stream.Readable()
    new_stream.wrap(demux_stream)
    //new_stream.setEncoding('utf8') // bad data with, crash without
    new_stream.on('readable', read_logger(demux_stream.meta, new_stream))
    new_stream.on('error', console.log)
  } else {
    demux_stream.on('data', data_logger(demux_stream.meta))
  }
}

mdm.on('connection', connection)

child.js:

var spawn = require("child_process").spawn
  , mdm = require("mux-demux")()
  , ones = spawn("yes", ["1"], {"stdio": "pipe"})
  , twos = spawn("yes", ["2"], {"stdio": "pipe"})

mdm.pipe(process.stdout)
ones.stdout.pipe(mdm.createWriteStream("ones"))
twos.stdout.pipe(mdm.createWriteStream("twos"))

function done() {
  ones.kill()
  twos.kill()
  setTimeout(process.exit, 1000)
}

setTimeout(done, 500)
rmg commented

Before looking at mux-demux I was using my own JSON based approach that used process.send({name: 'ones', data: data}) and child.on('message') just fired data events on the corresponding object. I was looking for something more stream-y. I saw the discussion in #14. My first thought was protocol buffers, but then I looked at what that took and decided it wasn't as easy to do as to say.

rmg commented

I ended up throwing together my own muxer/demuxer. It's not done yet, but I managed to get it functional enough to sort out what an API I would expect to use for such a thing https://github.com/rmg/unfunnel

Cool! thats great! since you are just getting started, can I recommend you don't use JSON?
it has heavy limitations when you start sending binary data.

you are length delimiting the json packets here https://github.com/rmg/unfunnel/blob/master/index.js#L34
I recommend you use another level of length delimiters, for each field - that will work great with binary data!

rmg commented

JSON was just the make-things-go solution. The packet format is entirely encapsulated in the receive and send methods for the purpose of changing it later :-)

I think what I'm going to do is use len:16, type:8, id:32, payload:(len-6). If type is data, ship payload directly to the stream identified by id (derived from the given name, mapped locally). If type is not data, then payload will be sent to the handler for that message type. So far I'm imagining create and close as message types. Might bang that out over lunch if I get a chance..

You definately need an end message, 'close' was more significant in classic-streams, so I put that in as a message in mux-demux. I regret making a special message for createStream though.

Although It is handy to be able to create a new stream, and send some metadata over, it became awkward because it could only go one way, if the child process (or what ever) KNOWS it's gonna be talking another process it's nicer to have it symmetrical:

//server
cp.stdout.pipe(mx.createStream('rpc')).pipe(cp.stdin)
//client

process.stdin.pipe(mx.createStream('rpc')).pipe(process.stdout)

Other times, you don't know how many connections will come through, and one end is the 'server' and the other the 'client', but looking back I realize that it would have been better to have a simpler stream, and allow the first message to be the header, instead of having a special header message.

you do need an error message, though - but maybe that can be mixed into the end message?

rmg commented

Ya, I guess end is more in line with Node than close.

I was thinking about error and decided that I'd add it once I could think of a situation where there's an error that wasn't mutually exclusive with the ability to send such a message. I certainly need to propagate errors up to the endpoints, though.

For create the only reason I can think of having a message for it in the transport is for propagating the user-friendly name of the endpoint. If I separate name and id and make id simply a hash of name and have the lookups all based on id, then I can probably get away without it.

I've implemented the non-JSON protocol now, btw.

Well, if you just use the name as the "data", then when you see a new name, you assume "create".

@Raynos encountered the first need for errors with mux-demux. if a new stream came in,
and there was not a thing to handle it, you need to send a 404-like error back to requester.

If you are using a multiplexer to tunnel some other stream, like http/tcp you definitely need errors.

rmg commented

Currently the endpoints are auto-created on first reference by either the
user creating a stream or the multiplexer receiving a packet for an
endpoint that doesn't exist yet. Because they are auto-created, there isn't
currently a 404 case. I should be firing at least an advisory
connection event from the multiplexor though.

I'll definitely be adding error message support, I just don't know yet
where to put the code that would actually use it. I might add 404 case
just so I have somewhere to hang it if I can get over the YAGNI.. I should
probably make the auto-creation behaviour a flag.

On Tue, Apr 16, 2013 at 9:49 AM, Dominic Tarr notifications@github.comwrote:

Well, if you just use the name as the "data", then when you see a new
name, you assume "create".

@Raynos https://github.com/Raynos encountered the first need for errors
with mux-demux. if a new stream came in,
and there was not a thing to handle it, you need to send a 404-like error
back to requester.

If you are using a multiplexer to tunnel some other stream, like http/tcp
you definitely need errors.


Reply to this email directly or view it on GitHubhttps://github.com//issues/24#issuecomment-16456644
.

http://twitter.com/rmgraham

No rush. The 404-like error works by having a default handler, that just emits an error - since we wanted an error on the other end - mux-demux has an stream.error(err) method, that sends an error to the other end.