eventloop.py
by
eventloop.py 事件轮询器
** 2018.2.5 完结 **
该模块负责监听所有的 socket, 获取这些 socket 发生的事件, 然后将其分发给注册的监听器
import
from __future__ import absolute_import, division, print_function, \
with_statement
import os
import time
import socket
import select
import traceback
import errno
import logging
from collections import defaultdict
from shadowsocks import shell
导出
__all__ = ['EventLoop', 'POLL_NULL', 'POLL_IN', 'POLL_OUT', 'POLL_ERR',
'POLL_HUP', 'POLL_NVAL', 'EVENT_NAMES']
__all__ 声明从该模块导出的变量, 该变量在 from xxx import *
生效. 详细见这里.
模块常量
POLL_NULL = 0x00
POLL_IN = 0x01
POLL_OUT = 0x04
POLL_ERR = 0x08
POLL_HUP = 0x10
POLL_NVAL = 0x20
EVENT_NAMES = {
POLL_NULL: 'POLL_NULL',
POLL_IN: 'POLL_IN',
POLL_OUT: 'POLL_OUT',
POLL_ERR: 'POLL_ERR',
POLL_HUP: 'POLL_HUP',
POLL_NVAL: 'POLL_NVAL',
}
TIMEOUT_PRECISION = 10
POLL_*
变量标识发生的一个事件的数据状态
POLL_NULL
00000000
用来初始化位运算POLL_IN
00000001
数据可以流入(可读)POLL_OUT
00000100
数据可以流出(可写)POLL_ERR
00001000
事件发生了错误A POLLERR means the socket got an asynchronous error. In TCP, this typically means a RST has been received or sent. If the file descriptor is not a socket, POLLERR might mean the device does not support polling.
POLLERR 意味着该 socket 获得了一个异步错误. 在 TCP 里, 这尤其意味着一个 RST 包被收到或者是被发送了. 如果这个文件描述符不是 socket, POLLERR 可能意味着该设备不支持轮询
POLL_HUP
00001010
事件发生了错误A POLLHUP means the socket is no longer connected. In TCP, this means FIN has been received and sent.
POLLHUP 意味着该 socket 不再连接了. 在 TCP 里, 这意味着 FIN 包被收到或者是被发送了
POLL_NVAL
00010000
事件发生了错误A POLLNVAL means the socket file descriptor is not open. It would be an error to close() it.
POLLNVAL 意味着该 socket 文件描述符没有打开. 在调用 close() 的时候会抛出这个错误.
-
EVENT_NAMES
变量方便从二进制的数值获得关于该二进制数值的文字描述(其实就是变量的字符串形式, 方便打印日志) TIMEOUT_PRECISION
超时精度, 如果一个请求超过了该时间没有活跃, 会清除关于该请求的资源, 在调试的时候将该值调大会方便单步调试.
class KqueueLoop
简介
Kqueue 形式的轮询, 获取发生的事件. 支持的平台有 NetBSD, OpenBSD, DragonflyBSD, and OS X(数据来源自维基百科)
类常量
MAX_EVENTS = 1024
一次循环中取出来的最多的事件数量
类构造器 __init__
def __init__(self):
self._kqueue = select.kqueue()
self._fds = {}
select.kqueue()
创建一个 kqueue 事件获取方式
_fds
存储着 file descriptor(文件描述符) -> 对应的监听的 mode(POLL_IN 或 POLL_OUT)
_control
简介
监听或者是移除监听一个文件描述符的状态, 该函数为私有函数
def _control(self, fd, mode, flags):
events = []
if mode & POLL_IN:
events.append(select.kevent(fd, select.KQ_FILTER_READ, flags))
if mode & POLL_OUT:
events.append(select.kevent(fd, select.KQ_FILTER_WRITE, flags))
for e in events:
self._kqueue.control([e], 0)
接收参数
- self 实例自身
- fd 文件描述符
- mode 模式, POLL_IN 或 POLL_OUT
- flags KQ_EV_ADD 是添加监听, KQ_EV_DELETE 是移除监听(移除监听传入的 mode 需要和添加监听时候的一致)
交互式程序流
def _control(self, fd, mode, flags):
events = []
if mode & POLL_IN:
events.append(select.kevent(fd, select.KQ_FILTER_READ, flags))
if mode & POLL_OUT:
events.append(select.kevent(fd, select.KQ_FILTER_WRITE, flags))
for e in events:
self._kqueue.control([e], 0)
跳转函数列表 |
---|
poll
简介
取出来发生的事件, 一次取出来的最多的事件数量为 KqueueLoop.MAX_EVENTS
, 超时时间为 timeout
, 如果超过了超时时间依旧没有事件发生, 程序继续往下走
def poll(self, timeout):
if timeout < 0:
timeout = None # kqueue behaviour
events = self._kqueue.control(
None, KqueueLoop.MAX_EVENTS, timeout
)
results = defaultdict(lambda: POLL_NULL)
for e in events:
fd = e.ident
if e.filter == select.KQ_FILTER_READ:
results[fd] |= POLL_IN
elif e.filter == select.KQ_FILTER_WRITE:
results[fd] |= POLL_OUT
return results.items()
接收参数
- self 实例本身
- timeout 超时时间
注意点
defaultdict
类 在给一个 key 赋值的时候, 该 key 不存在, 就会调用 lambda: POLL_NULL
, 这就是 POLL_NULL 常量的用处.
交互式程序流
def poll(self, timeout):
if timeout < 0:
timeout = None # kqueue behaviour
events = self._kqueue.control(
None, KqueueLoop.MAX_EVENTS, timeout
)
results = defaultdict(lambda: POLL_NULL)
for e in events:
fd = e.ident
if e.filter == select.KQ_FILTER_READ:
results[fd] |= POLL_IN
elif e.filter == select.KQ_FILTER_WRITE:
results[fd] |= POLL_OUT
return results.items()
跳转函数列表 |
---|
register
简介
注册一个 fd(文件描述符) 进入监听序列, 监听模式为 mode
def register(self, fd, mode):
self._fds[fd] = mode # 文件描述符和其监听模式的对应
self._control(fd, mode, select.KQ_EV_ADD) # 添加注册 传入 Kqueue 的添加常量
接收参数
- self 实例本身
- fd 文件描述符
- mode 监听模式
注意点
self._fds[fd] = mode
将监听的文件描述符和其监听的模式存储在 _fd
dict 中, 方便在移除的时候取出来该文件描述符在监听时候的 mode
交互式程序流
def register(self, fd, mode):
self._fds[fd] = mode # 文件描述符和其监听模式的对应
self._control(fd, mode, select.KQ_EV_ADD) # 添加注册 传入 Kqueue 的添加常量
跳转函数列表 |
---|
unregister
简介
移除注册在监听序列中的 fd(文件描述符), 其监听模式已经存储在 self._fds
中
def unregister(self, fd):
self._control(fd, self._fds[fd], select.KQ_EV_DELETE) # KQ_EN_DELETE 删除注册 self._fds 拿出来注册该文件描述符的时候的模式
del self._fds[fd] # 移除 KqueueLopp 关于该文件描述符的数据(mode)
接收参数
- self 实例本身
- fd 要移除的文件描述符
注意点
移除注册时候不需要传递 mode, 因为在监听的时候已经保存在 self._fds
中
交互式程序流
def unregister(self, fd):
self._control(fd, self._fds[fd], select.KQ_EV_DELETE) # KQ_EN_DELETE 删除注册 self._fds 拿出来注册该文件描述符的时候的模式
del self._fds[fd] # 移除 KqueueLopp 关于该文件描述符的数据(mode)
跳转函数列表 |
---|
modify
简介
修改一个文件描述符的监听模式, 从 POLL_IN 到 POLL_OUT 或者是反过来
def modify(self, fd, mode):
self.unregister(fd) # 删除注册 file descriptor => mode
self.register(fd, mode) # 重新注册 file descriptor => mode
接收参数
- self 实例本身
- fd 文件描述符
- mode 修改成的监听模式
交互式程序流
def modify(self, fd, mode):
self.unregister(fd) # 删除注册 file descriptor => mode
self.register(fd, mode) # 重新注册 file descriptor => mode
跳转函数列表 |
---|
close
简介
关闭该 kqueue 事件监听器
def close(self):
self._kqueue.close()
接收参数
无
class EventLoop
事件轮询器, 根据不同的平台创建不同的时间获取方式 epoll, kqueue(https://en.wikipedia.org/wiki/Kqueue), select(https://en.wikipedia.org/wiki/Select_(Unix)). 处理周期性的函数, 比如: 过期请求的销毁.将发生事件的文件描述符分发给相应的处理器.
__init__ 构造函数
简介
处理不同的平台获取事件的不同方式, 初始化一些变量
def __init__(self):
if hasattr(select, 'epoll'):
self._impl = select.epoll()
model = 'epoll'
elif hasattr(select, 'kqueue'):
self._impl = KqueueLoop()
model = 'kqueue'
elif hasattr(select, 'select'):
self._impl = SelectLoop()
model = 'select'
else:
raise Exception('can not find any available functions in select '
'package')
self._fdmap = {} # (f, handler)
self._last_time = time.time()
self._periodic_callbacks = []
self._stopping = False
logging.debug('using event model: %s', model)
接收参数
- self 实例本身
注意点
- _fdmap 数据格式为
{ fd: (f, handler) # 文件描述: (文件, 该文件发生事件时的处理器) }
交互式程序流
def __init__(self):
if hasattr(select, 'epoll'):
self._impl = select.epoll()
model = 'epoll'
elif hasattr(select, 'kqueue'):
self._impl = KqueueLoop()
model = 'kqueue'
elif hasattr(select, 'select'):
self._impl = SelectLoop()
model = 'select'
else:
raise Exception('can not find any available functions in select '
'package')
self._fdmap = {} # (f, handler)
self._last_time = time.time()
self._periodic_callbacks = []
self._stopping = False
logging.debug('using event model: %s', model)
跳转函数列表 |
---|
poll
简介
获取事件
def poll(self, timeout=None):
events = self._impl.poll(timeout)
return [(self._fdmap[fd][0], fd, event) for fd, event in events]
接收参数
- self 实例本身
- timeout 超时时间
解读
调用 selv._impl.poll
获取事件, events 为获取出来的事件列表, 格式为
[(
fd, # 文件按描述
mode # 发生的事件类型
), (
fd, # 文件描述符
mode # 发生的事件类型
)]
数组推导式来获取关于该文件描述符的文件, 最后返回的数据格式为:
[(
f, # 文件
fd, # 文件描述符
mode # 发生的事件类型
), (
f, # 文件
fd, # 文件描述符
mode # 发生的事件类型
)]
add
将文件加入监听序列
def add(self, f, mode, handler):
fd = f.fileno()
self._fdmap[fd] = (f, handler)
self._impl.register(fd, mode)
接收参数
- self 实例本身
- f 文件, 可为 socket 文件
- mode 监听的事件类型
- handler 当 f 发生了 mode 事件时的处理器
交互式程序流
def add(self, f, mode, handler):
fd = f.fileno()
self._fdmap[fd] = (f, handler)
self._impl.register(fd, mode)
跳转函数列表 |
---|
remove
移除 f 的监听
def remove(self, f):
fd = f.fileno()
del self._fdmap[fd]
self._impl.unregister(fd)
接收参数
- self 实例本身
- f 文件, 可为 socket 文件
程序执行流
def remove(self, f):
fd = f.fileno()
del self._fdmap[fd]
self._impl.unregister(fd)
跳转函数列表 |
---|
add_periodic
添加周期性函数
def add_periodic(self, callback):
self._periodic_callbacks.append(callback)
接收参数
- self 实例本身
- callback 回调函数
remove_periodic
移除周期性函数
def remove_periodic(self, callback):
self._periodic_callbacks.remove(callback)
接收参数
- self 实例本身
- callback 回调函数
modify
修改一个 f 的监听模式
def modify(self, f, mode):
fd = f.fileno()
self._impl.modify(fd, mode)
接收参数
- self 实例本身
- mode 修改为的监听模式
交互式程序流
def modify(self, f, mode):
fd = f.fileno()
self._impl.modify(fd, mode)
跳转函数列表 |
---|
stop
标识 self._stopping
为 True, 停止轮询器, 不再获取事件.
def stop(self):
self._stopping = True
run
简介
启动事件轮询器, 获取事件并将其分发给相应的处理器, 调用周期性的函数
def run(self):
events = []
while not self._stopping:
asap = False
try:
events = self.poll(TIMEOUT_PRECISION)
except (OSError, IOError) as e:
if errno_from_exception(e) in (errno.EPIPE, errno.EINTR):
# EPIPE: Happens when the client closes the connection
# EINTR: Happens when received a signal
# handles them as soon as possible
asap = True
logging.debug('poll:%s', e)
else:
logging.error('poll:%s', e)
traceback.print_exc()
continue
for sock, fd, event in events:
handler = self._fdmap.get(fd, None)
if handler is not None:
handler = handler[1]
try:
handler.handle_event(sock, fd, event)
except (OSError, IOError) as e:
shell.print_exception(e)
now = time.time()
if asap or now - self._last_time >= TIMEOUT_PRECISION:
for callback in self._periodic_callbacks:
callback()
self._last_time = now
接收参数
- self 实例本身
注意点
- errno.EPIPE https://groups.google.com/forum/#!topic/nodejs/8VBd36dPbnU 有这么一句话
EPIPE means that writing of (presumably) the HTTP request failed because the other end closed the connection.
EPIPE 意味着(大概)向 http 请求写入数据的时候, 对方已经关闭了连接
- errno.EINTR http://man7.org/linux/man-pages/man7/signal.7.html 官方文档, 大概意思就是说如果有一个阻塞操作, 但是此时收到了系统信号(比如 Ctrl + C, 或者是其他信号), 此时如果注册了该信号的处理器, 应该立即去执行该处理器, 所以此阻塞操作会抛出一个异常, errno 为 error.EINTR, 在 local.py 里面有这么一句代码
signal.signal(getattr(signal, 'SIGQUIT', signal.SIGTERM), handler)
(收到该系统信号之后, 优雅的退出程序, 也就是处理完已创建的连接才退出),所以需要在这里处理一下这个异常.
交互式程序流
def run(self):
events = []
while not self._stopping:
asap = False
try:
events = self.poll(TIMEOUT_PRECISION)
except (OSError, IOError) as e:
if errno_from_exception(e) in (errno.EPIPE, errno.EINTR):
# EPIPE: Happens when the client closes the connection
# EINTR: Happens when received a signal
# handles them as soon as possible
asap = True
logging.debug('poll:%s', e)
else:
logging.error('poll:%s', e)
traceback.print_exc()
continue
for sock, fd, event in events:
handler = self._fdmap.get(fd, None)
if handler is not None:
handler = handler[1]
try:
handler.handle_event(sock, fd, event)
except (OSError, IOError) as e:
shell.print_exception(e)
now = time.time()
if asap or now - self._last_time >= TIMEOUT_PRECISION:
for callback in self._periodic_callbacks:
callback()
self._last_time = now
跳转函数列表 |
---|
del
在该变量被垃圾回收机制回收的时候做的事情
def __del__(self):
self._impl.close()
调用 self._impl.close
关闭获取事件的实例, KqueueLoop.close
errno_from_exception
从异常对象里获取异常的 errno.
来源于 tornado
# from tornado
def errno_from_exception(e):
"""Provides the errno from an Exception object.
There are cases that the errno attribute was not set so we pull
the errno out of the args but if someone instatiates an Exception
without any args you will get a tuple error. So this function
abstracts all that behavior to give you a safe way to get the
errno.
"""
if hasattr(e, 'errno'):
return e.errno
elif e.args:
return e.args[0]
else:
return None
接收参数
- e Exception
get_sock_error
获取 socket 文件的错误, 在异步处理一个 socket 文件的时候, 如果该 socket 出现了错误, 需要通过这个函数来获取错误信息.
# from tornado
def get_sock_error(sock):
error_number = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
return socket.error(error_number, os.strerror(error_number))
接受参数
- sock socket 句柄
参考连接: https://stackoverflow.com/questions/21031717/so-error-vs-errno