deepinss

Deep in shadowsocks.

View on GitHub
23 December 2017

eventloop ♺

by

Introduce to eventloop

在请求 google.com 的时候总是不断的建立 TCP 连接, TCP 连接会创建 socket 进行通讯, 和浏览器的请求通讯需要和 socket 通讯, 接受浏览器的请求需要创建 socket, 和 SS server 通讯需要创建 socket, 所以监听 socket 的事件将是通讯的基础

TL;DR

导入模块

from __future__ import absolute_import, division, print_function, \
    with_statement

import os
import time
import socket
import select
import traceback
import errno
import logging
from collections import defaultdict

from shadowsocks import shell

__all__ 导出

__all__ = ['EventLoop', 'POLL_NULL', 'POLL_IN', 'POLL_OUT', 'POLL_ERR',
           'POLL_HUP', 'POLL_NVAL', 'EVENT_NAMES']

__all__ 声明该模块暴露出去的接口

模块常量

POLL_NULL = 0x00
POLL_IN = 0x01
POLL_OUT = 0x04
POLL_ERR = 0x08
POLL_HUP = 0x10
POLL_NVAL = 0x20

POLLERR 意味着 socket 生成了一个异步错误. 在 TCP 连接中, 这特别指 RST 消息被接收到或者是被发送, 如果这个文件描述符不是一个 socket, POLLERR 可能意味着该设备不支持轮询.

POLLHUP 意味着这个 socket 不再可以连接. 在 TCP 连接中, 这意味着 FIN 消息被接收或者是被发送

POLLNVAL 意味着 socket 文件描述符 没有打开, 这将是一个错误在尝试关闭这个 socket 的时候

引至 https://stackoverflow.com/questions/24791625/how-to-handle-the-linux-socket-revents-pollerr-pollhup-and-pollnval

EVENT_NAMES = {
    POLL_NULL: 'POLL_NULL',
    POLL_IN: 'POLL_IN',
    POLL_OUT: 'POLL_OUT',
    POLL_ERR: 'POLL_ERR',
    POLL_HUP: 'POLL_HUP',
    POLL_NVAL: 'POLL_NVAL',
}

辅助变量, 将标志位转换成字符串

TIMEOUT_PRECISION = 10000

超时的时间精度, 我们将在 TIMEOUT_PRECISION 秒之后检查是否有请求超时, 如果有超时则销毁该请求, 释放关于该请求的系统资源, 否则有可能造成内存溢出. 在调试的时候将该值调大, 可以将请求的生命周期增长, 方便调试.

事件轮询机制

class KqueueLoop(object):
    ...
# class SelectLoop(object):
#     ...

这里有两种 loop 的实现, 在我的PC上面只会用到 KqueueLoop, 先行只解释 KqueueLoop 的实现.

NAME kqueue, kevent – kernel event notification mechanism

LIBRARY Standard C Library (libc, -lc)

引至: freebsd-kqueue

Kqueue 是一种内核事件通知机制, 是操作系统提供的, 这里用来监听 socket 的[可读/可写]事件.

常量

MAX_EVENTS = 1024

类常量, 定义一次轮询取出来的事件数目的上限.

构造函数

def __init__(self):
        self._kqueue = select.kqueue()
        self._fds = {} # file descriptor 和 其对应的监听的 mode (POLL_IN or POLL_OUT)

KqueueLoop 的构造器, 创建一个 kqueue 内核事件监听队列和一个用来 key 为 file descriptor, value 为 其注册时的监听模式的对象, 下面是一个例子:

{
    4: 1 # 00000001
}

这个例子表明, KqueueLoop 里面监听了一个 文件描述符为 4, 监听模式为 POLL_IN(00000001) 的事件

Why? 为什么需要这个 _fds, 这个是用来在移除 注册该文件描述符事件 的时候使用的, 因为在移除的时候需要传递相同的监听模式, 这样在移除的时候只需要传递文件描述符给 KqueueEvent 实例, 而不需要传递监听模式.

_control 私有方法

