/ton-etl

Repository contains full indexing pipeline from data extraction to building complex high-level data marts

Primary LanguagePython

TON-ETL indexing in decoding pipeline

The repository contains the code for the indexing and decoding pipeline for the TON blockchain. IT is based on ton-indexer-worker by Toncenter. ton-indexer-worker ingests raw data into Postgres DB and perform basic parsing for jettons and NFTs, then the data flows via Debeizum to Kafka topics and being processed by parsers.

Overall architecture is shown on the diagram below:

Architecture

Postgres, Debeizum and Kafka are running in Docker, deployment configs are in docker-compose.yml.

Deployment

It is recommended to run database and TON Node with indexer on separate servers. Consider using high-performance SSD disks for database and TON Node.

Node + Indexer

Use official documentation to start node using dumps. Note that you don't require to wait for the node to sync the blockchain, you can start indexing right away. You can compile indexer from sources and run it on the same server as node, or use docker-compose provided.

Database

Use docker-compose to start Postgres DB, Debeizium and Kafka. It is recommended to increase Kakfa limits due to huge account states could cause Debeizium to fail. To do that you need to get server.properties from Kafka (cdc-using-debezium-kafka:/opt/bitnami/kafka/config/server.properties) and add following settings:

buffer.memory=200000000
max.request.size=200000000
message.max.bytes=200000000
max.partition.fetch.bytes=200000000

Also for better observability you can add JMX exporter to Kafka:

  1. Download jmx_exporter
  2. Download kafka config for jmx exporter kafka-2_0_0.yml
  3. Metrics would be available on localhost:7072/metrics

After starting you can create Debeizum connector:

curl --location 'http://localhost:8083/connectors' \
   --header 'Accept: application/json' \
   --header 'Content-Type: application/json' \
   --data '{
   "name": "cdc-using-debezium-connector",
   "config": {
       "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
       "database.hostname": "postgres",
       "database.port": "5432",
       "database.user": "postgres",
       "database.password": "???????", # put your PG password here
       "database.dbname": "ton_index_v2", # your DB name
       "topic.prefix": "ton",
       "topic.creation.default.partitions": 10,
       "topic.creation.default.replication.factor": 1,
       "transforms": "unwrap",
       "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
       "transforms.unwrap.add.fields": "op,table,lsn,source.ts_ms",
       "transforms.unwrap.add.headers": "db",
       "transforms.unwrap.delete.tombstone.handling.mode": "drop",
       "key.converter": "org.apache.kafka.connect.json.JsonConverter",
       "key.converter.schemas.enable": "false",
       "value.converter": "org.apache.kafka.connect.json.JsonConverter",
       "value.converter.schemas.enable": "false",
       "producer.override.max.request.size": "1073741824"
   }
}'

Parser

Before running parser you need to prepare local env file parser.env in parser directory:

PGDATABASE=ton_index_v2
PGHOST=postgres
PGUSER=postgres
PGPORT=5432
PGPASSWORD= <<< your password here
KAFKA_BROKER=cdc-using-debezium-kafka:29092

To run parsers just simply use docker-compose up -d in parser directory. Each parser can handle multiple event types (described in the DB schema part), also you can scale number of parser instances to handle more events per second.

Datalake exporter

Datalake exporter listens to Kafka topics with specific tables and exports data to S3. AVRO file format is used to export data. After the launch exporter discovers schema from PostgresDB and use this schema for AVRO serialization.

DB Schema

DB contains multiple schemas:

  • public - raw data from ton-index-worker
  • parsed - parsed data
  • prices - tables related to prices, TVL and other DeFi events

public

Contains raw data from ton-index-worker:

  • blocks - block data
  • transactions - transaction events
  • messages - messages (internal messages will be present twice with direction in and out flags)
  • message_contents - raw message bodies
  • nft_items, nft_collections, nft_transfers, getgems_nft_sales, getgems_nft_auctions - NFT data
  • jetton_burns, jetton_transfers, jetton_wallets, jetton_masters - Jettons data
  • latest_accounts_states - latest account states and raw data and code bodies

parsed

Contains parsed data produced by parsers. All tables produced from messages include tx_hash, trace_id and event_time (transaction time).

mc_libraries

Auxiliary table to store masterchain libraries data. It is required to have libraries when you run TVL emulator, to avoid costly network requests libraries are cached in this table.

message_comments

