/cryptostream

Cryptostream is gem designed to make getting a stream of blocks from arbitrary blockchains easy. Not production ready.

Primary LanguageRuby

Cryptostream

Warning: This project was created for a hackathon and is not production ready and has absolutely no tests backing it up. This can be used as a reference to build you own, but I make no guarantees about this actually working correctly if you do not evaluate it yourself.

Cryptostream is gem designed to make getting a stream of blocks from arbitrary blockchains easy. Right now it supports BTC, BCH, LTC, ETH, ETC, XLM, and XRP.

Usage

The example below shows the most basic usage of this library, leaning on defaults like an in-memory store to keep track of state. In a production implementation you will more likely be implementing a subclass of Cryptostream::Store::Base to persist data.

Streaming Blocks

This sample will log all blocks and works with every chain this library supports.

streamer = Cryptostream::Stream::Blocks.new(
  configuration: {
    ticker: :eth,
    rpc_uri: "https://mainnet.infura.io/",
    block_retention: 50, # optional, constrains memory usage of default memory store
  },
  plugins: [
    # NOTE: The class of the block received here is specific to the adapter which is chosen from the
    # currency configuration provided above. You'll need to reference further docs on each adapter
    # to know what to expect here.
    Cryptostream::Plugin::Lambda.new(
      on_block_added: ->(block, _data) { puts "Added block: #{block.number} (#{block.hsh})" },
      on_block_removed: lambda do |block, data|
        puts "Removed block (#{data[:reason]}): #{block.number} (#{block.hsh})"
      end
    ),
    Cryptostream::Plugin::StoreBlocks
  ]
)
streamer.run

Streaming Ethereum Token Transfers

This sample will log all blocks and specified ERC20 and ERC721 transfers.

streamer = Cryptostream::Stream::Blocks.new(
  configuration: {
    ticker: :eth,
    rpc_uri: "https://mainnet.infura.io/",
    starting_block_height: 6222278 # optional: only supported with default memory store
  },
  plugins: [
    Cryptostream::Currency::Ethereum::ERC20Plugin.new(
      ZRX: "0xe41d2489571d322189246dafa5ebde1f4699f498"
    ),
    Cryptostream::Currency::Ethereum::ERC721Plugin.new(
      CryptoKitties: "0x06012c8cf97bead5deae237070f9587f8e7a266d"
    ),
    Cryptostream::Plugin::Lambda.new(
      on_block_added: lambda do |block, transfers|
        puts "Added block: #{block.number} (#{block.hsh})"

        transfers[:erc20].each do |t|
          puts "  #{t.currency} Transfer | Amount: #{t.value / 1e18}, Hsh: #{t.transaction_hash}"
        end

        transfers[:erc721].each do |t|
          type = "Transfer"
          type = "Created" if t.created?
          type = "Destroyed" if t.destroyed?
          puts "  #{t.currency} #{type} | Token ID: #{t.token_id}, Hsh: #{t.transaction_hash}"
        end
      end,

      on_block_removed: lambda do |block, reason|
        puts "Removed block (#{reason}): #{block.number} (#{block.hsh})"
      end
    ),
    Cryptostream::Plugin::StoreBlocks
  ]
)
streamer.run

Streaming Bitcoin/Cash/Lite Transactions

This sample will log all blocks and transactions

streamer = Cryptostream::Stream::Blocks.new(
  configuration: {
    ticker: :btc,
    rpc_uri: "https://USERNAME:PASSWORD@BITCOIN_NODE_URL",
    starting_block_height: 1410000, # optional: only supported with default memory store
  },
  plugins: [
    Cryptostream::Currency::Bitcoin::TransactionsPlugin,

    Cryptostream::Plugin::Lambda.new(
      on_block_added: lambda do |block, data|
        puts "Added block: #{block['height']} (#{block['hash']})"
        transactions = data[:transactions]

        if transactions.length > 20
          puts "  Transactions | Too many transactions(#{transactions.length}) to list | Printing first 5..."
          transactions.first(5).each do |tx|
            puts "  Transaction  | #{tx.hash} | Inputs: #{tx.in.length} | Outputs: #{tx.out.length}"
          end
        else
          transactions.each do |tx|
            puts "  Transaction | #{tx.hash} | Inputs: #{tx.in.length} | Outputs: #{tx.out.length}"
          end
        end
      end,

      on_block_removed: lambda do |block, reason|
        puts "Removed block (#{reason}): #{block.number} (#{block.hsh})"
      end
    ),

    Cryptostream::Plugin::StoreBlocks,
  ]
)
streamer.run

Streaming Stellar Payments

This sample will stream and log all Stellar ledgers and payments

