[转载]memcached源码分析之线程池机制(一)

[转载]memcached源码分析之线程池机制(一) – Moon_Bird – 博客园.
已经个把月没有写长篇博文了,最近抽了点时间,将memcached源码分析系列文章的线程机制篇给整出来,在分析源码的过程中参考了网上的一些资源。

该文主要集中于两个问题:(1)memcached线程池是如何创建的,(2)线程池中的线程又是如何进行调度的。一切从源码中找答案。

memcached的线程池模型采用较典型的Master-Worker模型:

(1)主线程负责监听客户端的建立连接请求,以及accept 连接,将连接好的套接字放入连接队列;

(2)调度workers空闲线程来负责处理已经建立好的连接的读写等事件。

1 关键数据抽象

(1)memcached单个线程结构的封装

<span class="com">//memcached线程结构的封装结构</span><span class="pln">
 </span><span class="kwd">typedef</span><span class="pln"> </span><span class="kwd">struct</span><span class="pln"> </span><span class="pun">{</span><span class="pln">
     </span><span class="typ">pthread_t</span><span class="pln"> thread_id</span><span class="pun">;</span><span class="pln">        </span><span class="com">/* unique ID of this thread */</span><span class="pln">
     </span><span class="kwd">struct</span><span class="pln"> event_base </span><span class="pun">*</span><span class="kwd">base</span><span class="pun">;</span><span class="pln">    </span><span class="com">/* libevent handle this thread uses */</span><span class="pln">
     </span><span class="kwd">struct</span><span class="pln"> </span><span class="kwd">event</span><span class="pln"> notify_event</span><span class="pun">;</span><span class="pln">  </span><span class="com">/* listen event for notify pipe */</span><span class="pln">
     </span><span class="kwd">int</span><span class="pln"> notify_receive_fd</span><span class="pun">;</span><span class="pln">      </span><span class="com">/* receiving end of notify pipe */</span><span class="pln">
     </span><span class="kwd">int</span><span class="pln"> notify_send_fd</span><span class="pun">;</span><span class="pln">         </span><span class="com">/* sending end of notify pipe */</span><span class="pln">
     </span><span class="kwd">struct</span><span class="pln"> thread_stats stats</span><span class="pun">;</span><span class="pln">  </span><span class="com">/* Stats generated by this thread */</span><span class="pln">
     </span><span class="kwd">struct</span><span class="pln"> conn_queue </span><span class="pun">*</span><span class="pln">new_conn_queue</span><span class="pun">;</span><span class="pln"> </span><span class="com">/* queue of new connections to handle */</span><span class="pln">
     </span><span class="typ">cache_t</span><span class="pln"> </span><span class="pun">*</span><span class="pln">suffix_cache</span><span class="pun">;</span><span class="pln">      </span><span class="com">/* suffix cache */</span><span class="pln">
 </span><span class="pun">}</span><span class="pln"> LIBEVENT_THREAD</span><span class="pun">;</span>

这是memcached里的线程结构的封装,可以看到每个线程都包含一个CQ队列,一条通知管道pipe ,一个libevent的实例event_base等。

(2)线程连接队列

<span class="com">/* A connection queue. */</span><span class="pln">
 </span><span class="kwd">typedef</span><span class="pln"> </span><span class="kwd">struct</span><span class="pln"> conn_queue CQ</span><span class="pun">;</span><span class="pln">
 </span><span class="kwd">struct</span><span class="pln"> conn_queue </span><span class="pun">{</span><span class="pln">
     CQ_ITEM </span><span class="pun">*</span><span class="pln">head</span><span class="pun">;</span><span class="pln">
     CQ_ITEM </span><span class="pun">*</span><span class="pln">tail</span><span class="pun">;</span><span class="pln">
     </span><span class="typ">pthread_mutex_t</span><span class="pln"> </span><span class="kwd">lock</span><span class="pun">;</span><span class="pln">
     </span><span class="typ">pthread_cond_t</span><span class="pln">  cond</span><span class="pun">;</span><span class="pln">
 </span><span class="pun">};</span>

