Chart History Futures
uricod opened this issue · 3 comments
I agree - without support for this, its quite difficult to get data for futures (historical).
What makes this specific "service" different from the others is that it uses a "GET" command rather than "SUBS" or "ADD". Likewise, it only accepts a single symbol at a time unlike the rest. I believe this is why it got left out - as the current implementation makes the assumption that they all have the same interface - thus the _service_op method requires some modification to handle this "special case".
I too have written my own implementation and can share it if needed.
Thanks!
I believe a clean API for this would be one which would return any historical data requested as per the frequency and period parameters, followed by a callback to the existing "CHART_FUTURES" for the same symbol, which would yield an active websocket with live data AFTER all the historical data has been sent to the client.
@plucena24 To the extent that your implementation could be packaged as a pull request, would you be up for doing so?
Gross hack below if anyone is interested... it's fragile and any changes to how deferred message objects are defined will blow this up. Also, not sure how this code will behave if someone is exercising other pieces of the code - I was only interested in the historical data.
I see why this hasn't been implemented yet. CHART_HISTORY_FUTURES
service seems like a one off to the rest. As far as I can tell, it is an asynchronous service. It is the only service in the documentation that is listed as a Snapshot
https://developer.tdameritrade.com/content/streaming-data#_Toc504640570 . A bit of fiddling around in StreamClient
would be needed to code this cleanly.
Hoping that @plucena24 will drop the sweet codes.
from dotenv import dotenv_values
config = dotenv_values(".env")
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from tda.auth import easy_client
from tda.streaming import StreamClient
# https://developer.tdameritrade.com/content/streaming-data#_Toc504640594
async def _service_op_chart_history_futures(
handler
, stream_client
, parameters
, service
, command
):
request, request_id = stream_client._make_request(
service=service, command=command,
parameters=parameters)
async with stream_client._lock:
await stream_client._send({'requests': [request]})
# await stream_client._await_response(request_id, service, command)
deferred_messages = []
# Context handler to ensure we always append the deferred messages,
# regardless of how we exit the await loop below
class WriteDeferredMessages:
def __init__(self, this_client):
self.this_client = this_client
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.this_client._overflow_items.extendleft(deferred_messages)
with WriteDeferredMessages(stream_client):
while True:
resp = await stream_client._receive()
if 'snapshot' not in resp:
deferred_messages.append(resp)
continue
deferr = False
for s in resp['snapshot']:
if ( s['service'] != service
or s['command'] != command
):
deferr = True
break
for c in s['content']:
if ( c['key'] != parameters['symbol']
or int(c['0']) != request_id
):
deferr = True
break
if deferr:
deferred_messages.append(resp)
continue
for s in resp['snapshot']:
for c in s['content']:
handler(c['key'], c['3'])
break
def test_stream_futures_data():
start_date = '1995-01-01'
end_date = '2022-01-01'
dt_start_date = datetime.datetime.fromisoformat(start_date)
dt_end_date = datetime.datetime.fromisoformat(end_date)
symbol = "/ES"
end_time = str(int(dt_end_date.timestamp() * 1000))
start_time = str(int(dt_start_date.timestamp() * 1000))
frequency = "d1"
def get_webdriver():
return webdriver.Chrome('./chromedriver')
assert 'TD_CONSUMER_KEY' in config
assert 'TD_ACCOUNT_ID' in config
client = easy_client(
api_key=config['TD_CONSUMER_KEY'],
redirect_uri='https://localhost/vala',
token_path='.token.json',
webdriver_func=get_webdriver
)
stream_client = StreamClient(client, account_id=int(config['TD_ACCOUNT_ID']))
async def read_stream():
try:
await stream_client.login()
await stream_client.quality_of_service(StreamClient.QOSLevel.FAST)
def print_message(symbol, candles):
chart_fields = StreamClient.ChartFuturesFields.all_fields()
print(json.dumps(candles, indent=4))
await _service_op_chart_history_futures(
print_message
, stream_client
, { 'symbol': symbol
, 'END_TIME': end_time
, 'START_TIME': start_time
, 'frequency': frequency
}, 'CHART_HISTORY_FUTURES'
, 'GET'
)
# while True:
# await stream_client.handle_message()
except asyncio.CancelledError:
print('read_stream(): canceled')
raise
except Exception as e:
print(e)
finally:
print('read_stream(): canceled')
async def main():
# Wait for at most 10000 seconds
try:
await asyncio.wait_for(read_stream(), timeout=10000.0)
except asyncio.TimeoutError:
print('timeout!')
asyncio.run(main())