aws/amazon-kinesis-video-streams-parser-library

[QUESTION] Properly restart the pipeline when token is expired

Opened this issue · 21 comments

Hello everyone,

I'm using the KVS producer to send video feed from RTSP camera to KVS (using Python on Raspberry PI), this works perfectly. But on the other side, I'm using the parser library to extract the images and do some tasks on it.

On your documentation it writed that the token for GetMedia expire every 50 minutes, so if we need to extract images 24/7, we need to refresh the token every 45 minutes (cf one of my previous issue on this subject : #108)

So I try to apply something to automatic restart the process every 45 minutes (my code is in Kotlin), which is :

val getMediaWorker = GetMediaWorker.create(
    ........
)

if (getMediaWorker != null) {
     try {
         val future = executorService.submit(getMediaWorker)
         future.get(45, TimeUnit.MINUTES)
     } catch (te: TimeoutException) {
        logger.error("Will restart to avoid auto-stop due to 50 minutes of KVS Session")
        executorService.shutdownNow()
        kvsSessionExpiredOrNeedToRestartConsumer = true
    }
}

This works, my session restart every 45 minutes, and I didn't got a message due tu ClientLimitException or something else. But this solution, cause a NullPointerException after some restarts, and I didn't find why (I try to add a lot of catch to find where is the NullPointerException, but none of my blocks are fired)

The error is the following (hosted images on the error) :
Capture d’écran de 2021-04-13 07-52-31

Maybe you have another way to avoid the issue with the KVS max session duration ? Another thing that my future way to do this ?

Don't hesitate if you need more informations on my use case,

Kind regards,
Florian

Hello there,

I need an update on this case, there is anybody from KVS Team here ?

Thanks for your help

Hey @FlorianRuen,

What kind of credentialprovider are you using?

@unicornss

I'm using something like this in GetMediaWorker :

fun generateCredentials(): AWSStaticCredentialsProvider {
        return AWSStaticCredentialsProvider(
                BasicAWSCredentials(
                        kinesisProperties.accessKey,
                        kinesisProperties.secretKey
                )
        )
    }

So to answer your question, I think i'm using StaticCredentialsProvider

Thanks @FlorianRuen.

The documentation refers to the GetMedia connection that needs to be refreshed every 50 minutes. In your approach, it is possible that the getmedia getting interrupted (due to the thread timing out at 45 min) in the middle of stream ; ideally, it should be stopped at the fragment boundaries and not in the middle or partial frame data. Seems like in your framecreatorvisitor (Line 73) the frame might be null causing that exception. You can try handling that null check to avoid the exception if that works for your use case.

There are few other ways to implement a long running consumer.

One approach you could try to use the application to query the media for a finite time duration of say 30 minutes. This way your GetMedia and image generation finishes at the fragment boundaries and your restart could get the media for next 30 minutes and so on.

@unicornss I tried a lot of think to make it works :

  • I try to handle the NullPointerException but it's never catch and it cause my program to break
  • I try to remove the future to I cause the error above which ClientLimit

Do you have a sample code to do this : get for finite time of 30 minutes and restart in loop ?

Hi @unicornss,

I try to implement something using your references, but I got an error. my fragment list is all the time equals to [ ] (video stream is available, my access key, stream name and region are valid)

EDIT : See my edit below (I understand what happening on this error, but doesn't match my needs)

**_EDIT : This working, my list was empty because I want to create a real time application. So I set the timestampRange values now to now + 30 minutes. But no video stream already sent for this range.

I tried with a past timerange and it seems to works. But this doesn't match my needs. I want to extract the frame as soon as available, because after saving, this omage is analysed and a result need to returned in real-time.

With this behavior, I will need to extract images from now - 30 minutes to now, and I will create an important delay (because it can extract a lot of images on just 2 or 3 minutes, my analysing tool can't ingest the same number OR when I save the image now - 15 minutes, the video feed continue, and it will create a delay)

I'm pretty sure, that it's better to fix my initial problem (that work pretty well except this error of future and token refresh)_**

I tried with a past timerange and it seems to works. But this doesn't match my needs. I want to extract the frame as soon as available, because after saving, this omage is analysed and a result need to returned in real-time.

Seems in your use case you need both continuous media retrieval and real-time processing. Your approach of restarting the Getmedia worker helps to create a new Getmedia connection, but you need to stop that thread at the end of the segment to avoid the exception. Were you able to check if ContinuousGetMediaWorker works for your use case?

I will check this snippet, but I don't really understand how does it works. Do you have more indications on where I need to put getMediaProcessingArguments (FrameRenderer) to specify I want to save each frame on the disk ?

@unicornss

I try to implement the ContinuousGetMediaWorker, and I get an error :

companion object {
        fun create(
            processingVisitor: MkvElementVisitor,
            callback: FragmentMetadataCallback?
        ): FragmentProgressTracker {
            val metadataVisitor = FragmentMetadataVisitor.create()
            return FragmentProgressTracker(
                processingVisitor,
                metadataVisitor,
                **CountVisitor.create(
                    MkvTypeInfos.CLUSTER,
                    MkvTypeInfos.SEGMENT,
                    MkvTypeInfos.SIMPLEBLOCK,
                    MkvTypeInfos.TAG
                ),**

                EndOfSegmentVisitor(metadataVisitor, callback)
            )
        }
    }

The create method isn't found on CountVisitor. I double checked my versions and it seems to be the rights ones.

    implementation("com.amazonaws:aws-java-sdk-kinesis:1.11.487")
    implementation("com.amazonaws:amazon-kinesis-client:1.9.3")
    implementation("software.amazon.awssdk:aws-sdk-java:2.5.45")

Hi @FlorianRuen ,

I was referring to the approach in the ContinousGetMedia worker - uses long running loop (until stopped) and renews the connection in case of errors...

@Override
    public void run() {
        log.info("Start ContinuousGetMedia worker for stream {}", streamName);
        while (!shouldStop.get()) {
                       : 
                getMediaResult = videoMedia.getMedia(new GetMediaRequest().withStreamName(streamName).withStartSelector(selectorToUse));

                      :

   } catch (FrameProcessException e) {
                log.error("FrameProcessException in ContinuousGetMedia worker for stream: " + streamName, e);
                break;
            } catch (IOException | MkvElementVisitException e) {
                log.error("Failure in ContinuousGetMedia worker for stream: " + streamName, e);
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(ie);
            } catch (Throwable t) {
                log.error("Throwable",t);
            } finally {
                closeGetMediaResponse(getMediaResult);
                log.info("Exit processing GetMedia called for stream {}", streamName);
            }

Could this approach work for your solution?

@unicornss

I try to use something like you suggest : a long running loop. But I can't really use it with the GetMediaResult because there is no way to specify a FrameRendererVisitor, and this is the method I use to extract and save frame to the hardisk.

But I can use a long running loop until stop, catch all exception and restart everytime, something like this :

    private fun process(amazonKinesisVideo: AmazonKinesisVideo, streamName: String) {
        val framesFolder ="PATH TO SAVE MY FRAMES"

        while (!shouldStop.get()) {
            val getMediaProcessingArguments = FrameRendererVisitor(framesFolder, FragmentMetadataVisitor.create())
            val executorService = Executors.newFixedThreadPool(4)

            try {
                val getMediaWorker = GetMediaWorker.create(
                        Regions.EU_CENTRAL_1,
                        awsService.generateCredentials(),
                        streamName,
                        StartSelector().withStartSelectorType(StartSelectorType.NOW),
                        amazonKinesisVideo,
                        getMediaProcessingArguments
                )

                executorService.execute(getMediaWorker)

            } catch (e : FrameProcessException) {
                logger.error("FrameProcessException in ContinuousGetMedia worker for stream: $streamName", e);
                break;
            } catch (e : IOException) {
                logger.error("Failure in ContinuousGetMedia worker for stream: $streamName", e);
            } catch (e : MkvElementVisitException) {
                logger.error("Failure in ContinuousGetMedia worker for stream: $streamName", e);
            } catch (ie : InterruptedException) {
                Thread.currentThread().interrupt();
                throw RuntimeException (ie);
            } catch (t : Throwable) {
                logger.error("Throwable", t);
            } finally {
                logger.info("Exit processing GetMedia called for stream {}", streamName);
            }
        }
    }

If I want to use an approch like something you suggest

  • I need to fix the undefined method on CountVisitor.create (used in FragmentProgessTracker, which is used by GetMediaResponseStreamConsumer)

  • I need to find a way to specific a FrameRendererVisitor or something else to write all frame to the hardisk

I will try to launch the code above to see what happened until your next answer.

@unicornss

Update : the code above seems approximatly works, but it fire a lot of exception. Due to the while it restart immediatly and it's not seems to bug. THis is the result of my try

Here is a video from the terminal output for this code : https://eyesr-files.s3.eu-central-1.amazonaws.com/2021-05-12+09-19-06.mkv

@FlorianRuen

Need some time to go through the video and understand how often it restarts and the selector logic.

In the meantime, could you try adding closeGetMediaResponse(getMediaResult); in the finally block as below:

finally {
                closeGetMediaResponse(getMediaResult);
                log.info("Exit processing GetMedia called for stream {}", streamName);
            }

Thanks.

@unicornss I'm using something of type GetMediaResult so I can't use the closeGetMediaResponse (if you see my code above, it's a GetMediaWorker)

@unicornss To update this thread, I run the above code for more than 4 hours. It seems that there is a lot of Exception at the beginning, and after there is exception sometimes.

But another problem appear, it seems that the while loop cause GC Heap space overhead limit / Java Heap space, and the application seems very slow (this problem isn't fired on the older version)

Anyway there is other exception fired such as this screenshot https://i.ibb.co/rbgyBxt/Capture-d-cran-de-2021-05-13-21-19-28.png

@FlorianRuen

I'm using something of type GetMediaResult so I can't use the closeGetMediaResponse (if you see my code above, it's a GetMediaWorker)

Could you please send email to kinesis-video-support@amazon.com the following details:

  • Use case
  • Media format - resolution, fps ,
  • Latency requirements
  • Selector type
  • Issues with your initial approach and impact to the application.
  • Number of streams
  • Sample stream name(s)

This will help us understand your initial approach and any constraints in using the continuousgetmedia worker or how to stop at the right time to avoid frame decoding exception.

Thanks

@unicornss My e-mail is just on the way. Hope we can find a quick solution, because this blocked myself to deploy on more customers.

Hi @unicornss, I just want if you receive my e-mail and if you and KVS Team have time to check out my use case ?

Hello, apologies for the late response, is this still an issue we can assist you with?