qemu线程池:一个semaphore的使用范例

qemu里面有个服务于aio的线程池:

struct ThreadPool {
    AioContext *ctx;
    QEMUBH *completion_bh;
    QemuMutex lock;
    QemuCond check_cancel;
    QemuCond worker_stopped;
    QemuSemaphore sem;
    int max_threads;
    QEMUBH *new_thread_bh;

    /* The following variables are only accessed from one AioContext. */
    QLIST_HEAD(, ThreadPoolElement) head;

    /* The following variables are protected by lock.  */
    QTAILQ_HEAD(, ThreadPoolElement) request_list;
    int cur_threads;
    int idle_threads;
    int new_threads;     /* backlog of threads we need to create */
    int pending_threads; /* threads created but not running yet */
    int pending_cancellations; /* whether we need a cond_broadcast */
    bool stopping;
};

qemu在thread_pool_init_one建池子的时候注册了一个延迟执行的work:
pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);

这个work将在spawn_thread里面被安排调度

static void spawn_thread(ThreadPool *pool)
{
    pool->cur_threads++;
    pool->new_threads++; //这里是为了记录下spawn thread被调用的次数,为后面安排的创建线程的后半部提供数据,因为不是每次调用进来都会安排后半部工作的
    /* If there are threads being created, they will spawn new workers, so
     * we don't spend time creating many threads in a loop holding a mutex or
     * starving the current vcpu.
     *
     * If there are no idle threads, ask the main thread to create one, so we
     * inherit the correct affinity instead of the vcpu affinity.
     */
    if (!pool->pending_threads) { //如果前面已经有安排,就不再重复安排
        qemu_bh_schedule(pool->new_thread_bh);
    }

 

当work被调度到的时候,执行spawn_thread_bh_fn

static void spawn_thread_bh_fn(void *opaque)
{
    ThreadPool *pool = opaque;

    qemu_mutex_lock(&pool->lock);
    do_spawn_thread(pool);
    qemu_mutex_unlock(&pool->lock);
}
static void do_spawn_thread(ThreadPool *pool)
{
    QemuThread t;

    /* Runs with lock taken.  */
    if (!pool->new_threads) {
        return;
    }

    pool->new_threads--;
    pool->pending_threads++; //避免重复安排

    qemu_thread_create(&t, "worker", worker_thread, pool, QEMU_THREAD_DETACHED);
}

 

这里采用了递归的方式来一次性创建多个worker,新创建的worker线程会递归调动do_spawn_thread来创建下一个worker,直到发现new_threads为0

这里有个问题,如果qemu_thread_create创建新线程失败,那么就会导致后面新的线程永远无法创建,因为pending_threads不会被减扣为0,前面spawn_thread就不会再安排新的下半部工作来创建线程了。

下面是aio派发任务的函数thread_pool_submit_aio,可以看到,池子中的线程数是动态增加的,如果有空闲的线程或者线程数已达上限是不会创建新的线程的,并且采用了信号量通知的方法来减少线程轮询开销,有任务的时候才放开一个额度,而不是让线程一直尝试拿后面的mutex锁再去看下request list是不是空。

qemu_mutex_lock(&pool->lock);
   if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) {
       spawn_thread(pool);
   }
   QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
   qemu_mutex_unlock(&pool->lock);
   qemu_sem_post(&pool->sem); //这里会增加sem的计数,让一个idle的worker获得执行机会
   return &req->common;

 

worker创建的时候以及worker在func执行完后会再次尝试获取pool->sem(等待sem>0),等待新的任务:

pool->idle_threads++;
qemu_mutex_unlock(&pool->lock);
ret = qemu_sem_timedwait(&pool->sem, 10000);//减扣sem计数
qemu_mutex_lock(&pool->lock);
pool->idle_threads--;

池子资源释放的时候,会标记pool->stopping并给所有worker一个最后的任务(通过sem_post):释放自己:

thread_pool_free:

/* Stop new threads from spawning */
qemu_bh_delete(pool->new_thread_bh);
pool->cur_threads -= pool->new_threads;
pool->new_threads = 0;

/* Wait for worker threads to terminate */
pool->stopping = true;
while (pool->cur_threads > 0) {
qemu_sem_post(&pool->sem);
qemu_cond_wait(&pool->worker_stopped, &pool->lock);
}

Leave a Reply

Your email address will not be published. Required fields are marked *