Python Tinkoff API client for asyncio and humans.
- Tinkoff Investments (official docs)
- Clients for both REST and Streaming protocols in Tinkoff Investments
- Presence of data classes for all interaction with API
- Automatic reconnection and keep-alive connections
- Internal exclusive rate limiter for every resource in REST protocol
- Friendly exceptions for API errors
Use pip to install:
$ pip install tinkoff-api
import asyncio
from datetime import datetime
from tinkoff.investments import (
TinkoffInvestmentsRESTClient, Environment,CandleResolution
)
from tinkoff.investments.client.exceptions import TinkoffInvestmentsError
async def show_apple_year_candles():
try:
async with TinkoffInvestmentsRESTClient(
token='TOKEN',
environment=Environment.SANDBOX) as client:
candles = await client.market.candles.get(
figi='BBG000B9XRY4',
dt_from=datetime(2019, 1, 1),
dt_to=datetime(2019, 12, 31),
interval=CandleResolution.DAY
)
for candle in candles:
print(f'{candle.time}: {candle.h}')
except TinkoffInvestmentsError as e:
print(e)
async def jackpot():
try:
async with TinkoffInvestmentsRESTClient(
token='TOKEN',
environment=Environment.SANDBOX) as client:
instruments = await client.market.instruments.search('AAPL')
apple = instruments[0]
account = await client.sandbox.accounts.register()
await client.sandbox.accounts.positions.set_balance(
broker_account_id=account.brokerAccountId,
figi=apple.figi,
balance=100,
)
print('We created the following portfolio:')
positions = await client.portfolio.get_positions()
for position in positions:
print(f'{position.name}: {position.lots} lots')
except TinkoffInvestmentsError as e:
print(e)
asyncio.run(jackpot())
import asyncio
from datetime import datetime
from tinkoff.investments import (
TinkoffInvestmentsStreamingClient, CandleEvent, CandleResolution
)
client = TinkoffInvestmentsStreamingClient(token='TOKEN')
@client.events.candles('BBG009S39JX6', CandleResolution.MIN_1)
@client.events.candles('BBG000B9XRY4', CandleResolution.MIN_1)
async def on_candle(candle: CandleEvent, server_time: datetime):
print(candle, server_time)
asyncio.run(client.run())
import asyncio
from datetime import datetime
from tinkoff.investments import (
TinkoffInvestmentsStreamingClient, CandleEvent, CandleResolution
)
client = TinkoffInvestmentsStreamingClient(token='TOKEN')
@client.events.candles('BBG000B9XRY4', CandleResolution.HOUR)
async def on_candle(candle: CandleEvent, server_time: datetime):
if candle.h > 1000:
await client.events.candles.subscribe(
callback=on_candle,
figi=candle.figi,
interval=CandleResolution.MIN_1
)
elif candle.h < 1000:
await client.events.candles.unsubscribe(
candle.figi, CandleResolution.MIN_1
)
asyncio.run(client.run())
import asyncio
from datetime import datetime
from tinkoff.investments import (
TinkoffInvestmentsStreamingClient, TinkoffInvestmentsRESTClient,
CandleEvent, CandleResolution, OperationType
)
streaming = TinkoffInvestmentsStreamingClient('TOKEN')
rest = TinkoffInvestmentsRESTClient('TOKEN')
@streaming.events.candles('BBG000B9XRY4', CandleResolution.MIN_1)
async def buy_apple(candle: CandleEvent, server_time: datetime):
if candle.c > 350:
await rest.orders.create_market_order(
figi='BBG000B9XRY4',
lots=1,
operation=OperationType.BUY,
broker_account_id=123,
)
asyncio.run(streaming.run())
import asyncio
from datetime import datetime
from tinkoff.investments import CandleResolution, TinkoffInvestmentsRESTClient
from tinkoff.investments.utils.historical_data import HistoricalData
async def get_minute_candles():
# show 1 minute candles for AAPL in 10 years period of time
client = TinkoffInvestmentsRESTClient(token='***')
historical_data = HistoricalData(client)
async for candle in historical_data.iter_candles(
figi='BBG000B9XRY4',
dt_from=datetime(2010, 1, 1),
dt_to=datetime(2020, 1, 1),
interval=CandleResolution.MIN_1,
):
print(candle)
asyncio.run(get_minute_candles())
- allow to provide str constants along with specific enum objects
- add ability to unsubscribe by pattern
- rename some fields
- make some fields in snake case
- generate documentation