一文帶你探索Python中的eventlet通信機制
一、源碼解析
對python原生文件打補丁:
import eventlet eventlet.monkey_patch()
跟蹤進入該模塊方法:eventlet.patcher#monkey_patch
def monkey_patch(**on):
......
modules_to_patch = []
for name, modules_function in [
('os', _green_os_modules),
('select', _green_select_modules),
('socket', _green_socket_modules),
('thread', _green_thread_modules),
('time', _green_time_modules),
('MySQLdb', _green_MySQLdb),
('builtins', _green_builtins),
('subprocess', _green_subprocess_modules),
]:
if on[name] and not already_patched.get(name):
modules_to_patch += modules_function()
already_patched[name] = True
......該方法對某些系統(tǒng)模塊進行全局打補丁,使其對Greenthread友好。關(guān)鍵字參數(shù)用于指定哪些模塊需要打補丁,如果未提供關(guān)鍵字參數(shù),則會對所有默認的模塊(如代碼所示)打補丁,例如: monkey_patch(socket = True,select = True) 僅對socket和select模塊打補丁。大多數(shù)參數(shù)都是對同名的單個模塊進行打補丁,比如操作系統(tǒng),時間,選擇。但是socket例外,它也會對ssl模塊(如果存在)打補丁,thread用于對threading、thread、Queue打補丁。說明:多次調(diào)用monkey_patch是安全的。
以socket為例:('socket', _green_socket_modules),進入該方法:
def _green_socket_modules():
from eventlet.green import socket
try:
from eventlet.green import ssl
return [('socket', socket), ('ssl', ssl)]
except ImportError:
return [('socket', socket)]進入socket模塊:eventlet.green.socket
__import__('eventlet.green._socket_nodns')
__socket = sys.modules['eventlet.green._socket_nodns']
__all__ = __socket.__all__
__patched__ = __socket.__patched__ + [
'create_connection',
'getaddrinfo',
'gethostbyname',
'gethostbyname_ex',
'getnameinfo',
]在進入eventlet.green._socket_nodns:
__socket = __import__('socket')
__all__ = __socket.__all__
__patched__ = ['fromfd', 'socketpair', 'ssl', 'socket', 'timeout']可以看到是對python的原生socket模塊進行了打補?。簆ythonx.x/Lib/socket.py 以socket類為例:python原生的socket.socket()類并替換為了eventlet.greenio.base#GreenSocket類 該補丁類完全兼容原生socket類的API,它還可以識別關(guān)鍵字參數(shù)set_nonblocking = True。用來設(shè)置socket為非阻塞模式。
class GreenSocket(object):
# This placeholder is to prevent __getattr__ from creating an infinite call loop
fd = None
def __init__(self, family=socket.AF_INET, *args, **kwargs):
should_set_nonblocking = kwargs.pop('set_nonblocking', True)
if isinstance(family, six.integer_types):
fd = _original_socket(family, *args, **kwargs)
# Notify the hub that this is a newly-opened socket.
notify_opened(fd.fileno())
else:
fd = family
# import timeout from other socket, if it was there
try:
self._timeout = fd.gettimeout() or socket.getdefaulttimeout()
except AttributeError:
self._timeout = socket.getdefaulttimeout()
# Filter fd.fileno() != -1 so that won't call set non-blocking on
# closed socket
if should_set_nonblocking and fd.fileno() != -1:
set_nonblocking(fd)
self.fd = fd
# when client calls setblocking(0) or settimeout(0) the socket must
# act non-blocking
self.act_non_blocking = False
......我們再來看下ssl模塊。python原生的ssl模塊被替換為了evenlet.green.ssl模塊 該模塊提供了一個方法用來包裝socket:
def wrap_socket(sock, *a, **kw):
return GreenSSLSocket(sock, *a, **kw)直接進入GreenSSLSocket類:
class GreenSSLSocket(_original_sslsocket): ......
可以看出該補丁模塊繼承了原生socket,將原生socket的api都重寫了,但是基本都是直接調(diào)用原生api。注:Python3.x版本中,如果socket的另一端已關(guān)閉時,非阻塞模式的sslsocket對象不會再拋出錯誤(雖然它們會在另一端關(guān)閉時發(fā)出通知)。如果另一端的socket已經(jīng)關(guān)閉,任何的寫/讀操作都會被簡單地掛起。這個問題目前沒有好的解決方案。它看起來是Python的sslsocket對象實現(xiàn)的一個限制。一個解決方法是使用命令settimeout()在socket上設(shè)置合理的超時時間,并在超時時關(guān)閉/重新打開連接。
下面看下原生ssl模塊:pythonx.x/Lib/ssl.py
def wrap_socket(sock, keyfile=None, certfile=None,
server_side=False, cert_reqs=CERT_NONE,
ssl_version=PROTOCOL_TLS, ca_certs=None,
do_handshake_on_connect=True,
suppress_ragged_eofs=True,
ciphers=None):
if server_side and not certfile:
raise ValueError("certfile must be specified for server-side "
"operations")
if keyfile and not certfile:
raise ValueError("certfile must be specified")
context = SSLContext(ssl_version)
context.verify_mode = cert_reqs
if ca_certs:
context.load_verify_locations(ca_certs)
if certfile:
context.load_cert_chain(certfile, keyfile)
if ciphers:
context.set_ciphers(ciphers)
return context.wrap_socket(
sock=sock, server_side=server_side,
do_handshake_on_connect=do_handshake_on_connect,
suppress_ragged_eofs=suppress_ragged_eofs
)可以看到該調(diào)用了SSLContext.wrap_socket方法,進入該方法:
class SSLContext(_SSLContext):
......
sslsocket_class = None # SSLSocket is assigned later.
sslobject_class = None # SSLObject is assigned later.
......
def wrap_socket(self, sock, server_side=False,
do_handshake_on_connect=True,
suppress_ragged_eofs=True,
server_hostname=None, session=None):
# SSLSocket class handles server_hostname encoding before it calls
# ctx._wrap_socket()
return self.sslsocket_class._create(
sock=sock,
server_side=server_side,
do_handshake_on_connect=do_handshake_on_connect,
suppress_ragged_eofs=suppress_ragged_eofs,
server_hostname=server_hostname,
context=self,
session=session
)該類中類屬性sslobject_class定義如下:
# Python does not support forward declaration of types. SSLContext.sslsocket_class = SSLSocket SSLContext.sslobject_class = SSLObject
進入SSLSocket類:
class SSLSocket(socket):
......
@classmethod
def _create(cls, sock, server_side=False, do_handshake_on_connect=True,
suppress_ragged_eofs=True, server_hostname=None,
context=None, session=None):
......
if connected:
# create the SSL object
try:
self._sslobj = self._context._wrap_socket(
self, server_side, self.server_hostname,
owner=self, session=self._session,
)
if do_handshake_on_connect:
timeout = self.gettimeout()
if timeout == 0.0:
# non-blocking
raise ValueError("do_handshake_on_connect should not be specified for non-blocking sockets")
self.do_handshake()
except (OSError, ValueError):
self.close()
raise
return self最終該self._sslobj實例就是cpython中定義的對象,所有后續(xù)的所有操作都是調(diào)用的cpython方法。
二、遺留問題
問題堆棧:
Traceback (most recent call last):
File "test.py", line 40, in <module>
main()
File "test.py", line 35, in main
srv(listener)
File "test.py", line 10, in srv
r.readline(1<<10)
File "/usr/lib/python3.7/socket.py", line 589, in readinto
return self._sock.recv_into(b)
File "/usr/lib/python3.7/site-packages/eventlet/green/ssl.py", line 241, in recv_into
return self._base_recv(nbytes, flags, into=True, buffer_=buffer)
File "/usr/lib/python3.7/site-packages/eventlet/green/ssl.py", line 256, in _base_recv
read = self.read(nbytes, buffer_)
File "/usr/lib/python3.7/site-packages/eventlet/green/ssl.py", line 176, in read
super(GreenSSLSocket, self).read, *args, **kwargs)
File "/usr/lib/python3.7/site-packages/eventlet/green/ssl.py", line 146, in _call_trampolining
return func(*a, **kw)
File "/usr/lib/python3.7/ssl.py", line 911, in read
return self._sslobj.read(len, buffer)
ssl.SSLWantReadError: The operation did not complete (read) (_ssl.c:2488)從這里我們可以看到系統(tǒng)調(diào)用的入口是python3.7/socket.py中的readinto方法,進入該方法:
def readinto(self, b):
self._checkClosed()
self._checkReadable()
if self._timeout_occurred:
raise OSError("cannot read from timed out object")
while True:
try:
return self._sock.recv_into(b)
except timeout:
self._timeout_occurred = True
raise
except error as e:
if e.args[0] in _blocking_errnos:
return None
raise最多將len(b)個字節(jié)讀入可寫緩沖區(qū)* b 并返回讀取的字節(jié)數(shù)。如果套接字是非阻塞的并且沒有字節(jié)可用,則返回None。如果 b *為非空,則返回值為0表示該連接在另一端被關(guān)閉。注:如果未設(shè)置默認超時并且偵聽套接字具有(非零)超時,請強制新套接字處于阻塞模式,以覆蓋特定于平臺的套接字標(biāo)志繼承。
我們根據(jù)堆棧一步步進入最終報錯的地方:self._sslobj.read(len, buffer) 根據(jù)我們上面說的,self._sslobj實際上是cpython對象,那read方法是怎么就進入到了cpython實際的方法里面的呢?通過python代用C代碼的機制可以找到如下代碼:
#define _SSL__SSLSOCKET_READ_METHODDEF \
{"read", (PyCFunction)_ssl__SSLSocket_read, METH_VARARGS, _ssl__SSLSocket_read__doc__},
static PyObject *
_ssl__SSLSocket_read_impl(PySSLSocket *self, int len, int group_right_1,
Py_buffer *buffer);
static PyObject *
_ssl__SSLSocket_read(PySSLSocket *self, PyObject *args)
{
PyObject *return_value = NULL;
int len;
int group_right_1 = 0;
Py_buffer buffer = {NULL, NULL};
switch (PyTuple_GET_SIZE(args)) {
case 1:
if (!PyArg_ParseTuple(args, "i:read", &len)) {
goto exit;
}
break;
case 2:
if (!PyArg_ParseTuple(args, "iw*:read", &len, &buffer)) {
goto exit;
}
group_right_1 = 1;
break;
default:
PyErr_SetString(PyExc_TypeError, "_ssl._SSLSocket.read requires 1 to 2 arguments");
goto exit;
}
return_value = _ssl__SSLSocket_read_impl(self, len, group_right_1, &buffer);
exit:
/* Cleanup for buffer */
if (buffer.obj) {
PyBuffer_Release(&buffer);
}
return return_value;
}可以看出,read是映射到了_ssl__SSLSocket_read方法,而_ssl__SSLSocket_read則調(diào)用了_ssl__SSLSocket_read_impl方法。我們進入_ssl__SSLSocket_read_impl的實現(xiàn):
static PyObject *
_ssl__SSLSocket_read_impl(PySSLSocket *self, int len, int group_right_1,
Py_buffer *buffer)
/*[clinic end generated code: output=00097776cec2a0af input=ff157eb918d0905b]*/
{
......
do {
PySSL_BEGIN_ALLOW_THREADS
count = SSL_read(self->ssl, mem, len);
err = _PySSL_errno(count <= 0, self->ssl, count);
PySSL_END_ALLOW_THREADS
self->err = err;
if (PyErr_CheckSignals())
goto error;
if (has_timeout)
timeout = deadline - _PyTime_GetMonotonicClock();
if (err.ssl == SSL_ERROR_WANT_READ) {
sockstate = PySSL_select(sock, 0, timeout);
} else if (err.ssl == SSL_ERROR_WANT_WRITE) {
sockstate = PySSL_select(sock, 1, timeout);
} else if (err.ssl == SSL_ERROR_ZERO_RETURN &&
SSL_get_shutdown(self->ssl) == SSL_RECEIVED_SHUTDOWN)
{
count = 0;
goto done;
}
else
sockstate = SOCKET_OPERATION_OK;
if (sockstate == SOCKET_HAS_TIMED_OUT) {
PyErr_SetString(PySocketModule.timeout_error,
"The read operation timed out");
goto error;
} else if (sockstate == SOCKET_IS_NONBLOCKING) {
break;
}
} while (err.ssl == SSL_ERROR_WANT_READ ||
err.ssl == SSL_ERROR_WANT_WRITE);
if (count <= 0) {
PySSL_SetError(self, count, __FILE__, __LINE__);
goto error;
}
if (self->exc_type != NULL)
goto error;
......
}從該模塊的include也可以看出,該模塊就是調(diào)用了系統(tǒng)的openssl庫進行ssl通信
/* Include OpenSSL header files */ #include "openssl/rsa.h" #include "openssl/crypto.h" #include "openssl/x509.h" #include "openssl/x509v3.h" #include "openssl/pem.h" #include "openssl/ssl.h" #include "openssl/err.h" #include "openssl/rand.h" #include "openssl/bio.h" #include "openssl/dh.h"
進入openssl頭文件,可以看到確實有定義這個錯誤碼SSL_ERROR_WANT_READ:
在openssl源碼中我們可以找到這個定義include.openssl.ssl.h
# define SSL_AD_NO_APPLICATION_PROTOCOL TLS1_AD_NO_APPLICATION_PROTOCOL # define SSL_ERROR_NONE 0 # define SSL_ERROR_SSL 1 # define SSL_ERROR_WANT_READ 2 # define SSL_ERROR_WANT_WRITE 3 # define SSL_ERROR_WANT_X509_LOOKUP 4 # define SSL_ERROR_SYSCALL 5/* look at error stack/return
下面我們來看下PySSL_SetError方法:cpython->modules._ssl.c
static PyObject *
PySSL_SetError(PySSLSocket *sslsock, int ret, const char *filename, int lineno)
{
PyObject *type = PySSLErrorObject;
char *errstr = NULL;
_PySSLError err;
enum py_ssl_error p = PY_SSL_ERROR_NONE;
unsigned long e = 0;
assert(ret <= 0);
e = ERR_peek_last_error();
if (sslsock->ssl != NULL) {
err = sslsock->err;
switch (err.ssl) {
case SSL_ERROR_ZERO_RETURN:
errstr = "TLS/SSL connection has been closed (EOF)";
type = PySSLZeroReturnErrorObject;
p = PY_SSL_ERROR_ZERO_RETURN;
break;
case SSL_ERROR_WANT_READ:
errstr = "The operation did not complete (read)";
type = PySSLWantReadErrorObject;
p = PY_SSL_ERROR_WANT_READ;
break;
case SSL_ERROR_WANT_WRITE:
p = PY_SSL_ERROR_WANT_WRITE;
type = PySSLWantWriteErrorObject;
errstr = "The operation did not complete (write)";
break;經(jīng)過一步步跟進去,確實會發(fā)現(xiàn)返回了一個SSLError類型的錯誤。
到此這篇關(guān)于一文帶你探索Python中的eventlet通信機制的文章就介紹到這了,更多相關(guān)Python eventlet通信機制內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python的Scrapy框架中的CrawlSpider介紹和使用
這篇文章主要介紹了Python的Scrapy框架中的CrawlSpider介紹和使用,CrawlSpider其實是Spider的一個子類,除了繼承到Spider的特性和功能外,還派生除了其自己獨有的更加強大的特性和功能,其中最顯著的功能就是"LinkExtractors鏈接提取器",需要的朋友可以參考下2023-12-12
python中plt.imshow與cv2.imshow顯示顏色問題
這篇文章主要介紹了plt.imshow與cv2.imshow顯示顏色問題,本文給大家介紹的非常詳細,同時給大家提到了cv2.imshow()和plt.imshow()的區(qū)別講解,需要的朋友可以參考下2020-07-07
新手該如何學(xué)python怎么學(xué)好python?
怎么學(xué)好python?怎么靈活應(yīng)用python?2008-10-10
Python虛擬環(huán)境virtualenv是如何使用的
今天給大家?guī)淼氖顷P(guān)于Python虛擬環(huán)境的相關(guān)知識,文章圍繞著Python虛擬環(huán)境virtualenv是如何使用的展開,文中有非常詳細的解釋及代碼示例,需要的朋友可以參考下2021-06-06
tensorflow轉(zhuǎn)onnx的實現(xiàn)方法
本文主要介紹了tensorflow轉(zhuǎn)onnx的實現(xiàn)方法,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-03-03
使用python實現(xiàn)兩數(shù)之和的畫解算法
這篇文章主要介紹了使用python實現(xiàn)兩數(shù)之和的畫解算法,采用實例問題的描述來進行問題分析,并給出用暴力求解和哈希表兩種方法解決方案,有需要的朋友可以參考下2021-08-08