每个线程结构体中都指向一个CQ链表,CQ链表管理CQ_ITEM的单向链表。

(3)连接项结构体

<span class="com">/* An item in the connection queue. */</span><span class="pln">
 </span><span class="kwd">typedef</span><span class="pln"> </span><span class="kwd">struct</span><span class="pln"> conn_queue_item CQ_ITEM</span><span class="pun">;</span><span class="pln">
 </span><span class="kwd">struct</span><span class="pln"> conn_queue_item </span><span class="pun">{</span><span class="pln">
     </span><span class="kwd">int</span><span class="pln">               sfd</span><span class="pun">;</span><span class="pln">
     </span><span class="kwd">enum</span><span class="pln"> conn_states  init_state</span><span class="pun">;</span><span class="pln">
     </span><span class="kwd">int</span><span class="pln">               event_flags</span><span class="pun">;</span><span class="pln">
     </span><span class="kwd">int</span><span class="pln">               read_buffer_size</span><span class="pun">;</span><span class="pln">
     </span><span class="kwd">enum</span><span class="pln"> network_transport     transport</span><span class="pun">;</span><span class="pln">
     CQ_ITEM          </span><span class="pun">*</span><span class="kwd">next</span><span class="pun">;</span><span class="pln">
 </span><span class="pun">};</span>

CQ_ITEM实际上是主线程accept后返回的已建立连接的fd的封装,由主线程创建初始化并放入连接链表CQ中,共workers线程使用。

(4)网络连接的封装结构体

<span class="com">/**
  * The structure representing a connection into memcached.
  */</span><span class="pln">
  </span><span class="com">//memcached表示一个conn的抽象结构</span><span class="pln">
 </span><span class="kwd">typedef</span><span class="pln"> </span><span class="kwd">struct</span><span class="pln"> conn conn</span><span class="pun">;</span><span class="pln">
 </span><span class="kwd">struct</span><span class="pln"> conn </span><span class="pun">{</span><span class="pln">
 </span><span class="pun">..................</span><span class="pln">  
 </span><span class="pun">};</span>

由于这个结构太大,就略去中间的成员不展示了,与我们线程池相关的有一个成员则非常关键,那就是state,它是memcached中状态机驱动的关键(由drive_machine函数实现)。

2 线程池的初始化:

main()中线程池初始化函数入口为:

/* start up worker threads if MT mode */

thread_init(settings.num_threads, main_base);

函数的定义在thread.c实现,源码如下所示:

<span class="com">/*
  * Initializes the thread subsystem, creating various worker threads.
  *
  * nthreads  Number of worker event handler threads to spawn
  * main_base Event base for main thread
  */</span><span class="pln">
 </span><span class="kwd">void</span><span class="pln"> thread_init</span><span class="pun">(</span><span class="kwd">int</span><span class="pln"> nthreads</span><span class="pun">,</span><span class="pln"> </span><span class="kwd">struct</span><span class="pln"> event_base </span><span class="pun">*</span><span class="pln">main_base</span><span class="pun">)</span><span class="pln"> </span><span class="pun">{</span><span class="pln">
     </span><span class="kwd">int</span><span class="pln">         i</span><span class="pun">;</span><span class="pln">
  
     pthread_mutex_init</span><span class="pun">(&</span><span class="pln">cache_lock</span><span class="pun">,</span><span class="pln"> NULL</span><span class="pun">);</span><span class="pln">
     pthread_mutex_init</span><span class="pun">(&</span><span class="pln">stats_lock</span><span class="pun">,</span><span class="pln"> NULL</span><span class="pun">);</span><span class="pln">
  
     pthread_mutex_init</span><span class="pun">(&</span><span class="pln">init_lock</span><span class="pun">,</span><span class="pln"> NULL</span><span class="pun">);</span><span class="pln">
     pthread_cond_init</span><span class="pun">(&</span><span class="pln">init_cond</span><span class="pun">,</span><span class="pln"> NULL</span><span class="pun">);</span><span class="pln">
  
     pthread_mutex_init</span><span class="pun">(&</span><span class="pln">cqi_freelist_lock</span><span class="pun">,</span><span class="pln"> NULL</span><span class="pun">);</span><span class="pln">
     cqi_freelist </span><span class="pun">=</span><span class="pln"> NULL</span><span class="pun">;</span><span class="pln">
  
     </span><span class="com">//分配线程池结构数组</span><span class="pln">
     threads </span><span class="pun">=</span><span class="pln"> calloc</span><span class="pun">(</span><span class="pln">nthreads</span><span class="pun">,</span><span class="pln"> </span><span class="kwd">sizeof</span><span class="pun">(</span><span class="pln">LIBEVENT_THREAD</span><span class="pun">));</span><span class="pln">
     </span><span class="kwd">if</span><span class="pln"> </span><span class="pun">(!</span><span class="pln"> threads</span><span class="pun">)</span><span class="pln"> </span><span class="pun">{</span><span class="pln">
         perror</span><span class="pun">(</span><span class="str">"Can't allocate thread descriptors"</span><span class="pun">);</span><span class="pln">
         </span><span class="kwd">exit</span><span class="pun">(</span><span class="lit">1</span><span class="pun">);</span><span class="pln">
     </span><span class="pun">}</span><span class="pln">
  
     dispatcher_thread</span><span class="pun">.</span><span class="kwd">base</span><span class="pln"> </span><span class="pun">=</span><span class="pln"> main_base</span><span class="pun">;</span><span class="pln">
     dispatcher_thread</span><span class="pun">.</span><span class="pln">thread_id </span><span class="pun">=</span><span class="pln"> pthread_self</span><span class="pun">();</span><span class="pln">
  
     </span><span class="com">//为线程池每个线程创建读写管道</span><span class="pln">
     </span><span class="kwd">for</span><span class="pln"> </span><span class="pun">(</span><span class="pln">i </span><span class="pun">=</span><span class="pln"> </span><span class="lit">0</span><span class="pun">;</span><span class="pln"> i </span><span class="pun"><</span><span class="pln"> nthreads</span><span class="pun">;</span><span class="pln"> i</span><span class="pun">++)</span><span class="pln"> </span><span class="pun">{</span><span class="pln">
         </span><span class="kwd">int</span><span class="pln"> fds</span><span class="pun">[</span><span class="lit">2</span><span class="pun">];</span><span class="pln">
         </span><span class="kwd">if</span><span class="pln"> </span><span class="pun">(</span><span class="pln">pipe</span><span class="pun">(</span><span class="pln">fds</span><span class="pun">))</span><span class="pln"> </span><span class="pun">{</span><span class="pln">
             perror</span><span class="pun">(</span><span class="str">"Can't create notify pipe"</span><span class="pun">);</span><span class="pln">
             </span><span class="kwd">exit</span><span class="pun">(</span><span class="lit">1</span><span class="pun">);</span><span class="pln">
         </span><span class="pun">}</span><span class="pln">
  
         threads</span><span class="pun">[</span><span class="pln">i</span><span class="pun">].</span><span class="pln">notify_receive_fd </span><span class="pun">=</span><span class="pln"> fds</span><span class="pun">[</span><span class="lit">0</span><span class="pun">];</span><span class="pln">
         threads</span><span class="pun">[</span><span class="pln">i</span><span class="pun">].</span><span class="pln">notify_send_fd </span><span class="pun">=</span><span class="pln"> fds</span><span class="pun">[</span><span class="lit">1</span><span class="pun">];</span><span class="pln">
  
         </span><span class="com">//填充线程结构体信息</span><span class="pln">
         setup_thread</span><span class="pun">(&</span><span class="pln">threads</span><span class="pun">[</span><span class="pln">i</span><span class="pun">]);</span><span class="pln">
     </span><span class="pun">}</span><span class="pln">
  
     </span><span class="com">/* Create threads after we've done all the libevent setup. */</span><span class="pln">
     </span><span class="kwd">for</span><span class="pln"> </span><span class="pun">(</span><span class="pln">i </span><span class="pun">=</span><span class="pln"> </span><span class="lit">0</span><span class="pun">;</span><span class="pln"> i </span><span class="pun"><</span><span class="pln"> nthreads</span><span class="pun">;</span><span class="pln"> i</span><span class="pun">++)</span><span class="pln"> </span><span class="pun">{</span><span class="pln">
         </span><span class="com">//为线程池创建数目为nthreads的线程,worker_libevent为线程的回调函数,</span><span class="pln">
         create_worker</span><span class="pun">(</span><span class="pln">worker_libevent</span><span class="pun">,</span><span class="pln"> </span><span class="pun">&</span><span class="pln">threads</span><span class="pun">[</span><span class="pln">i</span><span class="pun">]);</span><span class="pln">
     </span><span class="pun">}</span><span class="pln">
  
     </span><span class="com">/* Wait for all the threads to set themselves up before returning. */</span><span class="pln">
     pthread_mutex_lock</span><span class="pun">(&</span><span class="pln">init_lock</span><span class="pun">);</span><span class="pln">
     </span><span class="kwd">while</span><span class="pln"> </span><span class="pun">(</span><span class="pln">init_count </span><span class="pun"><</span><span class="pln"> nthreads</span><span class="pun">)</span><span class="pln"> </span><span class="pun">{</span><span class="pln">
         pthread_cond_wait</span><span class="pun">(&</span><span class="pln">init_cond</span><span class="pun">,</span><span class="pln"> </span><span class="pun">&</span><span class="pln">init_lock</span><span class="pun">);</span><span class="pln">
     </span><span class="pun">}</span><span class="pln">
     pthread_mutex_unlock</span><span class="pun">(&</span><span class="pln">init_lock</span><span class="pun">);</span><span class="pln">
 </span><span class="pun">}</span>