streamer = Cryptostream::Stream::Blocks.new(
  configuration: {
    ticker: :xlm,
    rpc_uri: "https://horizon.stellar.org",
    starting_block_height: 19733892
  },
  plugins: [
    Cryptostream::Currency::Stellar::PaymentsPlugin,

    Cryptostream::Plugin::Lambda.new(
      on_block_added: lambda do |block, data|
        puts "Added block: #{block['sequence']} (#{block['hash']})"
        payments = data[:payments]

        payments.each do |t|
          case t.type
          when "payment"
            puts "  Payment | #{t.asset_code || 'XLM'} | Type: #{t.asset_type} | Amount: #{t.amount}"
          when "create_account"
            puts "  Account Created | Starting Balance: #{t.starting_balance} | #{t.account}"
          end
        end
      end,

      on_block_removed: lambda do |block, reason|
        puts "Removed block (#{reason}): #{block['sequence']} (#{block['hash']})"
      end
    ),

    Cryptostream::Plugin::StoreBlocks
  ]
)
streamer.run

Integration

To integrate this gem into a project, you need to implement these two interfaces and provide initialized instances to the stream constructor.

Cryptostream::Store::Base

class Base
  # Retrieve main chain tip from storage
  #
  # @returns [GenericBlock]
  def chain_tip
    raise NotImplementedError
  end

  # Retrieve block from storage using it's hash
  #
  # @params hsh [String]
  # @returns [GenericBlock]
  def block_by_hash(_hsh)
    raise NotImplementedError
  end
end

Cryptostream::Plugin

The plugin is the backbone of this library. As a consumer of the library you at minimum need to implement one that stores / removes blocks from your storage engine. Outside of that, the world is your oyster.

All plugins are executed in order and they are allowed to mutate the data argument they receive so that plugins later can access that information. For example, if you implement an ERC20 plugin, you would use the RPC interface to request information about ERC20 transfers in the block, and set them on the data object like data[:erc20_transfers] = transfers. With that any plugins downstream can now access that information and react accordingly.

See lib/cryptostream/plugin or the custom plugins defined in lib/cryptostream/currencies/*/plugins for examples.

class Plugin
  # Called when a new block is added to the main chain.
  #
  # @param block [AdapterBlock] currency adapter's representation of the block
  # @param data [Hash] supplementary data provided by plugins
  def block_added(block, data); end

  # Called when a new block should be removed. A reason is provided to identify if the removal
  # was due to being :orphaned or :pruned
  #
  # @param block [AdapterBlock] adapter's representation of the block
  # @param data [Hash] supplementary data provided by plugins. Always includes reason for
  #   removal in `reason` key as :orphaned or :pruned
  def block_removed(block, data); end

  # Called when an unexpected error occurs within the steam process to allow the plugin to
  # determine how to handle it.
  #
  # @param error [StandardError] error that was raised
  # @return [Boolean] if plugin wants the process to retry. true will have the caller retry,
  #   false will reraise the error and crash the process
  def handle_error(_error)
    false
  end
end

Development

Setup

To set up the project for local development. Install dependencies with this:

make install

Running Tests

You can run the full test suite with the following command:

make test

You can lint your code using all available linters with:

make lint

Dev Testing

A local command line tool can be found at bin/cryptostream and used to run against various testnets and see how the tool behaves.

For BTC, you either need to set BTC_RPC_URI in your environment or provide the --rpc-uri option since there is not a publicly accessible bitcoin node we can include by default.

$ ./bin/cryptostream help
Commands:
  cryptostream help [COMMAND]       # Describe available commands or one specific command
  cryptostream log_bitcoin_transactions TICKER  # Stream and log transactions for bitcoin-like chains
  cryptostream log_blocks TICKER    # Stream and log blocks for TICKER
  cryptostream log_token_transfers  # Stream and log ERC20 and ERC721 transfers for Ethereum

$ ./bin/cryptostream help log_bitcoin_transactions
Usage:
  cryptostream log_bitcoin_transactions TICKER

Options:
  [--rpc-uri=RPC_URI]          # Required: URI of RPC node to communicate with
  [--starting-block-height=N]  # block to start syncing from, Defaults to chain tip.
  [--block-retention=N]        # number of blocks to retain in the store

Stream and log transactions for bitcoin-like chains

$ ./bin/cryptostream help log_blocks
Usage:
  cryptostream log_blocks TICKER

Options:
  [--rpc-uri=RPC_URI]          # URI of RPC node to communicate with. Required for BTC.
  [--starting-block-height=N]  # block to start syncing from
  [--block-retention=N]        # number of blocks to retain in the memory store

Stream and log blocks for TICKER

$ ./bin/cryptostream help log_token_transfers
Usage:
  cryptostream log_token_transfers

Options:
  [--rpc-uri=RPC_URI]          # URI of RPC node to communicate with. Defaults to Infura mainnet
  [--starting-block-height=N]  # block to start syncing from, Defaults to chain tip.
  [--block-retention=N]        # number of blocks to retain in the store

Description:
  `log_token_transfers` will log transfer events for ERC20 and ERC721 tokens on Ethereum.

  The defaults are as such:

  ERC20: ZRX, BAT, WETH, REP, OMG, MKR

  ERC721: CryptoKitties, LucidSight MLB, Gods Unchained, CB Wallet Crypto Swag