geduldig/TwitterAPI

Stream stuck

mathrb opened this issue · 10 comments

Hello,

I'm using twitter v2 streaming api , and I'm experiencing an issue.
No problem managing the rules via request("tweets/search/stream/rules", params), eveything works fine.
No problem starting the stream and gathering items via:

request('tweets/search/stream', params={
                "tweet.fields": "lang,entities",
                "media.fields": "url,preview_image_url",
                "user.fields": "id"
            })
for item in self._response:
    .....

The issue is happening later, where the stream get "stuck", no more tweets are fetched from the stream.
No issue is raised from this library, it's just stuck in the while loop:


Like everything read from the stream is always empty.
Restarting the whole process fixes the issue, meaning that it's not due to inactivity on the topics I'm listening to.
In my opinion, the biggest issue is that I don't have any way to interrupt this while loop, I tried to close the response (
def close(self):
) but this blocks forever.

Any idea to work around this?

Kind regards

Can you post what requests you are using to setup the stream and read from the stream?

        rules = [{'value': t + self._base_rule, 'tag': t} for t in self._track]
        response = self._api.request(
            "tweets/search/stream/rules", method_override='GET')
        j_response = response.json()
        if "data" in j_response:
            delete_rules = [r["id"] for r in response.json()["data"]]
            self._api.request("tweets/search/stream/rules",
                              {"delete": {"ids": delete_rules}})
        response = self._api.request(
            "tweets/search/stream/rules", {"add": rules})
        if response.status_code > 201:
            self._logger.error("failed on setting rules",
                               code=response.status_code, text=response.text)
            raise Exception("Unable to set twitter rules")
        self._logger.info("rules added", rules=response.text)
            logger = create_logger()
            self._response = self._api.request('tweets/search/stream', params={
                "tweet.fields": "lang,entities",
                "media.fields": "url,preview_image_url",
                "user.fields": "id"
            })
            for item in self._response:
                # work on item

I just need to know what you are filtering for so I can repeat what you are seeing. Something simple that creates the issue please.

This is the json response after adding the rules, it contains the filters:

{"data":[{"value":"epic games free -giveaway has:links (lang:en OR lang:fr)","tag":"epic games free -giveaway","id":"1384947157963264020"},{"value":"raspberry pi has:links (lang:en OR lang:fr)","tag":"raspberry pi","id":"1384947157963264013"},{"value":"visual studio code python has:links (lang:en OR lang:fr)","tag":"visual studio code python","id":"1384947157963264021"},{"value":"bitwarden has:links (lang:en OR lang:fr)","tag":"bitwarden","id":"1384947157963264004"},{"value":"wsl linux has:links (lang:en OR lang:fr)","tag":"wsl linux","id":"1384947157963264015"},{"value":"raspberry kid game has:links (lang:en OR lang:fr)","tag":"raspberry kid game","id":"1384947157963264012"},{"value":"lille fives has:links (lang:en OR lang:fr)","tag":"lille fives","id":"1384947157963264025"},{"value":"diy nas (raspberry OR arm) has:links (lang:en OR lang:fr)","tag":"diy nas (raspberry OR arm)","id":"1384947157963264017"},{"value":"owncloud has:links (lang:en OR lang:fr)","tag":"owncloud","id":"1384947157963264022"},{"value":"tony hawk nintento has:links (lang:en OR lang:fr)","tag":"tony hawk nintento","id":"1384947157963264008"},{"value":"insurance contract pdf has:links (lang:en OR lang:fr)","tag":"insurance contract pdf","id":"1384947157963264018"},{"value":"diablo 4 has:links (lang:en OR lang:fr)","tag":"diablo 4","id":"1384947157963264003"},{"value":"nintendo switch -giveaway -concours has:links (lang:en OR lang:fr)","tag":"nintendo switch -giveaway -concours","id":"1384947157963264014"},{"value":"steam free game -giveaway has:links (lang:en OR lang:fr)","tag":"steam free game -giveaway","id":"1384947157963264016"},{"value":"raspberry (email OR home) server has:links (lang:en OR lang:fr)","tag":"raspberry (email OR home) server","id":"1384947157963264026"},{"value":"github actions arm python has:links (lang:en OR lang:fr)","tag":"github actions arm python","id":"1384947157963264011"},{"value":"visual studio code has:links (lang:en OR lang:fr)","tag":"visual studio code","id":"1384947157963264006"},{"value":"gog free -giveaway has:links (lang:en OR lang:fr)","tag":"gog free -giveaway","id":"1384947157963264027"},{"value":"fives cail has:links (lang:en OR lang:fr)","tag":"fives cail","id":"1384947157963264007"},{"value":"raspberry nas has:links (lang:en OR lang:fr)","tag":"raspberry nas","id":"1384947157963264009"},{"value":"augustus ai has:links (lang:en OR lang:fr)","tag":"augustus ai","id":"1384947157963264019"},{"value":"batman has:links (lang:en OR lang:fr)","tag":"batman","id":"1384947157963264023"},{"value":"net core has:links (lang:en OR lang:fr)","tag":"net core","id":"1384947157963264005"},{"value":"torch model serv has:links (lang:en OR lang:fr)","tag":"torch model serv","id":"1384947157963264024"},{"value":"data pipeline has:links (lang:en OR lang:fr)","tag":"data pipeline","id":"1384947157963264010"}],"meta":{"sent":"2021-04-21T19:08:29.576Z","summary":{"created":25,"not_created":0,"valid":25,"invalid":0}}}```

Will I see the problem if I filter for just this "epic games free -giveaway has:links (lang:en OR lang:fr)"?

Please help me. I don't have much time, so if you can give me something simple to work with that creates the issue that would make things easier.

@geduldig , since the issue can happen between a few minutes and many hours and cannot be reproduced easily, I think it would be easier if you could gave some hints on what part of the code I should debug. Since the item is always None when the issue happen, maybe something could be present in the buffer? I can debug, but I'd be glad to have some leads, and possibly PR afterwards.

Everything happens inside _StreamingIterable inside the while loop. I would double check that the issue does not occur for V1 streams. If the issue is isolated to V2 streams then it probably is due to new "hydrate" code. This part:

                if self.options['api_version'] == '2':
                    h_type = self.options['hydrate_type']
                    if h_type != HydrateType.NONE:
                        if 'data' in item and 'includes' in item:
                            field_suffix = '' if h_type == HydrateType.REPLACE else '_hydrate'                        
                            item = _hydrate_tweets(item['data'], item['includes'], field_suffix)

Thanks, will check that part

I think I found the problem. Will keep testing.

If you are curious, the modified code I am testing is:

                if item:
                    item = json.loads(item.decode('utf8'))
                    if self.options['api_version'] == '2':
                        h_type = self.options['hydrate_type']
                        if h_type != HydrateType.NONE:
                            if 'data' in item and 'includes' in item:
                                field_suffix = '' if h_type == HydrateType.REPLACE else '_hydrate' 
                                item = { 'data': _hydrate_tweets(item['data'], item['includes'], field_suffix) }

Fixed in v2.7.2.
Thank you for spotting this.