[转载]memcached源码分析之线程池机制(一) – Moon_Bird – 博客园.
已经个把月没有写长篇博文了,最近抽了点时间,将memcached源码分析系列文章的线程机制篇给整出来,在分析源码的过程中参考了网上的一些资源。
该文主要集中于两个问题:(1)memcached线程池是如何创建的,(2)线程池中的线程又是如何进行调度的。一切从源码中找答案。
memcached的线程池模型采用较典型的Master-Worker模型:
(1)主线程负责监听客户端的建立连接请求,以及accept 连接,将连接好的套接字放入连接队列;
(2)调度workers空闲线程来负责处理已经建立好的连接的读写等事件。
1 关键数据抽象
(1)memcached单个线程结构的封装
<span class = "com" > //memcached线程结构的封装结构</span><span class="pln"> </span><span class = "kwd" >typedef</span><span class = "pln" > </span><span class = "kwd" > struct </span><span class = "pln" > </span><span class = "pun" >{</span><span class = "pln" > </span><span class = "typ" >pthread_t</span><span class = "pln" > thread_id</span><span class = "pun" >;</span><span class = "pln" > </span><span class = "com" > /* unique ID of this thread */ </span><span class = "pln" > </span><span class = "kwd" > struct </span><span class = "pln" > event_base </span><span class = "pun" >*</span><span class = "kwd" > base </span><span class = "pun" >;</span><span class = "pln" > </span><span class = "com" > /* libevent handle this thread uses */ </span><span class = "pln" > </span><span class = "kwd" > struct </span><span class = "pln" > </span><span class = "kwd" > event </span><span class = "pln" > notify_event</span><span class = "pun" >;</span><span class = "pln" > </span><span class = "com" > /* listen event for notify pipe */ </span><span class = "pln" > </span><span class = "kwd" > int </span><span class = "pln" > notify_receive_fd</span><span class = "pun" >;</span><span class = "pln" > </span><span class = "com" > /* receiving end of notify pipe */ </span><span class = "pln" > </span><span class = "kwd" > int </span><span class = "pln" > notify_send_fd</span><span class = "pun" >;</span><span class = "pln" > </span><span class = "com" > /* sending end of notify pipe */ </span><span class = "pln" > </span><span class = "kwd" > struct </span><span class = "pln" > thread_stats stats</span><span class = "pun" >;</span><span class = "pln" > </span><span class = "com" > /* Stats generated by this thread */ </span><span class = "pln" > </span><span class = "kwd" > struct </span><span class = "pln" > conn_queue </span><span class = "pun" >*</span><span class = "pln" >new_conn_queue</span><span class = "pun" >;</span><span class = "pln" > </span><span class = "com" > /* queue of new connections to handle */ </span><span class = "pln" > </span><span class = "typ" >cache_t</span><span class = "pln" > </span><span class = "pun" >*</span><span class = "pln" >suffix_cache</span><span class = "pun" >;</span><span class = "pln" > </span><span class = "com" > /* suffix cache */ </span><span class = "pln" > </span><span class = "pun" >}</span><span class = "pln" > LIBEVENT_THREAD</span><span class = "pun" >;</span> |
这是memcached里的线程结构的封装,可以看到每个线程都包含一个CQ队列,一条通知管道pipe ,一个libevent的实例event_base等。
(2)线程连接队列
<span class = "com" > /* A connection queue. */ </span><span class = "pln" > </span><span class = "kwd" >typedef</span><span class = "pln" > </span><span class = "kwd" > struct </span><span class = "pln" > conn_queue CQ</span><span class = "pun" >;</span><span class = "pln" > </span><span class = "kwd" > struct </span><span class = "pln" > conn_queue </span><span class = "pun" >{</span><span class = "pln" > CQ_ITEM </span><span class = "pun" >*</span><span class = "pln" >head</span><span class = "pun" >;</span><span class = "pln" > CQ_ITEM </span><span class = "pun" >*</span><span class = "pln" >tail</span><span class = "pun" >;</span><span class = "pln" > </span><span class = "typ" >pthread_mutex_t</span><span class = "pln" > </span><span class = "kwd" > lock </span><span class = "pun" >;</span><span class = "pln" > </span><span class = "typ" >pthread_cond_t</span><span class = "pln" > cond</span><span class = "pun" >;</span><span class = "pln" > </span><span class = "pun" >};</span> |
每个线程结构体中都指向一个CQ链表,CQ链表管理CQ_ITEM的单向链表。
(3)连接项结构体
<span class = "com" > /* An item in the connection queue. */ </span><span class = "pln" > </span><span class = "kwd" >typedef</span><span class = "pln" > </span><span class = "kwd" > struct </span><span class = "pln" > conn_queue_item CQ_ITEM</span><span class = "pun" >;</span><span class = "pln" > </span><span class = "kwd" > struct </span><span class = "pln" > conn_queue_item </span><span class = "pun" >{</span><span class = "pln" > </span><span class = "kwd" > int </span><span class = "pln" > sfd</span><span class = "pun" >;</span><span class = "pln" > </span><span class = "kwd" > enum </span><span class = "pln" > conn_states init_state</span><span class = "pun" >;</span><span class = "pln" > </span><span class = "kwd" > int </span><span class = "pln" > event_flags</span><span class = "pun" >;</span><span class = "pln" > </span><span class = "kwd" > int </span><span class = "pln" > read_buffer_size</span><span class = "pun" >;</span><span class = "pln" > </span><span class = "kwd" > enum </span><span class = "pln" > network_transport transport</span><span class = "pun" >;</span><span class = "pln" > CQ_ITEM </span><span class = "pun" >*</span><span class = "kwd" >next</span><span class = "pun" >;</span><span class = "pln" > </span><span class = "pun" >};</span> |
CQ_ITEM实际上是主线程accept后返回的已建立连接的fd的封装,由主线程创建初始化并放入连接链表CQ中,共workers线程使用。
(4)网络连接的封装结构体
<span class = "com" > /** * The structure representing a connection into memcached. */ </span><span class = "pln" > </span><span class = "com" > //memcached表示一个conn的抽象结构</span><span class="pln"> </span><span class = "kwd" >typedef</span><span class = "pln" > </span><span class = "kwd" > struct </span><span class = "pln" > conn conn</span><span class = "pun" >;</span><span class = "pln" > </span><span class = "kwd" > struct </span><span class = "pln" > conn </span><span class = "pun" >{</span><span class = "pln" > </span><span class = "pun" >..................</span><span class = "pln" > </span><span class = "pun" >};</span> |
由于这个结构太大,就略去中间的成员不展示了,与我们线程池相关的有一个成员则非常关键,那就是state,它是memcached中状态机驱动的关键(由drive_machine函数实现)。
2 线程池的初始化:
main()中线程池初始化函数入口为:
/* start up worker threads if MT mode */
thread_init(settings.num_threads, main_base);
函数的定义在thread.c实现,源码如下所示:
<span class = "com" > /* * Initializes the thread subsystem, creating various worker threads. * * nthreads Number of worker event handler threads to spawn * main_base Event base for main thread */ </span><span class = "pln" > </span><span class = "kwd" > void </span><span class = "pln" > thread_init</span><span class = "pun" >(</span><span class = "kwd" > int </span><span class = "pln" > nthreads</span><span class = "pun" >,</span><span class = "pln" > </span><span class = "kwd" > struct </span><span class = "pln" > event_base </span><span class = "pun" >*</span><span class = "pln" >main_base</span><span class = "pun" >)</span><span class = "pln" > </span><span class = "pun" >{</span><span class = "pln" > </span><span class = "kwd" > int </span><span class = "pln" > i</span><span class = "pun" >;</span><span class = "pln" > pthread_mutex_init</span><span class = "pun" >(&</span><span class = "pln" >cache_lock</span><span class = "pun" >,</span><span class = "pln" > NULL</span><span class = "pun" >);</span><span class = "pln" > pthread_mutex_init</span><span class = "pun" >(&</span><span class = "pln" >stats_lock</span><span class = "pun" >,</span><span class = "pln" > NULL</span><span class = "pun" >);</span><span class = "pln" > pthread_mutex_init</span><span class = "pun" >(&</span><span class = "pln" >init_lock</span><span class = "pun" >,</span><span class = "pln" > NULL</span><span class = "pun" >);</span><span class = "pln" > pthread_cond_init</span><span class = "pun" >(&</span><span class = "pln" >init_cond</span><span class = "pun" >,</span><span class = "pln" > NULL</span><span class = "pun" >);</span><span class = "pln" > pthread_mutex_init</span><span class = "pun" >(&</span><span class = "pln" >cqi_freelist_lock</span><span class = "pun" >,</span><span class = "pln" > NULL</span><span class = "pun" >);</span><span class = "pln" > cqi_freelist </span><span class = "pun" >=</span><span class = "pln" > NULL</span><span class = "pun" >;</span><span class = "pln" > </span><span class = "com" > //分配线程池结构数组</span><span class="pln"> threads </span><span class = "pun" >=</span><span class = "pln" > calloc</span><span class = "pun" >(</span><span class = "pln" >nthreads</span><span class = "pun" >,</span><span class = "pln" > </span><span class = "kwd" > sizeof </span><span class = "pun" >(</span><span class = "pln" >LIBEVENT_THREAD</span><span class = "pun" >));</span><span class = "pln" > </span><span class = "kwd" > if </span><span class = "pln" > </span><span class = "pun" >(!</span><span class = "pln" > threads</span><span class = "pun" >)</span><span class = "pln" > </span><span class = "pun" >{</span><span class = "pln" > perror</span><span class = "pun" >(</span><span class = "str" > "Can't allocate thread descriptors" </span><span class = "pun" >);</span><span class = "pln" > </span><span class = "kwd" >exit</span><span class = "pun" >(</span><span class = "lit" >1</span><span class = "pun" >);</span><span class = "pln" > </span><span class = "pun" >}</span><span class = "pln" > dispatcher_thread</span><span class = "pun" >.</span><span class = "kwd" > base </span><span class = "pln" > </span><span class = "pun" >=</span><span class = "pln" > main_base</span><span class = "pun" >;</span><span class = "pln" > dispatcher_thread</span><span class = "pun" >.</span><span class = "pln" >thread_id </span><span class = "pun" >=</span><span class = "pln" > pthread_self</span><span class = "pun" >();</span><span class = "pln" > </span><span class = "com" > //为线程池每个线程创建读写管道</span><span class="pln"> </span><span class = "kwd" > for </span><span class = "pln" > </span><span class = "pun" >(</span><span class = "pln" >i </span><span class = "pun" >=</span><span class = "pln" > </span><span class = "lit" >0</span><span class = "pun" >;</span><span class = "pln" > i </span><span class = "pun" ><</span><span class = "pln" > nthreads</span><span class = "pun" >;</span><span class = "pln" > i</span><span class = "pun" >++)</span><span class = "pln" > </span><span class = "pun" >{</span><span class = "pln" > </span><span class = "kwd" > int </span><span class = "pln" > fds</span><span class = "pun" >[</span><span class = "lit" >2</span><span class = "pun" >];</span><span class = "pln" > </span><span class = "kwd" > if </span><span class = "pln" > </span><span class = "pun" >(</span><span class = "pln" >pipe</span><span class = "pun" >(</span><span class = "pln" >fds</span><span class = "pun" >))</span><span class = "pln" > </span><span class = "pun" >{</span><span class = "pln" > perror</span><span class = "pun" >(</span><span class = "str" > "Can't create notify pipe" </span><span class = "pun" >);</span><span class = "pln" > </span><span class = "kwd" >exit</span><span class = "pun" >(</span><span class = "lit" >1</span><span class = "pun" >);</span><span class = "pln" > </span><span class = "pun" >}</span><span class = "pln" > threads</span><span class = "pun" >[</span><span class = "pln" >i</span><span class = "pun" >].</span><span class = "pln" >notify_receive_fd </span><span class = "pun" >=</span><span class = "pln" > fds</span><span class = "pun" >[</span><span class = "lit" >0</span><span class = "pun" >];</span><span class = "pln" > threads</span><span class = "pun" >[</span><span class = "pln" >i</span><span class = "pun" >].</span><span class = "pln" >notify_send_fd </span><span class = "pun" >=</span><span class = "pln" > fds</span><span class = "pun" >[</span><span class = "lit" >1</span><span class = "pun" >];</span><span class = "pln" > </span><span class = "com" > //填充线程结构体信息</span><span class="pln"> setup_thread</span><span class = "pun" >(&</span><span class = "pln" >threads</span><span class = "pun" >[</span><span class = "pln" >i</span><span class = "pun" >]);</span><span class = "pln" > </span><span class = "pun" >}</span><span class = "pln" > </span><span class = "com" > /* Create threads after we've done all the libevent setup. */ </span><span class = "pln" > </span><span class = "kwd" > for </span><span class = "pln" > </span><span class = "pun" >(</span><span class = "pln" >i </span><span class = "pun" >=</span><span class = "pln" > </span><span class = "lit" >0</span><span class = "pun" >;</span><span class = "pln" > i </span><span class = "pun" ><</span><span class = "pln" > nthreads</span><span class = "pun" >;</span><span class = "pln" > i</span><span class = "pun" >++)</span><span class = "pln" > </span><span class = "pun" >{</span><span class = "pln" > </span><span class = "com" > //为线程池创建数目为nthreads的线程,worker_libevent为线程的回调函数,</span><span class="pln"> create_worker</span><span class = "pun" >(</span><span class = "pln" >worker_libevent</span><span class = "pun" >,</span><span class = "pln" > </span><span class = "pun" >&</span><span class = "pln" >threads</span><span class = "pun" >[</span><span class = "pln" >i</span><span class = "pun" >]);</span><span class = "pln" > </span><span class = "pun" >}</span><span class = "pln" > </span><span class = "com" > /* Wait for all the threads to set themselves up before returning. */ </span><span class = "pln" > pthread_mutex_lock</span><span class = "pun" >(&</span><span class = "pln" >init_lock</span><span class = "pun" >);</span><span class = "pln" > </span><span class = "kwd" > while </span><span class = "pln" > </span><span class = "pun" >(</span><span class = "pln" >init_count </span><span class = "pun" ><</span><span class = "pln" > nthreads</span><span class = "pun" >)</span><span class = "pln" > </span><span class = "pun" >{</span><span class = "pln" > pthread_cond_wait</span><span class = "pun" >(&</span><span class = "pln" >init_cond</span><span class = "pun" >,</span><span class = "pln" > </span><span class = "pun" >&</span><span class = "pln" >init_lock</span><span class = "pun" >);</span><span class = "pln" > </span><span class = "pun" >}</span><span class = "pln" > pthread_mutex_unlock</span><span class = "pun" >(&</span><span class = "pln" >init_lock</span><span class = "pun" >);</span><span class = "pln" > </span><span class = "pun" >}</span> |
线程池初始化函数由主线程进行调用,该函数先初始化各互斥锁,然后使用calloc分配nthreads*sizeof(LIBEVENT_THREAD)个字节的内存块来管理线程池,返回一个全局static变量 threads(类型为LIBEVENT_THREAD *);然后为每个线程创建一个匿名管道(该pipe将在线程的调度中发挥作用),接下来的setup_thread函数为线程设置事件监听,绑定CQ链表等初始化信息,源码如下所示:
<span class = "com" > /* * Set up a thread's information. */ </span><span class = "pln" > </span><span class = "kwd" > static </span><span class = "pln" > </span><span class = "kwd" > void </span><span class = "pln" > setup_thread</span><span class = "pun" >(</span><span class = "pln" >LIBEVENT_THREAD </span><span class = "pun" >*</span><span class = "pln" >me</span><span class = "pun" >)</span><span class = "pln" > </span><span class = "pun" >{</span><span class = "pln" > me</span><span class = "pun" >-></span><span class = "kwd" > base </span><span class = "pln" > </span><span class = "pun" >=</span><span class = "pln" > event_init</span><span class = "pun" >();</span><span class = "pln" > </span><span class = "kwd" > if </span><span class = "pln" > </span><span class = "pun" >(!</span><span class = "pln" > me</span><span class = "pun" >-></span><span class = "kwd" > base </span><span class = "pun" >)</span><span class = "pln" > </span><span class = "pun" >{</span><span class = "pln" > fprintf</span><span class = "pun" >(</span><span class = "pln" >stderr</span><span class = "pun" >,</span><span class = "pln" > </span><span class = "str" > "Can't allocate event base\n" </span><span class = "pun" >);</span><span class = "pln" > </span><span class = "kwd" >exit</span><span class = "pun" >(</span><span class = "lit" >1</span><span class = "pun" >);</span><span class = "pln" > </span><span class = "pun" >}</span><span class = "pln" > </span><span class = "com" > /* Listen for notifications from other threads */ </span><span class = "pln" > </span><span class = "com" > //为管道设置读事件监听,thread_libevent_process为回调函数</span><span class="pln"> event_set</span><span class = "pun" >(&</span><span class = "pln" >me</span><span class = "pun" >-></span><span class = "pln" >notify_event</span><span class = "pun" >,</span><span class = "pln" > me</span><span class = "pun" >-></span><span class = "pln" >notify_receive_fd</span><span class = "pun" >,</span><span class = "pln" > EV_READ </span><span class = "pun" >|</span><span class = "pln" > EV_PERSIST</span><span class = "pun" >,</span><span class = "pln" > thread_libevent_process</span><span class = "pun" >,</span><span class = "pln" > me</span><span class = "pun" >);</span><span class = "pln" > event_base_set</span><span class = "pun" >(</span><span class = "pln" >me</span><span class = "pun" >-></span><span class = "kwd" > base </span><span class = "pun" >,</span><span class = "pln" > </span><span class = "pun" >&</span><span class = "pln" >me</span><span class = "pun" >-></span><span class = "pln" >notify_event</span><span class = "pun" >);</span><span class = "pln" > </span><span class = "kwd" > if </span><span class = "pln" > </span><span class = "pun" >(</span><span class = "pln" >event_add</span><span class = "pun" >(&</span><span class = "pln" >me</span><span class = "pun" >-></span><span class = "pln" >notify_event</span><span class = "pun" >,</span><span class = "pln" > </span><span class = "lit" >0</span><span class = "pun" >)</span><span class = "pln" > </span><span class = "pun" >==</span><span class = "pln" > </span><span class = "pun" >-</span><span class = "lit" >1</span><span class = "pun" >)</span><span class = "pln" > </span><span class = "pun" >{</span><span class = "pln" > fprintf</span><span class = "pun" >(</span><span class = "pln" >stderr</span><span class = "pun" >,</span><span class = "pln" > </span><span class = "str" > "Can't monitor libevent notify pipe\n" </span><span class = "pun" >);</span><span class = "pln" > </span><span class = "kwd" >exit</span><span class = "pun" >(</span><span class = "lit" >1</span><span class = "pun" >);</span><span class = "pln" > </span><span class = "pun" >}</span><span class = "pln" > </span><span class = "com" > //为新线程创建连接CQ链表</span><span class="pln"> me</span><span class = "pun" >-></span><span class = "pln" >new_conn_queue </span><span class = "pun" >=</span><span class = "pln" > malloc</span><span class = "pun" >(</span><span class = "kwd" > sizeof </span><span class = "pun" >(</span><span class = "kwd" > struct </span><span class = "pln" > conn_queue</span><span class = "pun" >));</span><span class = "pln" > </span><span class = "kwd" > if </span><span class = "pln" > </span><span class = "pun" >(</span><span class = "pln" >me</span><span class = "pun" >-></span><span class = "pln" >new_conn_queue </span><span class = "pun" >==</span><span class = "pln" > NULL</span><span class = "pun" >)</span><span class = "pln" > </span><span class = "pun" >{</span><span class = "pln" > perror</span><span class = "pun" >(</span><span class = "str" > "Failed to allocate memory for connection queue" </span><span class = "pun" >);</span><span class = "pln" > </span><span class = "kwd" >exit</span><span class = "pun" >(</span><span class = "pln" >EXIT_FAILURE</span><span class = "pun" >);</span><span class = "pln" > </span><span class = "pun" >}</span><span class = "pln" > </span><span class = "com" > //初始化线程控制器内的CQ链表</span><span class="pln"> cq_init</span><span class = "pun" >(</span><span class = "pln" >me</span><span class = "pun" >-></span><span class = "pln" >new_conn_queue</span><span class = "pun" >);</span><span class = "pln" > </span><span class = "kwd" > if </span><span class = "pln" > </span><span class = "pun" >(</span><span class = "pln" >pthread_mutex_init</span><span class = "pun" >(&</span><span class = "pln" >me</span><span class = "pun" >-></span><span class = "pln" >stats</span><span class = "pun" >.</span><span class = "pln" >mutex</span><span class = "pun" >,</span><span class = "pln" > NULL</span><span class = "pun" >)</span><span class = "pln" > </span><span class = "pun" >!=</span><span class = "pln" > </span><span class = "lit" >0</span><span class = "pun" >)</span><span class = "pln" > </span><span class = "pun" >{</span><span class = "pln" > perror</span><span class = "pun" >(</span><span class = "str" > "Failed to initialize mutex" </span><span class = "pun" >);</span><span class = "pln" > </span><span class = "kwd" >exit</span><span class = "pun" >(</span><span class = "pln" >EXIT_FAILURE</span><span class = "pun" >);</span><span class = "pln" > </span><span class = "pun" >}</span><span class = "pln" > </span><span class = "com" > //创建cache</span><span class="pln"> me</span><span class = "pun" >-></span><span class = "pln" >suffix_cache </span><span class = "pun" >=</span><span class = "pln" > cache_create</span><span class = "pun" >(</span><span class = "str" > "suffix" </span><span class = "pun" >,</span><span class = "pln" > SUFFIX_SIZE</span><span class = "pun" >,</span><span class = "pln" > </span><span class = "kwd" > sizeof </span><span class = "pun" >(</span><span class = "kwd" > char </span><span class = "pun" >*),</span><span class = "pln" > NULL</span><span class = "pun" >,</span><span class = "pln" > NULL</span><span class = "pun" >);</span><span class = "pln" > </span><span class = "kwd" > if </span><span class = "pln" > </span><span class = "pun" >(</span><span class = "pln" >me</span><span class = "pun" >-></span><span class = "pln" >suffix_cache </span><span class = "pun" >==</span><span class = "pln" > NULL</span><span class = "pun" >)</span><span class = "pln" > </span><span class = "pun" >{</span><span class = "pln" > fprintf</span><span class = "pun" >(</span><span class = "pln" >stderr</span><span class = "pun" >,</span><span class = "pln" > </span><span class = "str" > "Failed to create suffix cache\n" </span><span class = "pun" >);</span><span class = "pln" > </span><span class = "kwd" >exit</span><span class = "pun" >(</span><span class = "pln" >EXIT_FAILURE</span><span class = "pun" >);</span><span class = "pln" > </span><span class = "pun" >}</span><span class = "pln" > </span><span class = "pun" >}</span> |
memcached使用libevent实现事件循环,关于libevent,不熟悉的读者可以查看相关资料,这里不做介绍,源码中的这句代码:
event_set(&me->notify_event, me->notify_receive_fd,EV_READ | EV_PERSIST, thread_libevent_process, me);
在me->notify_receive_fd(即匿名管道的读端)设置可读事件,回调函数 为thread_libevent_process,函数定义如下:
<span class = "kwd" > static </span><span class = "pln" > </span><span class = "kwd" > void </span><span class = "pln" > thread_libevent_process</span><span class = "pun" >(</span><span class = "kwd" > int </span><span class = "pln" > fd</span><span class = "pun" >,</span><span class = "pln" > </span><span class = "kwd" > short </span><span class = "pln" > which</span><span class = "pun" >,</span><span class = "pln" > </span><span class = "kwd" > void </span><span class = "pln" > </span><span class = "pun" >*</span><span class = "pln" >arg</span><span class = "pun" >)</span><span class = "pln" > </span><span class = "pun" >{</span><span class = "pln" > LIBEVENT_THREAD </span><span class = "pun" >*</span><span class = "pln" >me </span><span class = "pun" >=</span><span class = "pln" > arg</span><span class = "pun" >;</span><span class = "pln" > CQ_ITEM </span><span class = "pun" >*</span><span class = "pln" >item</span><span class = "pun" >;</span><span class = "pln" > </span><span class = "kwd" > char </span><span class = "pln" > buf</span><span class = "pun" >[</span><span class = "lit" >1</span><span class = "pun" >];</span><span class = "pln" > </span><span class = "com" > //响应pipe可读事件,读取主线程向管道内写的1字节数据(见dispatch_conn_new()函数)</span><span class="pln"> </span><span class = "kwd" > if </span><span class = "pln" > </span><span class = "pun" >(</span><span class = "pln" >read</span><span class = "pun" >(</span><span class = "pln" >fd</span><span class = "pun" >,</span><span class = "pln" > buf</span><span class = "pun" >,</span><span class = "pln" > </span><span class = "lit" >1</span><span class = "pun" >)</span><span class = "pln" > </span><span class = "pun" >!=</span><span class = "pln" > </span><span class = "lit" >1</span><span class = "pun" >)</span><span class = "pln" > </span><span class = "kwd" > if </span><span class = "pln" > </span><span class = "pun" >(</span><span class = "pln" >settings</span><span class = "pun" >.</span><span class = "pln" >verbose </span><span class = "pun" >></span><span class = "pln" > </span><span class = "lit" >0</span><span class = "pun" >)</span><span class = "pln" > fprintf</span><span class = "pun" >(</span><span class = "pln" >stderr</span><span class = "pun" >,</span><span class = "pln" > </span><span class = "str" > "Can't read from libevent pipe\n" </span><span class = "pun" >);</span><span class = "pln" > </span><span class = "com" > //从链接队列中取出一个conn</span><span class="pln"> item </span><span class = "pun" >=</span><span class = "pln" > cq_pop</span><span class = "pun" >(</span><span class = "pln" >me</span><span class = "pun" >-></span><span class = "pln" >new_conn_queue</span><span class = "pun" >);</span><span class = "pln" > </span><span class = "kwd" > if </span><span class = "pln" > </span><span class = "pun" >(</span><span class = "pln" >NULL </span><span class = "pun" >!=</span><span class = "pln" > item</span><span class = "pun" >)</span><span class = "pln" > </span><span class = "pun" >{</span><span class = "pln" > </span><span class = "com" > //使用conn创建新的任务</span><span class="pln"> conn </span><span class = "pun" >*</span><span class = "pln" >c </span><span class = "pun" >=</span><span class = "pln" > conn_new</span><span class = "pun" >(</span><span class = "pln" >item</span><span class = "pun" >-></span><span class = "pln" >sfd</span><span class = "pun" >,</span><span class = "pln" > item</span><span class = "pun" >-></span><span class = "pln" >init_state</span><span class = "pun" >,</span><span class = "pln" > item</span><span class = "pun" >-></span><span class = "pln" >event_flags</span><span class = "pun" >,</span><span class = "pln" > item</span><span class = "pun" >-></span><span class = "pln" >read_buffer_size</span><span class = "pun" >,</span><span class = "pln" > item</span><span class = "pun" >-></span><span class = "pln" >transport</span><span class = "pun" >,</span><span class = "pln" > me</span><span class = "pun" >-></span><span class = "kwd" > base </span><span class = "pun" >);</span><span class = "pln" > </span><span class = "kwd" > if </span><span class = "pln" > </span><span class = "pun" >(</span><span class = "pln" >c </span><span class = "pun" >==</span><span class = "pln" > NULL</span><span class = "pun" >)</span><span class = "pln" > </span><span class = "pun" >{</span><span class = "pln" > </span><span class = "kwd" > if </span><span class = "pln" > </span><span class = "pun" >(</span><span class = "pln" >IS_UDP</span><span class = "pun" >(</span><span class = "pln" >item</span><span class = "pun" >-></span><span class = "pln" >transport</span><span class = "pun" >))</span><span class = "pln" > </span><span class = "pun" >{</span><span class = "pln" > fprintf</span><span class = "pun" >(</span><span class = "pln" >stderr</span><span class = "pun" >,</span><span class = "pln" > </span><span class = "str" > "Can't listen for events on UDP socket\n" </span><span class = "pun" >);</span><span class = "pln" > </span><span class = "kwd" >exit</span><span class = "pun" >(</span><span class = "lit" >1</span><span class = "pun" >);</span><span class = "pln" > </span><span class = "pun" >}</span><span class = "pln" > </span><span class = "kwd" > else </span><span class = "pln" > </span><span class = "pun" >{</span><span class = "pln" > </span><span class = "kwd" > if </span><span class = "pln" > </span><span class = "pun" >(</span><span class = "pln" >settings</span><span class = "pun" >.</span><span class = "pln" >verbose </span><span class = "pun" >></span><span class = "pln" > </span><span class = "lit" >0</span><span class = "pun" >)</span><span class = "pln" > </span><span class = "pun" >{</span><span class = "pln" > fprintf</span><span class = "pun" >(</span><span class = "pln" >stderr</span><span class = "pun" >,</span><span class = "pln" > </span><span class = "str" > "Can't listen for events on fd %d\n" </span><span class = "pun" >,</span><span class = "pln" > item</span><span class = "pun" >-></span><span class = "pln" >sfd</span><span class = "pun" >);</span><span class = "pln" > </span><span class = "pun" >}</span><span class = "pln" > close</span><span class = "pun" >(</span><span class = "pln" >item</span><span class = "pun" >-></span><span class = "pln" >sfd</span><span class = "pun" >);</span><span class = "pln" > </span><span class = "pun" >}</span><span class = "pln" > </span><span class = "pun" >}</span><span class = "pln" > </span><span class = "kwd" > else </span><span class = "pln" > </span><span class = "pun" >{</span><span class = "pln" > c</span><span class = "pun" >-></span><span class = "pln" >thread </span><span class = "pun" >=</span><span class = "pln" > me</span><span class = "pun" >;</span><span class = "pln" > </span><span class = "pun" >}</span><span class = "pln" > cqi_free</span><span class = "pun" >(</span><span class = "pln" >item</span><span class = "pun" >);</span><span class = "pln" > </span><span class = "pun" >}</span><span class = "pln" > </span><span class = "pun" >}</span> |
使用setup_thread设置线程结构体的初始化信息之后,现在我们回到thread_init函数,thread_init中接着循环调用(循环调用nthreads次)create_worker(worker_libevent, &threads[i]); 创建真正运行的线程,create_worker是对pthread_create()简单的封装,参数worker_libevent作为每个线程的运行体,&threads[i]为传入参数。
worker_libevent为线程体,源码如下:
<span class = "com" > /* * Worker thread: main event loop */ </span><span class = "pln" > </span><span class = "kwd" > static </span><span class = "pln" > </span><span class = "kwd" > void </span><span class = "pln" > </span><span class = "pun" >*</span><span class = "pln" >worker_libevent</span><span class = "pun" >(</span><span class = "kwd" > void </span><span class = "pln" > </span><span class = "pun" >*</span><span class = "pln" >arg</span><span class = "pun" >)</span><span class = "pln" > </span><span class = "pun" >{</span><span class = "pln" > LIBEVENT_THREAD </span><span class = "pun" >*</span><span class = "pln" >me </span><span class = "pun" >=</span><span class = "pln" > arg</span><span class = "pun" >;</span><span class = "pln" > </span><span class = "com" > /* Any per-thread setup can happen here; thread_init() will block until * all threads have finished initializing. */ </span><span class = "pln" > pthread_mutex_lock</span><span class = "pun" >(&</span><span class = "pln" >init_lock</span><span class = "pun" >);</span><span class = "pln" > init_count</span><span class = "pun" >++;</span><span class = "pln" > </span><span class = "com" > //每创建新线程,将全局init_count加1</span><span class="pln"> pthread_cond_signal</span><span class = "pun" >(&</span><span class = "pln" >init_cond</span><span class = "pun" >);</span><span class = "pln" > </span><span class = "com" > // 发送init_cond信号</span><span class="pln"> pthread_mutex_unlock</span><span class = "pun" >(&</span><span class = "pln" >init_lock</span><span class = "pun" >);</span><span class = "pln" > </span><span class = "com" > //新创建线程阻塞于此,等待事件</span><span class="pln"> event_base_loop</span><span class = "pun" >(</span><span class = "pln" >me</span><span class = "pun" >-></span><span class = "kwd" > base </span><span class = "pun" >,</span><span class = "pln" > </span><span class = "lit" >0</span><span class = "pun" >);</span><span class = "pln" > </span><span class = "com" > //Libevent的事件主循环</span><span class="pln"> </span><span class = "kwd" > return </span><span class = "pln" > NULL</span><span class = "pun" >;</span><span class = "pln" > </span><span class = "pun" >}</span> |
worker_libevent中给init_count加1的目的在thread_init函数的这段代码可以看出来,
<span class = "com" > /* Wait for all the threads to set themselves up before returning. */ </span><span class = "pln" > pthread_mutex_lock</span><span class = "pun" >(&</span><span class = "pln" >init_lock</span><span class = "pun" >);</span><span class = "pln" > </span><span class = "kwd" > while </span><span class = "pln" > </span><span class = "pun" >(</span><span class = "pln" >init_count </span><span class = "pun" ><</span><span class = "pln" > nthreads</span><span class = "pun" >)</span><span class = "pln" > </span><span class = "pun" >{</span><span class = "pln" > pthread_cond_wait</span><span class = "pun" >(&</span><span class = "pln" >init_cond</span><span class = "pun" >,</span><span class = "pln" > </span><span class = "pun" >&</span><span class = "pln" >init_lock</span><span class = "pun" >);</span><span class = "pln" > </span><span class = "pun" >}</span><span class = "pln" > pthread_mutex_unlock</span><span class = "pun" >(&</span><span class = "pln" >init_lock</span><span class = "pun" >);</span> |
即主线程阻塞如此,等待worker_libevent发出的init_cond信号,唤醒后检查init_count < nthreads是否为假(即创建的线程数目是否达到要求),否则继续等待。 至此,线程池创建的代码已分析完毕,由于篇幅较长,将分析线程池中线程的调度流程另立一篇。