TrafficServer源码初体验–2

刚刚在开篇中介绍到eventProcessor.start方法,这时trafficserver的事件处理子系统已经被启动。接下来的代码分析过程种可以看到,这个事件处理子系统会被其他子系统使用。先把开篇种proxy/Main.cc的总控制流图引过来,便于继续分析。

eventProcessor之后,下面的代码

int use_separate_thread = 0;
int num_remap_threads = 1;
TS_ReadConfigInteger(use_separate_thread, "proxy.config.remap.use_remap_processor");
TS_ReadConfigInteger(num_remap_threads, "proxy.config.remap.num_remap_threads");
if (use_separate_thread && num_remap_threads < 1)
num_remap_threads = 1;

if (use_separate_thread) {
Note("using the new remap processor system with %d threads", num_remap_threads);
remapProcessor.setUseSeparateThread();
}
remapProcessor.start(num_remap_threads);

这段代码是根据配置文件,来选择是否为remap类型的事件单独启动线程作为事件处理。如果需要,则会调用eventProcessor中的 ET_REMAP = eventProcessor.spawn_event_threads(num_threads, “ET_REMAP”);

接下来的

RecProcessStart调用没怎么看明白,从命名上感觉像是一系列的同步相关的调度,包括stat统计同步功能、conf配置同步功能、以及远程同步功能,暂时不知做什么的,细节等回过头来在仔细研究吧。

然后 init_signals2 调用,根据我的理解应该是定期的将内存中的数据dump到stderr输出中,用于日志跟踪。

接下来的这段代码用于处理命令行风格的启动,如

traffic_server -C list 用于现实cache的配置情况。注意,这里的cmd_mode是处理traffic_server的命令模式的命令,而不是traffic_server的启动参数,换句话说,就是处理traffic_server -C (或者traffic_server –command) 这个模式的命令,包含(list,clear_cache,clear_hostdb,help)等。可以通过traffic_server -C help 看到命令模式支持的命令列表。

if (command_flag) {
// pmgmt initialization moved up, needed by RecProcessInit
//pmgmt->start();
int cmd_ret = cmd_mode();
if (cmd_ret != CMD_IN_PROGRESS) {
if (cmd_ret >= 0)
_exit(0);               // everything is OK
else
_exit(1);               // in error
}
}

接下来如果没有定义过INK_NO_ACL(双重否定,换句话说就是如果需要ACL控制) 则需要调用
initCacheControl() ,这个调用读取 proxy.config.cache.control.filename 这个路径对应的配置文件,载入关于cache方面的访问控制配置。

接下来

initCongestionControl();
IpAllow::InitInstance();
ParentConfig::startup();
#ifdef SPLIT_DNS
SplitDNSConfig::startup();
#endif

分别是关于拥塞控制配置、ip allow,以及dns配置的处理。其中ParentConfig::startup()的作用暂时还没搞得很明白。

接下来的 netProcessor.start();这行调用,真正开启了网络事件的处理,包括epoll_wait的loop,是通过eventProcessor实现的event loop.

接下来进行的代码段如下:

#ifndef INK_NO_HOSTDB
dnsProcessor.start();
if (hostDBProcessor.start() < 0)
SignalWarning(MGMT_SIGNAL_SYSTEM_ERROR, "bad hostdb or storage configuration, hostdb disabled");
#endif

这段代码启动dns事件处理以及hostDB事件处理,hostDB应该是负责cache的存储。

随后:

#ifndef INK_NO_CLUSTER
    clusterProcessor.init();
#endif

启动cluster相关事件处理。

接下来的代码是载入http服务相关配置。

 // Load HTTP port data. getNumSSLThreads depends on this.
    if (!HttpProxyPort::loadValue(http_accept_port_descriptor))
      HttpProxyPort::loadConfig();
    HttpProxyPort::loadDefaultIfEmpty();

接着启动cache,udpNet,sslNetProcessor对应的事件处理。

cacheProcessor.start();
udpNet.start(num_of_udp_threads); // XXX : broken for __WIN32
sslNetProcessor.start(getNumSSLThreads());