线程池初始化函数由主线程进行调用,该函数先初始化各互斥锁,然后使用calloc分配nthreads*sizeof(LIBEVENT_THREAD)个字节的内存块来管理线程池,返回一个全局static变量 threads(类型为LIBEVENT_THREAD *);然后为每个线程创建一个匿名管道(该pipe将在线程的调度中发挥作用),接下来的setup_thread函数为线程设置事件监听,绑定CQ链表等初始化信息,源码如下所示:

<span class="com">/*
  * Set up a thread's information.
  */</span><span class="pln">
 </span><span class="kwd">static</span><span class="pln"> </span><span class="kwd">void</span><span class="pln"> setup_thread</span><span class="pun">(</span><span class="pln">LIBEVENT_THREAD </span><span class="pun">*</span><span class="pln">me</span><span class="pun">)</span><span class="pln"> </span><span class="pun">{</span><span class="pln">
     me</span><span class="pun">-></span><span class="kwd">base</span><span class="pln"> </span><span class="pun">=</span><span class="pln"> event_init</span><span class="pun">();</span><span class="pln">
     </span><span class="kwd">if</span><span class="pln"> </span><span class="pun">(!</span><span class="pln"> me</span><span class="pun">-></span><span class="kwd">base</span><span class="pun">)</span><span class="pln"> </span><span class="pun">{</span><span class="pln">
         fprintf</span><span class="pun">(</span><span class="pln">stderr</span><span class="pun">,</span><span class="pln"> </span><span class="str">"Can't allocate event base\n"</span><span class="pun">);</span><span class="pln">
         </span><span class="kwd">exit</span><span class="pun">(</span><span class="lit">1</span><span class="pun">);</span><span class="pln">
     </span><span class="pun">}</span><span class="pln">
  
     </span><span class="com">/* Listen for notifications from other threads */</span><span class="pln">
     </span><span class="com">//为管道设置读事件监听,thread_libevent_process为回调函数</span><span class="pln">
     event_set</span><span class="pun">(&</span><span class="pln">me</span><span class="pun">-></span><span class="pln">notify_event</span><span class="pun">,</span><span class="pln"> me</span><span class="pun">-></span><span class="pln">notify_receive_fd</span><span class="pun">,</span><span class="pln">
               EV_READ </span><span class="pun">|</span><span class="pln"> EV_PERSIST</span><span class="pun">,</span><span class="pln"> thread_libevent_process</span><span class="pun">,</span><span class="pln"> me</span><span class="pun">);</span><span class="pln">
     event_base_set</span><span class="pun">(</span><span class="pln">me</span><span class="pun">-></span><span class="kwd">base</span><span class="pun">,</span><span class="pln"> </span><span class="pun">&</span><span class="pln">me</span><span class="pun">-></span><span class="pln">notify_event</span><span class="pun">);</span><span class="pln">
  
     </span><span class="kwd">if</span><span class="pln"> </span><span class="pun">(</span><span class="pln">event_add</span><span class="pun">(&</span><span class="pln">me</span><span class="pun">-></span><span class="pln">notify_event</span><span class="pun">,</span><span class="pln"> </span><span class="lit">0</span><span class="pun">)</span><span class="pln"> </span><span class="pun">==</span><span class="pln"> </span><span class="pun">-</span><span class="lit">1</span><span class="pun">)</span><span class="pln"> </span><span class="pun">{</span><span class="pln">
         fprintf</span><span class="pun">(</span><span class="pln">stderr</span><span class="pun">,</span><span class="pln"> </span><span class="str">"Can't monitor libevent notify pipe\n"</span><span class="pun">);</span><span class="pln">
         </span><span class="kwd">exit</span><span class="pun">(</span><span class="lit">1</span><span class="pun">);</span><span class="pln">
     </span><span class="pun">}</span><span class="pln">
  
     </span><span class="com">//为新线程创建连接CQ链表</span><span class="pln">
     me</span><span class="pun">-></span><span class="pln">new_conn_queue </span><span class="pun">=</span><span class="pln"> malloc</span><span class="pun">(</span><span class="kwd">sizeof</span><span class="pun">(</span><span class="kwd">struct</span><span class="pln"> conn_queue</span><span class="pun">));</span><span class="pln">
     </span><span class="kwd">if</span><span class="pln"> </span><span class="pun">(</span><span class="pln">me</span><span class="pun">-></span><span class="pln">new_conn_queue </span><span class="pun">==</span><span class="pln"> NULL</span><span class="pun">)</span><span class="pln"> </span><span class="pun">{</span><span class="pln">
         perror</span><span class="pun">(</span><span class="str">"Failed to allocate memory for connection queue"</span><span class="pun">);</span><span class="pln">
         </span><span class="kwd">exit</span><span class="pun">(</span><span class="pln">EXIT_FAILURE</span><span class="pun">);</span><span class="pln">
     </span><span class="pun">}</span><span class="pln">
     </span><span class="com">//初始化线程控制器内的CQ链表</span><span class="pln">
     cq_init</span><span class="pun">(</span><span class="pln">me</span><span class="pun">-></span><span class="pln">new_conn_queue</span><span class="pun">);</span><span class="pln">
  
     </span><span class="kwd">if</span><span class="pln"> </span><span class="pun">(</span><span class="pln">pthread_mutex_init</span><span class="pun">(&</span><span class="pln">me</span><span class="pun">-></span><span class="pln">stats</span><span class="pun">.</span><span class="pln">mutex</span><span class="pun">,</span><span class="pln"> NULL</span><span class="pun">)</span><span class="pln"> </span><span class="pun">!=</span><span class="pln"> </span><span class="lit">0</span><span class="pun">)</span><span class="pln"> </span><span class="pun">{</span><span class="pln">
         perror</span><span class="pun">(</span><span class="str">"Failed to initialize mutex"</span><span class="pun">);</span><span class="pln">
         </span><span class="kwd">exit</span><span class="pun">(</span><span class="pln">EXIT_FAILURE</span><span class="pun">);</span><span class="pln">
     </span><span class="pun">}</span><span class="pln">
     </span><span class="com">//创建cache</span><span class="pln">
     me</span><span class="pun">-></span><span class="pln">suffix_cache </span><span class="pun">=</span><span class="pln"> cache_create</span><span class="pun">(</span><span class="str">"suffix"</span><span class="pun">,</span><span class="pln"> SUFFIX_SIZE</span><span class="pun">,</span><span class="pln"> </span><span class="kwd">sizeof</span><span class="pun">(</span><span class="kwd">char</span><span class="pun">*),</span><span class="pln">
                                     NULL</span><span class="pun">,</span><span class="pln"> NULL</span><span class="pun">);</span><span class="pln">
     </span><span class="kwd">if</span><span class="pln"> </span><span class="pun">(</span><span class="pln">me</span><span class="pun">-></span><span class="pln">suffix_cache </span><span class="pun">==</span><span class="pln"> NULL</span><span class="pun">)</span><span class="pln"> </span><span class="pun">{</span><span class="pln">
         fprintf</span><span class="pun">(</span><span class="pln">stderr</span><span class="pun">,</span><span class="pln"> </span><span class="str">"Failed to create suffix cache\n"</span><span class="pun">);</span><span class="pln">
         </span><span class="kwd">exit</span><span class="pun">(</span><span class="pln">EXIT_FAILURE</span><span class="pun">);</span><span class="pln">
     </span><span class="pun">}</span><span class="pln">
 </span><span class="pun">}</span>

