单生产者多消费者框架

单生产者多消费者框架

学会这篇文章,让你如直哉撩发一般轻松战胜多线程调度😀

前言

场景:

编码后获得一帧视频数据,现在需要网络推流和保存本地文件,那么是创建一个生产者线程和两个消费者线程吗?一个消费者线程获取到数据用掉之后是不是还得保留给下一个消费者线程?

背景:

原本只有推流场景,那么就是基础的单生产者消费者,生产者编码生成一帧数据后放入队列并通知消费者,消费者拿到数据之后释放掉数据

原本设计的队列类是:

class VIDEO_QUEUE {
  public:
    // 声明一个队列,存储指向 video_data_packet_t 的智能指针
    // 智能指针 (std::unique_ptr)
    // 可以更好地管理内存,确保在出队或队列销毁时正确释放
    std::queue<std::unique_ptr<video_data_packet_t>> video_packet_queue;
    // 线程锁
    pthread_mutex_t videoMutex;
    // 条件变量
    pthread_cond_t videoCond;

  public:
    VIDEO_QUEUE();
    ~VIDEO_QUEUE();

    // 入队函数,接受一个智能指针,所有权转移给队列
    int putVideoPacketQueue(std::unique_ptr<video_data_packet_t> video_packet);
    // 出队函数,返回一个智能指针,所有权转移给调用者
    std::unique_ptr<video_data_packet_t> getVideoPacketQueue();

    // 队列大小
    int getVideoQueueSize();

  private:
    // 队列最大容量,避免无限增长导致内存耗尽
    static const int MAX_QUEUE_SIZE = 100;  // 示例值,可根据实际情况调整
};

使用的是std::unique_ptr,也就是同一时间只能一个线程独自占有

对于多个消费者的话指针不能同时传递,那么就需要创建两个VIDEO_QUEUE类变量,这样的话对于资源会有更多占用

解决方案

最终我选择使用引用计数方案。

引用计数(Reference Counting)是一种内存管理技术,通过跟踪有多少个指针/引用指向同一块内存,来决定何时释放该内存。

核心思想

  • 每个对象维护一个计数器
  • 每增加一个引用,计数器 +1
  • 每减少一个引用,计数器 -1
  • 当计数器为 0 时,自动释放内存

将原本的std::unique_ptr替换为std::shared_ptr

基本原理:就是记录对象被引用的次数,当引用次数为 0 的时候,也就是最后一个指向该对象的共享指针析构的时候,共享指针的析构函数就把指向的内存区域释放掉。

特点:它所指向的资源具有共享性,即多个shared_ptr可以指向同一份资源,并在内部使用引用计数机制来实现这一点。

共享指针内存:每个 shared_ptr对象在内部指向两个内存位置:

  • 指向对象的指针;
  • 用于控制引用计数数据的指针。

1.当新的shared_ptr对象与指针关联时,则在其构造函数中,将与此指针关联的引用计数增加1。

2.当任何shared_ptr对象超出作用域时,则在其析构函数中,它将关联指针的引用计数减1。如果引用计数变为0,则表示没有其他 shared_ptr对象与此内存关联,在这种情况下,它使用delete函数删除该内存。

shared_ptr像普通指针一样使用,可以将*->shared_ptr 对象一起使用,也可以像其他 shared_ptr 对象一样进行比较;

具体改动如下:

class VIDEO_QUEUE {
  public:
    // 声明一个队列,存储指向 video_data_packet_t 的智能指针
    // 智能指针 (std::shared_ptr)
    //引用计数(Reference Counting)是一种内存管理技术,通过跟踪有多少个指针/引用指向同一块内存,来决定何时释放该内存。
    // 可以更好地管理内存,确保在出队或队列销毁时正确释放
    //-----------------------------------------------------------------//
    //推流队列
    std::queue<std::shared_ptr<video_data_packet_t>> stream_queue;
    // 推流线程锁
    pthread_mutex_t stream_mutex;
    // 推流条件变量
    pthread_cond_t stream_cond;

