Python WebSocket 指南

适用于 Backpack Exchange API

本指南演示了如何使用 Python 连接 Backpack Exchange 的 WebSocket API。 WebSocket 提供用于市场数据和账户更新的实时数据流。

前提条件

若要使用私有数据流,请先获取您的 API 密钥: https://backpack.exchange/portfolio/settings/api-keys

安装所需的 Python 库:

• websockets - 用于建立 WebSocket 连接 • cryptography - 用于生成 X-Signature(仅用于私有数据流)

pip install websockets cryptography

如果使用私有数据流,建议安装 dotenv-python,通过环境变量安全管理密钥

pip install python-dotenv

创建 .env 文件并如下存储您的密钥:

PUBLIC_KEY=zDIJj9qneWIY0IYZ5aXoHcNMCm+XDhVcTssiT0HyY0A=
SECRET_KEY=4odxgSUxFrC/zsKWZF4OQwYAgnNu9hnWH3NxWfLAPz4=

创建 .gitignore 文件,并添加 .env,以避免其被纳入版本控制。

.env

导入所需库:

import json
import asyncio
import websockets
import base64
from time import time
import os
from cryptography.hazmat.primitives.asymmetric import ed25519
from dotenv import load_dotenv, find_dotenv

WebSocket API 基础

Backpack Exchange WebSocket API 地址为:wss://ws.backpack.exchange

WebSocket 流的命名格式为:<类型>.<交易对>

例如:

depth.SOL_USDC - SOL/USDC 的订单簿 • trade.SOL_USDC - SOL/USDC 的成交信息

为什么使用异步方式处理 WebSocket?

WebSocket 适用于持续连接并实时接收数据的场景。使用异步(async)方式编程具有以下优势:

  1. 非阻塞 I/O:异步可处理多个连接而不阻塞主线程。

  2. 资源高效:相比多线程,异步使用更少的资源。

  3. 性能更佳:处理大量连接时,异步开销更小。

  4. 实时处理:适用于需持续接收和处理数据的场景。

传统同步方式存在诸多缺点:

  1. 线程管理复杂:需要手动管理多个线程

  2. 资源占用高:每个连接都需独立线程

  3. 错误处理困难:线程间错误传播复杂

  4. 扩展性差:不易扩展以支持更多连接

而异步方式(本指南中的示例)更简洁、高效且更易维护。

公共数据流

无需身份验证即可订阅公共数据流,直接连接即可。

示例:订阅公共数据流

async def subscribe_to_public_stream():
    uri = "wss://ws.backpack.exchange"

    async with websockets.connect(uri) as websocket:
        # 订阅 SOL/USDC 的订单簿流
        subscribe_message = {
            "method": "SUBSCRIBE",
            "params": ["depth.SOL_USDC"]
        }

        await websocket.send(json.dumps(subscribe_message))
        print(f"已订阅 depth.SOL_USDC 数据流")

        # 处理收到的消息
        while True:
            response = await websocket.recv()
            data = json.loads(response)
            print(f"收到消息:{data}")

            # 可在此根据业务需求处理数据
            # 例如更新本地订单簿
#要在Jupyter Notebook中运行异步函数,请使用:
#await subscribe_to_public_stream()
#
#在您的代码中运行:
# asyncio.run(subscribe_to_public_stream())  

示例:订阅多个公共数据流

async def subscribe_to_multiple_streams():
    uri = "wss://ws.backpack.exchange"

    async with websockets.connect(uri) as websocket:
        # 订阅多个数据流
        subscribe_message = {
            "method": "SUBSCRIBE",
            "params": ["depth.SOL_USDC", "trade.SOL_USDC"]
        }

        await websocket.send(json.dumps(subscribe_message))
        print(f"已订阅多个数据流")

        while True:
            response = await websocket.recv()
            data = json.loads(response)
            print(f"收到消息:{data}")

            # 根据流名处理不同类型数据
            if "stream" in data and "data" in data:
                stream_name = data["stream"]
                stream_data = data["data"]

                if stream_name.startswith("depth."):
                    print(f"订单簿更新:{stream_data}")
                elif stream_name.startswith("trade."):
                    print(f"成交信息更新:{stream_data}")
#在 Jupyter notebook 中运行:
# await subscribe_to_multiple_streams()
#
#在代码中运行:
#asyncio.run(subscribe_to_multiple_streams())

私有数据流

私有数据流以 account. 开头,需要 API 密钥认证,可获取账户相关更新信息。

私有流的身份认证步骤:

  1. 构造签名字符串,格式如下: instruction=subscribe&timestamp=1614550000000&window=5000

  2. 使用私钥进行签名

  3. 在订阅消息中添加如下签名字段(数组形式): "signature": ["<验证公钥>", "<签名>", "<时间戳>", "<时间窗>"]

验证公钥和签名需使用 base64 编码。

# 从 .env 文件加载 API 密钥
# load_dotenv(find_dotenv())
# public_key = os.getenv("PUBLIC_KEY")
# secret_key = os.getenv("SECRET_KEY")

# 仅用于演示,请勿在生产环境中硬编码密钥
public_key = "5+yQgwU0ZdJ/9s+GXfuPFfo7yQQpl9CgvQedJXne30o="
secret_key = "TDSkv44jf/iD/QCKkyCdixO+p1sfLXxk+PZH7mW/ams="

# 使用密钥创建私钥对象
private_key = ed25519.Ed25519PrivateKey.from_private_bytes(
    base64.b64decode(secret_key)
)

示例:订阅私有数据流

async def subscribe_to_private_stream():
    uri = "wss://ws.backpack.exchange"

    # 生成认证参数
    timestamp = int(time() * 1e3)  # 毫秒时间戳
    window = "5000"  # 有效时间窗(毫秒)

    # 创建签名字符串
    sign_str = f"instruction=subscribe&timestamp={timestamp}&window={window}"

    # 生成签名
    signature_bytes = private_key.sign(sign_str.encode())
    encoded_signature = base64.b64encode(signature_bytes).decode()

    async with websockets.connect(uri) as websocket:
        # 带身份验证订阅订单流
        subscribe_message = {
            "method": "SUBSCRIBE",
            "params": ["account.orderUpdate"],
            "signature": [public_key, encoded_signature, str(timestamp), window]
        }

        await websocket.send(json.dumps(subscribe_message))
        print(f"已订阅 account.order 数据流")

        while True:
            response = await websocket.recv()
            data = json.loads(response)
            print(f"收到消息:{data}")

            # 处理订单更新
            if "stream" in data and data["stream"] == "account.order" and "data" in data:
                order_data = data["data"]
                print(f"订单更新:{order_data}")
#在 Jupyter notebook 中运行该函数:
#await subscribe_to_private_stream()
#若使用 nest_asyncio(推荐):
#asyncio.run(subscribe_to_private_stream())

WebSocket 保活机制:Ping/Pong

WebSocket 连接需通过 Ping-Pong 保持连接活跃。好消息是:Python 的 websockets 库已自动处理该机制。

参考资料

有关更多信息,请访问官方文档:https://docs.backpack.exchange/#tag/Streams

Last updated