memcached使用libevent实现事件循环,关于libevent,不熟悉的读者可以查看相关资料,这里不做介绍,源码中的这句代码:

event_set(&me->notify_event, me->notify_receive_fd,EV_READ | EV_PERSIST, thread_libevent_process, me);

在me->notify_receive_fd(即匿名管道的读端)设置可读事件,回调函数 为thread_libevent_process,函数定义如下:

<span class="kwd">static</span><span class="pln"> </span><span class="kwd">void</span><span class="pln"> thread_libevent_process</span><span class="pun">(</span><span class="kwd">int</span><span class="pln"> fd</span><span class="pun">,</span><span class="pln"> </span><span class="kwd">short</span><span class="pln"> which</span><span class="pun">,</span><span class="pln"> </span><span class="kwd">void</span><span class="pln"> </span><span class="pun">*</span><span class="pln">arg</span><span class="pun">)</span><span class="pln"> </span><span class="pun">{</span><span class="pln">
     LIBEVENT_THREAD </span><span class="pun">*</span><span class="pln">me </span><span class="pun">=</span><span class="pln"> arg</span><span class="pun">;</span><span class="pln">
     CQ_ITEM </span><span class="pun">*</span><span class="pln">item</span><span class="pun">;</span><span class="pln">
     </span><span class="kwd">char</span><span class="pln"> buf</span><span class="pun">[</span><span class="lit">1</span><span class="pun">];</span><span class="pln">
  
     </span><span class="com">//响应pipe可读事件,读取主线程向管道内写的1字节数据(见dispatch_conn_new()函数)</span><span class="pln">
     </span><span class="kwd">if</span><span class="pln"> </span><span class="pun">(</span><span class="pln">read</span><span class="pun">(</span><span class="pln">fd</span><span class="pun">,</span><span class="pln"> buf</span><span class="pun">,</span><span class="pln"> </span><span class="lit">1</span><span class="pun">)</span><span class="pln"> </span><span class="pun">!=</span><span class="pln"> </span><span class="lit">1</span><span class="pun">)</span><span class="pln">
         </span><span class="kwd">if</span><span class="pln"> </span><span class="pun">(</span><span class="pln">settings</span><span class="pun">.</span><span class="pln">verbose </span><span class="pun">></span><span class="pln"> </span><span class="lit">0</span><span class="pun">)</span><span class="pln">
             fprintf</span><span class="pun">(</span><span class="pln">stderr</span><span class="pun">,</span><span class="pln"> </span><span class="str">"Can't read from libevent pipe\n"</span><span class="pun">);</span><span class="pln">
  
     </span><span class="com">//从链接队列中取出一个conn</span><span class="pln">
     item </span><span class="pun">=</span><span class="pln"> cq_pop</span><span class="pun">(</span><span class="pln">me</span><span class="pun">-></span><span class="pln">new_conn_queue</span><span class="pun">);</span><span class="pln">
  
     </span><span class="kwd">if</span><span class="pln"> </span><span class="pun">(</span><span class="pln">NULL </span><span class="pun">!=</span><span class="pln"> item</span><span class="pun">)</span><span class="pln"> </span><span class="pun">{</span><span class="pln">
         </span><span class="com">//使用conn创建新的任务</span><span class="pln">
         conn </span><span class="pun">*</span><span class="pln">c </span><span class="pun">=</span><span class="pln"> conn_new</span><span class="pun">(</span><span class="pln">item</span><span class="pun">-></span><span class="pln">sfd</span><span class="pun">,</span><span class="pln"> item</span><span class="pun">-></span><span class="pln">init_state</span><span class="pun">,</span><span class="pln"> item</span><span class="pun">-></span><span class="pln">event_flags</span><span class="pun">,</span><span class="pln">
                            item</span><span class="pun">-></span><span class="pln">read_buffer_size</span><span class="pun">,</span><span class="pln"> item</span><span class="pun">-></span><span class="pln">transport</span><span class="pun">,</span><span class="pln"> me</span><span class="pun">-></span><span class="kwd">base</span><span class="pun">);</span><span class="pln">
         </span><span class="kwd">if</span><span class="pln"> </span><span class="pun">(</span><span class="pln">c </span><span class="pun">==</span><span class="pln"> NULL</span><span class="pun">)</span><span class="pln"> </span><span class="pun">{</span><span class="pln">
             </span><span class="kwd">if</span><span class="pln"> </span><span class="pun">(</span><span class="pln">IS_UDP</span><span class="pun">(</span><span class="pln">item</span><span class="pun">-></span><span class="pln">transport</span><span class="pun">))</span><span class="pln"> </span><span class="pun">{</span><span class="pln">
                 fprintf</span><span class="pun">(</span><span class="pln">stderr</span><span class="pun">,</span><span class="pln"> </span><span class="str">"Can't listen for events on UDP socket\n"</span><span class="pun">);</span><span class="pln">
                 </span><span class="kwd">exit</span><span class="pun">(</span><span class="lit">1</span><span class="pun">);</span><span class="pln">
             </span><span class="pun">}</span><span class="pln"> </span><span class="kwd">else</span><span class="pln"> </span><span class="pun">{</span><span class="pln">
                 </span><span class="kwd">if</span><span class="pln"> </span><span class="pun">(</span><span class="pln">settings</span><span class="pun">.</span><span class="pln">verbose </span><span class="pun">></span><span class="pln"> </span><span class="lit">0</span><span class="pun">)</span><span class="pln"> </span><span class="pun">{</span><span class="pln">
                     fprintf</span><span class="pun">(</span><span class="pln">stderr</span><span class="pun">,</span><span class="pln"> </span><span class="str">"Can't listen for events on fd %d\n"</span><span class="pun">,</span><span class="pln">
                         item</span><span class="pun">-></span><span class="pln">sfd</span><span class="pun">);</span><span class="pln">
                 </span><span class="pun">}</span><span class="pln">
                 close</span><span class="pun">(</span><span class="pln">item</span><span class="pun">-></span><span class="pln">sfd</span><span class="pun">);</span><span class="pln">
             </span><span class="pun">}</span><span class="pln">
         </span><span class="pun">}</span><span class="pln"> </span><span class="kwd">else</span><span class="pln"> </span><span class="pun">{</span><span class="pln">
             c</span><span class="pun">-></span><span class="pln">thread </span><span class="pun">=</span><span class="pln"> me</span><span class="pun">;</span><span class="pln">
         </span><span class="pun">}</span><span class="pln">
         cqi_free</span><span class="pun">(</span><span class="pln">item</span><span class="pun">);</span><span class="pln">
     </span><span class="pun">}</span><span class="pln">
 </span><span class="pun">}</span>