    //保存文件队列
    std::queue<std::shared_ptr<video_data_packet_t>> file_queue;
    //保存文件线程锁
    pthread_mutex_t file_mutex;
    //保存文件条件变量
    pthread_cond_t file_cond;


  public:
    VIDEO_QUEUE();
    ~VIDEO_QUEUE();

    // 入队函数,接受一个智能指针,引用计数+1
    int putVideoPacketQueue(std::shared_ptr<video_data_packet_t> video_packet);
    // 出队函数,返回一个智能指针,引用计数-1
    std::shared_ptr<video_data_packet_t> getVideoPacketQueue(PACKET_TYPE pack_type);

    // 队列大小
    int getVideoQueueSize(PACKET_TYPE pack_type);

    // 通知所有等待线程退出(程序关闭时调用)
    void shutdown();

  private:
    // 队列最大容量,避免无限增长导致内存耗尽
    static const int MAX_QUEUE_SIZE = 100;  // 示例值,可根据实际情况调整
    volatile bool is_shutdown;  // 关闭标志,通知等待线程退出
};

目前有两个消费者线程:推流线程和保存文件线程,所以需要给各自的线程创建一个shared_ptr队列,以及各自的条件变量和互斥锁和原有逻辑一样

类仍然提供入队出队接口供外部调用,除此以外新增一个is_shutdown标志用来表示生产者线程结束,从而去给消费者触发提醒,哪怕我已经有全局sig_atomic_t标志(后续讲原因)

入队出队函数

入队函数:

// 入队函数
int VIDEO_QUEUE::putVideoPacketQueue(
    std::shared_ptr<video_data_packet_t> video_packet)
{
    // stream_queue: 满了就丢弃最旧的帧(丢帧策略),避免阻塞生产者
    pthread_mutex_lock(&stream_mutex);
    if (stream_queue.size() >= MAX_QUEUE_SIZE) {
        stream_queue.pop();  // 丢弃最旧的帧
    }
    stream_queue.push(video_packet);
    pthread_cond_signal(&stream_cond);
    pthread_mutex_unlock(&stream_mutex);

    // file_queue: 满了就丢弃最旧的帧
    pthread_mutex_lock(&file_mutex);
    if (file_queue.size() >= MAX_QUEUE_SIZE) {
        file_queue.pop();  // 丢弃最旧的帧
    }
    file_queue.push(video_packet);
    pthread_cond_signal(&file_cond);
    pthread_mutex_unlock(&file_mutex);

    return 0;
}

传入参数是std::shared_ptr<video_data_packet_t>类型,当参数传入后,引用计数+1,知道离开作用域才引用计数-1.入队函数供消费者调用所以需要把参数放入两个队列中,每放入一个队列引用计数都会+1

出队函数:

// 出队函数
std::shared_ptr<video_data_packet_t> VIDEO_QUEUE::getVideoPacketQueue(PACKET_TYPE pack_type)
{
    std::shared_ptr<video_data_packet_t> packet = nullptr;

    if(pack_type == STREAM_VIDEO)
    {
        pthread_mutex_lock(&stream_mutex);
        while (stream_queue.empty() && !is_shutdown) {
            pthread_cond_wait(&stream_cond, &stream_mutex);
        }
        if (!stream_queue.empty()) {
            packet = stream_queue.front();
            stream_queue.pop();
        }
        pthread_mutex_unlock(&stream_mutex);
    }
    else if(pack_type == FILE_VIDEO)
    {
        pthread_mutex_lock(&file_mutex);
        while (file_queue.empty() && !is_shutdown) {
            pthread_cond_wait(&file_cond, &file_mutex);
        }
        if (!file_queue.empty()) {
            packet = file_queue.front();
            file_queue.pop();
        }
        pthread_mutex_unlock(&file_mutex);
    }

    return packet;
}

出队函数供两个消费者线程调用,要根据不同线程获取不同队列的内容,所以我提前定义了一个枚举体

typedef enum {
    STREAM_VIDEO = 0,
    FILE_VIDEO   = 1,
    STREAM_AUDIO = 2,
    FILE_AUDIO   = 3
}PACKET_TYPE;

根据推流和保存文件以及视频音频分成4类。

