火山引擎 ADG 社区

火山引擎 ADG 社区 Python WebSocket编程:websockets库使用指南

Python WebSocket编程:websockets库使用指南

Python WebSocket编程:websockets库使用指南1. 安装和配置 websockets1.1 安装 websockets1.2 导入 websockets2. 创建 WebSocket 服务器2.1 简单的 WebSocket 服务器2.2 处理多个客户端3. 创建 WebSocket 客...

egzosn  ·  2025-01-25 17:09:31 发布
Python WebSocket编程:websockets库使用指南

Python WebSocket编程:websockets库使用指南_python

1. 安装和配置 websockets

1.1 安装 websockets

可以使用 pip 安装 websockets 库:

pipinstall websockets
  • 1.

1.2 导入 websockets

在 Python 脚本中,导入 websockets 库:

import websockets
import asyncio
  • 1.
  • 2.

2. 创建 WebSocket 服务器

2.1 简单的 WebSocket 服务器

以下是一个简单的 WebSocket 服务器示例,接收客户端消息并返回相同的消息(回声服务器):

import asyncio
import websockets

async def echo(websocket, path):
    async for message in websocket:
        print(f"Received message: {message}")
        await websocket.send(message)

start_server = websockets.serve(echo, "localhost", 8765)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.

2.2 处理多个客户端

以下是一个处理多个客户端连接的 WebSocket 服务器示例:

import asyncio
import websockets

connected_clients = set()

async def handler(websocket, path):
    connected_clients.add(websocket)
    try:
        async for message in websocket:
            print(f"Received message: {message}")
            for client in connected_clients:
                if client != websocket:
                    await client.send(message)
    finally:
        connected_clients.remove(websocket)

start_server = websockets.serve(handler, "localhost", 8765)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.

3. 创建 WebSocket 客户端

3.1 简单的 WebSocket 客户端

以下是一个简单的 WebSocket 客户端示例,连接到服务器并发送消息:

import asyncio
import websockets

async def hello():
    uri = "ws://localhost:8765"
    async with websockets.connect(uri) as websocket:
        message = "Hello, WebSocket!"
        await websocket.send(message)
        print(f"Sent message: {message}")

        response = await websocket.recv()
        print(f"Received response: {response}")

asyncio.get_event_loop().run_until_complete(hello())
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.

3.2 持续通信的 WebSocket 客户端

以下是一个持续通信的 WebSocket 客户端示例,不断发送和接收消息:

import asyncio
import websockets

async def communicate():
    uri = "ws://localhost:8765"
    async with websockets.connect(uri) as websocket:
        while True:
            message = input("Enter message to send: ")
            await websocket.send(message)
            print(f"Sent message: {message}")

            response = await websocket.recv()
            print(f"Received response: {response}")

asyncio.get_event_loop().run_until_complete(communicate())
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.

4. 处理消息和错误

4.1 处理消息

在 WebSocket 服务器和客户端中,可以使用 async for 循环处理消息:

async for message in websocket:
    # 处理消息
    await websocket.send(message)
  • 1.
  • 2.
  • 3.

4.2 处理错误

在 WebSocket 服务器和客户端中,可以使用 try-except 块处理错误:

try:
    async for message in websocket:
        # 处理消息
        await websocket.send(message)
except websockets.ConnectionClosed as e:
    print(f"Connection closed: {e}")
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.

5. 最佳实践

5.1 使用 SSL/TLS 加密

在生产环境中,建议使用 SSL/TLS 加密 WebSocket 通信,以确保数据的安全性。

示例:使用 SSL/TLS 加密
import ssl

ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain(certfile="path/to/certfile", keyfile="path/to/keyfile")

start_server = websockets.serve(echo, "localhost", 8765, ssl=ssl_context)
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.

5.2 设置超时和重试机制

在 WebSocket 客户端中,设置超时和重试机制,以提高连接的可靠性。

示例:设置超时和重试机制
import asyncio
import websockets

async def connect_with_retry(uri, max_retries=5):
    retries = 0
    while retries < max_retries:
        try:
            async with websockets.connect(uri) as websocket:
                return websocket
        except (websockets.ConnectionClosed, OSError) as e:
            print(f"Connection failed: {e}")
            retries += 1
            await asyncio.sleep(2 ** retries)
    raise Exception("Max retries exceeded")

async def communicate():
    uri = "ws://localhost:8765"
    websocket = await connect_with_retry(uri)
    async with websocket:
        while True:
            message = input("Enter message to send: ")
            await websocket.send(message)
            print(f"Sent message: {message}")

            response = await websocket.recv()
            print(f"Received response: {response}")

asyncio.get_event_loop().run_until_complete(communicate())
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.

5.3 使用心跳机制

在 WebSocket 服务器和客户端中,使用心跳机制检测连接是否仍然活跃。

示例:使用心跳机制
async def send_heartbeat(websocket, interval=30):
    while True:
        await websocket.send("ping")
        await asyncio.sleep(interval)

async def handler(websocket, path):
    asyncio.create_task(send_heartbeat(websocket))
    async for message in websocket:
        if message != "ping":
            await websocket.send(message)
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.

总结

WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,广泛用于实时应用。websockets 库是一个简单易用且功能强大的 Python 库,适用于构建 WebSocket 服务器和客户端。本文详细介绍了如何使用 websockets 库进行 WebSocket 编程,包括安装和配置、创建 WebSocket 服务器和客户端、处理消息和错误、以及一些最佳实践。

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐

  • 浏览量 1776
  • 收藏 0
  • 0

所有评论(0)

查看更多评论 
已为社区贡献45条内容