Skip to content

Latest commit

 

History

History

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

README.md

CoreCast Python Client

A Python gRPC client for streaming Solana blockchain data from Bitquery's CoreCast service.

Installation

Install dependencies:

pip install -r requirements.txt

This will install the required dependencies including the bitquery-corecast-proto package which provides the protobuf definitions for CoreCast services.

Authentication

The application requires a Bitquery CoreCast authorization token.

Set your token directly in the YAML config

Edit config.yaml and set server.authorization to your token (it starts with ory_at_...). Example:

server:
  address: "corecast.bitquery.io"
  insecure: false
  authorization: "ory_at_your_actual_token_here"

Notes:

  • Your token will be sent as authorization: Bearer <token> metadata.
  • Do not commit real tokens to version control.

Run

Using default configuration (dex_trades):

python3 main.py

Filters

⚠️ Important: At least one filter must be specified for each stream type. Subscriptions without filters will be rejected.

Filter logic:

(program IN filter.programs) AND (pool IN filter.pools) AND (token IN filter.tokens)

Additional Options

Log Level

Control the verbosity of logging output:

python main.py --log-level DEBUG

Available log levels: DEBUG, INFO, WARNING, ERROR

Configuration

All parameters are loaded from config.yaml in the project root.

Available stream types:

  • dex_trades - DEX trades filters: program, token, pool, trader
  • dex_orders - DEX orders filters: program, token, pool, trader
  • dex_pools - DEX pools filters: program, token, pool
  • transfers - Transfers filters: sender, receiver, token
  • balances - Balance updates filters: address, token
  • transactions - Parsed transactions filters: program, signer

Configuration format

All configuration files follow this structure:

server:
  address: "corecast.bitquery.io"
  insecure: false # if false, TLS will be used
  authorization: "ory_at_..." # your CoreCast token; sent as metadata 'authorization'

stream:
  type: "dex_trades" # or dex_orders, dex_pools, transactions, transfers, balances

filters:
  # DEX filters (for dex_trades, dex_orders, dex_pools), Transaction
  programs:
    - "9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin"
  pools:
    - "Hf6c2L9H8iQy2f5uF5eBkQK2A2c6R7FZ8pCkU9D1ABCD"
  tokens:
    - "So11111111111111111111111111111111111111112" # SOL
  traders: # not for dex_pools
    - "7GJz9X7b1G9Nf1d5uQq2Z3B4nPq6F8d9LmNoPQrsTUV"

  # Transfer filters (for transfers)
  senders:
    - "DSqMPMsMAbEJVNuPKv1ZFdzt6YvJaDPDddfeW7ajtqds"
  receivers:
    - "ReceiverAddressHere..."

  # Balance filters (for balances)
  addresses:
    - "DSqMPMsMAbEJVNuPKv1ZFdzt6YvJaDPDddfeW7ajtqds"

  # Transaction filters (for transactions)
  signers:
    - "ETcW7iuVraMKLMJayNCCsr9bLvKrJPDczy1CMVMPmXTc"

Note: See config.yaml for complete examples with all stream types. Simply uncomment the stream type and filters you want to use.

Examples

DEX Trades with multiple programs:

stream:
  type: "dex_trades"
filters:
  programs:
    - "pAMMBay6oceH9fJKBRHGP5D4bD4sWpmSwMn52FMfXEA" # Prism AMM
    - "SoLFiHG9TfgtdUXUjWAxi3LtvYuFyDLVhBWxdMZxyCe" # SolFi
  tokens:
    - "So11111111111111111111111111111111111111112" # SOL

Transfers from specific sender:

stream:
  type: "transfers"
filters:
  senders:
    - "DSqMPMsMAbEJVNuPKv1ZFdzt6YvJaDPDddfeW7ajtqds"
  tokens:
    - "So11111111111111111111111111111111111111112" # SOL
    - "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v" # USDC

Balance updates for specific address:

stream:
  type: "balances"
filters:
  addresses:
    - "DSqMPMsMAbEJVNuPKv1ZFdzt6YvJaDPDddfeW7ajtqds"
  tokens:
    - "So11111111111111111111111111111111111111112" # SOL

Transactions from specific signer:

stream:
  type: "transactions"
filters:
  signers:
    - "ETcW7iuVraMKLMJayNCCsr9bLvKrJPDczy1CMVMPmXTc"

Programmatic Usage

from config import load_config
from client import CoreCastClient

# Load configuration
config = load_config('config.yaml')
# Create and connect client
client = CoreCastClient(config)
client.connect()

# Stream data
client.stream_dex_trades()

# Close connection
client.close()

Debugging Utilities

The project includes utility functions for debugging protobuf messages:

protobuf_utils.py

Contains helper functions for working with protobuf messages:

  • print_protobuf_message(msg, indent=0, encoding="base58") - Pretty print any protobuf message
  • format_protobuf_message(msg, encoding="base58") - Format message as string instead of printing
  • get_protobuf_field_value(msg, field_path) - Extract specific field values using dot notation
  • extract_bytes_fields(msg, encoding="base58") - Extract all bytes fields as a dictionary

Usage Example

from protobuf_utils import print_protobuf_message

# In your message handler
def on_message_received(message):
    print("Received message:")
    print_protobuf_message(message)

    # Or with hex encoding
    print_protobuf_message(message, encoding="hex")

Example Script

Run the example script to see usage patterns:

python3 example_protobuf_debug.py

Requirements

  • Python 3.7+
  • gRPC Python libraries
  • PyYAML for configuration
  • base58 for address encoding
  • bitquery-corecast-proto for CoreCast protobuf definitions

Proto Files

This project uses protobuf definitions from the bitquery-corecast-proto PyPI package. This package contains all the necessary .proto files and generated Python classes for communicating with Bitquery's CoreCast service.

The package includes:

  • CoreCast service definitions
  • Request and response message types
  • Stream message definitions
  • Solana-specific message types (blocks, DEX trades, transfers, etc.)

Schema Reference

To explore the complete protobuf schema definitions and message structures, check out the official schema repository:

Bitquery Streaming Protobuf Schema