注意条件变量的等待是stream_queue.empty() && !is_shutdown队列空并且标志位不置位才等待

通知所有线程退出

// 通知所有等待线程退出
void VIDEO_QUEUE::shutdown()
{
    pthread_mutex_lock(&stream_mutex);
    is_shutdown = true;
    pthread_cond_broadcast(&stream_cond);
    pthread_mutex_unlock(&stream_mutex);

    pthread_mutex_lock(&file_mutex);
    pthread_cond_broadcast(&file_cond);
    pthread_mutex_unlock(&file_mutex);
}

生产者退出后通知所有线程退出

线程调度

在原本保存文件线程做修改,成为生产者线程(因为在这里获取VENC的编码数据),然后在获取到一帧数据之后入队。再单独创建两个消费者线程

首先是生产者线程:

static hi_s32 get_stream()
{
    hi_s32             ret;
    hi_venc_stream     stream;
    hi_venc_chn_status stat;
    td_u32 i = 0;

    if (memset_s(&stream, sizeof(stream), 0, sizeof(stream)) != EOK) {
        printf("call memset_s error\n");
    }
    //查询编码通道状态
    ret = hi_mpi_venc_query_status(0, &stat);
    if (ret != HI_SUCCESS) {
        sample_print("hi_mpi_venc_query chn[0] failed with %#x!\n",ret);
        return 2;
    }
    //确保包大于0
    if (stat.cur_packs == 0) {
        sample_print("NOTE: current frame of chn[0] is HI_NULL!\n");
        return 1;
    }
    //分配cur_packs个包信息结构体
    stream.pack = (hi_venc_pack*)malloc(sizeof(hi_venc_pack) * stat.cur_packs);
    if (stream.pack == HI_NULL) {
        sample_print("malloc stream pack failed!\n");
        return 2;
    }

    stream.pack_cnt = stat.cur_packs;
    ret = hi_mpi_venc_get_stream(0, &stream, HI_TRUE);
    if (ret != HI_SUCCESS) {
        free(stream.pack);
        stream.pack = HI_NULL;
        sample_print("hi_mpi_venc_get_stream failed with %#x!\n", ret);
        return 2;
    }

    td_u32 total_len = 0;
    for (td_u32 j = 0; j < stream.pack_cnt; j++) {
        total_len += stream.pack[j].len;
    }

    // 使用 std::make_shared 创建 video_data_packet_t 的智能指针
    std::shared_ptr<video_data_packet_t> video_data_packet =
        std::make_shared<video_data_packet_t>();
    if (video_data_packet == nullptr) {  // 检查分配是否成功
        sample_print("Error: Failed to allocate shared_ptr for "
                        "video_data_packet.\n");
        // 智能指针分配失败,无需手动释放
        // goto release_mpi_stream; // 跳转到统一的释放逻辑
    }
    // 预留 buffer 空间
        try {
            video_data_packet->buffer.resize(total_len);
        }
        catch (const std::bad_alloc& e) {
            sample_print("Error: Failed to allocate buffer for "
                         "video_data_packet->buffer: %s\n",
                         e.what());
            // std::shared_ptr 会自动清理,无需手动释放
            // goto release_mpi_stream; // 跳转到统一的释放逻辑
            free(stream.pack);
            stream.pack = HI_NULL;
            return 2;  // 或者其他错误处理
        }

        td_u32 offset = 0;
        for (td_u32 j = 0; j < stream.pack_cnt; j++) {
            // 使用 memcpy 将每个 NALU 数据复制到 video_data_packet->buffer 中
            memcpy(video_data_packet->buffer.data() + offset,
                   stream.pack[j].addr, stream.pack[j].len);
            offset += stream.pack[j].len;
        }

        // **根据海思的 NALU 类型判断是否为关键帧**
        // 只要 stream.pack 中包含 IDR_SLICE,就认为是关键帧
        video_data_packet->is_key_frame = 0;  // 默认不是关键帧
        for (td_u32 j = 0; j < stream.pack_cnt; j++) {
            if (stream.pack[j].data_type.h264_type ==
                OT_VENC_H264_NALU_IDR_SLICE) {
                video_data_packet->is_key_frame =
                    1;  // 发现 IDR_SLICE,标记为关键帧
                break;  // 找到一个 IDR 就足够了
            }
        }
        // 对于非关键帧,stream.pack_cnt 应该为 1,且 data_type 为
        // OT_VENC_H264_NALU_P_SLICE

        // **设置时间戳 (使用 stream.pack[0].pts)**
        video_data_packet->timestamp = (int64_t)stream.pack[0].pts;

        // 将智能指针入队,所有权从当前作用域转移到队列
        int put_ret =
            high_video_queue->putVideoPacketQueue(video_data_packet);
        if (put_ret !=
            0) {  // Check if putVideoPacketQueue failed (e.g., queue full)
            sample_print("Error: Failed to push video packet to queue for "
                         "channel 0, queue full or other error.\n");
            // 如果入队失败,video_data_packet
            // 的所有权没有被转移,它会在当前作用域结束时自动释放
        }

        // 必须调用 release_stream 归还 VENC ring buffer 空间,否则 ring buffer 耗尽后 VENC 停止输出
        ret = hi_mpi_venc_release_stream(0, &stream);
        if (ret != HI_SUCCESS) {
            sample_print("hi_mpi_venc_release_stream chn[0] failed with %#x!\n", ret);
        }

        free(stream.pack);
        stream.pack = HI_NULL;

    return 0;
}

void *get_video(void *param)
{
    fd_set read_fds;
    struct timeval timeout_val;
    fd = hi_mpi_venc_get_fd(0);
    printf("[VENC] get_video thread started, fd=%d\r\n", fd);

    int loop_count = 0;
    while(keep_running)
    {
        loop_count++;

        FD_ZERO(&read_fds);
        FD_SET(fd, &read_fds);

        int nfds = fd + 1;
        timeout_val.tv_sec  = 2; /* 2 is a number */
        timeout_val.tv_usec = 0;

        int ret = select(nfds, &read_fds,NULL, NULL, &timeout_val);
        if(ret < 0)
        {
            sample_print("[VENC] select failed!\n");
            break;
        }
        else if (ret == 0) {
            if (loop_count <= 10) {
                sample_print("[VENC] get venc stream time out, continue...\n");
            }
            continue;
        }
        else {
            ret = get_stream();
            if(ret != 0)
            {
                sample_print("[VENC] get venc stream fail\n");
            }
        }
    }

    printf("[VENC] Thread exiting...\n");

    // 生产者退出,通知消费者(save_file)队列不会再有新帧
    if (high_video_queue != NULL) {
        high_video_queue->shutdown();
    }

    return NULL;
}

在全局定义线程运行标志位:volatile sig_atomic_t keep_running = 1;

生产者通过select获取编码好的数据帧,然后放入队列

再单独创建两个消费者线程

void *save_file(void *param)
{
    while(true)
    {
        //后续还有音频流也需要在这里面添加
        std::shared_ptr<video_data_packet_t> video_data_packet = 
                high_video_queue->getVideoPacketQueue(FILE_VIDEO);

        if (video_data_packet == nullptr) {
            // shutdown 后队列已排空,安全退出
            break;
        }

        if (video_data_packet->buffer.empty()) {
            fprintf(stderr, "save file:error:video_data_packet is empty\r\n");
            continue;
        }
        size_t written = fwrite(video_data_packet->buffer.data(),1,video_data_packet->buffer.size(),output_fd);
        if (written != video_data_packet->buffer.size()) {
            // 处理错误
            fprintf(stderr, "save file:error:write to file failed,writed : %zu   to be write :%zu\r\n",written,video_data_packet->buffer.size());
        }
        fflush(output_fd);
    }
    return NULL;
}

void *network_stream(void *param)
{
    hi_s32 ret;
    char network_address[] = "rtmp://47.119.145.176/live/stream";
    printf("network_address: %s\n", network_address);

    // 初始化 FFmpeg 配置main_ffmpeg_config_ptr为全局变量
    ret = init_ffmpeg_config(network_address, &main_ffmpeg_config_ptr);
    if (ret != 0) {
        fprintf(stderr, "主程序: FFmpeg 配置初始化失败!\n");
        delete high_video_queue;
        return NULL;
    }
    printf("主程序: init_ffmpeg_config 成功。\n");

    // 确保 FFmpeg 上下文已经初始化
    if (!main_ffmpeg_config_ptr || !main_ffmpeg_config_ptr->oc ||
        !main_ffmpeg_config_ptr->video_stream.stream) {
        fprintf(stderr, "Stream consumer thread: FFmpeg config not initialized "
                        "properly. Exiting.\n");
        return NULL;
    }

    while(true)
    {
        //后续还有音频流也需要在这里面添加
        std::shared_ptr<video_data_packet_t> video_data_packet = 
                high_video_queue->getVideoPacketQueue(STREAM_VIDEO);

        if (video_data_packet == nullptr) {
            // shutdown 后队列已排空,安全退出
            break;
        }

        if (video_data_packet->buffer.empty()) {
            fprintf(stderr, "network_stream:error:video_data_packet is empty\r\n");
            continue;
        }

        // 处理视频 AVPacket
        ret = deal_video_avpacket(main_ffmpeg_config_ptr->oc,
                                      &main_ffmpeg_config_ptr->video_stream,
                                      video_data_packet, main_ffmpeg_config_ptr);
        if (ret == -1) {
            // 如果没有数据,短暂休眠,避免空转和过度占用 CPU
            usleep(1000 * 10);  // 休眠 10 毫秒
            continue;
        }
        // 对于 ret == 0 (丢弃空帧) 或 ret < 0 (错误),也继续循环,等待下一帧
    }

    return NULL;
}

消费者线程的循环判断并不是全局的volatile sig_atomic_t keep_running = 1;

而是一个死循环,逻辑是:

  1. 线程正常运行,消费者线程首先调用出队函数获取一帧数据,在出队函数中会去判断(见上文),如果队列空(此时is_shutdown为0)则条件变量等待,如果不空直接取出数据
  2. 当主程序收到终止信号后在信号处理函数将keep_running置为0,生产者线程退出循环并调用shutdown()接口置is_shutdown为1
  3. 消费者线程新一轮循环进入出队函数,如果这个时候队列还有未处理数据,即队列非空,但是is_shutdown是1,所以直接跳过条件变量等待(如果不这样会死锁!!!)处理队列内部数据
  4. 如果队列数据处理完为空,那么跳过条件变量后返回得到的是空数据,那么就可以直接判断处理完了可以退出循环

通过以上代码一对多消费者线程调度就能正常运行,当shared_ptr引用计数为0后会自动delete

整体框架图

================================================================================
                     单生产者多消费者视频处理框架架构图
================================================================================

+----------------------+
|   硬 件 编 码 源 (VENC)  |
|  (hi_mpi_venc_get_stream)|
+----------------------+
           |
           | 原始编码数据 (NALU)
           v
+------------------------------------------------------------------+
|                      生产者线程 (Producer)                         |
|  函数:get_video() -> get_stream()                                |
|                                                                  |
|  1. select 监听 VENC fd                                           |
|  2. 获取 stream 包                                                |
|  3. 组装数据 -> std::make_shared<video_data_packet_t>             |
|  4. 调用:high_video_queue->putVideoPacketQueue(shared_ptr)       |
|  5. 退出时调用:high_video_queue->shutdown()                      |
+------------------------------------------------------------------+
           |
           | std::shared_ptr (引用计数 +1)
           | 所有权转移给队列
           v