def _control(self, fd, mode, flags):
        events = []
        if mode & POLL_IN: 
            events.append(
                select.kevent(fd, select.KQ_FILTER_READ, flags)
            )
        if mode & POLL_OUT:
            events.append(select.kevent(fd, select.KQ_FILTER_WRITE, flags))
        for e in events:
            self._kqueue.control([e], 0)

私有方法, 不为外部调用!

_control 函数作用是向 _kqueue 里面添加新的 kevent 事件, 下划线开头.

’’’ :param fd: 要监听的 文件描述符 :param mode: 监听模式 POLL\_IN 对应 select.KQ\_FILTER\_READ POLL_OUT 对应 select.KQ_FILTER_WRITE :param flags: 传递给 kevent 的 flags, 这个参数会表明是删除事件还是添加事件, 如果删除事件那么mode要和添加时候的一致, 会用到 _fds 变量 ‘’’

这里要强调说明一点的是 self._kqueue.control 方法, 该方法不是系统调用提供的, 而是 Python 提供的, 深入解释篇幅会用的很多, 为了不打断流程, 将放在一个单独的文章内解释, 前往, 这里传递的参数 0 标识是非阻塞调用.

poll 方法

def poll(self, timeout):
    if timeout < 0:
        timeout = None  # kqueue behaviour
    events = self._kqueue.control(
        None, KqueueLoop.MAX_EVENTS, timeout
    )  # 在 timeout 时间内是否有事件发生, 如果没有, 在timeout之后会返回一个空数组, 我们就可以进行自己内部程序的处理, 如果有事件发生, 会将事件返回, 我们在接下来处理发生的一系列事件 <https://docs.python.org/2/library/select.html#select.kqueue.control>
    results = defaultdict(lambda: POLL_NULL)
    for e in events:
        fd = e.ident
        if e.filter == select.KQ_FILTER_READ:
            results[fd] |= POLL_IN
        elif e.filter == select.KQ_FILTER_WRITE:
            logging.info('发生 POLL_OUT 事件')
            results[fd] |= POLL_OUT
    return results.items()

该方法是轮询, 获取事件, POLL_NULL 是在这里用到的, 用来生成该文件描述符的数据流方向. 生成的事件是 kevent 对象数组, 在 C 里是 kevent 结构体. Shadowsocks 用的是自己定义的 POLL_IN, POLL_OUT 来标识数据流的方向, 而不是 select 模块的 filter 里定义的 KQ_FILTER_READKQ_FILTER_WRITE. 这也不是 C 原生的变量名, C 原生的变量名分别为 EVFILT_READ , EVFILT_WRITE. 另见: L2540

在这里会格式化发生事件的对象为 fd -> mode(文件描述符 -> 事件模式或者称之为数据流方向), return 回一个数组 [(fd, mode), (anotherFd, mode)]

register 方法

def register(self, fd, mode):
    self._fds[fd] = mode
    self._control(fd, mode, select.KQ_EV_ADD)

用来注册 socket 的方法, 所有进入 Kqueue 事件队列的都要从这里通过, _fds 变量存储了该文件描述符的模式(数据流方向). 传递 select.KQ_EV_ADD flag 来标识是添加事件

unregister 方法

def unregister(self, fd):
    self._control(fd, self._fds[fd], select.KQ_EV_DELETE)
    del self._fds[fd]

用来取消注册 socket 的方法, 所有想要删除事件的 socket 都要经过这里, 从 _fds 拿出来添加事件当时的模式(数据流方向), 传递 select.KQ_EV_DELETE flag 来标识是删除事件. 从 Kqueue 事件队列中移除事件之后, 删除 _fds 中关于该文件描述符的模式数据

modify 方法

def modify(self, fd, mode):
    self.unregister(fd) # 删除注册 file descriptor => mode
    self.register(fd, mode) # 重新注册 file descriptor => mode

用来修改文件描述符的监听模式, 首先需要删除已经注册的事件, 然后重新添加.

close 方法

def close(self):
    self._kqueue.close()

关闭该内核事件队列.