貌似udpNet那块并没有使用epoll等高效的处理异步非阻塞处理方式,具体原因还没有分析。

然后启动 日志处理, Log::init(remote_management_flag ? 0 : Log::NO_REMOTE_MANAGEMENT);

扩展处理,处理扩展逻辑.(extension,而不是plugin,后面还会有一次对plugin的初始化)

plugin_init(system_config_directory, true); // extensions.config

start_stats_snap();启动统计快照

body_factory = NEW(new HttpBodyFactory);

eventProcessor.schedule_every(NEW(new ShowStats), HRTIME_SECONDS(show_statistics), ET_CALL); 显示当前统计信息

接下来初始化插件、启动transformProcessor。transformProcessor应该是将用户访问的内容从源站取到ts上后进行一系列的转换,用于最终传回给请求调用者。

#ifndef TS_NO_API
    plugin_init(system_config_directory, false);        // plugin.config
#else
    api_init();                 // we still need to initialize some of the data structure other module needs.
    extern void init_inkapi_stat_system();
    init_inkapi_stat_system();
    // i.e. http_global_hooks
#endif
#ifndef TS_NO_TRANSFORM
    transformProcessor.start();
#endif

接下来准备启动http的proxy server,也就是我们看到的默认8080的端口将被打开,ts将接受用户请求,实现cdn服务。

    init_HttpProxyServer();
    int http_enabled = 1;
    TS_ReadConfigInteger(http_enabled, "proxy.config.http.enabled");

    if (http_enabled) {
#ifndef INK_NO_ICP
      int icp_enabled = 0;
      TS_ReadConfigInteger(icp_enabled, "proxy.config.icp.enabled");
#endif
      start_HttpProxyServer(num_accept_threads);
#ifndef INK_NO_ICP
      if (icp_enabled)
        icpProcessor.start();
#endif
    }

    // "Task" processor, possibly with its own set of task threads
    tasksProcessor.start(num_task_threads);

    int back_door_port = NO_FD;
    TS_ReadConfigInteger(back_door_port, "proxy.config.process_manager.mgmt_port");
    if (back_door_port != NO_FD)
      start_HttpProxyServerBackDoor(back_door_port, num_accept_threads > 0 ? 1 : 0); // One accept thread is enough

上面代码提到的icpProcessor是用于处理ICP协议( Internet Cache Protocol)的处理线程。taskProcessor是用于处理任务。

随后启动管理端口,可用于http web ui的管理

 int back_door_port = NO_FD;
    TS_ReadConfigInteger(back_door_port, "proxy.config.process_manager.mgmt_port");
    if (back_door_port != NO_FD)
      start_HttpProxyServerBackDoor(back_door_port, num_accept_threads > 0 ? 1 : 0); // One accept thread is enough

接下来启动socks代理

#ifndef INK_NO_SOCKS
    if (netProcessor.socks_conf_stuff->accept_enabled) {
      start_SocksProxy(netProcessor.socks_conf_stuff->accept_port);
    }
#endif

updateManager.start();
用于启动自动更新ts配置。

run_AutoStop();
用于自动停止ts,停止后会有processmanager重启,这个机制只在系统存在环境变量PROXY_AUTO_EXIT大于0的时候生效,并且进程会在PROXY_AUTO_EXIT秒后自动退出,然后由processmanager重启这个进程。有点类似apache里面的maxchildrequest那个玩意儿,到一定程度后,进程自动退出,随后由监控进程重启,这样可以避免由于程序疏忽造成的内存泄漏问题。

最后启动主线程。
this_thread()->execute();

到此为止,proxy/Main.cc的内容已经粗略的过了一遍,接下来准备对比较重要的部分深入探讨下。

TrafficServer源码初体验–(proxy/Main.cc 事件处理子系统)

 

最近在研究TrafficServer的源码,将一些心得体会分享出来。先上一张主程序(proxy/Main.cc)流程图,用Understand反向工程出来的,比较有助于代码的理解和分析。

