Could not get rid of a blocking ReadFrame.
Mr3h1v opened this issue · 5 comments
Even i have set inputFormatCtx.SetInterruptCallback() and called Interrupt method of the result.
Can you share some code? It's hard to help you without having more information
Here is example code. udp ts stream will end after about 30s, and call Interrupt at 35th second, but app still blocking.
package main
import (
"errors"
"fmt"
"log"
"net"
"time"
"github.com/asticode/go-astiav"
)
var (
cb astiav.IOInterrupter
input = "0.0.0.0:10001"
output = "output.flv"
)
func main() {
time.AfterFunc(35*time.Second, func() {
log.Println("try to interrupt")
cb.Interrupt()
})
addr, _ := net.ResolveUDPAddr("udp", input)
conn, _ := net.ListenUDP("udp", addr)
defer conn.Close()
// Alloc packet
pkt := astiav.AllocPacket()
defer pkt.Free()
// Alloc input format context
inputFormatContext := astiav.AllocFormatContext()
if inputFormatContext == nil {
log.Fatal(errors.New("main: input format context is nil"))
}
defer inputFormatContext.Free()
// Set interrupt callback
cb = inputFormatContext.SetInterruptCallback()
// Alloc io context
ioContext, err := astiav.AllocIOContext(4096, false, conn.Read, nil, nil)
if err != nil {
log.Fatal(fmt.Errorf("main: allocating io context failed: %w", err))
}
defer ioContext.Free()
// Store io context
inputFormatContext.SetPb(ioContext)
// Open input
if err := inputFormatContext.OpenInput("", astiav.FindInputFormat("mpegts"), nil); err != nil {
log.Fatal(fmt.Errorf("main: opening input failed: %w", err))
}
defer inputFormatContext.CloseInput()
// Find stream info
if err := inputFormatContext.FindStreamInfo(nil); err != nil {
log.Fatal(fmt.Errorf("main: finding stream info failed: %w", err))
}
// Alloc output format context
outputFormatContext, err := astiav.AllocOutputFormatContext(nil, "flv", output)
if err != nil {
log.Fatal(fmt.Errorf("main: allocating output format context failed: %w", err))
}
if outputFormatContext == nil {
log.Fatal(errors.New("main: output format context is nil"))
}
defer outputFormatContext.Free()
// Loop through streams
inputStreams := make(map[int]*astiav.Stream) // Indexed by input stream index
outputStreams := make(map[int]*astiav.Stream) // Indexed by input stream index
for _, is := range inputFormatContext.Streams() {
// Only process audio or video
if is.CodecParameters().MediaType() != astiav.MediaTypeAudio &&
is.CodecParameters().MediaType() != astiav.MediaTypeVideo {
continue
}
// Add input stream
inputStreams[is.Index()] = is
// Add stream to output format context
os := outputFormatContext.NewStream(nil)
if os == nil {
log.Fatal(errors.New("main: output stream is nil"))
}
// Copy codec parameters
if err = is.CodecParameters().Copy(os.CodecParameters()); err != nil {
log.Fatal(fmt.Errorf("main: copying codec parameters failed: %w", err))
}
// Reset codec tag
os.CodecParameters().SetCodecTag(0)
// Add output stream
outputStreams[is.Index()] = os
}
// If this is a file, we need to use an io context
if !outputFormatContext.OutputFormat().Flags().Has(astiav.IOFormatFlagNofile) {
// Open io context
ioContext, err := astiav.OpenIOContext(output, astiav.NewIOContextFlags(astiav.IOContextFlagWrite))
if err != nil {
log.Fatal(fmt.Errorf("main: opening io context failed: %w", err))
}
defer ioContext.Close() //nolint:errcheck
// Update output format context
outputFormatContext.SetPb(ioContext)
}
// Write header
if err = outputFormatContext.WriteHeader(nil); err != nil {
log.Fatal(fmt.Errorf("main: writing header failed: %w", err))
}
// Loop through packets
for {
// Read frame
if err = inputFormatContext.ReadFrame(pkt); err != nil {
if errors.Is(err, astiav.ErrEof) {
break
}
log.Fatal(fmt.Errorf("main: reading frame failed: %w", err))
}
// Get input stream
inputStream, ok := inputStreams[pkt.StreamIndex()]
if !ok {
pkt.Unref()
continue
}
// Get output stream
outputStream, ok := outputStreams[pkt.StreamIndex()]
if !ok {
pkt.Unref()
continue
}
// Update packet
pkt.SetStreamIndex(outputStream.Index())
pkt.RescaleTs(inputStream.TimeBase(), outputStream.TimeBase())
pkt.SetPos(-1)
// Write frame
if err = outputFormatContext.WriteInterleavedFrame(pkt); err != nil {
log.Fatal(fmt.Errorf("main: writing interleaved frame failed: %w", err))
}
}
// Write trailer
if err = outputFormatContext.WriteTrailer(); err != nil {
log.Fatal(fmt.Errorf("main: writing trailer failed: %w", err))
}
// Success
log.Println("success")
}
FYI the very same code (minus the udp binding) works properly if you replace inputFormatContext.OpenInput("", astiav.FindInputFormat("mpegts"), nil)
with inputFormatContext.OpenInput("udp://"+input, astiav.FindInputFormat("mpegts"), nil)
and remove the custom io context. Therefore if that fits your original use case, I'd recommend using this instead.
However, if you can't, the reason why this is blocking is because the conn.Read
method is blocking and therefore the interrupt callback is never called (the interrupt callback is not called in parallel of the read function). To fix this problem, you need to find a way to provide a read function to AllocIOContext
that doesn't block when the stream has stopped.
For instance replacing cb.Interrupt()
with conn.Close()
in your example actually unblocks everything 👍 Needless to say that you can remove the IO interrupter logic in that case
Thanks a lot. That is just a simplified example above.
I must to use AllocIOContext because i only got an io.Reader In my real problem.
I will find a way to close it.