事件轮询器 EventLoop

class EventLoop(object):
    ...

该类责任:

  1. 负责创建事件轮询机制, run 起来事件轮询, 存储着每个 socket 的处理器(TCPReply 和 UDPReply 或者是其他类)
  2. 根据 (f, mode, handler) 创建相应的监听, 根据 fd 移除监听
  3. 获取发生的事件, 然后处理每一个事件, 将其分发到对应的处理类中
  4. 负责调用被注册的周期性的函数

构造函数

def __init__(self):
    if hasattr(select, 'epoll'):
        self._impl = select.epoll()
        model = 'epoll'
    elif hasattr(select, 'kqueue'):
        self._impl = KqueueLoop()
        model = 'kqueue'
    elif hasattr(select, 'select'):
        self._impl = SelectLoop()
        model = 'select'
    else:
        raise Exception('can not find any available functions in select '
                        'package')
    self._fdmap = {}
    self._last_time = time.time()
    self._periodic_callbacks = []
    self._stopping = False

__init__ 构建函数, 判断 select 模块支持的事件机制, 这里只讨论 kqueue 事件队列机制, 执行 self._impl = KqueueLoop().

_impl 全称为 implement

_fdmap 是一个对象, 例子如下:

{
    fd: (f, handler) # 文件描述符: (socket文件, socket文件的处理器)
}

在其他的方法中会向 _fdmap 中添加数据.

_last_time 为时间戳

_periodic_callbacks 为周期性的回调函数

这是一个数组, 存储着在一次事件从获取到处理完成的周期之后的 处理函数, 比如超时 socket 的处理, 系统资源的释放等

_stopping 是否是停止状态的标志变量

poll 方法

def poll(self, timeout=None):
    events = self._impl.poll(timeout)
    return [(self._fdmap[fd][0], fd, event) for fd, event in events]

传入超时时间, 调用事件轮询机制获取发生的事件, 然后根据发生的事件数组来构造新的数组, 根据文件描述符取出来 socket, 这里返回的数组举例:

[
    (f, fd, event), # (socket 文件. 该 socket 文件的文件描述符, 发生的事件模式(POLL_IN, POLL_OUT)亦称数据流方向)
    (f, fd, event)
]

add 方法

def add(self, f, mode, handler): 
    fd = f.fileno()
    self._fdmap[fd] = (f, handler)
    self._impl.register(fd, mode)

参数:

  1. f 要监听的 socket 文件
  2. 要监听的模式 (POLL_IN, POLL_OUT) 数据流方向
  3. socket 的处理器

存储下来 fd -> (f, handler), 调用 _impl 注册该文件描述符.

感觉这里就是一段很棒👍的代码, 代码也许并不多高深, 但是将数据存储到了其该存储的地方, sockethandler 这两个变量是 事件轮询机制(kevent) 并不关心的, 它需要一个文件描述符和一个 mode(数据流方向) 就可以, 至于是添加还是移除则是由不同的方法确定的. 该类需要的数据是 socket 和其的处理器 handler, 所以这两个变量存放在了 EventLoop 类中.

remove

def remove(self, f):
    fd = f.fileno() # 获取该 socket 的 file dscriptor
    del self._fdmap[fd]  # 删除关于该文件描述符 在 _fdmap 中的(f, handler)引用
    self._impl.unregister(fd) # 解除在 监听器 kqueue(或者是其他) 的注册

根据 socket 来移除事件监听

add_periodic

def add_periodic(self, callback): # 添加周期性函数
    self._periodic_callbacks.append(callback)

添加周期性的函数 callback

remove_periodic

def remove_periodic(self, callback):
    self._periodic_callbacks.remove(callback)

移除周期性的函数 callback

modify

def modify(self, f, mode):
    fd = f.fileno()
    self._impl.modify(fd, mode)

根据 socket 和传递过来的 mode 来修改该 socket 的监听模式

stop

def stop(self):
    self._stopping = True

_stopping 标志位置为 True, 用来 grace(优雅) 停止.

run

