UDP -> NewPcapScanner
qrpike opened this issue ยท 7 comments
I am trying to pipe a UDP stream into the pcap parser. It seems to take a io.Reader which the UDP go package supports.
conn, _ := net.ListenMulticastUDP("udp", iface, addr)
packetDataSource, err := iex.NewPacketDataSource( conn )
if err != nil {
panic(err)
}
pcapScanner := iex.NewPcapScanner(packetDataSource)
// decoding stuff here
However I get the following error:
panic: Unknown magic 80040001
Any insights would be greatly appreciated.
Thanks,
I started to look into this last night but need to get a UDP replay set up. I suspect the net
package is decoding the packets already, so we might only need to unmarshal the payload (untested):
import (
"net"
"github.com/timpalpant/go-iex/iextp"
_ "github.com/timpalpant/go-iex/iextp/deep"
_ "github.com/timpalpant/go-iex/iextp/tops"
)
const maxUDPPacketSize = 65535
conn, _ := net.ListenMulticastUDP("udp", iface, addr)
buf := make([]byte, maxUDPPacketSize)
for {
// Read next UDP packet.
n, _, err := conn.ReadFrom(buf)
if err != nil {
panic(err)
}
payload := buf[:n]
// Unmarshal it according to IEX Transport Protocol.
segment := iextp.Segment{}
if err := segment.Unmarshal(payload); err != nil {
panic(err)
}
for msg := range segment.Messages {
// Do something with the iextp.Message
}
}
We should rename NewPacketDataSource
to indicate that it only supports pcap(ng) files, and add a function for creating a PcapScanner
from a net.PacketConnection
. I'll look into it further tonight.
I was also working on this last night, and made some progress:
package main
import (
"log"
"net"
"fmt"
iextp "github.com/timpalpant/go-iex/iextp"
)
const (
maxDatagramSize = 65535
multicastAddress = "233.215.21.4:10378"
)
func main(){
fmt.Printf("Listening on %s\n", multicastAddress)
Listen(multicastAddress, msgHandler)
}
func msgHandler(src *net.UDPAddr, n int, b []byte) {
log.Println(n, "bytes read from", src)
// log.Println(hex.Dump(b[:n]))
// This part works fine:
header := iextp.SegmentHeader{}
if err := header.Unmarshal( b[:40] ); err != nil {
println("Decoding header error")
panic( err )
}
fmt.Println( header )
// This fails with: "panic: unknown message protocol: 32772"
segment := iextp.Segment{}
if err := segment.Unmarshal( b[:n] ); err != nil {
println("Decoding segment error")
panic( err )
}
println("Successfully decoded message")
}
func Listen(address string, handler func(*net.UDPAddr, int, []byte)) {
// Parse the string address
addr, err := net.ResolveUDPAddr("udp", address)
if err != nil {
log.Fatal(err)
}
// Open up a connection
conn, err := net.ListenMulticastUDP("udp", nil, addr)
if err != nil {
log.Fatal(err)
}
conn.SetReadBuffer(maxDatagramSize)
// Loop forever reading from the socket
for {
buffer := make([]byte, maxDatagramSize)
numBytes, src, err := conn.ReadFromUDP(buffer)
if err != nil {
log.Fatal("ReadFromUDP failed:", err)
}
handler(src, numBytes, buffer)
}
}
The header parsing works, but parsing the entire segment seems to fail.
To test, you can just run that code and install tcpreplay to replay a pcap file. brew install tcpreplay
and then sudo tcpreplay -i en0 sample.pcap
Nice. Can you try adding an import _ "github.com/timpalpant/go-iex/iextp/deep"
? It will register the DEEP protocol as a side effect, which I think is protocol 32772.
Hmm, still gets the same error.
Hmm, here is the verbatim program I am running (I just added the import to yours):
package main
import (
"fmt"
"log"
"net"
iextp "github.com/timpalpant/go-iex/iextp"
_ "github.com/timpalpant/go-iex/iextp/deep"
)
const (
maxDatagramSize = 65535
multicastAddress = "233.215.21.4:10378"
)
func main() {
fmt.Printf("Listening on %s\n", multicastAddress)
Listen(multicastAddress, msgHandler)
}
func msgHandler(src *net.UDPAddr, n int, b []byte) {
log.Println(n, "bytes read from", src)
// log.Println(hex.Dump(b[:n]))
// This part works fine:
header := iextp.SegmentHeader{}
if err := header.Unmarshal(b[:40]); err != nil {
println("Decoding header error")
panic(err)
}
fmt.Println(header)
// This fails with: "panic: unknown message protocol: 32772"
segment := iextp.Segment{}
if err := segment.Unmarshal(b[:n]); err != nil {
println("Decoding segment error")
panic(err)
}
println("Successfully decoded message")
}
func Listen(address string, handler func(*net.UDPAddr, int, []byte)) {
// Parse the string address
addr, err := net.ResolveUDPAddr("udp", address)
if err != nil {
log.Fatal(err)
}
// Open up a connection
conn, err := net.ListenMulticastUDP("udp", nil, addr)
if err != nil {
log.Fatal(err)
}
conn.SetReadBuffer(maxDatagramSize)
// Loop forever reading from the socket
for {
buffer := make([]byte, maxDatagramSize)
numBytes, src, err := conn.ReadFromUDP(buffer)
if err != nil {
log.Fatal("ReadFromUDP failed:", err)
}
handler(src, numBytes, buffer)
}
}
and I'm testing by running
$ go build
$ ./test_udp &
$ sudo tcpreplay -i en0 ~/Downloads/data-feeds-20180127-20180127_IEXTP1_DEEP1.0.pcap
against the DEEP sample file from here: https://www.googleapis.com/download/storage/v1/b/iex/o/data%2Ffeeds%2F20180127%2F20180127_IEXTP1_DEEP1.0.pcap.gz?generation=1517101215560431&alt=media
In the stdout I get:
Listening on 233.215.21.4:10378
2018/10/25 12:18:20 1470 bytes read from 23.226.155.132:10378
{1 0 32772 1 1150681088 1430 66 2848 133 2018-01-27 13:00:16.623722248 +0000 UTC}
Successfully decoded message
2018/10/25 12:18:20 1470 bytes read from 23.226.155.132:10378
{1 0 32772 1 1150681088 1430 66 44318 2047 2018-01-27 13:00:16.624837389 +0000 UTC}
Successfully decoded message
...
Strangest thing ever. I forked the repo to try to use PCAP and was importing iextp
from one repo, and deep
from this repo. Seems to be working now. Would be amazing to have this built into the project, but not required. This is for users who have a cross connect directly to IEX and need to parse the incoming UDP multicast stream.
Yep, it's a good addition, and I think your initial snippet seems like a good interface to implement. I can take a shot at adding it this weekend.