timpalpant/go-iex

UDP -> NewPcapScanner

qrpike opened this issue ยท 7 comments

I am trying to pipe a UDP stream into the pcap parser. It seems to take a io.Reader which the UDP go package supports.

conn, _ := net.ListenMulticastUDP("udp", iface, addr)
packetDataSource, err := iex.NewPacketDataSource( conn )
if err != nil {
    panic(err)
}
pcapScanner := iex.NewPcapScanner(packetDataSource)
// decoding stuff here

However I get the following error:
panic: Unknown magic 80040001

Any insights would be greatly appreciated.

Thanks,

I started to look into this last night but need to get a UDP replay set up. I suspect the net package is decoding the packets already, so we might only need to unmarshal the payload (untested):

import (
    "net"

    "github.com/timpalpant/go-iex/iextp"
    _ "github.com/timpalpant/go-iex/iextp/deep"
    _ "github.com/timpalpant/go-iex/iextp/tops"
)

const maxUDPPacketSize = 65535

conn, _ := net.ListenMulticastUDP("udp", iface, addr)
buf := make([]byte, maxUDPPacketSize)

for {
      // Read next UDP packet.
      n, _, err := conn.ReadFrom(buf)
      if err != nil {
        panic(err)
      }
      payload := buf[:n]

      // Unmarshal it according to IEX Transport Protocol.
      segment := iextp.Segment{}
      if err := segment.Unmarshal(payload); err != nil {
        panic(err)
      }

      for msg := range segment.Messages {
        // Do something with the iextp.Message
      }
}

We should rename NewPacketDataSource to indicate that it only supports pcap(ng) files, and add a function for creating a PcapScanner from a net.PacketConnection. I'll look into it further tonight.

I was also working on this last night, and made some progress:

package main

import (
	"log"
	"net"
	"fmt"
	iextp "github.com/timpalpant/go-iex/iextp"
)

const (
	maxDatagramSize = 65535
	multicastAddress = "233.215.21.4:10378"
)


func main(){
	fmt.Printf("Listening on %s\n", multicastAddress)
	Listen(multicastAddress, msgHandler)
}

func msgHandler(src *net.UDPAddr, n int, b []byte) {
	log.Println(n, "bytes read from", src)
	// log.Println(hex.Dump(b[:n]))

	// This part works fine:
	header := iextp.SegmentHeader{}
	if err := header.Unmarshal( b[:40] ); err != nil {
		println("Decoding header error")
		panic( err )
	}
	fmt.Println( header )

	// This fails with: "panic: unknown message protocol: 32772"
	segment := iextp.Segment{}
	if err := segment.Unmarshal( b[:n] ); err != nil {
		println("Decoding segment error")
		panic( err )
	}
	println("Successfully decoded message")
}



func Listen(address string, handler func(*net.UDPAddr, int, []byte)) {
	// Parse the string address
	addr, err := net.ResolveUDPAddr("udp", address)
	if err != nil {
		log.Fatal(err)
	}
	// Open up a connection
	conn, err := net.ListenMulticastUDP("udp", nil, addr)
	if err != nil {
		log.Fatal(err)
	}
	conn.SetReadBuffer(maxDatagramSize)
	// Loop forever reading from the socket
	for {
		buffer := make([]byte, maxDatagramSize)
		numBytes, src, err := conn.ReadFromUDP(buffer)
		if err != nil {
			log.Fatal("ReadFromUDP failed:", err)
		}
		handler(src, numBytes, buffer)
	}
}

The header parsing works, but parsing the entire segment seems to fail.

To test, you can just run that code and install tcpreplay to replay a pcap file. brew install tcpreplay and then sudo tcpreplay -i en0 sample.pcap

Nice. Can you try adding an import _ "github.com/timpalpant/go-iex/iextp/deep"? It will register the DEEP protocol as a side effect, which I think is protocol 32772.

Hmm, still gets the same error.

Hmm, here is the verbatim program I am running (I just added the import to yours):

package main

import (
	"fmt"
	"log"
	"net"

	iextp "github.com/timpalpant/go-iex/iextp"
	_ "github.com/timpalpant/go-iex/iextp/deep"
)

const (
	maxDatagramSize  = 65535
	multicastAddress = "233.215.21.4:10378"
)

func main() {
	fmt.Printf("Listening on %s\n", multicastAddress)
	Listen(multicastAddress, msgHandler)
}

func msgHandler(src *net.UDPAddr, n int, b []byte) {
	log.Println(n, "bytes read from", src)
	// log.Println(hex.Dump(b[:n]))

	// This part works fine:
	header := iextp.SegmentHeader{}
	if err := header.Unmarshal(b[:40]); err != nil {
		println("Decoding header error")
		panic(err)
	}
	fmt.Println(header)

	// This fails with: "panic: unknown message protocol: 32772"
	segment := iextp.Segment{}
	if err := segment.Unmarshal(b[:n]); err != nil {
		println("Decoding segment error")
		panic(err)
	}
	println("Successfully decoded message")
}

func Listen(address string, handler func(*net.UDPAddr, int, []byte)) {
	// Parse the string address
	addr, err := net.ResolveUDPAddr("udp", address)
	if err != nil {
		log.Fatal(err)
	}
	// Open up a connection
	conn, err := net.ListenMulticastUDP("udp", nil, addr)
	if err != nil {
		log.Fatal(err)
	}
	conn.SetReadBuffer(maxDatagramSize)
	// Loop forever reading from the socket
	for {
		buffer := make([]byte, maxDatagramSize)
		numBytes, src, err := conn.ReadFromUDP(buffer)
		if err != nil {
			log.Fatal("ReadFromUDP failed:", err)
		}
		handler(src, numBytes, buffer)
	}
}

and I'm testing by running

$ go build
$ ./test_udp &
$ sudo tcpreplay -i en0 ~/Downloads/data-feeds-20180127-20180127_IEXTP1_DEEP1.0.pcap

against the DEEP sample file from here: https://www.googleapis.com/download/storage/v1/b/iex/o/data%2Ffeeds%2F20180127%2F20180127_IEXTP1_DEEP1.0.pcap.gz?generation=1517101215560431&alt=media

In the stdout I get:

Listening on 233.215.21.4:10378
2018/10/25 12:18:20 1470 bytes read from 23.226.155.132:10378
{1 0 32772 1 1150681088 1430 66 2848 133 2018-01-27 13:00:16.623722248 +0000 UTC}
Successfully decoded message
2018/10/25 12:18:20 1470 bytes read from 23.226.155.132:10378
{1 0 32772 1 1150681088 1430 66 44318 2047 2018-01-27 13:00:16.624837389 +0000 UTC}
Successfully decoded message
...

Strangest thing ever. I forked the repo to try to use PCAP and was importing iextp from one repo, and deep from this repo. Seems to be working now. Would be amazing to have this built into the project, but not required. This is for users who have a cross connect directly to IEX and need to parse the incoming UDP multicast stream.

Yep, it's a good addition, and I think your initial snippet seems like a good interface to implement. I can take a shot at adding it this weekend.