def run(self):
    events = []
    while not self._stopping:
        asap = False
        try:
            events = self.poll(TIMEOUT_PRECISION) # 取出来所有发生事件的 socket
        except (OSError, IOError) as e:
            if errno_from_exception(e) in (errno.EPIPE, errno.EINTR):
                # EPIPE: Happens when the client closes the connection
                # EINTR: Happens when received a signal
                # handles them as soon as possible
                asap = True
                logging.debug('poll:%s', e)
            else:
                logging.error('poll:%s', e)
                traceback.print_exc()
                continue

        for sock, fd, event in events: # 事件 socket 事件文件描述符 事件模式(POLL_IN POLL_OUT)
            handler = self._fdmap.get(fd, None) # 根据 file descriptor 获取 处理器 shadowsocks.tcpreply.TCPReply
            if handler is not None:
                handler = handler[1]
                try:
                    handler.handle_event(sock, fd, event)
                except (OSError, IOError) as e:
                    shell.print_exception(e)
        now = time.time()
        if asap or now - self._last_time >= TIMEOUT_PRECISION:
            for callback in self._periodic_callbacks:
                callback()
            self._last_time = now

终于迎来了 run 函数, 注入能量🎆, Power!

events 是发生的事件的数组, while 循环, 只要 _stopping 标志位为 False, 就继续循环.

asap 是 as soon as possible 的缩写, 标志着是否应该尽快处理有问题的 socket, 因为获取事件的时候有可能出错.

调用 eventloop.poll 并且传入时间精度来控制获取事件的最大超时时间, 一旦有数据流入或者是流出或者是其他的事件发生, 在这里就能拿到发生事件的数据 (sock, fd, event).

接下来处理每一个事件, 从 _fdmap 里面拿出来关于该文件描述符的数据(f, handler), 然后取出 handler, 然后将数据传递给处理器处理.

获取当前的时间戳, 如果获取事件的时候出错或者是现在距离上次调用周期性的函数超过了时间精度, 则调用周期性的函数然后更新调用的时间戳.

close

def __del__(self):
    self._impl.close()

销毁函数, 在销毁该对象的时候调用该方法.

来点图形吧

都是在介绍代码🤔, 好像违背了我刚开始说的图形化😣, 所以现在来画一张图形吧, 来看看 run 之后的获取事件的这一个流程, 关于 TCPReply 的会在接下来的文章中继续深入(deep)

           |-not _stopping ----------|
run -------|                         |
       / \ |-_stopping 直接返回       |
        |                            |-> events = EventLoop.poll(TIMEOUT_PRECISION)-----|
        |                                      |  / \                                   |
        |开始下一轮循环                          |   |                                    | 
        |                                      |   | [(socket, fd, event)]              |
        |                                     \ /  |                                    |
        |                    |------->results = KqueueLoop.poll(timeout)                | events
        |                    |   然后循环results, 将|filter 标志位 对应为内部 POLL_IN 等变量. |
  →-----↑                    |   接下来是数据的第一步|处理, 从 `_fdmap` 拿出来 socket,        |
  ↑                          |            返回给 Ev|entLoop.poll调用                      |
  |                          |-----------------|  |                                     |
  |                             [(fd, e)]      |  |                                     |
  |                                            | \ /                                    |
  |                         _kqueue.control 拿出来内核事件队列里发生的事件, 然后返回           |
  |                                                                                     |
  |                               \ | /------------------------------------------------<|
  |                                \|/                events
  |                                 |
  |                                 |
  |                                \ /
  | 在这里处理每一个发生的事件, 拿出来每一个事件对应的处理器 <-----> 走向处理器处理流程, 处理完成, 流程返回, ¬
  | 后面介绍这一段流程
  |                                 |
  |                                 |
  |                                \ /
  | 事件处理完毕, 如果事件出现错误, 或者是距离上次调用周期性函数超过时间精度, 就调用所有的周期函数
  |                                 |
  |                                 |
  |         走向下一个循环            ↓
  ↑←--------------------------------←

接下来深究 TCPReply

tags: