并行处理视频流:使用 PySpark 进行大规模视频分析

DDD
发布: 2025-08-19 23:08:04
原创
480人浏览过

并行处理视频流:使用 pyspark 进行大规模视频分析

本文档介绍了如何使用 PySpark 并行处理多个视频文件,并进行人脸识别等视频分析任务。我们将探讨如何利用 Spark 的分布式计算能力,高效地从视频中提取帧,检测人脸,并进行人脸追踪。本文提供了详细的代码示例和步骤,帮助读者理解和应用 PySpark 进行大规模视频处理。

环境配置

首先,确保你的环境中安装了必要的 Python 库和 FFmpeg。

  1. 安装 Python 库:

    pip install ffmpeg-python
    pip install face-recognition
    conda install -c conda-forge opencv
    登录后复制
  2. 安装 FFmpeg:

    确保你的系统上安装了 FFmpeg。具体的安装方法取决于你的操作系统。例如,在 Ubuntu 上可以使用以下命令:

    sudo apt-get update
    sudo apt-get install ffmpeg
    登录后复制

PySpark 代码实现

以下代码展示了如何使用 PySpark 并行读取视频文件,提取帧,并进行人脸检测和追踪。

from pyspark import SQLContext, SparkConf, SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 配置 Spark
conf = SparkConf().setAppName("myApp").setMaster("local[40]")
spark = SparkSession.builder.master("local[40]").config("spark.driver.memory", "30g").getOrCreate()

sc = spark.sparkContext
sqlContext = SQLContext(sc)

import cv2
import os
import uuid
import ffmpeg
import subprocess
import numpy as np

from scipy.optimize import linear_sum_assignment
import pyspark.sql.functions as F
from pyspark.sql import Row
from pyspark.sql.types import (StructType, StructField,
                               IntegerType, FloatType,
                               ArrayType, BinaryType,
                               MapType, DoubleType, StringType)

from pyspark.sql.window import Window
from pyspark.ml.feature import StringIndexer
from pyspark.sql import Row, DataFrame, SparkSession

import pathlib

# 指定视频文件目录
input_dir = "../data/video_files/faces/"
pathlist = list(pathlib.Path(input_dir).glob('*.mp4'))
pathlist = [Row(str(ele)) for ele in pathlist]

# 创建 DataFrame
column_name = ["video_uri"]
df = sqlContext.createDataFrame(data=pathlist, schema=column_name)

print("Initial dataframe")
df.show(10, truncate=False)

# 定义视频元数据 Schema
video_metadata = StructType([
    StructField("width", IntegerType(), False),
    StructField("height", IntegerType(), False),
    StructField("num_frames", IntegerType(), False),
    StructField("duration", FloatType(), False)
])

# 定义 Shot Schema
shots_schema = ArrayType(
    StructType([
        StructField("start", FloatType(), False),
        StructField("end", FloatType(), False)
    ]))

# 定义 UDF:提取视频元数据
@F.udf(returnType=video_metadata)
def video_probe(uri):
    probe = ffmpeg.probe(uri, threads=1)
    video_stream = next(
        (
            stream
            for stream in probe["streams"]
            if stream["codec_type"] == "video"
        ),
        None,
    )
    width = int(video_stream["width"])
    height = int(video_stream["height"])
    num_frames = int(video_stream["nb_frames"])
    duration = float(video_stream["duration"])
    return (width, height, num_frames, duration)

# 定义 UDF:提取视频帧
@F.udf(returnType=ArrayType(BinaryType()))
def video2images(uri, width, height,
                 sample_rate: int = 5,
                 start: float = 0.0,
                 end: float = -1.0,
                 n_channels: int = 3):
    """
    Uses FFmpeg filters to extract image byte arrays
    and sampled & localized to a segment of video in time.
    """
    video_data, _ = (
        ffmpeg.input(uri, threads=1)
        .output(
            "pipe:",
            format="rawvideo",
            pix_fmt="rgb24",
            ss=start,
            t=end - start,
            r=1 / sample_rate,
        ).run(capture_stdout=True))
    img_size = height * width * n_channels
    return [video_data[idx:idx + img_size] for idx in range(0, len(video_data), img_size)]

