get wss://stream.api.{env}.bluefin.io/ws/market
WebSocket Market Streams URL.
Client Library
#!/usr/bin/env python
import asyncio
import logging
import time
from bluefin_pro_sdk import BluefinProSdk, Environment, RpcUrl
from crypto_helpers.signature import SuiWallet
from openapi_client import MarketSubscriptionStreams, MarketDataStreamName
# Set up logging
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)
def now():
"""Return the current time in milliseconds since the Epoch."""
return int(time.time() * 1000)
async def log_update(event):
"""Log an account or market data event."""
if not type(event).__name__.endswith("Update"):
return
log.info(f"{event!r}")
async def main():
"""
Example showing how to listen to market websocket streams using the Bluefin Pro API
using the PRODUCTION environment.
"""
# Create a wallet with your private key
sui_wallet = SuiWallet(
mnemonic="dilemma salmon lake ceiling moral glide cute that ginger float area aunt vague remind cage mother concert inch dizzy present proud program time urge"
)
log.info(f"Using wallet address: {sui_wallet.sui_address}")
# Initialize the Bluefin Pro SDK client with PRODUCTION environment and RPC
async with BluefinProSdk(
sui_wallet=sui_wallet,
contracts=None, # Not needed for this example
rpc_url=RpcUrl.PROD,
env=Environment.PRODUCTION,
debug=False # Set to True for more verbose output
) as client:
try:
# Initialize the client, which will handle authentication
await client.init()
# ========= Listen to Market Websocket Streams ==========
# Subscribe to WebSockets and log events as they arrive.
async with await client.create_market_data_stream_listener(
handler=log_update
) as market_data_stream:
await market_data_stream.subscribe(
subscription=[
MarketSubscriptionStreams(
symbol="ETH-PERP",
streams=[
MarketDataStreamName.MARK_PRICE,
MarketDataStreamName.RECENT_TRADE,
MarketDataStreamName.DIFF_DEPTH_500_MS,
MarketDataStreamName.PARTIAL_DEPTH_5,
],
)
]
)
except Exception as e:
log.error(f"Error listening to websocket: {e}")
raise
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
log.info("Exiting due to keyboard interrupt")
use bluefin_api::models::{
MarketDataStreamName, MarketStreamMessage, MarketStreamMessagePayload,
MarketSubscriptionMessage, MarketSubscriptionStreams, SubscriptionResponseMessage,
SubscriptionType,
};
use bluefin_pro::prelude::*;
use futures_util::stream::StreamExt;
use futures_util::SinkExt;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::Sender;
use tokio::time::timeout;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
use tokio_tungstenite::tungstenite::Message;
type Error = Box<dyn std::error::Error>;
type Result<T> = std::result::Result<T, Error>;
async fn listen_to_mark_price_updates(
environment: Environment,
symbol: &str,
sender: Sender<MarketStreamMessage>,
max_time_without_message: Duration,
shutdown_flag: Arc<AtomicBool>,
) -> Result<()> {
let request = ws::market::url(environment).into_client_request()?;
// Establish connection with websocket URL.
let (websocket_stream, _) = connect_async(request).await?;
let (mut ws_sender, mut ws_receiver) = websocket_stream.split();
// Send a subscription message to receive Mark price updates.
let sub_message = serde_json::to_string(&MarketSubscriptionMessage::new(
SubscriptionType::Subscribe,
vec![MarketSubscriptionStreams::new(
symbol.into(),
vec![MarketDataStreamName::MarkPrice],
)],
))?;
ws_sender.send(Message::Text(sub_message)).await?;
// Spawn a websocket listener task to listen for messages on the subscribed topic.
tokio::spawn(async move {
while !shutdown_flag.load(std::sync::atomic::Ordering::Relaxed) {
let Ok(message) = timeout(max_time_without_message, ws_receiver.next()).await else {
println!("Websocket receiver task timed out due to inactivity");
return;
};
let Some(Ok(message)) = message else {
println!("Websocket receiver task terminated");
return;
};
match message {
Message::Ping(_) => {
println!("Ping received");
// Send back Pong.
if let Err(error) = ws_sender.send(Message::Pong(Vec::new())).await {
eprintln!("Error sending Pong: {error}");
}
println!("Pong sent");
}
Message::Pong(_) => {
println!("Pong received");
}
Message::Text(text) => {
// Check if it's the Mark price update.
if let Ok(websocket_message) =
serde_json::from_str::<MarketStreamMessage>(&text)
{
if let Err(error) = sender.send(websocket_message).await {
eprintln!("Error sending message to channel: {error}");
}
continue;
}
// Check if it's a subscription message.
if let Ok(subscription_message) =
serde_json::from_str::<SubscriptionResponseMessage>(&text)
{
println!(
"Subscription response message received: {}",
serde_json::to_string_pretty(&subscription_message).unwrap()
);
}
}
Message::Close(_) => {
println!("Close received");
return;
}
_ => {
eprintln!("Unknown message received");
return;
}
}
}
});
Ok(())
}
#[tokio::main]
async fn main() -> Result<()> {
let shutdown_flag = Arc::new(AtomicBool::new(false));
let (sender, mut receiver) = tokio::sync::mpsc::channel::<MarketStreamMessage>(100);
listen_to_mark_price_updates(
Environment::Testnet,
symbols::perps::ETH,
sender,
Duration::from_secs(10),
Arc::clone(&shutdown_flag),
)
.await?;
while let Some(websocket_message) = receiver.recv().await {
if let MarketStreamMessage::MarkPriceUpdate {
payload: MarketStreamMessagePayload::MarkPriceUpdate(mark_price),
} = websocket_message
{
println!("{mark_price:#?}");
}
}
shutdown_flag.store(true, std::sync::atomic::Ordering::SeqCst);
Ok(())
}
HTTPs
Alternatively, call the GET /ws/market endpoint using the integrated editor on the right or locally from any language supporting HTTPs network calls.