线程池

Catalogue
  1. 1. demo
  2. 2. @construct 创建线程数量
  3. 3. @add 投递执行任务
  4. 4. @线程池销毁 与释放流程
  5. 5. @Lib\Thread\Pool\Future
    1. 5.1. @furture->get() 等待该异步任务处理结束
    2. 5.2. @异步任务demo
    3. 5.3. @同步任务demo

demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<?php

$pool = new Lib\Thread\Pool(4);

$ref = [1,2,3,4];

//future = Lib\Thread\Pool\Future
//future->get() 可以阻塞返回结果
$future = $pool->add(function()use(&$ref){
sleep(1);
var_dump($ref);
});
$future->get();
;

@construct 创建线程数量

构造函数需要传入线程创建的参数,在初始化就默认创建固定的线程数量

@add 投递执行任务

add() 函数接受一个php闭包函数,可通过引用的方式附加传入参数

投递后如果有线程空闲,立即执行该任务

@线程池销毁 与释放流程

1
2
3
4
5
6
<?php
test();
function test(){
$pool = new Lib\Thread\Pool(4);
$pool->add(function(){});
}

$pool 的生命周期在test函数内,如果函数调用结束,那么触发$pool垃圾回收,且该类为自定义类型,所以回收会触发如下线程回收流程:

  • php : $pool->destruct(); php对象释放
  • php : $pool->free_object(); php对象底层扩展进行释放内存处理
  • c++ : pool->~pool() 调用c++线程池对象析构函数
  • c++ : 加锁改变线程状态 并唤醒所有线程,等待回收线程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    ThreadPool::~ThreadPool(){
    {
    unique_lock<mutex> lock(queue_mu);
    stop = true;
    }
    cond.notify_all();
    for(thread &w : workers){
    w.join();
    }
    }
  • 所以线程池的生命周期,依赖php对象实例,如果需要常驻运行,则需要将$pool 设置为全局或者静态变量

@Lib\Thread\Pool\Future

投递任务后立即返回一个future包装器,可用于阻塞等待任务结束获取返回值,转换为同步阻塞程序

1
$future = $pool->add(function(){});

@furture->get() 等待该异步任务处理结束

该函数用于等待,当前线程执行的任务结束,并获取返回值

1
2
3
4
5
$future = $pool->add(function(){
return [1,3,4,5];
});
$res = $future->get();
//res = [1,3,4,5]

@异步任务demo

将100个任务全部投递到线程排队处理,然后当前继续执行其他任务

1
2
3
4
5
6
for($i = 0;$i < 100 ;$i ++){
$pool->add(function(){
sleep(1);
});
}
//这里继续执行其他任务

@同步任务demo

将100个任务投递到线程处理,并逐一等待每一任务执行完毕

1
2
3
4
5
6
7
8
for($i = 0;$i < 100 ;$i ++){
$future = $pool->add(function()use($i){
sleep(1);
return $i;
});
var_dump($future->get());
}
//这里继续阻塞,直到上面100s过期后任务处理完毕