Python WebSocket 指南
适用于 Backpack Exchange API
本指南演示了如何使用 Python 连接 Backpack Exchange 的 WebSocket API。 WebSocket 提供用于市场数据和账户更新的实时数据流。
前提条件
若要使用私有数据流,请先获取您的 API 密钥: https://backpack.exchange/portfolio/settings/api-keys
安装所需的 Python 库:
• websockets - 用于建立 WebSocket 连接 • cryptography - 用于生成 X-Signature(仅用于私有数据流)
pip install websockets cryptography
如果使用私有数据流,建议安装 dotenv-python,通过环境变量安全管理密钥
pip install python-dotenv
创建 .env
文件并如下存储您的密钥:
PUBLIC_KEY=zDIJj9qneWIY0IYZ5aXoHcNMCm+XDhVcTssiT0HyY0A=
SECRET_KEY=4odxgSUxFrC/zsKWZF4OQwYAgnNu9hnWH3NxWfLAPz4=
创建 .gitignore
文件,并添加 .env
,以避免其被纳入版本控制。
.env
导入所需库:
import json
import asyncio
import websockets
import base64
from time import time
import os
from cryptography.hazmat.primitives.asymmetric import ed25519
from dotenv import load_dotenv, find_dotenv
WebSocket API 基础
Backpack Exchange WebSocket API 地址为:wss://ws.backpack.exchange
WebSocket 流的命名格式为:<类型>.<交易对>
例如:
• depth.SOL_USDC
- SOL/USDC 的订单簿
• trade.SOL_USDC
- SOL/USDC 的成交信息
为什么使用异步方式处理 WebSocket?
WebSocket 适用于持续连接并实时接收数据的场景。使用异步(async)方式编程具有以下优势:
非阻塞 I/O:异步可处理多个连接而不阻塞主线程。
资源高效:相比多线程,异步使用更少的资源。
性能更佳:处理大量连接时,异步开销更小。
实时处理:适用于需持续接收和处理数据的场景。
传统同步方式存在诸多缺点:
线程管理复杂:需要手动管理多个线程
资源占用高:每个连接都需独立线程
错误处理困难:线程间错误传播复杂
扩展性差:不易扩展以支持更多连接
而异步方式(本指南中的示例)更简洁、高效且更易维护。
公共数据流
无需身份验证即可订阅公共数据流,直接连接即可。
示例:订阅公共数据流
async def subscribe_to_public_stream():
uri = "wss://ws.backpack.exchange"
async with websockets.connect(uri) as websocket:
# 订阅 SOL/USDC 的订单簿流
subscribe_message = {
"method": "SUBSCRIBE",
"params": ["depth.SOL_USDC"]
}
await websocket.send(json.dumps(subscribe_message))
print(f"已订阅 depth.SOL_USDC 数据流")
# 处理收到的消息
while True:
response = await websocket.recv()
data = json.loads(response)
print(f"收到消息:{data}")
# 可在此根据业务需求处理数据
# 例如更新本地订单簿
#要在Jupyter Notebook中运行异步函数,请使用:
#await subscribe_to_public_stream()
#
#在您的代码中运行:
# asyncio.run(subscribe_to_public_stream())
示例:订阅多个公共数据流
async def subscribe_to_multiple_streams():
uri = "wss://ws.backpack.exchange"
async with websockets.connect(uri) as websocket:
# 订阅多个数据流
subscribe_message = {
"method": "SUBSCRIBE",
"params": ["depth.SOL_USDC", "trade.SOL_USDC"]
}
await websocket.send(json.dumps(subscribe_message))
print(f"已订阅多个数据流")
while True:
response = await websocket.recv()
data = json.loads(response)
print(f"收到消息:{data}")
# 根据流名处理不同类型数据
if "stream" in data and "data" in data:
stream_name = data["stream"]
stream_data = data["data"]
if stream_name.startswith("depth."):
print(f"订单簿更新:{stream_data}")
elif stream_name.startswith("trade."):
print(f"成交信息更新:{stream_data}")
#在 Jupyter notebook 中运行:
# await subscribe_to_multiple_streams()
#
#在代码中运行:
#asyncio.run(subscribe_to_multiple_streams())
私有数据流
私有数据流以 account.
开头,需要 API 密钥认证,可获取账户相关更新信息。
私有流的身份认证步骤:
构造签名字符串,格式如下:
instruction=subscribe×tamp=1614550000000&window=5000
使用私钥进行签名
在订阅消息中添加如下签名字段(数组形式):
"signature": ["<验证公钥>", "<签名>", "<时间戳>", "<时间窗>"]
验证公钥和签名需使用 base64 编码。
# 从 .env 文件加载 API 密钥
# load_dotenv(find_dotenv())
# public_key = os.getenv("PUBLIC_KEY")
# secret_key = os.getenv("SECRET_KEY")
# 仅用于演示,请勿在生产环境中硬编码密钥
public_key = "5+yQgwU0ZdJ/9s+GXfuPFfo7yQQpl9CgvQedJXne30o="
secret_key = "TDSkv44jf/iD/QCKkyCdixO+p1sfLXxk+PZH7mW/ams="
# 使用密钥创建私钥对象
private_key = ed25519.Ed25519PrivateKey.from_private_bytes(
base64.b64decode(secret_key)
)
示例:订阅私有数据流
async def subscribe_to_private_stream():
uri = "wss://ws.backpack.exchange"
# 生成认证参数
timestamp = int(time() * 1e3) # 毫秒时间戳
window = "5000" # 有效时间窗(毫秒)
# 创建签名字符串
sign_str = f"instruction=subscribe×tamp={timestamp}&window={window}"
# 生成签名
signature_bytes = private_key.sign(sign_str.encode())
encoded_signature = base64.b64encode(signature_bytes).decode()
async with websockets.connect(uri) as websocket:
# 带身份验证订阅订单流
subscribe_message = {
"method": "SUBSCRIBE",
"params": ["account.orderUpdate"],
"signature": [public_key, encoded_signature, str(timestamp), window]
}
await websocket.send(json.dumps(subscribe_message))
print(f"已订阅 account.order 数据流")
while True:
response = await websocket.recv()
data = json.loads(response)
print(f"收到消息:{data}")
# 处理订单更新
if "stream" in data and data["stream"] == "account.order" and "data" in data:
order_data = data["data"]
print(f"订单更新:{order_data}")
#在 Jupyter notebook 中运行该函数:
#await subscribe_to_private_stream()
#若使用 nest_asyncio(推荐):
#asyncio.run(subscribe_to_private_stream())
WebSocket 保活机制:Ping/Pong
WebSocket 连接需通过 Ping-Pong 保持连接活跃。好消息是:Python 的 websockets
库已自动处理该机制。
参考资料
有关更多信息,请访问官方文档:https://docs.backpack.exchange/#tag/Streams
Last updated