asticode/go-astiav

Could not get rid of a blocking ReadFrame.

Mr3h1v opened this issue · 5 comments

Even i have set inputFormatCtx.SetInterruptCallback() and called Interrupt method of the result.

Can you share some code? It's hard to help you without having more information

Here is example code. udp ts stream will end after about 30s, and call Interrupt at 35th second, but app still blocking.

package main

import (
	"errors"
	"fmt"
	"log"
	"net"
	"time"

	"github.com/asticode/go-astiav"
)

var (
	cb     astiav.IOInterrupter
	input  = "0.0.0.0:10001"
	output = "output.flv"
)

func main() {
	time.AfterFunc(35*time.Second, func() {
		log.Println("try to interrupt")
		cb.Interrupt()
	})

	addr, _ := net.ResolveUDPAddr("udp", input)
	conn, _ := net.ListenUDP("udp", addr)
	defer conn.Close()

	// Alloc packet
	pkt := astiav.AllocPacket()
	defer pkt.Free()

	// Alloc input format context
	inputFormatContext := astiav.AllocFormatContext()
	if inputFormatContext == nil {
		log.Fatal(errors.New("main: input format context is nil"))
	}
	defer inputFormatContext.Free()

	// Set interrupt callback
	cb = inputFormatContext.SetInterruptCallback()

	// Alloc io context
	ioContext, err := astiav.AllocIOContext(4096, false, conn.Read, nil, nil)
	if err != nil {
		log.Fatal(fmt.Errorf("main: allocating io context failed: %w", err))
	}
	defer ioContext.Free()

	// Store io context
	inputFormatContext.SetPb(ioContext)

	// Open input
	if err := inputFormatContext.OpenInput("", astiav.FindInputFormat("mpegts"), nil); err != nil {
		log.Fatal(fmt.Errorf("main: opening input failed: %w", err))
	}
	defer inputFormatContext.CloseInput()

	// Find stream info
	if err := inputFormatContext.FindStreamInfo(nil); err != nil {
		log.Fatal(fmt.Errorf("main: finding stream info failed: %w", err))
	}

	// Alloc output format context
	outputFormatContext, err := astiav.AllocOutputFormatContext(nil, "flv", output)
	if err != nil {
		log.Fatal(fmt.Errorf("main: allocating output format context failed: %w", err))
	}
	if outputFormatContext == nil {
		log.Fatal(errors.New("main: output format context is nil"))
	}
	defer outputFormatContext.Free()

	// Loop through streams
	inputStreams := make(map[int]*astiav.Stream)  // Indexed by input stream index
	outputStreams := make(map[int]*astiav.Stream) // Indexed by input stream index
	for _, is := range inputFormatContext.Streams() {
		// Only process audio or video
		if is.CodecParameters().MediaType() != astiav.MediaTypeAudio &&
			is.CodecParameters().MediaType() != astiav.MediaTypeVideo {
			continue
		}

		// Add input stream
		inputStreams[is.Index()] = is

		// Add stream to output format context
		os := outputFormatContext.NewStream(nil)
		if os == nil {
			log.Fatal(errors.New("main: output stream is nil"))
		}

		// Copy codec parameters
		if err = is.CodecParameters().Copy(os.CodecParameters()); err != nil {
			log.Fatal(fmt.Errorf("main: copying codec parameters failed: %w", err))
		}

		// Reset codec tag
		os.CodecParameters().SetCodecTag(0)

		// Add output stream
		outputStreams[is.Index()] = os
	}

	// If this is a file, we need to use an io context
	if !outputFormatContext.OutputFormat().Flags().Has(astiav.IOFormatFlagNofile) {
		// Open io context
		ioContext, err := astiav.OpenIOContext(output, astiav.NewIOContextFlags(astiav.IOContextFlagWrite))
		if err != nil {
			log.Fatal(fmt.Errorf("main: opening io context failed: %w", err))
		}
		defer ioContext.Close() //nolint:errcheck

		// Update output format context
		outputFormatContext.SetPb(ioContext)
	}

	// Write header
	if err = outputFormatContext.WriteHeader(nil); err != nil {
		log.Fatal(fmt.Errorf("main: writing header failed: %w", err))
	}

	// Loop through packets
	for {
		// Read frame
		if err = inputFormatContext.ReadFrame(pkt); err != nil {
			if errors.Is(err, astiav.ErrEof) {
				break
			}
			log.Fatal(fmt.Errorf("main: reading frame failed: %w", err))
		}

		// Get input stream
		inputStream, ok := inputStreams[pkt.StreamIndex()]
		if !ok {
			pkt.Unref()
			continue
		}

		// Get output stream
		outputStream, ok := outputStreams[pkt.StreamIndex()]
		if !ok {
			pkt.Unref()
			continue
		}

		// Update packet
		pkt.SetStreamIndex(outputStream.Index())
		pkt.RescaleTs(inputStream.TimeBase(), outputStream.TimeBase())
		pkt.SetPos(-1)

		// Write frame
		if err = outputFormatContext.WriteInterleavedFrame(pkt); err != nil {
			log.Fatal(fmt.Errorf("main: writing interleaved frame failed: %w", err))
		}
	}

	// Write trailer
	if err = outputFormatContext.WriteTrailer(); err != nil {
		log.Fatal(fmt.Errorf("main: writing trailer failed: %w", err))
	}

	// Success
	log.Println("success")
}

FYI the very same code (minus the udp binding) works properly if you replace inputFormatContext.OpenInput("", astiav.FindInputFormat("mpegts"), nil) with inputFormatContext.OpenInput("udp://"+input, astiav.FindInputFormat("mpegts"), nil) and remove the custom io context. Therefore if that fits your original use case, I'd recommend using this instead.

However, if you can't, the reason why this is blocking is because the conn.Read method is blocking and therefore the interrupt callback is never called (the interrupt callback is not called in parallel of the read function). To fix this problem, you need to find a way to provide a read function to AllocIOContext that doesn't block when the stream has stopped.

For instance replacing cb.Interrupt() with conn.Close() in your example actually unblocks everything 👍 Needless to say that you can remove the IO interrupter logic in that case

Thanks a lot. That is just a simplified example above.
I must to use AllocIOContext because i only got an io.Reader In my real problem.
I will find a way to close it.