由于TrafficServer在2011年才刚刚由Yahoo释放出源码,所以关于TrafficServer的源码分析的文档非常有限,淘宝网的技术专家们对此研究的比较深入,有一些基础的知识可以参照淘宝CDN团队的官方博客。http://rdc.taobao.com/blog/cs/?tag=traffic-server 对TrafficServer的基本使用,以及事件系统,网络子系统有个简单的介绍。

本博是一方面是希望能够在阅读代码过程中能做个笔记,另一方面也希望能够深入的了解TrafficServer的源码实现,从而发现TrafficServer现有代码中存在的问题以及可以优化的地方。(这是因为TrafficServer这个项目经历了近10年,无数工程师都参与过开发,因此整个代码结构组成并不是非常清晰,而且存在很多已经废弃的代码。)

话不多说,开始从上面这张图入手分析:

这张图可以看到很多分支,实际上上面很大的一坨东西是在分析TrafficServer的各种配置文件,并且将配置文件的内容载入到内存中,使用高效的数据结构进行存储,以便可以在取得某个配置值的时候,可以达到较高的效率。

一直到  eventProcessor.start(num_of_net_threads); 这行之前,大部分工作都是在处理配置文件相关的工作。

eventProcessor.start(num_of_net_threads); 内部实现如下:

int
EventProcessor::start(int n_event_threads)
{
char thr_name[MAX_THREAD_NAME_LENGTH];
int i;

// do some sanity checking.
static int started = 0;
ink_release_assert(!started);
ink_release_assert(n_event_threads > 0 && n_event_threads <= MAX_EVENT_THREADS);
started = 1;

n_ethreads = n_event_threads;
n_thread_groups = 1;

int first_thread = 1;

for (i = 0; i < n_event_threads; i++) {
EThread *t = NEW(new EThread(REGULAR, i));
if (first_thread && !i) {
ink_thread_setspecific(Thread::thread_data_key, t);
global_mutex = t->mutex;
t->cur_time = ink_get_based_hrtime_internal();
}
all_ethreads[i] = t;

eventthread[ET_CALL][i] = t;
t->set_event_type((EventType) ET_CALL);
}
n_threads_for_type[ET_CALL] = n_event_threads;
for (i = first_thread; i < n_ethreads; i++) {
snprintf(thr_name, MAX_THREAD_NAME_LENGTH, "[ET_NET %d]", i);
all_ethreads[i]->start(thr_name);
}

Debug("iocore_thread", "Created event thread group id %d with %d threads", ET_CALL, n_event_threads);
return 0;
}

这部分代码实现的功能是启动事件处理子系统,事件处理子系统说白了就是一组线程(EThread类型,分类别存储,将不同的类型事件的线程用eventthread存储)。这些线程分别被start起来,start种会调用this->execute方法,execute方法的实现如下:

void
EThread::execute() {
switch (tt) {

case REGULAR: {
Event *e;
Que(Event, link) NegativeQueue;
ink_hrtime next_time = 0;

// give priority to immediate events
for (;;) {
// execute all the available external events that have
// already been dequeued
cur_time = ink_get_based_hrtime_internal();
while ((e = EventQueueExternal.dequeue_local())) {
if (!e->timeout_at) { // IMMEDIATE
ink_assert(e->period == 0);
process_event(e, e->callback_event);
} else if (e->timeout_at > 0) // INTERVAL
EventQueue.enqueue(e, cur_time);
else { // NEGATIVE
Event *p = NULL;
Event *a = NegativeQueue.head;
while (a && a->timeout_at > e->timeout_at) {
p = a;
a = a->link.next;
}
if (!a)
NegativeQueue.enqueue(e);
else
NegativeQueue.insert(e, p);
}
}
bool done_one;
do {
done_one = false;
// execute all the eligible internal events
EventQueue.check_ready(cur_time, this);
while ((e = EventQueue.dequeue_ready(cur_time))) {
ink_assert(e);
ink_assert(e->timeout_at > 0);
if (e->cancelled)
free_event(e);
else {
done_one = true;
process_event(e, e->callback_event);
}
}
} while (done_one);
// execute any negative (poll) events
if (NegativeQueue.head) {
if (n_ethreads_to_be_signalled)
flush_signals(this);
// dequeue all the external events and put them in a local
// queue. If there are no external events available, don't
// do a cond_timedwait.
if (!INK_ATOMICLIST_EMPTY(EventQueueExternal.al))
EventQueueExternal.dequeue_timed(cur_time, next_time, false);
while ((e = EventQueueExternal.dequeue_local())) {
if (!e->timeout_at)
process_event(e, e->callback_event);
else {
if (e->cancelled)
free_event(e);
else {
// If its a negative event, it must be a result of
// a negative event, which has been turned into a
// timed-event (because of a missed lock), executed
// before the poll. So, it must
// be executed in this round (because you can't have
// more than one poll between two executions of a
// negative event)
if (e->timeout_at < 0) {
Event *p = NULL;
Event *a = NegativeQueue.head;
while (a && a->timeout_at > e->timeout_at) {
p = a;
a = a->link.next;
}
if (!a)
NegativeQueue.enqueue(e);
else
NegativeQueue.insert(e, p);
} else
EventQueue.enqueue(e, cur_time);
}
}
}
// execute poll events
while ((e = NegativeQueue.dequeue()))
process_event(e, EVENT_POLL);
if (!INK_ATOMICLIST_EMPTY(EventQueueExternal.al))
EventQueueExternal.dequeue_timed(cur_time, next_time, false);
} else {                // Means there are no negative events
next_time = EventQueue.earliest_timeout();
ink_hrtime sleep_time = next_time - cur_time;
if (sleep_time > THREAD_MAX_HEARTBEAT_MSECONDS * HRTIME_MSECOND) {
next_time = cur_time + THREAD_MAX_HEARTBEAT_MSECONDS * HRTIME_MSECOND;
sleep_time = THREAD_MAX_HEARTBEAT_MSECONDS * HRTIME_MSECOND;
}
// dequeue all the external events and put them in a local
// queue. If there are no external events available, do a
// cond_timedwait.
if (n_ethreads_to_be_signalled)
flush_signals(this);
EventQueueExternal.dequeue_timed(cur_time, next_time, true);
}
}
}

case DEDICATED: {
// coverity[lock]
if (eventsem)
ink_sem_wait(eventsem);
MUTEX_TAKE_LOCK_FOR(oneevent->mutex, this, oneevent->continuation);
oneevent->continuation->handleEvent(EVENT_IMMEDIATE, oneevent);
MUTEX_UNTAKE_LOCK(oneevent->mutex, this);
free_event(oneevent);
break;
}

default:
ink_assert(!"bad case value (execute)");
break;
}                             /* End switch */
// coverity[missing_unlock]
}

上述的execute代码是事件处理子系统中的核心 事件处理部分,因此我用Understand将此部分实现反向工程出流程图,看起来比较清晰:

上面这张图从 (;;) 发射出来的红色的no的分支箭头,应该是Understand对源码分析错误,我从源码中并没有看到会有这个走向。

实际上在REGULAR的case分支下,是一个for(;;)的永循环,专门负责循环处理常规事件。

代码中的 EventQueueExternal 是事件模型种提到的外部队列,EventQueue是内部队列。事件的处理过程是这样的:先从外部队列中取出一个事件e,查看这个事件是否需要立刻执行(通过判断e->timeout_at可以确定是否需要立刻执行),如果需要立刻执行,则调用process_event立刻执行事件(稍后会分析process_event的实现细节);如果取出的事件e,并不是一个需要立刻执行的事件,且不属于(epoll之类的网络事件),则将这个事件加入到内部队列EventQueue中;如果取出的事件e属于epoll这样的网络事件,则将其加入到NegativeQueue中,随后会有针对这种事件的处理。外部队列的事件处理完成后,接下来处理内部队列中的事件(这部分事件有刚刚在处理外部事件时加入到内部队列的事件)内部队列EventQueue的实现是用的优先级队列的方式,并且从代码上观察,应该是只要处理掉一个内部队列的事件就会再次尝试检测内部队列是否有需要处理的事件。直至一次检查过程中没有需要被处理的事件,才会完成对内部队列事件的检查。