# 添加视频元数据列
df = df.withColumn("metadata", video_probe(F.col("video_uri")))
print("With Metadata")
df.show(10, truncate=False)

# 提取视频帧
df = df.withColumn("frame", F.explode(
    video2images(F.col("video_uri"), F.col("metadata.width"), F.col("metadata.height"), F.lit(1), F.lit(0.0),
                 F.lit(5.0))))

# 定义人脸检测相关 Schema 和 UDF
box_struct = StructType(
    [
        StructField("xmin", IntegerType(), False),
        StructField("ymin", IntegerType(), False),
        StructField("xmax", IntegerType(), False),
        StructField("ymax", IntegerType(), False)
    ]
)

def bbox_helper(bbox):
    top, right, bottom, left = bbox
    bbox = [top, left, bottom, right]

    return list(map(lambda x: max(x, 0), bbox))

@F.udf(returnType=ArrayType(box_struct))
def face_detector(img_data, width=1920, height=1080, n_channels=3):
    img = np.frombuffer(img_data, np.uint8).reshape(height, width, n_channels)
    faces = face_recognition.face_locations(img)
    return [bbox_helper(f) for f in faces]

# 进行人脸检测
df = df.withColumn("faces", face_detector(F.col("frame"), F.col("metadata.width"), F.col("metadata.height")))

# 定义人脸追踪相关 Schema 和 UDF
annot_schema = ArrayType(
    StructType(
        [
            StructField("bbox", box_struct, False),
            StructField("tracker_id", StringType(), False),
        ]
    )
)

def bbox_iou(b1, b2):
    L = list(zip(b1, b2))
    left, top = np.max(L, axis=1)[:2]
    right, bottom = np.min(L, axis=1)[2:]
    if right < left or bottom < top:
        return 0
    b_area = lambda b: (b[2] - b[0]) * (b[3] - b[1])
    inter_area = b_area([left, top, right, bottom])
    b1_area, b2_area = b_area(b1), b_area(b2)
    iou = inter_area / float(b1_area + b2_area - inter_area)
    return iou

@F.udf(returnType=MapType(IntegerType(), IntegerType()))
def tracker_match(trackers, detections, bbox_col="bbox", threshold=0.3):
    """
    Match Bounding Boxes across successive image frames.
    """
    from scipy.optimize import linear_sum_assignment

    similarity = bbox_iou
    if not trackers or not detections:
        return {}
    if len(trackers) == len(detections) == 1:
        if (
                similarity(trackers[0][bbox_col], detections[0][bbox_col])
                >= threshold
        ):
            return {0: 0}

    sim_mat = np.array(
        [
            [
                similarity(tracked[bbox_col], detection[bbox_col])
                for tracked in trackers
            ]
            for detection in detections
        ],
        dtype=np.float32,
    )

    matched_idx = linear_sum_assignment(-sim_mat)
    matches = []
    for m in matched_idx:
        try:
            if sim_mat[m[0], m[1]] >= threshold:
                matches.append(m.reshape(1, 2))
        except:
            pass

    if len(matches) == 0:
        return {}
    else:
        matches = np.concatenate(matches, axis=0, dtype=int)

    rows, cols = zip(*np.where(matches))
    idx_map = {cols[idx]: rows[idx] for idx in range(len(rows))}
    return idx_map