使用setup_thread设置线程结构体的初始化信息之后,现在我们回到thread_init函数,thread_init中接着循环调用(循环调用nthreads次)create_worker(worker_libevent, &threads[i]); 创建真正运行的线程,create_worker是对pthread_create()简单的封装,参数worker_libevent作为每个线程的运行体,&threads[i]为传入参数。

worker_libevent为线程体,源码如下:

<span class="com">/*
  * Worker thread: main event loop
  */</span><span class="pln">
 </span><span class="kwd">static</span><span class="pln"> </span><span class="kwd">void</span><span class="pln"> </span><span class="pun">*</span><span class="pln">worker_libevent</span><span class="pun">(</span><span class="kwd">void</span><span class="pln"> </span><span class="pun">*</span><span class="pln">arg</span><span class="pun">)</span><span class="pln"> </span><span class="pun">{</span><span class="pln">
     LIBEVENT_THREAD </span><span class="pun">*</span><span class="pln">me </span><span class="pun">=</span><span class="pln"> arg</span><span class="pun">;</span><span class="pln">
  
     </span><span class="com">/* Any per-thread setup can happen here; thread_init() will block until
      * all threads have finished initializing.
      */</span><span class="pln">
     pthread_mutex_lock</span><span class="pun">(&</span><span class="pln">init_lock</span><span class="pun">);</span><span class="pln">
     init_count</span><span class="pun">++;</span><span class="pln">     </span><span class="com">//每创建新线程,将全局init_count加1</span><span class="pln">
     pthread_cond_signal</span><span class="pun">(&</span><span class="pln">init_cond</span><span class="pun">);</span><span class="pln">  </span><span class="com">// 发送init_cond信号</span><span class="pln">
     pthread_mutex_unlock</span><span class="pun">(&</span><span class="pln">init_lock</span><span class="pun">);</span><span class="pln">
  
     </span><span class="com">//新创建线程阻塞于此,等待事件</span><span class="pln">
     event_base_loop</span><span class="pun">(</span><span class="pln">me</span><span class="pun">-></span><span class="kwd">base</span><span class="pun">,</span><span class="pln"> </span><span class="lit">0</span><span class="pun">);</span><span class="pln"> </span><span class="com">//Libevent的事件主循环</span><span class="pln">
     </span><span class="kwd">return</span><span class="pln"> NULL</span><span class="pun">;</span><span class="pln">
 </span><span class="pun">}</span>

