def __init__(self, impl=None):
self._impl = impl or _poll()
if hasattr(self._impl, "fileno"):
set_close_exec(self._impl.fileno())
self._handlers = {}
self._events = {}
self._callbacks = []
self._timeouts = []
self._running = False
self._stopped = False
self._thread_ident = None
self._blocking_signal_threshold = None
# Create a pipe that we send bogus data to when we want to wake
# the I/O loop when it is idle
self._waker = Waker()
self.add_handler(self._waker.fileno(), lambda fd, events: self._waker.consume(), self.READ)
class IOLoop(object):
"""A level-triggered I/O loop.
We use epoll (Linux) or kqueue (BSD and Mac OS X; requires python
2.6+) if they are available, or else we fall back on select(). If
you are implementing a system that needs to handle thousands of
simultaneous connections, you should use a system that supports either
epoll or queue.
Example usage for a simple TCP server::
import errno
import functools
import ioloop
import socket
def connection_ready(sock, fd, events):
while True:
try:
connection, address = sock.accept()
except socket.error, e:
if e.args[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
raise
return
connection.setblocking(0)
handle_connection(connection, address)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setblocking(0)
sock.bind(("", port))
sock.listen(128)
io_loop = ioloop.IOLoop.instance()
callback = functools.partial(connection_ready, sock)
io_loop.add_handler(sock.fileno(), callback, io_loop.READ)
io_loop.start()
"""
# Constants from the epoll module
_EPOLLIN = 0x001
_EPOLLPRI = 0x002
_EPOLLOUT = 0x004
_EPOLLERR = 0x008
_EPOLLHUP = 0x010
_EPOLLRDHUP = 0x2000
_EPOLLONESHOT = (1 << 30)
_EPOLLET = (1 << 31)
# Our events map exactly to the epoll events
NONE = 0
READ = _EPOLLIN
WRITE = _EPOLLOUT
ERROR = _EPOLLERR | _EPOLLHUP
def __init__(self, impl=None):
self._impl = impl or _poll()
if hasattr(self._impl, 'fileno'):
set_close_exec(self._impl.fileno())
self._handlers = {}
self._events = {}
self._callbacks = []
self._callback_lock = threading.Lock()
self._timeouts = []
self._running = False
self._stopped = False
self._thread_ident = None
self._blocking_signal_threshold = None
# Create a pipe that we send bogus data to when we want to wake
# the I/O loop when it is idle
self._waker = Waker()
self.add_handler(self._waker.fileno(),
lambda fd, events: self._waker.consume(),
self.READ)
@staticmethod
def instance():
"""Returns a global IOLoop instance.
Most single-threaded applications have a single, global IOLoop.
Use this method instead of passing around IOLoop instances
throughout your code.
A common pattern for classes that depend on IOLoops is to use
a default argument to enable programs with multiple IOLoops
but not require the argument for simpler applications::
class MyClass(object):
def __init__(self, io_loop=None):
self.io_loop = io_loop or IOLoop.instance()
"""
if not hasattr(IOLoop, "_instance"):
IOLoop._instance = IOLoop()
return IOLoop._instance
@staticmethod
def initialized():
"""Returns true if the singleton instance has been created."""
return hasattr(IOLoop, "_instance")
#.........这里部分代码省略.........
class IOLoop(object):
"""A level-triggered I/O loop.
We use epoll (Linux) or kqueue (BSD and Mac OS X; requires python
2.6+) if they are available, or else we fall back on select(). If
you are implementing a system that needs to handle thousands of
simultaneous connections, you should use a system that supports either
epoll or queue.
Example usage for a simple TCP server::
import errno
import functools
import ioloop
import socket
def connection_ready(sock, fd, events):
while True:
try:
connection, address = sock.accept()
except socket.error, e:
if e.args[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
raise
return
connection.setblocking(0)
handle_connection(connection, address)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setblocking(0)
sock.bind(("", port))
sock.listen(128)
io_loop = ioloop.IOLoop.instance()
callback = functools.partial(connection_ready, sock)
io_loop.add_handler(sock.fileno(), callback, io_loop.READ)
io_loop.start()
"""
# Constants from the epoll module
_EPOLLIN = 0x001
_EPOLLPRI = 0x002
_EPOLLOUT = 0x004
_EPOLLERR = 0x008
_EPOLLHUP = 0x010
_EPOLLRDHUP = 0x2000
_EPOLLONESHOT = (1 << 30)
_EPOLLET = (1 << 31)
# Our events map exactly to the epoll events
NONE = 0
READ = _EPOLLIN
WRITE = _EPOLLOUT
ERROR = _EPOLLERR | _EPOLLHUP
# Global lock for creating global IOLoop instance
_instance_lock = threading.Lock()
def __init__(self, impl=None):
self._impl = impl or _poll()
if hasattr(self._impl, 'fileno'):
set_close_exec(self._impl.fileno())
self._handlers = {}
self._events = {}
self._callbacks = []
self._callback_lock = threading.Lock()
self._timeouts = []
self._running = False
self._stopped = False
self._thread_ident = None
self._blocking_signal_threshold = None
# Create a pipe that we send bogus data to when we want to wake
# the I/O loop when it is idle
self._waker = Waker()
self.add_handler(self._waker.fileno(),
lambda fd, events: self._waker.consume(),
self.READ)
@staticmethod
def instance():
"""Returns a global IOLoop instance.
Most single-threaded applications have a single, global IOLoop.
Use this method instead of passing around IOLoop instances
throughout your code.
A common pattern for classes that depend on IOLoops is to use
a default argument to enable programs with multiple IOLoops
but not require the argument for simpler applications::
class MyClass(object):
def __init__(self, io_loop=None):
self.io_loop = io_loop or IOLoop.instance()
"""
if not hasattr(IOLoop, "_instance"):
with IOLoop._instance_lock:
if not hasattr(IOLoop, "_instance"):
# New instance after double check
IOLoop._instance = IOLoop()
#.........这里部分代码省略.........
class PollIOLoop(IOLoop):
"""Base class for IOLoops built around a select-like function.
For concrete implementations, see `tornado.platform.epoll.EPollIOLoop`
(Linux), `tornado.platform.kqueue.KQueueIOLoop` (BSD and Mac), or
`tornado.platform.select.SelectIOLoop` (all platforms).
"""
def initialize(self, impl, time_func=None):
super(PollIOLoop, self).initialize()
self._impl = impl
if hasattr(self._impl, 'fileno'):
set_close_exec(self._impl.fileno())
self.time_func = time_func or time.time
self._handlers = {}
self._events = {}
self._callbacks = []
self._callback_lock = threading.Lock()
self._timeouts = []
self._cancellations = 0
self._running = False
self._stopped = False
self._closing = False
self._thread_ident = None
self._blocking_signal_threshold = None
# Create a pipe that we send bogus data to when we want to wake
# the I/O loop when it is idle
self._waker = Waker()
self.add_handler(self._waker.fileno(),
lambda fd, events: self._waker.consume(),
self.READ)
def close(self, all_fds=False):
with self._callback_lock:
self._closing = True
self.remove_handler(self._waker.fileno())
if all_fds:
for fd in self._handlers.keys():
try:
close_method = getattr(fd, 'close', None)
if close_method is not None:
close_method()
else:
os.close(fd)
except Exception:
gen_log.debug("error closing fd %s", fd, exc_info=True)
self._waker.close()
self._impl.close()
# 关键点!!
# 这里应该就是注册到大的事件循环里面的入口了
# 看看地方会调用这个add_handler
def add_handler(self, fd, handler, events):
self._handlers[fd] = stack_context.wrap(handler)
self._impl.register(fd, events | self.ERROR)
def update_handler(self, fd, events):
self._impl.modify(fd, events | self.ERROR)
def remove_handler(self, fd):
self._handlers.pop(fd, None)
self._events.pop(fd, None)
try:
self._impl.unregister(fd)
except Exception:
gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
def set_blocking_signal_threshold(self, seconds, action):
if not hasattr(signal, "setitimer"):
gen_log.error("set_blocking_signal_threshold requires a signal module "
"with the setitimer method")
return
self._blocking_signal_threshold = seconds
if seconds is not None:
signal.signal(signal.SIGALRM,
action if action is not None else signal.SIG_DFL) # 采用默认的系统信号处理方法,对于SIGALRM来说,默认是结束掉当前进程
def start(self):
if not logging.getLogger().handlers:
# The IOLoop catches and logs exceptions, so it's
# important that log output be visible. However, python's
# default behavior for non-root loggers (prior to python
# 3.2) is to print an unhelpful "no handlers could be
# found" message rather than the actual log entry, so we
# must explicitly configure logging if we've made it this
# far without anything.
logging.basicConfig()
if self._stopped:
self._stopped = False
return
old_current = getattr(IOLoop._current, "instance", None)
IOLoop._current.instance = self
self._thread_ident = thread.get_ident() # 获取当前启动线程的id
self._running = True
# 为什么要在这里设置wakeup_fd这个东东??
#.........这里部分代码省略.........
class PollIOLoop(IOLoop):
"""Base class for IOLoops built around a select-like function.
For concrete implementations, see `tornado.platform.epoll.EPollIOLoop`
(Linux), `tornado.platform.kqueue.KQueueIOLoop` (BSD and Mac), or
`tornado.platform.select.SelectIOLoop` (all platforms).
"""
def initialize(self, impl, time_func=None):
super(PollIOLoop, self).initialize()
self._impl = impl
if hasattr(self._impl, 'fileno'):
set_close_exec(self._impl.fileno())
self.time_func = time_func or time.time
self._handlers = {}
self._events = {}
self._callbacks = []
self._callback_lock = threading.Lock()
self._timeouts = []
self._running = False
self._stopped = False
self._closing = False
self._thread_ident = None
self._blocking_signal_threshold = None
# Create a pipe that we send bogus data to when we want to wake
# the I/O loop when it is idle
self._waker = Waker()
self.add_handler(self._waker.fileno(),
lambda fd, events: self._waker.consume(),
self.READ)
def close(self, all_fds=False):
with self._callback_lock:
self._closing = True
self.remove_handler(self._waker.fileno())
if all_fds:
for fd in self._handlers.keys()[:]:
try:
os.close(fd)
except Exception:
gen_log.debug("error closing fd %s", fd, exc_info=True)
self._waker.close()
self._impl.close()
def add_handler(self, fd, handler, events):
self._handlers[fd] = stack_context.wrap(handler)
self._impl.register(fd, events | self.ERROR)
def update_handler(self, fd, events):
self._impl.modify(fd, events | self.ERROR)
def remove_handler(self, fd):
self._handlers.pop(fd, None)
self._events.pop(fd, None)
try:
self._impl.unregister(fd)
except Exception:
gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
def set_blocking_signal_threshold(self, seconds, action):
if not hasattr(signal, "setitimer"):
gen_log.error("set_blocking_signal_threshold requires a signal module "
"with the setitimer method")
return
self._blocking_signal_threshold = seconds
if seconds is not None:
signal.signal(signal.SIGALRM,
action if action is not None else signal.SIG_DFL)
def start(self):
if not logging.getLogger().handlers:
# The IOLoop catches and logs exceptions, so it's
# important that log output be visible. However, python's
# default behavior for non-root loggers (prior to python
# 3.2) is to print an unhelpful "no handlers could be
# found" message rather than the actual log entry, so we
# must explicitly configure logging if we've made it this
# far without anything.
logging.basicConfig()
if self._stopped:
self._stopped = False
return
old_current = getattr(IOLoop._current, "instance", None)
IOLoop._current.instance = self
self._thread_ident = thread.get_ident()
self._running = True
# signal.set_wakeup_fd closes a race condition in event loops:
# a signal may arrive at the beginning of select/poll/etc
# before it goes into its interruptible sleep, so the signal
# will be consumed without waking the select. The solution is
# for the (C, synchronous) signal handler to write to a pipe,
# which will then be seen by select.
#
# In python's signal handling semantics, this only matters on the
# main thread (fortunately, set_wakeup_fd only works on the main
# thread and will raise a ValueError otherwise).
#
# If someone has already set a wakeup fd, we don't want to
# disturb it. This is an issue for twisted, which does its
#.........这里部分代码省略.........
class PollIOLoop(IOLoop):
"""Base class for IOLoops built around a select-like function.
For concrete implementations, see `tornado.platform.epoll.EPollIOLoop`
(Linux), `tornado.platform.kqueue.KQueueIOLoop` (BSD and Mac), or
`tornado.platform.select.SelectIOLoop` (all platforms).
"""
def initialize(self, impl, time_func=None):
super(PollIOLoop, self).initialize()
self._impl = impl
if hasattr(self._impl, 'fileno'):
set_close_exec(self._impl.fileno())
self.time_func = time_func or time.time
self._handlers = {}
self._events = {}
self._callbacks = []
self._callback_lock = threading.Lock()
self._timeouts = []
self._cancellations = 0
self._running = False
self._stopped = False
self._closing = False
self._thread_ident = None
self._blocking_signal_threshold = None
self._timeout_counter = itertools.count()
# Create a pipe that we send bogus data to when we want to wake
# the I/O loop when it is idle
self._waker = Waker()
self.add_handler(self._waker.fileno(),
lambda fd, events: self._waker.consume(),
self.READ)
def close(self, all_fds=False):
with self._callback_lock:
self._closing = True
self.remove_handler(self._waker.fileno())
if all_fds:
for fd, handler in self._handlers.values():
self.close_fd(fd)
self._waker.close()
self._impl.close()
self._callbacks = None
self._timeouts = None
def add_handler(self, fd, handler, events):
fd, obj = self.split_fd(fd)
self._handlers[fd] = (obj, stack_context.wrap(handler))
self._impl.register(fd, events | self.ERROR)
def update_handler(self, fd, events):
fd, obj = self.split_fd(fd)
self._impl.modify(fd, events | self.ERROR)
def remove_handler(self, fd):
fd, obj = self.split_fd(fd)
self._handlers.pop(fd, None)
self._events.pop(fd, None)
try:
self._impl.unregister(fd)
except Exception:
gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
def set_blocking_signal_threshold(self, seconds, action):
if not hasattr(signal, "setitimer"):
gen_log.error("set_blocking_signal_threshold requires a signal module "
"with the setitimer method")
return
self._blocking_signal_threshold = seconds
if seconds is not None:
signal.signal(signal.SIGALRM,
action if action is not None else signal.SIG_DFL)
def start(self):
# tonardo 使用 _running/_stopped 两个字段组合表示3种状态:
# 1、就绪(初始化完成/已经结束):_running=False, _stopped=False;
# 2、正在运行:_running=True, _stopped=False;
# 3、正在结束:_running=False, _stopped=True;
if self._running:
raise RuntimeError("IOLoop is already running")
self._setup_logging()
if self._stopped:
self._stopped = False
return
old_current = getattr(IOLoop._current, "instance", None)
IOLoop._current.instance = self
self._thread_ident = thread.get_ident()
self._running = True
# signal.set_wakeup_fd closes a race condition in event loops:
# a signal may arrive at the beginning of select/poll/etc
# before it goes into its interruptible sleep, so the signal
# will be consumed without waking the select. The solution is
# for the (C, synchronous) signal handler to write to a pipe,
# which will then be seen by select.
#
# In python's signal handling semantics, this only matters on the
# main thread (fortunately, set_wakeup_fd only works on the main
# thread and will raise a ValueError otherwise).
#.........这里部分代码省略.........
class IOLoop(object):
"""A level-triggered I/O loop.
We use epoll (Linux) or kqueue (BSD and Mac OS X; requires python
2.6+) if they are available, or else we fall back on select(). If
you are implementing a system that needs to handle thousands of
simultaneous connections, you should use a system that supports either
epoll or queue.
Example usage for a simple TCP server::
import errno
import functools
import ioloop
import socket
def connection_ready(sock, fd, events):
while True:
try:
connection, address = sock.accept()
except socket.error, e:
if e.args[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
raise
return
connection.setblocking(0)
handle_connection(connection, address)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setblocking(0)
sock.bind(("", port))
sock.listen(128)
io_loop = ioloop.IOLoop.instance()
callback = functools.partial(connection_ready, sock)
io_loop.add_handler(sock.fileno(), callback, io_loop.READ)
io_loop.start()
"""
# Constants from the epoll module
_EPOLLIN = 0x001
_EPOLLPRI = 0x002
_EPOLLOUT = 0x004
_EPOLLERR = 0x008
_EPOLLHUP = 0x010
_EPOLLRDHUP = 0x2000
_EPOLLONESHOT = (1 << 30)
_EPOLLET = (1 << 31)
# Our events map exactly to the epoll events
NONE = 0
READ = _EPOLLIN
WRITE = _EPOLLOUT
ERROR = _EPOLLERR | _EPOLLHUP
# Global lock for creating global IOLoop instance
_instance_lock = threading.Lock()
def __init__(self, impl=None):
self._impl = impl or _poll()
if hasattr(self._impl, 'fileno'):
set_close_exec(self._impl.fileno())
self._handlers = {}
self._events = {}
self._callbacks = []
self._callback_lock = threading.Lock()
self._timeouts = []
self._running = False
self._stopped = False
self._thread_ident = None
self._blocking_signal_threshold = None
# Create a pipe that we send bogus data to when we want to wake
# the I/O loop when it is idle
self._waker = Waker()
self.add_handler(self._waker.fileno(),
lambda fd, events: self._waker.consume(),
self.READ)
@staticmethod
def instance():
"""Returns a global IOLoop instance.
Most single-threaded applications have a single, global IOLoop.
Use this method instead of passing around IOLoop instances
throughout your code.
A common pattern for classes that depend on IOLoops is to use
a default argument to enable programs with multiple IOLoops
but not require the argument for simpler applications::
class MyClass(object):
def __init__(self, io_loop=None):
self.io_loop = io_loop or IOLoop.instance()
"""
if not hasattr(IOLoop, "_instance"):
with IOLoop._instance_lock:
if not hasattr(IOLoop, "_instance"):
# New instance after double check
IOLoop._instance = IOLoop()
#.........这里部分代码省略.........
请发表评论