@F.udf(returnType=ArrayType(box_struct))
def OFMotionModel(frame, prev_frame, bboxes, height, width):
    if not prev_frame:
        prev_frame = frame
    gray = cv2.cvtColor(np.frombuffer(frame, np.uint8).reshape(height, width, 3), cv2.COLOR_BGR2GRAY)
    prev_gray = cv2.cvtColor(np.frombuffer(prev_frame, np.uint8).reshape(height, width, 3), cv2.COLOR_BGR2GRAY)

    inst = cv2.DISOpticalFlow.create(cv2.DISOPTICAL_FLOW_PRESET_MEDIUM)
    inst.setUseSpatialPropagation(False)

    flow = inst.calc(prev_gray, gray, None)

    h, w = flow.shape[:2]
    shifted_boxes = []
    for box in bboxes:
        xmin, ymin, xmax, ymax = box
        avg_y = np.mean(flow[int(ymin):int(ymax), int(xmin):int(xmax), 0])
        avg_x = np.mean(flow[int(ymin):int(ymax), int(xmin):int(xmax), 1])

        shifted_boxes.append(
            {"xmin": int(max(0, xmin + avg_x)), "ymin": int(max(0, ymin + avg_y)), "xmax": int(min(w, xmax + avg_x)),
             "ymax": int(min(h, ymax + avg_y))})
    return shifted_boxes

def match_annotations(iterator, segment_id="video_uri", id_col="tracker_id"):
    """
    Used by mapPartitions to iterate over the small chunks of our hierarchically-organized data.
    """

    matched_annots = []
    for idx, data in enumerate(iterator):
        data = data[1]
        if not idx:
            old_row = {idx: uuid.uuid4() for idx in range(len(data[1]))}
            old_row[segment_id] = data[0]
            pass
        annots = []
        curr_row = {segment_id: data[0]}
        if old_row[segment_id] != curr_row[segment_id]:
            old_row = {}
        if data[2] is not None:
            for ky, vl in data[2].items():
                detection = data[1][vl].asDict()
                detection[id_col] = old_row.get(ky, uuid.uuid4())
                curr_row[vl] = detection[id_col]
                annots.append(Row(**detection))
        matched_annots.append(annots)
        old_row = curr_row
    return matched_annots

def track_detections(df, segment_id="video_uri", frames="frame", detections="faces", optical_flow=True):
    id_col = "tracker_id"
    frame_window = Window().orderBy(frames)
    value_window = Window().orderBy("value")
    annot_window = Window.partitionBy(segment_id).orderBy(segment_id, frames)
    indexer = StringIndexer(inputCol=segment_id, outputCol="vidIndex")

    # adjust detections w/ optical flow
    if optical_flow:
        df = (
            df.withColumn("prev_frames", F.lag(F.col(frames)).over(annot_window))
            .withColumn(detections, OFMotionModel(F.col(frames), F.col("prev_frames"), F.col(detections), F.col("metadata.height"), F.col("metadata.width")))
        )

    df = (
        df.select(segment_id, frames, detections)
        .withColumn("bbox", F.explode(detections))
        .withColumn(id_col, F.lit(""))
        .withColumn("trackables", F.struct([F.col("bbox"), F.col(id_col)]))
        .groupBy(segment_id, frames, detections)
        .agg(F.collect_list("trackables").alias("trackables"))
        .withColumn(
            "old_trackables", F.lag(F.col("trackables")).over(annot_window)
        )
        .withColumn(
            "matched",
            tracker_match(F.col("trackables"), F.col("old_trackables")),
        )
        .withColumn("frame_index", F.row_number().over(frame_window))
    )

    df = (
        indexer.fit(df)
        .transform(df)
        .withColumn("vidIndex", F.col("vidIndex").cast(StringType()))
    )
    unique_ids = df.select("vidIndex").distinct().count()
    matched = (
        df.select("vidIndex", segment_id, "trackables", "matched")
        .rdd.map(lambda x: (x[0], x[1:]))
        .partitionBy(unique_ids, lambda x: int(x[0]))
        .mapPartitions(match_annotations)
    )
    matched_annotations = sqlContext.createDataFrame(matched, annot_schema).withColumn("value_index",
                                                                                       F.row_number().over(
                                                                                           value_window))

    return (
        df.join(matched_annotations, F.col("value_index") == F.col("frame_index"))
        .withColumnRenamed("value", "trackers_matched")
        .withColumn("tracked", F.explode(F.col("trackers_matched")))
        .select(
            segment_id,
            frames,
            detections,
            F.col("tracked.{}".format("bbox")).alias("bbox"),
            F.col("tracked.{}".format(id_col)).alias(id_col),
        )
        .withColumn(id_col, F.sha2(F.concat(F.col(segment_id), F.col(id_col)), 256))
        .withColumn("tracked_detections", F.struct([F.col("bbox"), F.col(id_col)]))
        .groupBy(segment_id, frames, detections)
        .agg(F.collect_list("tracked_detections").alias("tracked_detections"))
        .orderBy(segment_id, frames, detections)
    )

