Hướng Dẫn Kết Nối WebSocket API Bằng Python trên Backpack Exchange
Hướng dẫn này sẽ giúp bạn hiểu cách sử dụng Backpack Exchange WebSocket API với Python. WebSocket cung cấp các luồng dữ liệu real-time (thời gian thực) cho dữ liệu thị trường và cập nhật tài khoản.
Yêu Cầu Chuẩn Bị
Lấy API Key nếu bạn sử dụng các luồng dữ liệu riêng tư: https://backpack.exchange/portfolio/settings/api-keys
Cài Đặt Các Thư Viện Python Cần Thiết:
websockets - để kết nối WebSocket
cryptography - dùng để ký X-Signature (chỉ khi dùng luồng private)
pip install websockets cryptography
Cài đặt dotenv-python để bảo mật và quản lý key thông qua biến môi trường nếu bạn sử dụng các luồng dữ liệu riêng tư.
pip install python-dotenv
Tạo file .env và lưu key của bạn theo định dạng sau:
PUBLIC_KEY=zDIJj9qneWIY0IYZ5aXoHcNMCm+XDhVcTssiT0HyY0A=
SECRET_KEY=4odxgSUxFrC/zsKWZF4OQwYAgnNu9hnWH3NxWfLAPz4=
Tạo file .gitignore và thêm .env vào để loại trừ file này khỏi hệ thống quản lý phiên bản (version control).
.env
Import các thư viện cần thiết:
import json
import asyncio
import websockets
import base64
from time import time
import os
from cryptography.hazmat.primitives.asymmetric import ed25519
from dotenv import load_dotenv, find_dotenv
Kiến Thức Cơ Bản Về WebSocket API
WebSocket API của Backpack Exchange có thể truy cập tại: wss://ws.backpack.exchange
.
Các luồng WebSocket được đặt tên theo định dạng sau: <type>.<symbol>
Ví dụ:
depth.SOL_USDC
- Order book for SOL/USDCtrade.SOL_USDC
- Trades for SOL/USDC
Vì Sao Nên Dùng Async Khi Làm Việc Với WebSocket
WebSocket được thiết kế để duy trì kết nối lâu dài và nhận dữ liệu theo thời gian thực. Khi kết hợp với lập trình bất đồng bộ (async), bạn sẽ có nhiều lợi ích:
Non-blocking I/O: Async giúp ứng dụng xử lý nhiều kết nối cùng lúc mà không chặn luồng chính.
Tiết kiệm tài nguyên: Async sử dụng ít tài nguyên hơn so với việc tạo nhiều thread để xử lý đồng thời.
Hiệu suất cao hơn: Async có thể xử lý nhiều kết nối cùng lúc với chi phí thấp hơn so với các phương pháp đồng bộ.
Xử lý real-time hiệu quả: Async rất phù hợp khi làm việc với luồng dữ liệu real-time, nơi cần liên tục nhận và xử lý dữ liệu.
Hạn chế của phương pháp đồng bộ (Synchronous):
Quản lý thread phức tạp: Phải tự quản lý nhiều luồng.
Tốn tài nguyên: Mỗi kết nối cần một thread riêng.
Xử lý lỗi khó: Truyền lỗi giữa các thread rất phức tạp.
Khó mở rộng: Không phù hợp khi cần mở rộng nhiều kết nối.
Async là lựa chọn sạch hơn, hiệu quả hơn, dễ bảo trì hơn (như ví dụ trong hướng dẫn này).
Các Luồng Công Khai (Public Streams)
Các luồng công khai không yêu cầu xác thực. Bạn có thể đăng ký (subscribe) trực tiếp để nhận dữ liệu.
Ví dụ: Đăng Ký Một Luồng Công Khai (Public Stream)
async def subscribe_to_public_stream():
uri = "wss://ws.backpack.exchange"
async with websockets.connect(uri) as websocket:
# Subscribe to the depth stream for SOL/USDC
subscribe_message = {
"method": "SUBSCRIBE",
"params": ["depth.SOL_USDC"]
}
await websocket.send(json.dumps(subscribe_message))
print(f"Subscribed to depth.SOL_USDC stream")
# Process incoming messages
while True:
response = await websocket.recv()
data = json.loads(response)
print(f"Received: {data}")
# You can process the data here based on your needs
# For example, update a local order book
# To run the async function in a Jupyter notebook, use:
# await subscribe_to_public_stream()
#
# TO run in your code
# asyncio.run(subscribe_to_public_stream())
Ví dụ: Đăng Ký Nhiều Luồng Công Khai Cùng Lúc
async def subscribe_to_multiple_streams():
uri = "wss://ws.backpack.exchange"
async with websockets.connect(uri) as websocket:
# Subscribe to multiple streams
subscribe_message = {
"method": "SUBSCRIBE",
"params": ["depth.SOL_USDC", "trade.SOL_USDC"]
}
await websocket.send(json.dumps(subscribe_message))
print(f"Subscribed to multiple streams")
# Process incoming messages
while True:
response = await websocket.recv()
data = json.loads(response)
print(f"Received: {data}")
# Process different stream data based on the stream name
if "stream" in data and "data" in data:
stream_name = data["stream"]
stream_data = data["data"]
if stream_name.startswith("depth."):
# Process order book data
print(f"Order book update: {stream_data}")
elif stream_name.startswith("trade."):
# Process trade data
print(f"Trade update: {stream_data}")
# Run the async function in a Jupyter notebook:
# await subscribe_to_multiple_streams()
#
# To run in your code
# asyncio.run(subscribe_to_multiple_streams())
Các Luồng Riêng Tư (Private Streams)
Luồng riêng tư yêu cầu xác thực bằng API Key. Các luồng này được đặt tiền tố là account. và cung cấp các thông tin cập nhật liên quan đến tài khoản của bạn.
Xác Thực Để Sử Dụng Luồng Riêng Tư (Authentication for Private Streams)
Để xác thực khi sử dụng luồng riêng tư, bạn cần thực hiện:
Tạo chuỗi signature theo định dạng:
instruction=subscribe×tamp=1614550000000&window=5000
Ký chuỗi trên bằng Private Key của bạn.
Đưa dữ liệu chữ ký vào thông điệp subscription dưới dạng array:
"signature": ["<verifying key>", "<signature>", "<timestamp>", "<window>"]
Các luồng riêng tư luôn có tiền tố account.
và việc đăng ký (subscribe) luồng riêng tư bắt buộc phải gửi kèm signature. Verifying key và signature cần được mã hóa base64.
# Load API keys from .env file
# load_dotenv(find_dotenv())
# public_key = os.getenv("PUBLIC_KEY")
# secret_key = os.getenv("SECRET_KEY")
# For demonstration purposes only - don't hardcode keys in production
public_key = "5+yQgwU0ZdJ/9s+GXfuPFfo7yQQpl9CgvQedJXne30o="
secret_key = "TDSkv44jf/iD/QCKkyCdixO+p1sfLXxk+PZH7mW/ams="
# Create private key from secret key
private_key = ed25519.Ed25519PrivateKey.from_private_bytes(
base64.b64decode(secret_key)
)
Ví dụ: Đăng Ký Một Luồng Riêng Tư (Private Stream)
Ask ChatGPT
async def subscribe_to_private_stream():
uri = "wss://ws.backpack.exchange"
# Generate authentication parameters
timestamp = int(time() * 1e3) # Unix time in milliseconds
window = "5000" # Time window in milliseconds
# Create signature string
sign_str = f"instruction=subscribe×tamp={timestamp}&window={window}"
# Sign the string
signature_bytes = private_key.sign(sign_str.encode())
encoded_signature = base64.b64encode(signature_bytes).decode()
async with websockets.connect(uri) as websocket:
# Subscribe to the order stream with authentication
subscribe_message = {
"method": "SUBSCRIBE",
"params": ["account.orderUpdate"],
"signature": [public_key, encoded_signature, str(timestamp), window]
}
await websocket.send(json.dumps(subscribe_message))
print(f"Subscribed to account.order stream")
# Process incoming messages
while True:
response = await websocket.recv()
data = json.loads(response)
print(f"Received: {data}")
# Process order updates
if "stream" in data and data["stream"] == "account.order" and "data" in data:
order_data = data["data"]
print(f"Order update: {order_data}")
# Run the async function in a Jupyter notebook:
await subscribe_to_private_stream()
#
# If using nest_asyncio (recommended):
# asyncio.run(subscribe_to_private_stream())
WebSocket Ping/Pong
Kết nối WebSocket yêu cầu cơ chế ping-pong để duy trì kết nối luôn hoạt động. Tin tốt là thư viện T websockets
của Python sẽ tự động xử lý việc này cho bạn.
Nguồn Tham Khảo
Để tìm hiểu thêm, bạn có thể truy cập tài liệu chính thức tại: https://docs.backpack.exchange/#tag/Streams
Last updated