S3 / Kinesis eventstream, events, eventpayloads not handled (at all)
castaway opened this issue · 25 comments
S3::SelectObjectContent returns not a chunk of result, but an eventstream (one or more of its possible members) - formatted using the user given OutputSerialization values.
Paws does not recognise, or deal with "event":true style structures (nor eventstream, eventpayload structures), thus we currently have no way of getting at the resulting data from a SelectObjectContent method call.
Kinesis::SubscribeToShard uses the same functionality - these seem to be the only two places in the entire aws/botocore data.
boto3 has an EventStream - https://botocore.amazonaws.com/v1/documentation/api/latest/reference/eventstream.html
This should be fixed on this branch
https://github.com/byterock/aws-sdk-perl/tree/s3ObjectTagging
hey, I was just looking at your branch / 10_responses.t test, and that doesnt look anything like what Im getting back from calling SelectObjectContent on actual AWS.. how/where did you handle the eventstream part?
hmm maybe I didn't
If I remember correctly That might be one of the commnand I was never able to get to work.
Most likley I just got the XML to work correctly. Looking at the 09_response test I never did make a real SQL query.
Do you have an simple example of your call and expected response? If you do post it here and thent I can play about with it.
Do you have a real world example of an SQL that would retrun real results
I've just added a test (that passes but isnt the full story) to t/s3/selectcontent.t - https://github.com/pplu/aws-sdk-perl/pull/265/commits .. the response object returned is:
0 Paws::Net::APIResponse=HASH(0x5381ae8) 'content' => "\c@\c@\cDW\c@\c@\c@Uo�+�\cM:message-type\cG\c@\cEevent\cK:event-type\cG\c@\cGRecords\cM:content-type\cG\c@\cXapplication/octet-stream{\"_1\":\"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\"};�z��\c@\c@\c@�\c@\c@\c@C�\cV�\cM:message-type\cG\c@\cEevent\cK:event-type\cG\c@\cEStats\cM:content-type\cG\c@\cHtext/xml<Stats xmlns=\"\"><BytesScanned>1000</BytesScanned><BytesProcessed>1000</BytesProcessed><BytesReturned>1010</BytesReturned></Stats>�ɬ�\c@\c@\c@8\c@\c@\c@(�Ƅ�\cM:message-type\cG\c@\cEevent\cK:event-type\cG\c@\cCEndϗӒ" 'headers' => HASH(0x5381c20) 'connection' => 'close' 'date' => 'Thu, 30 Jan 2020 11:59:48 GMT' 'server' => 'AmazonS3' 'transfer-encoding' => 'chunked' 'x-amz-id-2' => 'Rj/J45PuoiroynJe9uKkzUri8994sZ4HU5EPUXIVj/0G90MXln3Ileqfj8+Muz56Zh6nYAd2enM=' 'x-amz-request-id' => 'DDF8DE515326E08F' 'status' => 200
If you look at the content there, and the linked boto3 code, it appears "eventstreaming" needs to break up the content response by each event in the stream (using whatever the user said as a separator, mine was ";"), and return each type of "Event" response when requested (in boto3 the user needs to loop to get them)
Seems like it needs another type of Role that the EventStream (here SelectObjectContentEventStream) uses, to me.
.. I would have added failing parts to the test, but unsure as yet even how they would look..
Ok will have a look
Something to blog about ;)
Well seems there is a plan in place to get the latest changes up to CPAN and because of that (would like to keep my s3tagging branch stable) I am going to have a look at the Kinesis::SubscribeToShard
command and see if I can get boto/paws to play nicely with each other. The only problems is that Kineisi uses one of the JS call interfaces while SelectObjectContent uses the restXML so the patches will be different.
watch this space :)
Silly me I forgot to ask are both the
S3::SelectObjectContent
and
Kinesis::SubscribeToShard
not working?
I haven't tried both, I just made an assumption by grepping for "event":true in the json files. Since the Paws code doesn't seem to special case events anywhere that I can tell, I assume neither work.
Got any thoughts on how you'll handle it? I find talking about the plan helps refine it before trying things out..
Hmm was playing about Kinesis and it does eventaully return something but it is just gibberish to me. Takes 5 min for the requests to time out.
I was hopeing to play with the other CLI and see what is returned but the Python aws2 CLI does not have it yet!
The JAVA API does so I am going to play with this and see if I can get a better response out of AWS
We might be able to jump ahead of the Python crowd :)
boto definitely deals with it.. See: https://botocore.amazonaws.com/v1/documentation/api/latest/reference/eventstream.html#botocore-eventstream .. looks like just need to split the contents by whatever the separator was, then check each chunk for each type of event..
The slightly annoying part is that the separator is supplied by the user.. be nice if we can eventually wrap the whole thing (but thats a Paws++ level)
Yep boto handles it. I was just saysn the python AWS2 CLI doesn't, though they may get it working in working in AWS3.
Working my way though a generic solution to 'SubscribeToShard' this afternoon or tommorow. The EventStream is use on Pinpoint so would like to get all three to work.
Will write blog post on my notes when I an done.
Can you bung this one in a separate branch please?
Yes of course.
did a blog post on it last night
seems the event stream is Proprietary to AWS so a little more code that usual. Might have to make a seperate module to handle it.
Nice day out so I am off into the woods so before I go a quick update.
I am not 100% sure that the 'SubscribetoShard' is suppose to return a scalar result
$ListObjectsV2Output = $s3->SubscribeToShard( ConsumerARN => 'arn:aws:kinesis:us-east-1:985173205561:stream/TestSteam5Shard/consumer/TestKinesisApp:1581111187', ShardId => 'shardId-000000000000', StartingPosition => { Type => 'LATEST', } );
this sort stream of call requires you to set up an event to interupt the call like this;
`my $caller = FullTestMakerLWPCaller->new();
$caller->ua->add_handler(
'response_data',
sub {
my ($response, $ua, $h, $data) = @_;
my $es = AWS::EventStream::VND->new();
use IO::Scalar;
my $content = $response->content;
my $ios = new IO::Scalar($content);
$es->decode($ios);
return 1;
},
);
my $s3 = Paws->service(
'Kinesis',
region => 'us-east-1',
debug => 1,
caller => $caller,
);
my $ListObjectsV2Output = $s3->SubscribeToShard(
ConsumerARN => 'arn:aws:kinesis:us-east-1:985173205561:stream/TestSteam5Shard/consumer/TestKinesisApp:1581111187',
ShardId => 'shardId-000000000000',
StartingPosition => {
Type => 'LATEST',
}
);
`
Now the above works (no changes to PAWS code are needed me thinks)
The new thing is 'AWS::EventStream::VND' is a little class I wrote to translate the stream from binary and I am getting this now;
{headers=> {
':content-type' => 'application/x-amz-json-1.1',
':event-type' => 'SubscribeToShardEvent',
':message-type' => 'event'
},
$message=>{"ContinuationSequenceNumber":"49604106570538379893614421402055478744627446698567794690","MillisBehindLatest":0,"Records":[]}
},
{headers=>{
':content-type' => 'application/x-amz-json-1.1',
':event-type' => 'SubscribeToShardEvent',
':message-type' => 'event'
},
message=>{"ContinuationSequenceNumber":"49604106570538379893614421403057678249087974284399214594","MillisBehindLatest":0,"Records":[{"ApproximateArrivalTimestamp":1.581883045111E9,"Data":"1010101010101010101010101010101010101010101010101010101010101w==","PartitionKey":"Key010","SequenceNumber":"49604106570538379893614421403055260397448745026049802242"}]}} `
Now I just have to figure out some way to intergate that back into PAWs,
Right now the above will just loop in the do_call sub of Paws::Net::LWPCaller or what ever caller.
so it will never get into the parts where it tries to unserialize the content untill it gets disconnected after 5 mins (for kinesis anyway) hence when you run it in scalar conext you get a dump of what ever had been buffered up and it then returns to the normal deserialation path.
I might need to go a little deeper in to Guts of PAWS and come up with something like 'Paws::Net::Event::LWPCaller' which sets all the above up for us and with some sort of hook to see the data as it goes by.
Any way time to head out.
TTFN
That looks like a good start! The thing I hadn't figured out yet, was whether it was an ongoing event stream that will keep returning stuff (the S3 one doesn't look like it is), or not. If it is, I think we might need to provide something like a "get next event" that keeps returning the object representing the next event..
(btw, github comments preview is useful for juggling with the code quotes.. annoying things)
From what I have tested on S3 it will rerturn while it has stuff to return. So far it works the same as the other event streams, it needs the 'response_data' handler to get the chunks, othe wise in scalar context you get one big chunk at the end.
Will have a closer look toady. Still have some finer points of 'AWS::EventStream::VND' to work out, large chunks.
Going way back to my early codeing days I have some very old 'RealPlayer' audio files that I am going to try and stream as I have a the notes on how to tests them. I think I still have a the 60 line 'C' program that will bit, nibble and byte check them on a 3.5 floppy some place around here. Hopefully I can still read the disk.
If I can't get the 'C' programs I need to dust off my old analog oscilloscope and see if it still works in XY mode. If it still works I will likely I will have to I check the incomming streams off my laptop seakers output, if I can get at them without cracking the case.
Sounds like alot of work but great fun. If I am really out of luck on above I do have the a few 6AX7s about that I think I can use with this project [https://www.allaboutcircuits.com/textbook/experiments/chpt-5/vacuum-tube-audio-amplifier/] :) even more fun.
That of find/borrow a digital oscilloscope some place. Might be hard to do a '.t' for it though. :)
Ok no need to break out the 'oscilloscope' just yet. Did uncover a few things in the last few min.
Found and got my old 'c' code and traslated into a perl test for streams (neat blog post on that)
Seems pinpoint and other APIs that have EventStreams are set up to send thier data to a 'Kinesis' stream so no need to worry about thoses.
The S3 'SelectObjectContent' should be be made to work in both scalar context, handle what is being streamed down and mundge it together automagiclaly, like the proto code link above indicates. I think we should also set it up so we can over-ride that and handle it with a user supplied 'anonymous sub'.
As for 'SubscribetoShard' I am going to try to set it up so the user has to supply an 'anonymous sub' when using it. There is a little bits in BOTO I think I can use for this without hard-coding anything.
cheers
Well it looks like none of the 'paginators' work at least for the ones that have the same name as an 'action'. I will fix all of those so the calls will have both 'scalar' and 'anonymous' actions. That should fix things up. Still need the 'AWS::EventStream::VND' for the event stream so no time wasted
Paginators? context? which paginators? ;)
All of them :) If you check in the data of each boto you will find 'paginators-1.json' that is used to generate these
PAGINATORS
Paginator methods are helpers that repetively call methods that return partial results
ListAllMultipartUploads(sub { },Bucket => Str, [Delimiter => Str, EncodingType => Str, KeyMarker => Str, MaxUploads => Int, Prefix => Str, UploadIdMarker => Str])
ListAllMultipartUploads(Bucket => Str, [Delimiter => Str, EncodingType => Str, KeyMarker => Str, MaxUploads => Int, Prefix => Str, UploadIdMarker => Str])If passed a sub as first parameter, it will call the sub for each element found in :
Uploads, passing the object as the first parameter, and the string 'Uploads' as the second parameter
CommonPrefixes, passing the object as the first parameter, and the string 'CommonPrefixes' as the second parameter
If not, it will return a a Paws::S3::ListMultipartUploadsOutput instance with all the params; andparams; from all the responses. Please take into account that this mode can potentially consume vasts ammounts of memory.
I have only found a few that work and the only way to make some work is run them though 'Paws::Net::MockCaller' which is not much use.
Getting into the real guts of auto genration here, I can only guess but it looks like the original programers just traslated out that block of python code and no one has looked at is since of even complained that they cannot be made to work.
documents to work this way
$ListObjectsV2Output = $s3->ListStreamConsumers(
sub { $calls++; $keys{ $_[0]->Key }++; },
StreamARN => 'arn:aws:kinesis:us-east-1:985173205561:stream/TestSteam5Shard',
);
but they don't.
I am going to see that they do ;)
opps I lie. Seems I was calling them wrong.
I think they do work where set up correcty. I will have to do the same sort of thing for the subscribetoStream
Ok finally worked things out.
Seem there a some flaws in our pagination code.
I have just got 'ListAllShards' to work.
The API will return the 'NextToken' and you next call needs only the token and the number you want (MaxResults if omitted it retruns all the red) . It will actully croak if you supply, in this case the 'StreamName', value
Our coded just dumps the calling parmas so it will work once then die;
$result = $self->ListAllShards(@_, NextToken => $result->NextToken);
Will have to study to boto a little more to see how it manifiest it.
For the "EventStream" I will add in the same sort of sub to handle them. Scalar and I guess a liniar call?
If Im reading this correctly: https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShardEvent.html then per call we're getting a set of events, where each event has the listed fields in it.. so it would be enough (for the pure API level that we're currently supporting), to return an array of SubscribeToShardEvent event objects, I think?
So yes, need to handle somewhere turning them into SubscribeToShardEvent objects, as you were looking at.