Scrapy的Pipeline之處理CPU密集型或阻塞型操作詳解
Pipeline處理CPU密集型或阻塞型操作
Twisted框架的reactor適合于處理短的、非阻塞的操作。但是如果要處理一些復(fù)雜的、或者包含阻塞的操作又該怎么辦呢?Twisted提供了線程池來在其他的線程而不是主線程(Twisted的reactor線程)中執(zhí)行慢的操作——使用reactor.callInThread() API。這就意味著reactor在執(zhí)行計算時還能保持運行并對事件做出反應(yīng)。一定要記住線程池中的處理不是線程安全的。這就意味著當(dāng)你使用了全局的狀態(tài)之后,還要面臨所有那些傳統(tǒng)的多線程編程的同步問題。下面是一個簡單的例子:
class UsingBlocking(object):
@defer.inlineCallbacks
def process_item(self, item, spider):
price = item["price"][0]
out = defer.Deferred()
reactor.callInThread(self._do_calculation, price, out)
item["price"][0] = yield out
defer.returnValue(item)
def _do_calculation(self, price, out):
new_price = price + 1
time.sleep(0.10)
reactor.callFromThread(out.callback, new_price)在上面的Pipeline中,對于每個Item,我們提取出它的price字段,想要在_do_caculation()方法中對它進行處理。這個方法使用了time.sleep(),一個阻塞的操作。我們調(diào)用reactor.callInThread()方法使它運行在另一個線程中,該方法的第一個參數(shù)是想要調(diào)用的函數(shù),后面的參數(shù)則會全部傳遞給被調(diào)用的函數(shù)作為參數(shù)。在這里我們給被調(diào)用的函數(shù)傳遞了price,還有一個創(chuàng)建的Deferred對象out。當(dāng)_do_caculation()函數(shù)完成計算后,我們會使用out的回調(diào)函數(shù)來返回這個值。接下來,yield這個 Deferred對象并為price設(shè)置一個新的值,最后返回Item。
在_do_caculation()函數(shù)中我們把price加一,然后休眠了100ms。其實這個時間是很長的,如果在reactor的線程中調(diào)用這個函數(shù),那就意味著我們每秒只能處理不超過10個頁面。不過如果把它放在另一個線程中來調(diào)用就不會出現(xiàn)這種問題了。這些計算任務(wù)會在線程池中排隊,等待某個線程處于可用狀態(tài),然后這個線程就會執(zhí)行這個任務(wù),休眠100ms。最后一步是激活out的回調(diào)函數(shù)。通常情況下,我們可以這樣來激活:out.callback(new_price),但是既然現(xiàn)在我們處于另外一個線程中,這樣做就不安全了。如果我們執(zhí)意這樣做了,這個Deferred對象的代碼,也就是Scrapy的功能就會在別的線程中執(zhí)行,這樣會導(dǎo)致數(shù)據(jù)被損壞。所以我們調(diào)用了reactor.callFromThread()函數(shù),同樣的,它也是以一個函數(shù)作為參數(shù),并把額外的參數(shù)直接傳遞給被調(diào)用的函數(shù)。這個函數(shù)會在主線程中排隊并等待被調(diào)用,它反過來解鎖了process_item()方法中的yield語句,并恢復(fù)Scrapy對這個Item的操作。
如果我們的pipeline中含有全局狀態(tài)會怎么樣呢?比如,計數(shù)器或者平均值等,我們需要在_do_caculation()函數(shù)中使用的。例如有以下兩個變量,beta和delta:
class UsingBlocking(object):
def __init__(self):
self.beta, self.delta = 0, 0
...
def _do_calculation(self, price, out):
self.beta += 1
time.sleep(0.001)
self.delta += 1
new_price = price + self.beta - self.delta + 1
assert abs(new_price-price-1) < 0.01
time.sleep(0.10)...上面的代碼有一些問題,并且在運行的時候會給出assertion錯誤。這是因為,如果一個線程在self.beta += 1和self.delta += 1語句之間切換的話,另一個線程就會恢復(fù)執(zhí)行并使用beta和delta的值來計算price,這里線程會發(fā)現(xiàn)這兩個值處于不一致的狀態(tài)(beta比delta大),這樣,錯誤的產(chǎn)生了。中間短的sleep會讓線程切換更可能發(fā)生,不過即使沒有它,同樣也會出現(xiàn)競態(tài)條件。為了阻止競態(tài)條件的發(fā)生,我們必須使用鎖,例如Python的threading.RLock()鎖。使用了這個遞歸鎖,就能確保兩個線程不會同時執(zhí)行鎖保護的臨界區(qū)的代碼:
class UsingBlocking(object):
def __init__(self):
...
self.lock = threading.RLock()
...
def _do_calculation(self, price, out):
with self.lock:
self.beta += 1
...
new_price = price + self.beta - self.delta + 1
assert abs(new_price-price-1) < 0.01 ...現(xiàn)在的代碼就沒問題了,要注意的是,我們不需要保護整個代碼,只需要能夠覆蓋全局狀態(tài)的使用就行了。
在ITEM_PIPELINES中加上:
ITEM_PIPELINES = { ...
'properties.pipelines.computation.UsingBlocking': 500,
}運行一下會發(fā)現(xiàn),時延由于100ms的休眠的緣故變調(diào)了,不過吞吐量還是保持不變,大約每秒25個。
到此這篇關(guān)于Scrapy的Pipeline之處理CPU密集型或阻塞型操作詳解的文章就介紹到這了,更多相關(guān)Pipeline處理CPU密集型或阻塞型操作內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
windows系統(tǒng)上通過whl文件安裝triton模塊的簡單步驟
這篇文章主要介紹了在Windows系統(tǒng)中通過.whl文件安裝Triton的步驟,包括確認系統(tǒng)環(huán)境、下載合適的.whl文件、使用pip安裝、驗證安裝、使用Triton以及解決潛在問題,需要的朋友可以參考下2025-01-01
python pyautogui手動活動(模擬鼠標(biāo)鍵盤)自動化庫使用
這篇文章主要為大家介紹了python pyautogui手動活動(模擬鼠標(biāo)鍵盤)自動化庫使用示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2024-01-01
Python 將字符串轉(zhuǎn)換為列表的7種方法匯總
這篇文章主要介紹了Python 將字符串轉(zhuǎn)換為列表的7種方法匯總,在本文中,我們將嘗試將給定的字符串轉(zhuǎn)換為列表,其中根據(jù)用戶的選擇,遇到空格或任何其他特殊字符,為此,我們在string中使用split()方法,需要的朋友可以參考下2023-11-11
淺談Python 字符串格式化輸出(format/printf)
下面小編就為大家?guī)硪黄獪\談Python 字符串格式化輸出(format/printf)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-07-07

