WSGI及gunicorn指北(二)

内容预览:
  • 现在再来看一下Worker进程~
  • ggevent 模式的worker基于gevent异步协程库,支持异步I/O模式~
  • 而ggevent模式的worker里,每一个链接来的时候,worker会起一个协程来处...~

pyg0已经大概了解了wsgi。现在他决定深入探索他们实际在生产环境里用到的web 服务器 -gunicorn。

先来看看官网的介绍:Gunicorn 是一个运行在Unix上的python WSGI web 服务器。它采用Pre-Fork 的worker模型 。Gunicorn
可以适用于多种python web 框架,实现简单,占用系用资源少,速度快。

抛开那些自卖自夸的话语,我们一项一项来分析。

Gunicorn 之所以只能运行在Unix系统上,是因为gunicorn使用了只有在*nix系统中才存在的模块和方法,例如fnctl,os.fork等等。

而Pre-Fork的worker模型是指在gunicorn启动时,会通过Master(在gunicorn里叫做arbiter)预先生成特定数量的worker, 这些worker通过arbiter管理。

当我们启动一个wsgiapp, 会初始化一个Application,在他的基类BaseApplication中有一个run方法,代码如下(节选):

class BaseApplication(object):

def run(self):
try:
Arbiter(self).run()
except RuntimeError as e:
print("nError: %sn" % e, file=sys.stderr)
sys.stderr.flush()
sys.exit(
1)

主要是把任务交给Arbiter这个类来完成。我们看看这个Arbiter是如果启动和管理worker的。

class Arbiter(object):
"""
Arbiter maintain the workers processes alive. It launches or
kills them if needed. It also manages application reloading
via SIGHUP/USR2.
"""
def start(self):
"""
Initialize the arbiter. Start listening and set pidfile if needed.
"""
self.log.info(
"Starting gunicorn %s", __version__)

if 'GUNICORN_PID' in os.environ:
self.master_pid
= int(os.environ.get('GUNICORN_PID'))
self.proc_name
= self.proc_name + ".2"
self.master_name
= "Master.2"

self.pid
= os.getpid()
if self.cfg.pidfile is not None:
pidname
= self.cfg.pidfile
if self.master_pid != 0:
pidname
+= ".2"
self.pidfile
= Pidfile(pidname)
self.pidfile.create(self.pid)
self.cfg.on_starting(self)

self.init_signals()

if not self.LISTENERS:
fds
= None
listen_fds
= systemd.listen_fds()
if listen_fds:
self.systemd
= True
fds
= range(systemd.SD_LISTEN_FDS_START,
systemd.SD_LISTEN_FDS_START
+ listen_fds)

elif self.master_pid:
fds
= []
for fd in os.environ.pop('GUNICORN_FD').split(','):
fds.append(int(fd))

self.LISTENERS
= sock.create_sockets(self.cfg, self.log, fds)

listeners_str
= ",".join([str(l) for l in self.LISTENERS])
self.log.debug(
"Arbiter booted")
self.log.info(
"Listening at: %s (%s)", listeners_str, self.pid)
self.log.info(
"Using worker: %s", self.cfg.worker_class_str)

# check worker class requirements
if hasattr(self.worker_class, "check_config"):
self.worker_class.check_config(self.cfg, self.log)

self.cfg.when_ready(self)
def run(self):
"Main master loop."
self.start()
util._setproctitle(
"master [%s]" % self.proc_name)

try:
self.manage_workers()

while True:
self.maybe_promote_master()

sig
= self.SIG_QUEUE.pop(0) if self.SIG_QUEUE else None
if sig is None:
self.sleep()
self.murder_workers()
self.manage_workers()
continue

if sig not in self.SIG_NAMES:
self.log.info(
"Ignoring unknown signal: %s", sig)
continue

signame
= self.SIG_NAMES.get(sig)
handler
= getattr(self, "handle_%s" % signame, None)
if not handler:
self.log.error(
"Unhandled signal: %s", signame)
continue
self.log.info(
"Handling signal: %s", signame)
handler()
self.wakeup()
except StopIteration:
self.halt()
except KeyboardInterrupt:
self.halt()
except HaltServer as inst:
self.halt(reason
=inst.reason, exit_status=inst.exit_status)
except SystemExit:
raise
except Exception:
self.log.info(
"Unhandled exception in main loop",
exc_info
=True)
self.stop(False)
if self.pidfile is not None:
self.pidfile.unlink()
sys.exit(
-1)

def manage_workers(self):
"""
Maintain the number of workers by spawning or killing
as required.
"""
if len(self.WORKERS.keys()) < self.num_workers:
self.spawn_workers()

workers
= self.WORKERS.items()
workers
= sorted(workers, key=lambda w: w[1].age)
while len(workers) > self.num_workers:
(pid, _)
= workers.pop(0)
self.kill_worker(pid, signal.SIGTERM)

active_worker_count
= len(workers)
if self._last_logged_active_worker_count != active_worker_count:
self._last_logged_active_worker_count
= active_worker_count
self.log.debug(
" workers".format(active_worker_count),
extra
={"metric": "gunicorn.workers",
"value": active_worker_count,
"mtype": "gauge"})

def spawn_worker(self):
self.worker_age
+= 1
worker
= self.worker_class(self.worker_age, self.pid, self.LISTENERS,
self.app, self.timeout
/ 2.0,
self.cfg, self.log)
self.cfg.pre_fork(self, worker)
pid
= os.fork()
if pid != 0:
worker.pid
= pid
self.WORKERS[pid]
= worker
return pid

# Do not inherit the temporary files of other workers
for sibling in self.WORKERS.values():
sibling.tmp.close()

# Process Child
worker.pid = os.getpid()
try:
util._setproctitle(
"worker [%s]" % self.proc_name)
self.log.info(
"Booting worker with pid: %s", worker.pid)
self.cfg.post_fork(self, worker)
worker.init_process()
sys.exit(0)
except SystemExit:
raise
except AppImportError as e:
self.log.debug(
"Exception while loading the application",
exc_info
=True)
print("%s" % e, file=sys.stderr)
sys.stderr.flush()
sys.exit(self.APP_LOAD_ERROR)
except:
self.log.exception(
"Exception in worker process")
if not worker.booted:
sys.exit(self.WORKER_BOOT_ERROR)
sys.exit(
-1)
finally:
self.log.info(
"Worker exiting (pid: %s)", worker.pid)
try:
worker.tmp.close()
self.cfg.worker_exit(self, worker)
except:
self.log.warning(
"Exception during worker exit:n%s",
traceback.format_exc())

 

Arbiter 初始化时, 首先要读取配置,比如worker的数量,worker的模式,然后创建socket,创建worker并且进入循环管理worker信号,如果worker不响应了,就干掉创建一个新的。主进程Arbiter大概就是这么简单。

在Arbiter创建worker的方法里,通过worker.init_process()进入worker的消息循环。

现在再来看一下Worker进程。Gunicorn支持多种worker模式,默认的为sync,就像名字表达的一样,这是个同步阻塞的网络模型。除了sync之外,还有ggevent,gaiohttp,gthread等等。我们主要来看一下ggevent模式的worker。

ggevent 模式的worker基于gevent异步协程库,支持异步I/O模式。

我们来看代码(节选)

class GeventWorker(AsyncWorker):
def run(self):
servers
= []
ssl_args
= {}

if self.cfg.is_ssl:
ssl_args
= dict(server_side=True, **self.cfg.ssl_options)

for s in self.sockets:
s.setblocking(
1)
pool
= Pool(self.worker_connections)
if self.server_class is not None:
environ
= base_environ(self.cfg)
environ.update({
"wsgi.multithread": True,
"SERVER_SOFTWARE": VERSION,
})
server
= self.server_class(
s, application
=self.wsgi, spawn=pool, log=self.log,
handler_class
=self.wsgi_handler, environ=environ,
**ssl_args)
else:
hfun
= partial(self.handle, s)
server
= StreamServer(s, handle=hfun, spawn=pool, **ssl_args)

server.start()
servers.append(server)

while self.alive:
self.notify()
gevent.sleep(
1.0)

try:
# Stop accepting requests
for server in servers:
if hasattr(server, 'close'): # gevent 1.0
server.close()
if hasattr(server, 'kill'): # gevent < 1.0
server.kill()

# Handle current requests until graceful_timeout
ts = time.time()
while time.time() - ts <= self.cfg.graceful_timeout:
accepting
= 0
for server in servers:
if server.pool.free_count() != server.pool.size:
accepting
+= 1

# if no server is accepting a connection, we can exit
if not accepting:
return

self.notify()
gevent.sleep(
1.0)

# Force kill all active the handlers
self.log.warning("Worker graceful timeout (pid:%s)" % self.pid)
for server in servers:
server.stop(timeout
=1)
except:
pass

 

在worker启动的时候,针对每一个Ambiter 初始化的socket,创建一个server,每一个server都运行在一个gevent pool里,等待和处理连接的操作都是在server里面进行的。

这里的server_class是和wsgi_handler 就是gevent的WSGIserver和WSGIHander的子类

class GeventPyWSGIWorker(GeventWorker):
"The Gevent StreamServer based workers."
server_class
= PyWSGIServer
wsgi_handler
= PyWSGIHandler

 

我们看一下server创建的过程

 server = self.server_class(s, application=self.wsgi, spawn=pool, log=self.log,
handler_class
=self.wsgi_handler, environ=environ,
**ssl_args)

s就是Arbiter创建的socket, application 就是我们用的WSGI app(比如django,flask等框架的app), spawn是gevent的协程池。handler_class就是gevent的wsgi_handler的子类。通过这种方式,把WSGI app交给worker来运行。

while self.alive:
self.notify()
gevent.sleep(1.0)

这里每隔一秒给Arbiter发一个通知,告诉arbiter worker还活着,避免被arbiter干掉。

 

真正等待和处理链接走到了gevent里的WSGIServer 和WSGIhandler。我们进入gevent具体看下:

class BaseServer(object):
def start_accepting(self):
if self._watcher is None:
# just stop watcher without creating a new one?
self._watcher = self.loop.io(self.socket.fileno(), 1)
self._watcher.start(self._do_read)

 

WSGIServer用start_accepting来接收链接, 在读到socket里的信息后,用do_handle来处理链接

 def do_handle(self, *args):
spawn
= self._spawn
handle
= self._handle
close
= self.do_close

try:
if spawn is None:
_handle_and_close_when_done(handle, close, args)
else:
spawn(_handle_and_close_when_done, handle, close, args)
except:
close(
*args)
raise

 

WSGIServer实际上是对每一个链接启动一个协程。在协程中,用self._handle, 使用WSGIHandler来处理连接的请求。我们找到WSGIHandler

class WSGIHandler(object):
def handle_one_response(self):
self.time_start
= time.time()
self.status
= None
self.headers_sent
= False

self.result
= None
self.response_use_chunked
= False
self.response_length
= 0

try:
try:
self.run_application()
finally:
try:
self.wsgi_input._discard()
except (socket.error, IOError):
# Don't let exceptions during discarding
# input override any exception that may have been
# raised by the application, such as our own _InvalidClientInput.
# In the general case, these aren't even worth logging (see the comment
# just below)
pass
except _InvalidClientInput:
self._send_error_response_if_possible(
400)
except socket.error as ex:
if ex.args[0] in (errno.EPIPE, errno.ECONNRESET):
# Broken pipe, connection reset by peer.
# Swallow these silently to avoid spewing
# useless info on normal operating conditions,
# bloating logfiles. See https://github.com/gevent/gevent/pull/377
# and https://github.com/gevent/gevent/issues/136.
if not PY3:
sys.exc_clear()
self.close_connection
= True
else:
self.handle_error(
*sys.exc_info())
except: # pylint:disable=bare-except
self.handle_error(*sys.exc_info())
finally:
self.time_finish
= time.time()
self.log_request()

def run_application(self):
assert self.result is None
try:
self.result
= self.application(self.environ, self.start_response)
self.process_result()
finally:
close
= getattr(self.result, 'close', None)
try:
if close is not None:
close()
finally:
# Discard the result. If it's a generator this can
# free a lot of hidden resources (if we failed to iterate
# all the way through it---the frames are automatically
# cleaned up when StopIteration is raised); but other cases
# could still free up resources sooner than otherwise.
close = None
self.result
= None

 

在WSGIHandler里,读取到request之后,调用handle_one_response来处理,其中run_application里,我们又看到了熟悉的

    self.result = self.application(self.environ, self.start_response)

WSGI app的标准调用。

 

pyg0看了这么一大圈,然后自己默默地做了一个总结:Gunicorn在启动的时候,使用Arbiter来预先创建相应数量和模式的worker,所有的worker都监听同一组socket。在每个worker里,创建server来监听和处理请求。而ggevent模式的worker里,每一个链接来的时候,worker会起一个协程来处理,通过协程的方式来实现异步I/O。而具体处理请求的,是用户提供的WSGI app。

 

以上就是:WSGI及gunicorn指北(二) 的全部内容。

本站部分内容来源于互联网和用户投稿,如有侵权请联系我们删除,谢谢。
Email:[email protected]


0 条回复 A 作者 M 管理员
    所有的伟大,都源于一个勇敢的开始!
欢迎您,新朋友,感谢参与互动!欢迎您 {{author}},您在本站有{{commentsCount}}条评论