Contains decoded text comments. Produced by CommentsDecoder from message_contents items.

nft_history

Contains NFT transfers, sales and auctions history. Produced by NftHistoryParser from nft_transfers stream, also uses getgems_nft_sales and getgems_nft_auctions items.

jetton_mint

Contains jetton mint events. Produced from messages table stream with JettonMintParser.

dex_swap_parsed

Supported DEXs:

Each parser estimates deal volume in TON and USD using core prices (see below). So for swaps with core assets like TON, stablecoins or LSDs volume will be estimated based on the core asset price at the moment of the swap. Otherwise (like swaps between jettons) volume will be null.

tradoor_perp_order

Contains decoded Tradoor Perp Order events. Produced from messages table stream with TradoorPerpOrder parser.

tradoor_perp_position_change

Contains decoded Tradoor Perp Position Change events. Produced from messages table stream with TradoorPerpPositionChange parser.

tradoor_option_order

Contains decoded Tradoor Option Order events. Produced from messages table stream with TradoorOptionOrder parser.

gaspump_trade

Contains decoded GasPump trade events. Produced from messages table stream with GasPumpTrade parser.

tonfun_bcl_trade

Contains decoded TonFun trade events. Produced from messages table stream with TonFunTrade parser. Includes following fields:

  • bcl_master - jetton master address. According to TONFun architecture the same jetton master is used after the token leaves the bonding curve.
  • event_type - Buy, Sell or SendLiq. SendLiq is used for the event when liquidity is collected from the bonding curve and sent to DEX (Ston.fi)
  • trader_address - address of the trader. None for the SendLiq event (but actually in most cases SendLiq occurs after the Buy event and has the same trace_id)
  • ton_amount - amount of TON sold/bought (zero for SendLiq)
  • bcl_amount - amount of jetton bought/sold (zero for SendLiq)
  • referral_ver - referral version. TONFun protocol allows to set arbitrary referral cell and the cell starts with 32-bit version (opcode). The only supported opcode is crc32(ref_v1).
  • partner_address, platform_tag, extra_tag - referral addresses provided by the trader

evaa_supply

Contains decoded EVAA supply events. Produced from messages table stream with EvaaSupplyParser.

EVAA pools:

  • EQC8rUZqR_pWV1BylWUlPNBzyiTYVoBEmQkMIQDZXICfnuRr - EVAA main pool
  • EQBIlZX2URWkXCSg3QF2MJZU-wC5XkBoLww-hdWk2G37Jc6N - EVAA LP pool
  • EQBozwKVDya9IL3Kw4mR5AQph4yo15EuMdyX8nLljeaUxrpM - Coffin pool (EVAA fork)

evaa_withdraw

Contains decoded EVAA withdraw events. Produced from messages table stream with EvaaWithdrawAndLiquidationParser.

evaa_liquidation

Contains decoded EVAA liquidation events. Produced from messages table stream with EvaaWithdrawAndLiquidationParser.

prices

Tables related to DeFi - DEX trades, TVL, LSD prices, etc..

core

TON/USDT prices, LSD prices and other prices originated from single smart-contracts.

dex_trade

Almost the same as parsed.dex_swap_parsed, but contains prices for base asset. Produced by PriceDiscovery parser.

agg_price

Aggregated prices based on the recent DEX swaps. Produced by PriceDiscovery parser. Aggregation algorithm is provided in update_agg_prices method in db.py

dex_pool

Unique DEX pools. Populated after each swap detected, updated by TVLPoolStateParser parser. The parser executes get methods on pool smart-contracts to get pool state, estimates TVL in TON and USD and updates dex_pool table.

dex_pool_history

Contains the history of DEX pools changes. Produced by TVLPoolStateParser parser.

dex_pool_link

Helper table to map pools tokens to pools. Having this table it is possible to get top tokens by TVL:

select jetton, sum(tvl_usd) / 2 as total_tvl from prices.dex_pool_link
join prices.dex_pool using (pool)
where tvl_usd > 0
group by 1 order by total_tvl desc limit 10

Other parsers

If you will start indexing not from the genesis block but rather from some recent block you have to recover all states for the accounts and jetton_wallets and NFT items. To do it you should use ton-smc-scanner tool from ton-index-worker to populate all accounts states first. Since this tool doesn't update jetton wallets and NFT items you can use special parsers to recover all jettton wallets and NFT items: