/ws/market

WebSocket Market Streams URL.

Client Library

#!/usr/bin/env python

import asyncio
import logging
import time

from bluefin_pro_sdk import BluefinProSdk, Environment, RpcUrl
from crypto_helpers.signature import SuiWallet
from openapi_client import MarketSubscriptionStreams, MarketDataStreamName

# Set up logging
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)


def now():
    """Return the current time in milliseconds since the Epoch."""
    return int(time.time() * 1000)


async def log_update(event):
    """Log an account or market data event."""
    if not type(event).__name__.endswith("Update"):
        return
    log.info(f"{event!r}")


async def main():
    """
    Example showing how to listen to market websocket streams using the Bluefin Pro API
    using the PRODUCTION environment.
    """
    # Create a wallet with your private key
    sui_wallet = SuiWallet(
        mnemonic="dilemma salmon lake ceiling moral glide cute that ginger float area aunt vague remind cage mother concert inch dizzy present proud program time urge"
    )

    log.info(f"Using wallet address: {sui_wallet.sui_address}")

    # Initialize the Bluefin Pro SDK client with PRODUCTION environment and RPC
    async with BluefinProSdk(
            sui_wallet=sui_wallet,
            contracts=None,  # Not needed for this example
            rpc_url=RpcUrl.PROD,
            env=Environment.PRODUCTION,
            debug=False  # Set to True for more verbose output
    ) as client:
        try:
            # Initialize the client, which will handle authentication
            await client.init()

            # ========= Listen to Market Websocket Streams ==========
            # Subscribe to WebSockets and log events as they arrive.
            async with await client.create_market_data_stream_listener(
                    handler=log_update
            ) as market_data_stream:
                await market_data_stream.subscribe(
                    subscription=[
                        MarketSubscriptionStreams(
                            symbol="ETH-PERP",
                            streams=[
                                MarketDataStreamName.MARK_PRICE,
                                MarketDataStreamName.RECENT_TRADE,
                                MarketDataStreamName.DIFF_DEPTH_500_MS,
                                MarketDataStreamName.PARTIAL_DEPTH_5,
                            ],
                        )
                    ]
                )

        except Exception as e:
            log.error(f"Error listening to websocket: {e}")
            raise


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        log.info("Exiting due to keyboard interrupt")

use bluefin_api::models::{
    MarketDataStreamName, MarketStreamMessage, MarketStreamMessagePayload,
    MarketSubscriptionMessage, MarketSubscriptionStreams, SubscriptionResponseMessage,
    SubscriptionType,
};
use bluefin_pro::prelude::*;
use futures_util::stream::StreamExt;
use futures_util::SinkExt;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::Sender;
use tokio::time::timeout;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
use tokio_tungstenite::tungstenite::Message;

type Error = Box<dyn std::error::Error>;
type Result<T> = std::result::Result<T, Error>;

async fn listen_to_mark_price_updates(
    environment: Environment,
    symbol: &str,
    sender: Sender<MarketStreamMessage>,
    max_time_without_message: Duration,
    shutdown_flag: Arc<AtomicBool>,
) -> Result<()> {
    let request = ws::market::url(environment).into_client_request()?;

    // Establish connection with websocket URL.
    let (websocket_stream, _) = connect_async(request).await?;
    let (mut ws_sender, mut ws_receiver) = websocket_stream.split();

    // Send a subscription message to receive Mark price updates.
    let sub_message = serde_json::to_string(&MarketSubscriptionMessage::new(
        SubscriptionType::Subscribe,
        vec![MarketSubscriptionStreams::new(
            symbol.into(),
            vec![MarketDataStreamName::MarkPrice],
        )],
    ))?;
    ws_sender.send(Message::Text(sub_message)).await?;

    // Spawn a websocket listener task to listen for messages on the subscribed topic.
    tokio::spawn(async move {
        while !shutdown_flag.load(std::sync::atomic::Ordering::Relaxed) {
            let Ok(message) = timeout(max_time_without_message, ws_receiver.next()).await else {
                println!("Websocket receiver task timed out due to inactivity");
                return;
            };
            let Some(Ok(message)) = message else {
                println!("Websocket receiver task terminated");
                return;
            };
            match message {
                Message::Ping(_) => {
                    println!("Ping received");
                    // Send back Pong.
                    if let Err(error) = ws_sender.send(Message::Pong(Vec::new())).await {
                        eprintln!("Error sending Pong: {error}");
                    }
                    println!("Pong sent");
                }
                Message::Pong(_) => {
                    println!("Pong received");
                }
                Message::Text(text) => {
                    // Check if it's the Mark price update.
                    if let Ok(websocket_message) =
                        serde_json::from_str::<MarketStreamMessage>(&text)
                    {
                        if let Err(error) = sender.send(websocket_message).await {
                            eprintln!("Error sending message to channel: {error}");
                        }
                        continue;
                    }

                    // Check if it's a subscription message.
                    if let Ok(subscription_message) =
                        serde_json::from_str::<SubscriptionResponseMessage>(&text)
                    {
                        println!(
                            "Subscription response message received: {}",
                            serde_json::to_string_pretty(&subscription_message).unwrap()
                        );
                    }
                }
                Message::Close(_) => {
                    println!("Close received");
                    return;
                }
                _ => {
                    eprintln!("Unknown message received");
                    return;
                }
            }
        }
    });

    Ok(())
}

#[tokio::main]
async fn main() -> Result<()> {
    let shutdown_flag = Arc::new(AtomicBool::new(false));
    let (sender, mut receiver) = tokio::sync::mpsc::channel::<MarketStreamMessage>(100);
    listen_to_mark_price_updates(
        Environment::Testnet,
        symbols::perps::ETH,
        sender,
        Duration::from_secs(10),
        Arc::clone(&shutdown_flag),
    )
    .await?;

    while let Some(websocket_message) = receiver.recv().await {
        if let MarketStreamMessage::MarkPriceUpdate {
            payload: MarketStreamMessagePayload::MarkPriceUpdate(mark_price),
        } = websocket_message
        {
            println!("{mark_price:#?}");
        }
    }

    shutdown_flag.store(true, std::sync::atomic::Ordering::SeqCst);

    Ok(())
}

HTTPs

Alternatively, call the GET /ws/market endpoint using the integrated editor on the right or locally from any language supporting HTTPs network calls.

Request & Response

Language
URL
Click Try It! to start a request and see the response here!