+------------------------------------------------------------------+
|                   共享队列管理类 (VIDEO_QUEUE)                     |
|                                                                  |
|  +------------------------------------------------------------+  |
|  |  入队接口:putVideoPacketQueue(shared_ptr)                 |  |
|  |  逻辑:                                                    |  |
|  |    1. lock(stream_mutex) -> push(stream_queue) -> signal   |  |
|  |    2. lock(file_mutex)   -> push(file_queue)   -> signal   |  |
|  |    (同一份数据放入两个队列,引用计数再 +2)                   |  |
|  +------------------------------------------------------------+  |
|                                                                  |
|  +---------------------------+  +---------------------------+   |
|  |   推流子队列 (stream_queue) |  |   存文件子队列 (file_queue) |   |
|  |   类型:STREAM_VIDEO      |  |   类型:FILE_VIDEO        |   |
|  |   锁:stream_mutex        |  |   锁:file_mutex          |   |
|  |   条件变量:stream_cond   |  |   条件变量:file_cond     |   |
|  +---------------------------+  +---------------------------+   |
|                                                                  |
|  全局状态:volatile bool is_shutdown                             |
|  内存策略:std::shared_ptr (引用计数自动管理内存)                  |
+------------------------------------------------------------------+
           |                           |
           | getVideoPacketQueue       | getVideoPacketQueue
           | (STREAM_VIDEO)            | (FILE_VIDEO)
           | (引用计数 -1)             | (引用计数 -1)
           v                           v
+---------------------------+ +---------------------------+
|   消费者线程 1 (Consumer)   | |   消费者线程 2 (Consumer)   |
|   函数:network_stream()    | |   函数:save_file()       |
|                           | |                           |
|  1. 循环调用出队接口        | |  1. 循环调用出队接口        |
|  2. 检查 nullptr (退出标志) | |  2. 检查 nullptr (退出标志) |
|  3. FFmpeg 推流处理         | |  3. fwrite 写入本地文件     |
|  4. shared_ptr 离开作用域   | |  4. shared_ptr 离开作用域   |
+---------------------------+ +---------------------------+
           |                           |
           +-------------+-------------+
                         |
                         | 当两个消费者都处理完毕
                         | (引用计数归 0)
                         v
                 [ 内存自动释放 (delete) ]


================================================================================
                           线程退出与通知机制流程
================================================================================

[ 全局信号标志 ]
volatile sig_atomic_t keep_running = 1
           |
           | (收到终止信号 SIGINT 等)
           v
      keep_running = 0
           |
           v
+---------------------------+
|      生产者线程检测到      |
|      (while 循环结束)      |
+---------------------------+
           |
           | 调用 shutdown()
           v
+---------------------------+
|    VIDEO_QUEUE::shutdown  |
|                           |
|  1. is_shutdown = true    |
|  2. pthread_cond_broadcast|  (唤醒所有等待的消费者)
|     (stream_cond &        |
|      file_cond)           |
+---------------------------+
           |
           +------------------+------------------+
           |                  |                  |
           v                  v                  v
    [消费者线程 1]       [消费者线程 2]       (后续其他消费者)
           |                  |
           | 条件变量唤醒     | 条件变量唤醒
           |                  |
           v                  v
    检查队列状态:           检查队列状态:
    if (empty &&             if (empty &&
        is_shutdown)             is_shutdown)
           |                  |
           v                  v
      返回 nullptr         返回 nullptr
           |                  |
           v                  v
      break 循环           break 循环
           |                  |
           v                  v
      线程安全退出         线程安全退出

================================================================================

核心设计要点说明:

  1. 内存管理 (std::shared_ptr)
    • 生产者创建对象时引用计数为 1。
    • 入队时,数据被分别放入 stream_queuefile_queue,每个队列持有一个 shared_ptr,引用计数增加。
    • 消费者出队后,各自持有指针进行处理,处理完毕离开作用域后引用计数减少。
    • 当所有消费者都处理完(引用计数归 0),内存自动释放,无需手动 delete
  2. 队列设计 (VIDEO_QUEUE)
    • 内部维护两个独立队列(推流队列、文件队列),各自拥有独立的锁 (mutex) 和条件变量 (cond),避免消费者之间相互阻塞。
    • 提供统一的入队接口,自动广播到两个子队列。
  3. 退出机制 (shutdown)
    • 生产者退出前调用 shutdown(),将 is_shutdown 置为 true 并广播条件变量。
    • 消费者唤醒后,若发现队列为空且 is_shutdown 为真,则返回 nullptr 并安全退出循环,防止死锁。
  4. 丢帧策略
    • 入队时若队列满 (MAX_QUEUE_SIZE),主动 pop 掉最旧的一帧,防止生产者阻塞,保证实时性。
上一篇
下一篇