alexgolec/tda-api

Chart History Futures

uricod opened this issue · 3 comments

The streaming object has a Chart History Future is needed to get futures history. Can that be added?

If not I will add.
image

I agree - without support for this, its quite difficult to get data for futures (historical).

What makes this specific "service" different from the others is that it uses a "GET" command rather than "SUBS" or "ADD". Likewise, it only accepts a single symbol at a time unlike the rest. I believe this is why it got left out - as the current implementation makes the assumption that they all have the same interface - thus the _service_op method requires some modification to handle this "special case".

I too have written my own implementation and can share it if needed.

Thanks!

I believe a clean API for this would be one which would return any historical data requested as per the frequency and period parameters, followed by a callback to the existing "CHART_FUTURES" for the same symbol, which would yield an active websocket with live data AFTER all the historical data has been sent to the client.

ak2k commented

@plucena24 To the extent that your implementation could be packaged as a pull request, would you be up for doing so?

Gross hack below if anyone is interested... it's fragile and any changes to how deferred message objects are defined will blow this up. Also, not sure how this code will behave if someone is exercising other pieces of the code - I was only interested in the historical data.

I see why this hasn't been implemented yet. CHART_HISTORY_FUTURES service seems like a one off to the rest. As far as I can tell, it is an asynchronous service. It is the only service in the documentation that is listed as a Snapshot https://developer.tdameritrade.com/content/streaming-data#_Toc504640570 . A bit of fiddling around in StreamClient would be needed to code this cleanly.

Hoping that @plucena24 will drop the sweet codes.

from dotenv import dotenv_values
config = dotenv_values(".env")

from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from tda.auth import easy_client
from tda.streaming import StreamClient

# https://developer.tdameritrade.com/content/streaming-data#_Toc504640594
async def _service_op_chart_history_futures(
        handler
    ,   stream_client             
    ,   parameters
    ,   service
    ,   command
    ):

    request, request_id = stream_client._make_request(
        service=service, command=command,
        parameters=parameters)

    async with stream_client._lock:
        await stream_client._send({'requests': [request]})

        # await stream_client._await_response(request_id, service, command)
        deferred_messages = []

        # Context handler to ensure we always append the deferred messages,
        # regardless of how we exit the await loop below
        class WriteDeferredMessages:
            def __init__(self, this_client):
                self.this_client = this_client

            def __enter__(self):
                return self

            def __exit__(self, exc_type, exc_val, exc_tb):
                self.this_client._overflow_items.extendleft(deferred_messages)

        with WriteDeferredMessages(stream_client):
            while True:
                resp = await stream_client._receive()

                if 'snapshot' not in resp:
                    deferred_messages.append(resp)
                    continue

                deferr = False
                for s in resp['snapshot']:

                    if (    s['service'] != service 
                        or  s['command'] != command
                    ):
                        deferr = True
                        break

                    for c in s['content']:
                        if (    c['key'] != parameters['symbol']
                            or  int(c['0']) != request_id
                        ):
                            deferr = True
                            break
                
                if deferr:
                    deferred_messages.append(resp)
                    continue

                for s in resp['snapshot']:
                    for c in s['content']:
                        handler(c['key'], c['3'])

                break

def test_stream_futures_data():

    start_date      = '1995-01-01'
    end_date        = '2022-01-01'
    dt_start_date   = datetime.datetime.fromisoformat(start_date)
    dt_end_date     = datetime.datetime.fromisoformat(end_date)

    symbol          = "/ES"
    end_time        = str(int(dt_end_date.timestamp() * 1000))
    start_time      = str(int(dt_start_date.timestamp() * 1000))
    frequency       = "d1"

    def get_webdriver():
        return webdriver.Chrome('./chromedriver')

    assert 'TD_CONSUMER_KEY' in config
    assert 'TD_ACCOUNT_ID' in config

    client = easy_client(
        api_key=config['TD_CONSUMER_KEY'],
        redirect_uri='https://localhost/vala',
        token_path='.token.json',
        webdriver_func=get_webdriver
    )

    stream_client = StreamClient(client, account_id=int(config['TD_ACCOUNT_ID']))

    async def read_stream():

        try:
            await stream_client.login()
            await stream_client.quality_of_service(StreamClient.QOSLevel.FAST)

            def print_message(symbol, candles):
                chart_fields = StreamClient.ChartFuturesFields.all_fields()
                print(json.dumps(candles, indent=4))

            await _service_op_chart_history_futures(
                print_message
            ,   stream_client
            ,   {   'symbol':       symbol
                ,   'END_TIME':     end_time
                ,   'START_TIME':   start_time
                ,   'frequency':    frequency
            },  'CHART_HISTORY_FUTURES'
            ,   'GET'
            )

            # while True:
            #     await stream_client.handle_message()

        except asyncio.CancelledError:
            print('read_stream(): canceled')
            raise
        except Exception as e:
            print(e)
        finally:
            print('read_stream(): canceled')

    async def main():
        # Wait for at most 10000 seconds
        try:
            await asyncio.wait_for(read_stream(), timeout=10000.0)
        except asyncio.TimeoutError:
            print('timeout!')

    asyncio.run(main())