# 定义 DetectionTracker Transformer
from pyspark import keyword_only
from pyspark.ml.pipeline import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param

class DetectionTracker(Transformer, HasInputCol, HasOutputCol):
    """Detect and track."""

    @keyword_only
    def __init__(self, inputCol=None, outputCol=None, framesCol=None, detectionsCol=None, optical_flow=None):
        """Initialize."""
        super(DetectionTracker, self).__init__()
        self.framesCol = Param(self, "framesCol", "Column containing frames.")
        self.detectionsCol = Param(self, "detectionsCol", "Column containing detections.")
        self.optical_flow = Param(self, "optical_flow", "Use optical flow for tracker correction. Default is False")
        self._setDefault(framesCol="frame", detectionsCol="faces", optical_flow=False)
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None, framesCol=None, detectionsCol=None, optical_flow=None):
        """Get params."""
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def setFramesCol(self, value):
        """Set framesCol."""
        return self._set(framesCol=value)

    def getFramesCol(self):
        """Get framesCol."""
        return self.getOrDefault(self.framesCol)

    def setDetectionsCol(self, value):
        """Set detectionsCol."""
        return self._set(detectionsCol=value)

    def getDetectionsCol(self):
        """Get detectionsCol."""
        return self.getOrDefault(self.detectionsCol)

    def setOpticalflow(self, value):
        """Set optical_flow."""
        return self._set(optical_flow=value)

    def getOpticalflow(self):
        """Get optical_flow."""
        return self.getOrDefault(self.optical_flow)

    def _transform(self, dataframe):
        """Do transformation."""
        input_col = self.getInputCol()
        output_col = self.getOutputCol()
        frames_col = self.getFramesCol()
        detections_col = self.getDetectionsCol()
        optical_flow = self.getOpticalflow()

        id_col = "tracker_id"
        frame_window = Window().orderBy(frames_col)
        value_window = Window().orderBy("value")
        annot_window = Window.partitionBy(input_col).orderBy(input_col, frames_col)
        indexer = StringIndexer(inputCol=input_col, outputCol="vidIndex")

        # adjust detections w/ optical flow
        if optical_flow:
            dataframe = (
                dataframe.withColumn("prev_frames", F.lag(F.col(frames_col)).over(annot_window))
                .withColumn(detections_col,
                            OFMotionModel(F.col(frames_col), F.col("prev_frames"), F.col(detections_col)))
            )

        dataframe = (
            dataframe.select(input_col, frames_col, detections_col)
            .withColumn("bbox", F.explode(detections_col))
            .withColumn(id_col, F.lit(""))
            .withColumn("trackables", F.struct([F.col("bbox"), F.col(id_col)]))
            .groupBy(input_col, frames_col, detections_col)
            .agg(F.collect_list("trackables").alias("trackables"))
            .withColumn(
                "old_trackables", F.lag(F.col("trackables")).over(annot_window)
            )
            .withColumn(
                "matched",
                tracker_match(F.col("trackables"), F.col("old_trackables")),
            )
            .withColumn("frame_index", F.row_number().over(frame_window))
        )

        dataframe = (
            indexer.fit(dataframe)
            .transform(dataframe)
            .withColumn("vidIndex", F.col("vidIndex").cast(StringType()))
        )

        unique_ids = dataframe.select("vidIndex").distinct().count()
        matched = (
            dataframe.select("vidIndex", input_col, "trackables", "matched")
            .rdd.map(lambda x: (x[0], x[1:]))
            .partitionBy(unique_ids, lambda x: int(x[0]))
            .mapPartitions(match_annotations)
        )

        matched_annotations = sqlContext.createDataFrame(matched, annot_schema).withColumn("value_index",
                                                                                           F.row_number().over(
                                                                                               value_window))

        return (
            dataframe.join(matched_annotations, F.col("value_index") == F.col("frame_index"))
            .withColumnRenamed("value", "trackers_matched")
            .withColumn("tracked", F.explode(F.col("trackers_matched")))
            .select(
                input_col,
                frames_col,
                detections_col,
                F.col("tracked.{}".format("bbox")).alias("bbox"),
                F.col("tracked.{}".format(id_col)).alias(id_col),
            )
            .withColumn(id_col, F.sha2(F.concat(F.col(input_col), F.col(id_col)), 256))
            .withColumn(output_col, F.struct([F.col("bbox"), F.col(id_col)]))
            .groupBy(input_col, frames_col, detections_col)
            .agg(F.collect_list(output_col).alias(output_col))
            .orderBy(input_col, frames_col, detections_col)
        )