worker_libevent中给init_count加1的目的在thread_init函数的这段代码可以看出来,

<span class="com">/* Wait for all the threads to set themselves up before returning. */</span><span class="pln">
     pthread_mutex_lock</span><span class="pun">(&</span><span class="pln">init_lock</span><span class="pun">);</span><span class="pln">
     </span><span class="kwd">while</span><span class="pln"> </span><span class="pun">(</span><span class="pln">init_count </span><span class="pun"><</span><span class="pln"> nthreads</span><span class="pun">)</span><span class="pln"> </span><span class="pun">{</span><span class="pln">
         pthread_cond_wait</span><span class="pun">(&</span><span class="pln">init_cond</span><span class="pun">,</span><span class="pln"> </span><span class="pun">&</span><span class="pln">init_lock</span><span class="pun">);</span><span class="pln">
     </span><span class="pun">}</span><span class="pln">
     pthread_mutex_unlock</span><span class="pun">(&</span><span class="pln">init_lock</span><span class="pun">);</span>

即主线程阻塞如此,等待worker_libevent发出的init_cond信号,唤醒后检查init_count < nthreads是否为假(即创建的线程数目是否达到要求),否则继续等待。 至此,线程池创建的代码已分析完毕,由于篇幅较长,将分析线程池中线程的调度流程另立一篇。

赞(0) 打赏
分享到: 更多 (0)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏

登录

注册