使用PYAV的前提
PyAV 是FFmpeg函式庫的 Pythonic 綁定。目標是提供底層庫的所有功能和控制,但盡可能管理細節。
PyAV 可透過容器、串流、封包、編解碼器和影格直接、精確地存取您的媒體。它公開了該資料的一些轉換,並幫助您將資料傳入/傳出其他套件(例如 Numpy 和 Pillow)。
這種權力確實伴隨著一些責任,因為與媒體合作非常複雜,PyAV 無法將其抽象化或為您做出所有最佳決策。如果該ffmpeg
命令無需您竭盡全力即可完成工作,那麼 PyAV 可能會成為障礙而不是幫助。
所以如果我們想要在推流前處理串流、或者是拉流後處理串流內容,或者是更改串流編碼的相關資訊等【較難直接使用ffmpeg指令達到的事情時】,才較適合使用PYAV。
用RTMP的方式推流
範例程式碼如下:
import av
import asyncio
import cv2
import numpy as np
import threading
import time
from av import VideoFrame
from av.codec import CodecContext
# Define the RTMP server URL
rtmp_url = 'rtmp://127.0.0.1/live/test1' # Replace with your RTMP server URL
# Function to capture webcam frames and push to RTMP server
def capture_and_push():
# Open the video capture device (webcam)
cap = cv2.VideoCapture(0)
# Create an output container for the RTMP stream
output_container = av.open(rtmp_url, 'w', format='flv')
# Set up video stream parameters
video_stream = output_container.add_stream('h264', rate=30)
video_stream.width = 640
video_stream.height = 480
# Create a codec context for H.264 encoding
codecContext = CodecContext.create('h264', 'w')
# Create a thread for encoding and pushing frames
def encode_and_push_frames():
while True:
ret, frame = cap.read()
if not ret:
break
# Convert the frame to a VideoFrame
frame = VideoFrame.from_ndarray(frame, format='bgr24')
frame.pts = frame.pts
frame.time_base = frame.time_base
# Encode the frame and write it to the output container
packet = video_stream.encode(frame)
output_container.mux(packet)
encode_thread = threading.Thread(target=encode_and_push_frames)
encode_thread.start()
# Wait for the encode thread to finish
encode_thread.join()
# Release the video capture device and close the output container
cap.release()
output_container.close()
if __name__ == "__main__":
capture_and_push()
輸出透明背景的影片
若要使用 pyav
(Python 的 ffmpeg/avconv 綁定) 來輸出具有透明背景的影片,你通常會使用像 ProRes 4444
或者 VP9
(與 WebM 容器一同使用) 這類的編碼器,因為它們支持 alpha 通道 (透明度)。
import av
import numpy as np
# 定義影片參數
width, height = 640, 480
duration = 5 # seconds
fps = 30
total_frames = duration * fps
# 創建輸出容器和流
container = av.open('output.webm', mode='w')
stream = container.add_stream('libvpx-vp9', rate=fps)
stream.width = width
stream.height = height
stream.pix_fmt = 'yuv420p'
# 產生影片框,這裡僅作為範例用透明背景
for frame_idx in range(total_frames):
# 創建一個全透明的框
img = np.zeros((height, width, 4), dtype=np.uint8)
# 只是一個範例,所以在畫面中央畫一個半透明的紅色圓
center_x, center_y = width // 2, height // 2
radius = min(width, height) // 4
y, x = np.ogrid[-center_y:height-center_y, -center_x:width-center_x]
mask = x*x + y*y <= radius*radius
img[mask] = [255, 0, 0, 128] # Semi-transparent red
# 轉換成 AVFrame 和寫入流
frame = av.VideoFrame.from_ndarray(img, format='rgba')
for packet in stream.encode(frame):
container.mux(packet)
# 結束編碼
for packet in stream.encode():
container.mux(packet)
container.close()
在串流裡增加自定義的變數
pyav
裡面的frame.side_data
是一個屬性,它通常包含與該幀(幀)相關的附加資訊。這些資訊可能包括但不限於:
- 動態範圍資訊
- 運動向量資訊(對於某些視訊編解碼器)
- 其他 FFmpeg 內部為特定編解碼器或格式定義的元數據
舉個例子,如果你正在解碼一個使用 HEVC(或稱為 H.265)編碼的視訊串流,frame.side_data
可能會包含關於這一幀的 HDR 元資料(如果該視訊支援 HDR)。
通常情況下,大多數應用程式可能不需要直接讀取side_data
。但是,對於需要細節控製或分析的應用程序,這是一個非常有用的屬性。
import av
import pyav
import threading
# Define the source RTMP URL and destination RTMP URL
source_url = 'rtmp://127.0.0.1/live/test1'
destination_url = 'rtmp://127.0.0.1/live/test2'
# Create an input container for the source RTMP stream
input_container = av.open(source_url, mode='r')
# Create an output container for the destination RTMP stream
output_container = av.open(destination_url, mode='w')
# Set up a video encoder for H.264
video_stream = output_container.add_stream('h264', rate=30)
video_stream.options['x264opts'] = 'nal-hrd=cbr'
video_stream.options['c:v'] = 'libx264'
# Define a SEI message to add to the video frames (modify this as needed)
sei_data = b'Some SEI Data'
# Function to add SEI data to frames and write to the output
def process_frames():
for packet in input_container.demux():
if packet.stream.type == 'video':
for frame in packet.decode():
# Add SEI data to the frame
frame.side_data['sei'] = sei_data
# Encode and write the frame to the output
output_container.mux(packet)
# Create a thread to process and write frames
frame_thread = threading.Thread(target=process_frames)
try:
# Start the frame processing thread
frame_thread.start()
# Run the main loop to write the output to the destination RTMP stream
while True:
output_container.mux(output_container.recv())
except (KeyboardInterrupt, pyav.AVError):
pass
finally:
# Clean up resources and close the containers
frame_thread.join()
input_container.close()
output_container.close()