# 使用 DetectionTracker
detectTracker = DetectionTracker(inputCol="video_uri", outputCol="tracked_detections")
print(type(detectTracker))

detectTracker.transform(df)
final = track_detections(df)

print("Final dataframe")
final.select("tracked_detections").show(100, truncate=False)
登录后复制

代码解释

  1. Spark 配置:

    • 配置 SparkSession,设置 App 名称和 Master 节点。
    • 增加 spark.driver.memory 配置,避免内存不足。
  2. 视频元数据提取:

    • 使用 FFmpeg 提取视频的宽度、高度、帧数和时长。
    • video_probe UDF 用于获取视频元数据。
  3. 视频帧提取:

    • 使用 FFmpeg 提取视频帧,并将其转换为图像字节数组。
    • video2images UDF 用于提取视频帧。
  4. 人脸检测:

    • 使用 face_recognition 库检测视频帧中的人脸。
    • face_detector UDF 用于检测人脸,并返回人脸的边界框。
  5. 人脸追踪:

    • 使用光流法(Optical Flow)和匈牙利算法(Hungarian Algorithm)进行人脸追踪。
    • OFMotionModel UDF 用于使用光流法预测下一帧的人脸位置。
    • tracker_match UDF 用于匹配相邻帧中的人脸。
    • match_annotations 函数用于在分区中匹配人脸。
    • track_detections 函数用于整合人脸检测和追踪结果。
  6. DetectionTracker Transformer:

    • 将人脸检测和追踪逻辑封装成一个 Transformer,方便在 Spark ML Pipeline 中使用。

注意事项

  • 内存配置: 视频处理通常需要大量的内存。请根据实际情况调整 spark.driver.memory 和 spark.executor.memory 参数。
  • FFmpeg 安装: 确保 FFmpeg 安装正确,并且可以在系统中访问。
  • 视频文件路径: 确保视频文件路径正确,并且 Spark 可以访问这些文件。
  • UDF 性能: UDF 的性能可能不如 Spark 内置函数。在实际应用中,尽量使用 Spark 内置函数优化性能。
  • 并行度: 根据集群的规模和视频文件的大小,调整 Spark 的并行度,以充分利用集群的计算资源。

总结

本文档提供了一个使用 PySpark 并行处理视频文件的完整示例,包括视频元数据提取、视频帧提取、人脸检测和追踪。通过使用 Spark 的分布式计算能力,我们可以高效地处理大规模的视频数据,并进行各种视频分析任务。在实际应用中,可以根据具体需求调整代码,例如修改人脸检测算法、调整光流法参数等。

以上就是并行处理视频流:使用 PySpark 进行大规模视频分析的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习
PHP中文网抖音号
发现有趣的

Copyright 2014-2025 //m.sbmmt.com/ All Rights Reserved | php.cn | 湘ICP备2023035733号