海康威视摄像头rtsp取流;SDK取流;海康威视SDK云台控制
self.status_label.config(text=f"停止控制失败,错误码: {error_code}", foreground="red")self.status_label.config(text=f"控制失败,错误码: {error_code}", foreground="red")self.status_label = ttk.Label(status_frame, text="
一、海康威视rtsp取流
1、 "rtsp://admin:szdl2025@192.168.1.64/streaming/channels/101"
即rtsp://用户名:密码@IP地址/streaming(流模式)//channels/101(主码流)
(不常用,通过该方式取流延时较大 比直接网页登陆看预览延时还大)
2、示例代码
"""
摄像头取流方式
"""
import cv2
import json
def getConfig():
fd = open("config.json", "r")
config = fd.read()
fd.close()
return json.loads(config)
# config = getConfig()
# rtsp_url = config['rtsp_url']
# rtsp_url = "rtsp://username:password@192.168.1.100/stream" # 示例 RTSP URL
# rtsp://admin:szdl2025@192.168.1.64/streaming/channels/101
rtsp_url = "rtsp://admin:szdl2025@192.168.1.64:554/Streaming/Channels/101"
# 创建 VideoCapture 对象
cap = cv2.VideoCapture(rtsp_url)
if not cap.isOpened():
print("无法打开摄像头")
exit()
while True:
# 逐帧捕获
ret, frame = cap.read()
if not ret:
print("无法读取视频流")
break
# 显示帧
cv2.imshow("RTSP Stream", frame)
# 按 'q' 键退出
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# 释放资源
cap.release()
cv2.destroyAllWindows()
二 摄像头SDK取流
1、SDK下载https://open.hikvision.com/download/5cda567cf47ae80dd41a54b3

2、创建Python项目并用pycharm打开(环境我用的conda管理的)

3、新建lib文件夹

4、lib文件夹打开后新建win文件夹

5、下载解压后的SDK头文件中的内容复制无脑粘贴到win中(其实只粘贴必要的就行)

6、将HCNetSDK与PlayCtrl粘贴到项目根目录(去下载的SDK Python示例的第一个demo里找)

之后就是这样

7、用pycharm打开项目 根目录下新建一个.py文件,复制如下代码运行即可
# coding=utf-8
import os
import platform
from HCNetSDK import *
from PlayCtrl import *
import numpy as np
import time
import ctypes
import cv2
"""
这是一个取流并将流转换成图片,再一帧一帧播放的程序
"""
class HKCam(object):
def __init__(self, camIP, username, password, devport=8000):
# 登录的设备信息
self.DEV_IP = create_string_buffer(camIP.encode())
self.DEV_PORT = devport
self.DEV_USER_NAME = create_string_buffer(username.encode())
self.DEV_PASSWORD = create_string_buffer(password.encode())
self.WINDOWS_FLAG = False if platform.system() != "Windows" else True
self.funcRealDataCallBack_V30 = None
self.recent_img = None # 最新帧
self.n_stamp = None # 帧时间戳
self.last_stamp = None # 上次时间戳
# 加载库
if self.WINDOWS_FLAG:
os.chdir(r'./lib/win')
self.Objdll = ctypes.CDLL(r'./HCNetSDK.dll')
self.Playctrldll = ctypes.CDLL(r'./PlayCtrl.dll')
else:
os.chdir(r'./lib/linux')
self.Objdll = cdll.LoadLibrary(r'./libhcnetsdk.so')
self.Playctrldll = cdll.LoadLibrary(r'./libPlayCtrl.so')
# 设置组件库和SSL库加载路径
self.SetSDKInitCfg()
# 初始化DLL
self.Objdll.NET_DVR_Init()
# 启用SDK写日志
self.Objdll.NET_DVR_SetLogToFile(3, bytes('./SdkLog_Python/', encoding="utf-8"), False)
os.chdir(r'../../')
# 登录
(self.lUserId, self.device_info) = self.LoginDev()
if self.lUserId < 0:
print('Login device fail, error code is: %d' % self.Objdll.NET_DVR_GetLastError())
self.Objdll.NET_DVR_Cleanup()
exit()
else:
print(f'摄像头[{camIP}]登录成功!!')
self.start_play()
time.sleep(1)
def start_play(self):
self.PlayCtrl_Port = c_long(-1)
# 获取一个播放句柄
if not self.Playctrldll.PlayM4_GetPort(byref(self.PlayCtrl_Port)):
print(u'获取播放库句柄失败')
# 定义码流回调函数
self.funcRealDataCallBack_V30 = REALDATACALLBACK(self.RealDataCallBack_V30)
# 开启预览
self.preview_info = NET_DVR_PREVIEWINFO()
self.preview_info.hPlayWnd = 0
self.preview_info.lChannel = 1
self.preview_info.dwStreamType = 0
self.preview_info.dwLinkMode = 0
self.preview_info.bBlocked = 1
# 开始预览并且设置回调函数回调获取实时流数据
self.lRealPlayHandle = self.Objdll.NET_DVR_RealPlay_V40(self.lUserId, byref(self.preview_info),
self.funcRealDataCallBack_V30, None)
if self.lRealPlayHandle < 0:
print('Open preview fail, error code is: %d' % self.Objdll.NET_DVR_GetLastError())
self.Objdll.NET_DVR_Logout(self.lUserId)
self.Objdll.NET_DVR_Cleanup()
exit()
def SetSDKInitCfg(self):
if self.WINDOWS_FLAG:
strPath = os.getcwd().encode('gbk')
sdk_ComPath = NET_DVR_LOCAL_SDK_PATH()
sdk_ComPath.sPath = strPath
self.Objdll.NET_DVR_SetSDKInitCfg(2, byref(sdk_ComPath))
self.Objdll.NET_DVR_SetSDKInitCfg(3, create_string_buffer(strPath + b'\libcrypto-1_1-x64.dll'))
self.Objdll.NET_DVR_SetSDKInitCfg(4, create_string_buffer(strPath + b'\libssl-1_1-x64.dll'))
else:
strPath = os.getcwd().encode('utf-8')
sdk_ComPath = NET_DVR_LOCAL_SDK_PATH()
sdk_ComPath.sPath = strPath
self.Objdll.NET_DVR_SetSDKInitCfg(2, byref(sdk_ComPath))
self.Objdll.NET_DVR_SetSDKInitCfg(3, create_string_buffer(strPath + b'/libcrypto.so.1.1'))
self.Objdll.NET_DVR_SetSDKInitCfg(4, create_string_buffer(strPath + b'/libssl.so.1.1'))
def LoginDev(self):
device_info = NET_DVR_DEVICEINFO_V30()
lUserId = self.Objdll.NET_DVR_Login_V30(self.DEV_IP, self.DEV_PORT, self.DEV_USER_NAME, self.DEV_PASSWORD,
byref(device_info))
return (lUserId, device_info)
def read(self):
# 等待新帧
while self.n_stamp == self.last_stamp:
time.sleep(0.001) # 添加小延迟避免CPU占用过高
continue
self.last_stamp = self.n_stamp
return self.n_stamp, self.recent_img
def DecCBFun(self, nPort, pBuf, nSize, pFrameInfo, nUser, nReserved2):
if pFrameInfo.contents.nType == 3: # 视频帧
nWidth = pFrameInfo.contents.nWidth
nHeight = pFrameInfo.contents.nHeight
nStamp = pFrameInfo.contents.nStamp
YUV = np.frombuffer(pBuf[:nSize], dtype=np.uint8)
YUV = np.reshape(YUV, [nHeight + nHeight // 2, nWidth])
img_rgb = cv2.cvtColor(YUV, cv2.COLOR_YUV2BGR_YV12)
self.recent_img, self.n_stamp = img_rgb, nStamp
def RealDataCallBack_V30(self, lPlayHandle, dwDataType, pBuffer, dwBufSize, pUser):
if dwDataType == NET_DVR_SYSHEAD:
# 设置流播放模式
self.Playctrldll.PlayM4_SetStreamOpenMode(self.PlayCtrl_Port, 0)
# 打开码流,送入系统头数据
if self.Playctrldll.PlayM4_OpenStream(self.PlayCtrl_Port, pBuffer, dwBufSize, 1024 * 1024):
# 设置解码回调
self.FuncDecCB = DECCBFUNWIN(self.DecCBFun)
self.Playctrldll.PlayM4_SetDecCallBackExMend(self.PlayCtrl_Port, self.FuncDecCB, None, 0, None)
# 开始解码播放
if self.Playctrldll.PlayM4_Play(self.PlayCtrl_Port, None):
print(u'播放库播放成功')
else:
print(u'播放库播放失败')
else:
print(u'播放库打开流失败')
elif dwDataType == NET_DVR_STREAMDATA:
# 输入流数据
self.Playctrldll.PlayM4_InputData(self.PlayCtrl_Port, pBuffer, dwBufSize)
def release(self):
# 停止预览
self.Objdll.NET_DVR_StopRealPlay(self.lRealPlayHandle)
# 释放播放资源
if self.PlayCtrl_Port.value > -1:
self.Playctrldll.PlayM4_Stop(self.PlayCtrl_Port)
self.Playctrldll.PlayM4_CloseStream(self.PlayCtrl_Port)
self.Playctrldll.PlayM4_FreePort(self.PlayCtrl_Port)
# 登出设备
self.Objdll.NET_DVR_Logout(self.lUserId)
# 清理SDK
self.Objdll.NET_DVR_Cleanup()
print('释放资源结束')
def main():
# 摄像头配置
camIP = '192.168.1.64'
DEV_PORT = 8000
username = 'admin'
password = 'szdl2025'
# 创建摄像头对象
HIK = HKCam(camIP, username, password,devport=8000)
# 创建窗口
window_name = "q = quit;s = save"
cv2.namedWindow(window_name, cv2.WINDOW_NORMAL)
cv2.resizeWindow(window_name, 1920, 1080)
print("按 'q' 键退出程序")
print("按 's' 键保存当前帧")
frame_count = 0
start_time = time.time()
try:
while True:
# 读取一帧
n_stamp, img = HIK.read()
if img is not None:
# 显示帧率
frame_count += 1
elapsed_time = time.time() - start_time
fps = frame_count / elapsed_time if elapsed_time > 0 else 0
# 在图像上显示帧率
fps_text = f"FPS: {fps:.2f}"
cv2.putText(img, fps_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
# 显示图像
cv2.imshow(window_name, img)
# 检查按键
key = cv2.waitKey(1) & 0xFF
if key == ord('q'): # 按 'q' 退出
print("用户请求退出程序")
break
elif key == ord('s'): # 按 's' 保存当前帧
timestamp = time.strftime("%Y%m%d_%H%M%S")
filename = f"capture_{timestamp}.jpg"
cv2.imwrite(filename, img)
print(f"已保存图像: {filename}")
elif key == 27: # ESC键
print("ESC键退出程序")
break
# 检查窗口是否被关闭
try:
if cv2.getWindowProperty(window_name, cv2.WND_PROP_VISIBLE) < 1:
print("窗口已关闭,退出程序")
break
except:
print("窗口异常,退出程序")
break
except KeyboardInterrupt:
print("\n程序被用户中断")
except Exception as e:
print(f"程序运行出错: {e}")
finally:
# 释放资源
HIK.release()
cv2.destroyAllWindows()
print("程序结束")
if __name__ == "__main__":
main()
三、海康威视SDK云台控制
1、环境和上面sdk取流类似 搭建好环境项目根目录下新建.py文件 然后输入以下代码运行即可
"""
需要海康威视的HCNetSDK和PlayCtrl库文件
放置在lib/win或lib/linux目录下
确保网络可达摄像头设备
"""
# coding=utf-8
import os
import platform
import tkinter as tk
from tkinter import ttk
import ctypes
from ctypes import *
from HCNetSDK import *
from PlayCtrl import *
from time import sleep
"""
这是一个云台控制GUI界面 可以缩小放大旋转
"""
class PTZCameraController:
def __init__(self):
# 设备配置
self.DEV_IP = create_string_buffer(b'192.168.1.64')
self.DEV_PORT = 8000
self.DEV_USER_NAME = create_string_buffer(b'admin')
self.DEV_PASSWORD = create_string_buffer(b'szdl2025')
# 系统标志
self.WINDOWS_FLAG = platform.system() == "Windows"
# SDK相关变量
self.Objdll = None
self.Playctrldll = None
self.PlayCtrl_Port = c_long(-1)
self.lUserId = None
self.lRealPlayHandle = None
# 回调函数
self.funcRealDataCallBack_V30 = None
self.FuncDecCB = None
# GUI相关
self.win = None
self.cv = None
self.current_command = None # 当前控制命令
# 云台状态跟踪
self.ptz_limits = {
TILT_DOWN: False, # 跟踪下极限状态
TILT_UP: False, # 跟踪上极限状态
PAN_LEFT: False, # 跟踪左极限状态
PAN_RIGHT: False # 跟踪右极限状态
}
# 初始化
self.initialize_sdk()
self.setup_gui()
def setup_gui(self):
"""设置GUI界面"""
self.win = tk.Tk()
self.win.title("PTZ摄像头控制系统")
# 设置窗口大小 - 适应1920x1080屏幕
self.win.geometry("1000x700")
self.win.minsize(800, 600)
# 创建主框架
main_frame = ttk.Frame(self.win, padding="10")
main_frame.pack(fill=tk.BOTH, expand=True)
# 视频显示区域 - 使用16:9比例
video_frame = ttk.LabelFrame(main_frame, text="视频预览", padding="5")
video_frame.pack(fill=tk.BOTH, expand=True, pady=(0, 10))
# 设置视频显示区域为16:9比例 (854x480)
video_width = 854
video_height = 480
self.cv = tk.Canvas(video_frame, bg='black', width=video_width, height=video_height)
self.cv.pack(fill=tk.BOTH, expand=True)
# 控制按钮区域
control_frame = ttk.LabelFrame(main_frame, text="云台控制", padding="10")
control_frame.pack(fill=tk.X, pady=5)
# 云台控制按钮布局
self.create_control_buttons(control_frame)
# 状态栏
status_frame = ttk.Frame(main_frame)
status_frame.pack(fill=tk.X, pady=5)
self.status_label = ttk.Label(status_frame, text="系统就绪", foreground="green")
self.status_label.pack(side=tk.LEFT)
ttk.Button(status_frame, text="退出", command=self.cleanup).pack(side=tk.RIGHT)
def create_control_buttons(self, parent):
"""创建云台控制按钮 - 简化布局确保显示"""
# 使用Frame来组织按钮布局
button_frame = ttk.Frame(parent)
button_frame.pack(fill=tk.X, pady=10)
# 按钮配置
button_config = {
'width': 8,
'padding': (5, 3)
}
# 创建所有按钮并放在同一行
buttons = [
("上", TILT_UP),
("下", TILT_DOWN),
("左", PAN_LEFT),
("右", PAN_RIGHT),
("放大", ZOOM_IN),
("缩小", ZOOM_OUT)
]
self.control_buttons = {} # 存储按钮引用
# 添加一个居中的容器来确保按钮居中显示
center_frame = ttk.Frame(button_frame)
center_frame.pack(expand=True)
for text, command in buttons:
btn = ttk.Button(center_frame, text=text, **button_config)
btn.pack(side=tk.LEFT, padx=5)
# 绑定事件
btn.bind('<ButtonPress-1>', lambda e, cmd=command: self.ptz_control_start(cmd))
btn.bind('<ButtonRelease-1>', lambda e, cmd=command: self.ptz_control_stop(cmd))
# 存储按钮引用
self.control_buttons[command] = btn
def ptz_control_start(self, command):
"""开始云台控制"""
# 检查极限状态
if self.ptz_limits.get(command, False):
direction_names = {
PAN_LEFT: "向左", PAN_RIGHT: "向右",
TILT_UP: "向上", TILT_DOWN: "向下",
ZOOM_IN: "放大", ZOOM_OUT: "缩小"
}
direction = direction_names.get(command, "未知")
self.status_label.config(text=f"云台已到达{direction}极限,无法继续移动", foreground="orange")
return
speed = 7 # 默认速度
self.current_command = command
# 开始控制
lRet = self.Objdll.NET_DVR_PTZControlWithSpeed(self.lRealPlayHandle, command, 0, speed)
if lRet == 0:
error_code = self.Objdll.NET_DVR_GetLastError()
self.status_label.config(text=f"控制失败,错误码: {error_code}", foreground="red")
print(f'PTZ control fail, error code is: {error_code}')
# 如果控制失败,可能是到达极限,更新状态
if error_code in [7, 8]: # 常见的极限错误码,具体根据SDK文档调整
self.ptz_limits[command] = True
direction_names = {
PAN_LEFT: "向左", PAN_RIGHT: "向右",
TILT_UP: "向上", TILT_DOWN: "向下",
ZOOM_IN: "放大", ZOOM_OUT: "缩小"
}
direction = direction_names.get(command, "未知")
self.status_label.config(text=f"云台已到达{direction}极限", foreground="orange")
else:
direction_names = {
PAN_LEFT: "向左", PAN_RIGHT: "向右",
TILT_UP: "向上", TILT_DOWN: "向下",
ZOOM_IN: "放大", ZOOM_OUT: "缩小"
}
direction = direction_names.get(command, "未知")
self.status_label.config(text=f"云台{direction}转动中...", foreground="blue")
print(f'PTZ control started: {direction}')
# 重置相反方向的极限状态
opposite_commands = {
TILT_UP: TILT_DOWN,
TILT_DOWN: TILT_UP,
PAN_LEFT: PAN_RIGHT,
PAN_RIGHT: PAN_LEFT,
ZOOM_IN: ZOOM_OUT,
ZOOM_OUT: ZOOM_IN
}
if command in opposite_commands:
self.ptz_limits[opposite_commands[command]] = False
def ptz_control_stop(self, command):
"""停止云台控制"""
speed = 7
# 如果已经到达极限,直接返回
if self.ptz_limits.get(command, False):
return
lRet = self.Objdll.NET_DVR_PTZControlWithSpeed(self.lRealPlayHandle, command, 1, speed)
if lRet:
direction_names = {
PAN_LEFT: "向左", PAN_RIGHT: "向右",
TILT_UP: "向上", TILT_DOWN: "向下",
ZOOM_IN: "放大", ZOOM_OUT: "缩小"
}
direction = direction_names.get(command, "未知")
self.status_label.config(text=f"云台停止{direction}转动", foreground="green")
print(f'PTZ control stopped: {direction}')
else:
error_code = self.Objdll.NET_DVR_GetLastError()
self.status_label.config(text=f"停止控制失败,错误码: {error_code}", foreground="red")
self.current_command = None
def initialize_sdk(self):
"""初始化SDK"""
# 加载库
if self.WINDOWS_FLAG:
os.chdir(r'./lib/win')
self.Objdll = ctypes.CDLL(r'./HCNetSDK.dll')
self.Playctrldll = ctypes.CDLL(r'./PlayCtrl.dll')
else:
os.chdir(r'./lib/linux')
self.Objdll = cdll.LoadLibrary(r'./libhcnetsdk.so')
self.Playctrldll = cdll.LoadLibrary(r'./libPlayCtrl.so')
# 设置SDK初始化配置
self.set_sdk_init_cfg()
# 初始化DLL
self.Objdll.NET_DVR_Init()
self.Objdll.NET_DVR_SetLogToFile(3, bytes('./SdkLog_Python/', encoding="utf-8"), False)
# 获取播放句柄
if not self.Playctrldll.PlayM4_GetPort(byref(self.PlayCtrl_Port)):
print('获取播放库句柄失败')
# 登录设备
if not self.login_dev():
print("设备登录失败")
return False
# 开始预览
if not self.start_preview():
print("开始预览失败")
return False
return True
def set_sdk_init_cfg(self):
"""设置SDK初始化配置"""
if self.WINDOWS_FLAG:
strPath = os.getcwd().encode('gbk')
sdk_ComPath = NET_DVR_LOCAL_SDK_PATH()
sdk_ComPath.sPath = strPath
self.Objdll.NET_DVR_SetSDKInitCfg(2, byref(sdk_ComPath))
self.Objdll.NET_DVR_SetSDKInitCfg(3, create_string_buffer(strPath + b'\libcrypto-1_1-x64.dll'))
self.Objdll.NET_DVR_SetSDKInitCfg(4, create_string_buffer(strPath + b'\libssl-1_1-x64.dll'))
else:
strPath = os.getcwd().encode('utf-8')
sdk_ComPath = NET_DVR_LOCAL_SDK_PATH()
sdk_ComPath.sPath = strPath
self.Objdll.NET_DVR_SetSDKInitCfg(2, byref(sdk_ComPath))
self.Objdll.NET_DVR_SetSDKInitCfg(3, create_string_buffer(strPath + b'/libcrypto.so.1.1'))
self.Objdll.NET_DVR_SetSDKInitCfg(4, create_string_buffer(strPath + b'/libssl.so.1.1'))
def login_dev(self):
"""登录设备"""
device_info = NET_DVR_DEVICEINFO_V30()
self.lUserId = self.Objdll.NET_DVR_Login_V30(
self.DEV_IP, self.DEV_PORT, self.DEV_USER_NAME, self.DEV_PASSWORD, byref(device_info))
if self.lUserId < 0:
print('Login device fail, error code is: %d' % self.Objdll.NET_DVR_GetLastError())
return False
return True
def start_preview(self):
"""开始预览"""
preview_info = NET_DVR_PREVIEWINFO()
preview_info.hPlayWnd = 0
preview_info.lChannel = 1
preview_info.dwStreamType = 0
preview_info.dwLinkMode = 0
preview_info.bBlocked = 1
self.funcRealDataCallBack_V30 = REALDATACALLBACK(self.RealDataCallBack_V30)
self.lRealPlayHandle = self.Objdll.NET_DVR_RealPlay_V40(
self.lUserId, byref(preview_info), self.funcRealDataCallBack_V30, None)
if self.lRealPlayHandle < 0:
print('Open preview fail, error code is: %d' % self.Objdll.NET_DVR_GetLastError())
return False
return True
def RealDataCallBack_V30(self, lPlayHandle, dwDataType, pBuffer, dwBufSize, pUser):
"""实时数据回调"""
if dwDataType == NET_DVR_SYSHEAD:
self.Playctrldll.PlayM4_SetStreamOpenMode(self.PlayCtrl_Port, 0)
if self.Playctrldll.PlayM4_OpenStream(self.PlayCtrl_Port, pBuffer, dwBufSize, 1024 * 1024):
self.FuncDecCB = DECCBFUNWIN(self.DecCBFun)
self.Playctrldll.PlayM4_SetDecCallBackExMend(self.PlayCtrl_Port, self.FuncDecCB, None, 0, None)
if self.Playctrldll.PlayM4_Play(self.PlayCtrl_Port, self.cv.winfo_id()):
print('播放库播放成功')
self.status_label.config(text="视频播放中", foreground="green")
else:
print('播放库播放失败')
else:
print('播放库打开流失败')
elif dwDataType == NET_DVR_STREAMDATA:
self.Playctrldll.PlayM4_InputData(self.PlayCtrl_Port, pBuffer, dwBufSize)
def DecCBFun(self, nPort, pBuf, nSize, pFrameInfo, nUser, nReserved2):
"""解码回调函数"""
pass # 可以在这里添加图像处理代码
def cleanup(self):
"""清理资源"""
self.status_label.config(text="正在关闭...", foreground="orange")
# 确保停止所有云台控制
if self.current_command is not None:
self.ptz_control_stop(self.current_command)
if self.lRealPlayHandle is not None:
self.Objdll.NET_DVR_StopRealPlay(self.lRealPlayHandle)
if self.PlayCtrl_Port.value > -1:
self.Playctrldll.PlayM4_Stop(self.PlayCtrl_Port)
self.Playctrldll.PlayM4_CloseStream(self.PlayCtrl_Port)
self.Playctrldll.PlayM4_FreePort(self.PlayCtrl_Port)
if self.lUserId is not None:
self.Objdll.NET_DVR_Logout(self.lUserId)
self.Objdll.NET_DVR_Cleanup()
self.win.quit()
def run(self):
"""运行应用程序"""
try:
self.win.mainloop()
except Exception as e:
print(f"运行错误: {e}")
finally:
self.cleanup()
if __name__ == '__main__':
app = PTZCameraController()
app.run()
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)