bool done_one;
do {
done_one = false;
// execute all the eligible internal events
EventQueue.check_ready(cur_time, this);
while ((e = EventQueue.dequeue_ready(cur_time))) {
ink_assert(e);
ink_assert(e->timeout_at > 0);
if (e->cancelled)
free_event(e);
else {
done_one = true;
process_event(e, e->callback_event);
}
}
} while (done_one);

完成内部队列中的事件检查后,会检查刚刚提到的NegativeQueue队列中的事件(在处理NegativeQueue事件前,貌似源码种可以看到对于外部队列中的事件又做了一次检查,基本流程和上面的差不多,只不过加入了一些阻塞方法,例如EventQueueExternal.dequeue_timed(cur_time, next_time, false);EventQueueExternal的是ProtectedQueue类型,内部有一个 InkAtomicList al和  Que(Event, link) localQueue。而EventQueueExternal.dequeue_timed() 方法则是将al中的内容pop出来,并且放入到localQueue中。随后使用EventQueueExternal.dequeue_local将localQueue中的事件取出。 然后进入到对poll事件的处理(NegativeQueue),代码如下:

// execute poll events
while ((e = NegativeQueue.dequeue()))
process_event(e, EVENT_POLL);
if (!INK_ATOMICLIST_EMPTY(EventQueueExternal.al))
EventQueueExternal.dequeue_timed(cur_time, next_time, false);
} else {                // Means there are no negative events
next_time = EventQueue.earliest_timeout();
ink_hrtime sleep_time = next_time - cur_time;
if (sleep_time > THREAD_MAX_HEARTBEAT_MSECONDS * HRTIME_MSECOND) {
next_time = cur_time + THREAD_MAX_HEARTBEAT_MSECONDS * HRTIME_MSECOND;
sleep_time = THREAD_MAX_HEARTBEAT_MSECONDS * HRTIME_MSECOND;
}

上面提到的是在REGULAR的case分支下,是一个for(;;)的永循环,专门负责循环处理常规事件。

接下来是DEDICATED的case分支下,顾名思义,这是一个专用事件处理线程,事件处理完成后就结束线程。事件处理的方法是       oneevent->continuation->handleEvent(EVENT_IMMEDIATE, oneevent);

说到这里,引出了一个经典的设计模式Continuation,这个是一个古老的却又精湛的设计模式,可以实现”协程”从而达到较高的执行效率。

 

TrafficServer项目代码比较多,文章还是分开写吧,上面谈到的这坨文字,是从proxy/Main.cc作为入口,进行分析,并且完成了对事件处理子系统的分析,即eventProcessor.start这个调用。

scrapy结合webkit抓取js生成的页面问题(fake headless)

1 scedule

scrapy 作为抓取框架,包括了spider,pipeline基础设施

2 webkit

scrapy 本身不能作为js engine,这就导致很多js生成的页面的数据会无法抓取到,因此,一些通用做法是webkit或者xmi_runner(firefox)。通过这个手段可以对于js生成的数据进行抓取。需要安装的包有

python-webkit (相关依赖自行解决)

Xvfb (用于非Xwindow环境)

3 开发downloader middleware

from scrapy.http import Request, FormRequest, HtmlResponse

import gtk
import webkit
import jswebkit
import settings

class WebkitDownloader( object ):
    def process_request( self, request, spider ):
        if spider.name in settings.WEBKIT_DOWNLOADER:
            if( type(request) is not FormRequest ):
                webview = webkit.WebView()
                webview.connect( 'load-finished', lambda v,f: gtk.main_quit() )
                webview.load_uri( request.url )
                gtk.main()
                js = jswebkit.JSContext( webview.get_main_frame().get_global_context() )
                renderedBody = str( js.EvaluateScript( 'document.body.innerHTML' ) )
                return HtmlResponse( request.url, body=renderedBody )

4 配置

在scrapy的settings.py中加入:

#which spider should use WEBKIT
WEBKIT_DOWNLOADER=['ccb']

DOWNLOADER_MIDDLEWARES = {
    'rate_crawler.dowloader.WebkitDownloader': 543,
}   

import os
os.environ["DISPLAY"] = ":0"

5 使用

启动 Xvfb (假设DISPLAY=:0)

要与settings.py中的DISPLAY对应(本例中是:0)。

scrapy crawl xxx

即可

Twisted10.0.0 无法以Daemon方式运行 kqreactor的分析与解决

背景:

最近基于Twisted开发了一个网络应用,该应用运行于FreeBSD 8.1  amd64系统之上,考虑到Twisted 默认的ractor模式使用的是select模型,不如BSD系统中提倡的kqueue模型的效率高,因此考虑启用Twisted 10.0.0中的kqreactor模式。

问题:

1 Twisted 10.0.0为了兼容python2.5,kqreactor的实现是基于PyKQueue1.3开发,然而测试后发现PyKqueue1.3对于FreeBSD的kqueue进行的封装,并不能稳定的运行于Twisted上,经常会有程序阻塞无响应的情况发生。

2 Twisted 的Daemon方式是采用python的os模块中的fork实现,而fork对于kqueue来讲是会有问题的,这是因为kqueue建立的queue是无法被fork之后的进程所共享的。

目前的方案:

1 由于我的应用是运行于python2.6之上,因此考虑使用python中自带的select.kqueue来实现twisted中的kqreactor,这个已经有人进行了实现,替换掉twisted中位于internet目录中的对应文件即可(请注意备份)。 kqreactor

2 对于Twisted的Daemon问题,我首先想到的最简单的方式是使用rfork来替换os.fork。但我没有找到python中对于rfork的支持。因此用c自己包装了一个。源码如下:

rforktar

3 将2中的模块编译安装后,需要对twisted的源文件做一个patch,如下:

_twistd

至此,就可以以Twisted的Daemon方式运行kqreactor了。

vim笔记

跳转到函数、变量定义处:
[+ctrl+i 跳转到函数、变量和#define
[+ctrl+d 跳转到#define处
ctrl+i 向前跳到前几次光标位置
ctrl+o 向后跳到后几次光标位置

函数体跳转:
[[
]]
[]
][
{}

2009-07-07 17:31 补充

如何选中括号中的内容 进行:
将光标移至括号内,按shift+v进入行选模式,然后

i{ - 选小括号中内容,不包括小括号
a{ - 选小括号中内容,包括小括号
ib - 选中括号中内容,不包括括号
ab - 选中括号中内容,包括括号
i{ - 选大括号中内容,不包括大括号
a{ - 选大括号中内容,包括大括号

:h text-objects

下面还有些相关的tips,未全部验证:
vi{ 选中{}中间的内容,不包括{}
va{ 选中{}中间内容,包括{}
vi( 选中()中间内容
vi< 选中<>中间内容
vi[ 选中[]中间内容
vit 选中中间的内容
vi” 选中””中间内容
vi’ 选中”中间的内容
vis 选中一个句子
vib 选中一个block
viw选中一个单词
vip 选中一个段落

笔记

Hello,

I think you can try putting the following entries in front of the “\sources\setup.exe /wds /wdsdiscover /WdsServer:wds-server.ddpo.local” line:

Ping <IP address>

Pause

Ping <IP address>

Pause

You need to interactive with the commands in Windows PE in this scenario. If you want the process to be achieved more automatically, you can try the command “ping <IP address> -n [count]” in front of the setup.exe line. You should adjust the [count] to a large number so that there is enough time for the network to be initialized. In this way, the winpeshl.ini file should look like:

Ping <IP address> -n 100

\sources\setup.exe /wds /wdsdiscover /WdsServer:wds-server.ddpo.local

I hope this helps.

Best regards,

Something about Soaplib 0.8.1 problem

in soaplib/wsgi_soap.py

there are somethings like

if payload:

and this will cause a futurewarning

FutureWarning: The behavior of this method will change in future versions. Use specific ‘len(elem)’ or ‘elem is not None’ test instead.

and the invoke from java client with axis 1.4 will be failed,so we need to change it as it describes:

change

if payload:

to

if payload is not None:

then it goes well.