日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区

您的位置:首頁技術文章
文章詳情頁

如何用python實現一個HTTP連接池

瀏覽:29日期:2022-06-29 18:40:36
一. 連接池的原理

首先, HTTP連接是基于TCP連接的, 與服務器之間進行HTTP通信, 本質就是與服務器之間建立了TCP連接后, 相互收發基于HTTP協議的數據包. 因此, 如果我們需要頻繁地去請求某個服務器的資源, 我們就可以一直維持與個服務器的TCP連接不斷開, 然后在需要請求資源的時候, 把連接拿出來用就行了.

如何用python實現一個HTTP連接池

一個項目可能需要與服務器之間同時保持多個連接, 比如一個爬蟲項目, 有的線程需要請求服務器的網頁資源, 有的線程需要請求服務器的圖片等資源, 而這些請求都可以建立在同一條TCP連接上.

因此, 我們使用一個管理器來對這些連接進行管理, 任何程序需要使用這些連接時, 向管理器申請就可以了, 等到用完之后再將連接返回給管理器, 以供其他程序重復使用, 這個管理器就是連接池.

如何用python實現一個HTTP連接池

二. 實現代碼1. HTTPConnectionPool類

基于上一章的分析, 連接池應該是一個收納連接的容器, 同時對這些連接有管理能力:

class HTTPConnectionPool: def __init__(self, host: str, port: int = None, max_size: int = None, idle_timeout: int = None) -> None: ''' :param host: pass :param port: pass :param max_size: 同時存在的最大連接數, 默認None->連接數無限,沒了就創建 :param idle_timeout: 單個連接單次最長空閑時間,超時自動關閉,默認None->不限時 ''' self.host = host self.port = port self.max_size = max_size self.idle_timeout = idle_timeout self._lock = threading.Condition() self._pool = [] # 這里的conn_num指的是總連接數,包括其它線程拿出去正在使用的連接 self.conn_num = 0 self.is_closed = False def acquire(self, blocking: bool = True, timeout: int = None) -> WrapperHTTPConnection: ... def release(self, conn: WrapperHTTPConnection) -> None: ...

因此, 我們定義這樣一個HTTPConnectionPool類, 使用一個列表來保存可用的連接. 對于外部來說, 只需要調用這個連接池對象的acquire和release方法就能取得和釋放連接.

2. 線程安全地管理連接

對于線程池內部來說, 至少需要三個關于連接的操作: 從連接池中取得連接, 將連接放回連接池, 以及創建一個連接:

def _get_connection(self) -> WrapperHTTPConnection: # 這個方法會把連接從_idle_conn移動到_used_conn列表中,并返回這個連接 try: return self._pool.pop() except IndexError: raise EmptyPoolErrordef _put_connection(self, conn: WrapperHTTPConnection) -> None: self._pool.append(conn)def _create_connection(self) -> WrapperHTTPConnection: self.conn_num += 1 return WrapperHTTPConnection(self, HTTPConnection(self.host, self.port))

對于連接池外部來說, 主要有申請連接和釋放連接這兩個操作, 實際上這就是個簡單的生產者消費者模型. 考慮到外部可能是多線程的環境, 我們使用threading.Condition來保證線程安全. 關于Condition的資料可以看這里.

def acquire(self, blocking: bool = True, timeout: int = None) -> WrapperHTTPConnection: if self.is_closed: raise ConnectionPoolClosed with self._lock: if self.max_size is None or not self.is_full(): # 在還能創建新連接的情況下,如果沒有空閑連接,直接創建一個就行了 if self.is_pool_empty():self._put_connection(self._create_connection()) else: # 不能創建新連接的情況下,如果設置了blocking=False,沒連接就報錯 # 否則,就基于timeout進行阻塞,直到超時或者有可用連接為止 if not blocking:if self.is_pool_empty(): raise EmptyPoolError elif timeout is None:while self.is_pool_empty(): self._lock.wait() elif timeout < 0:raise ValueError('’timeout’ must be a non-negative number') else:end_time = time.time() + timeoutwhile self.is_pool_empty(): remaining = end_time - time.time() if remaining <= 0: raise EmptyPoolError self._lock.wait(remaining) # 走到這一步了,池子里一定有空閑連接 return self._get_connection()def release(self, conn: WrapperHTTPConnection) -> None: if self.is_closed: # 如果這個連接是在連接池關閉后才釋放的,那就不用回連接池了,直接放生 conn.close() return # 實際上,python列表的append操作是線程安全的,可以不加鎖 # 這里調用鎖是為了通過notify方法通知其它正在wait的線程:現在有連接可用了 with self._lock: if not conn.is_available: # 如果這個連接不可用了,就應該創建一個新連接放進去,因為可能還有其它線程在等著連接用 conn.close() self.conn_num -= 1 conn = self._create_connection() self._put_connection(conn) self._lock.notify()

我們首先看看acquire方法, 這個方法其實就是在申請到鎖之后調用內部的_get_connection方法獲取連接, 這樣就線程安全了. 需要注意的是, 如果當前的條件無法獲取連接, 就會調用條件變量的wait方法, 及時釋放鎖并阻塞住當前線程. 然后, 當其它線程作為生產者調用release方法釋放連接時, 會觸發條件變量的notify方法, 從而喚醒一個阻塞在wait階段的線程, 即消費者. 這個消費者再從池中取出剛放回去的線程, 這樣整個生產者消費者模型就運轉起來了.

3. 上下文管理器

對于一個程序來說, 它使用連接池的形式是獲取連接->使用連接->釋放連接. 因此, 我們應該通過with語句來管理這個連接, 以免在程序的最后遺漏掉釋放連接這一步驟.

基于這個原因, 我們通過一個WrapperHTTPConnection類來對HTTPConnection進行封裝, 以實現上下文管理器的功能. HTTPConnection的代碼可以看《用python實現一個HTTP客戶端》這篇文章.

class WrapperHTTPConnection: def __init__(self, pool: ’HTTPConnectionPool’, conn: HTTPConnection) -> None: self.pool = pool self.conn = conn self.response = None self.is_available = True def __enter__(self) -> ’WrapperHTTPConnection’: return self def __exit__(self, *exit_info: Any) -> None: # 如果response沒讀完并且連接需要復用,就棄用這個連接 if not self.response.will_close and not self.response.is_closed(): self.close() self.pool.release(self) def request(self, *args: Any, **kwargs: Any) -> HTTPResponse: self.conn.request(*args, **kwargs) self.response = self.conn.get_response() return self.response def close(self) -> None: self.conn.close() self.is_available = False

同樣的, 連接池可能也需要關閉, 因此我們給連接池也加上上下文管理器的功能:

class HTTPConnectionPool: ... def close(self) -> None: if self.is_closed: return self.is_closed = True pool, self._pool = self._pool, None for conn in pool: conn.close() def __enter__(self) -> ’HTTPConnectionPool’: return self def __exit__(self, *exit_info: Any) -> None: self.close()

這樣, 我們就可以通過with語句優雅地管理連接池了:

with HTTPConnectionPool(**kwargs) as pool: with pool.acquire() as conn: res = conn.request(’GET’, ’/’) ...4. 定時清理連接

如果一個連接池的所需連接數是隨時間變化的, 那么就會出現一種情況: 在高峰期, 我們創建了非常多的連接, 然后進入低谷期之后, 連接過剩, 大量的連接處于空閑狀態, 浪費資源. 因此, 我們可以設置一個定時任務, 定期清理空閑時間過長的連接, 減少連接池的資源占用.

首先, 我們需要為連接對象添加一個last_time屬性, 每當連接釋放進入連接池后, 就修改這個屬性的值為當前時間, 這樣我們就能明確知道, 連接池內的每個空閑連接空閑了多久:

class WrapperHTTPConnection: ... def __init__(self, pool: ’HTTPConnectionPool’, conn: HTTPConnection) -> None: ... self.last_time = Noneclass HTTPConnectionPool: ... def _put_connection(self, conn: WrapperHTTPConnection) -> None: conn.last_time = time.time() self._pool.append(conn)

然后, 我們通過threading.Timer來實現一個定時任務:

def start_clear_conn(self) -> None: if self.idle_timeout is None: # 如果空閑連接的超時時間為無限,那么就不應該清理連接 return self.clear_idle_conn() self._clearer = threading.Timer(self.idle_timeout, self.start_clear_conn) self._clearer.start()def stop_clear_conn(self) -> None: if self._clearer is not None: self._clearer.cancel()

threading.Timer只會執行一次定時任務, 因此, 我們需要在start_clear_conn中不斷地把自己設置為定時任務. 這其實等同于新開了一個線程來執行start_clear_conn方法, 因此并不會出現遞歸過深問題. 不過需要注意的是, threading.Timer雖然不會阻塞當前線程, 但是卻會阻止當前線程結束, 就算把它設置為守護線程都不行, 唯一可行的辦法就是調用stop_clear_conn方法取消這個定時任務.

最后, 我們定義clear_idle_conn方法來清理閑置時間超時的連接:

def clear_idle_conn(self) -> None: if self.is_closed: raise ConnectionPoolClosed # 這里開一個新線程來清理空閑連接,避免了阻塞主線程導致的定時精度出錯 threading.Thread(target=self._clear_idle_conn).start()def _clear_idle_conn(self) -> None: if not self._lock.acquire(timeout=self.idle_timeout): # 因為是每隔self.idle_timeout秒檢查一次 # 如果過了self.idle_timeout秒還沒申請到鎖,下一次都開始了,本次也就不用繼續了 return current_time = time.time() if self.is_pool_empty(): pass elif current_time - self._pool[-1].last_time >= self.idle_timeout: # 這里處理下面的二分法沒法處理的邊界情況,即所有連接都閑置超時的情況 self.conn_num -= len(self._pool) self._pool.clear() else: # 通過二分法找出從左往右第一個不超時的連接的指針 left, right = 0, len(self._pool) - 1 while left < right: mid = (left + right) // 2 if current_time - self._pool[mid].last_time >= self.idle_timeout:left = mid + 1 else:right = mid self._pool = self._pool[left:] self.conn_num -= left self._lock.release()

由于我們獲取和釋放連接都是從self._pool的尾部開始操作的, 因此self._pool這個容器是一個先進后出隊列, 它里面放著的連接, 一定是越靠近頭部的閑置時間越長, 從頭到尾閑置時間依次遞減. 基于這個原因, 我們使用二分法來找出列表中第一個沒有閑置超時的連接, 然后把在它之前的連接一次性刪除, 這樣就能達到O(logN)的時間復雜度, 算是一種比較高效的方法. 需要注意的是, 如果連接池內所有的連接都是超時的, 那么這種方法是刪不干凈的, 需要對這種邊界情況單獨處理.

三. 總結1. 完整代碼及分析

這個連接池的完整代碼如下:

import threadingimport timefrom typing import Anyfrom client import HTTPConnection, HTTPResponseclass WrapperHTTPConnection: def __init__(self, pool: ’HTTPConnectionPool’, conn: HTTPConnection) -> None: self.pool = pool self.conn = conn self.response = None self.last_time = time.time() self.is_available = True def __enter__(self) -> ’WrapperHTTPConnection’: return self def __exit__(self, *exit_info: Any) -> None: # 如果response沒讀完并且連接需要復用,就棄用這個連接 if not self.response.will_close and not self.response.is_closed(): self.close() self.pool.release(self) def request(self, *args: Any, **kwargs: Any) -> HTTPResponse: self.conn.request(*args, **kwargs) self.response = self.conn.get_response() return self.response def close(self) -> None: self.conn.close() self.is_available = Falseclass HTTPConnectionPool: def __init__(self, host: str, port: int = None, max_size: int = None, idle_timeout: int = None) -> None: ''' :param host: pass :param port: pass :param max_size: 同時存在的最大連接數, 默認None->連接數無限,沒了就創建 :param idle_timeout: 單個連接單次最長空閑時間,超時自動關閉,默認None->不限時 ''' self.host = host self.port = port self.max_size = max_size self.idle_timeout = idle_timeout self._lock = threading.Condition() self._pool = [] # 這里的conn_num指的是總連接數,包括其它線程拿出去正在使用的連接 self.conn_num = 0 self.is_closed = False self._clearer = None self.start_clear_conn() def acquire(self, blocking: bool = True, timeout: int = None) -> WrapperHTTPConnection: if self.is_closed: raise ConnectionPoolClosed with self._lock: if self.max_size is None or not self.is_full():# 在還能創建新連接的情況下,如果沒有空閑連接,直接創建一個就行了if self.is_pool_empty(): self._put_connection(self._create_connection()) else:# 不能創建新連接的情況下,如果設置了blocking=False,沒連接就報錯# 否則,就基于timeout進行阻塞,直到超時或者有可用連接為止if not blocking: if self.is_pool_empty(): raise EmptyPoolErrorelif timeout is None: while self.is_pool_empty(): self._lock.wait()elif timeout < 0: raise ValueError('’timeout’ must be a non-negative number')else: end_time = time.time() + timeout while self.is_pool_empty(): remaining = end_time - time.time() if remaining <= 0: raise EmptyPoolError self._lock.wait(remaining) # 走到這一步了,池子里一定有空閑連接 return self._get_connection() def release(self, conn: WrapperHTTPConnection) -> None: if self.is_closed: # 如果這個連接是在連接池關閉后才釋放的,那就不用回連接池了,直接放生 conn.close() return # 實際上,python列表的append操作是線程安全的,可以不加鎖 # 這里調用鎖是為了通過notify方法通知其它正在wait的線程:現在有連接可用了 with self._lock: if not conn.is_available:# 如果這個連接不可用了,就應該創建一個新連接放進去,因為可能還有其它線程在等著連接用conn.close()self.conn_num -= 1conn = self._create_connection() self._put_connection(conn) self._lock.notify() def _get_connection(self) -> WrapperHTTPConnection: # 這個方法會把連接從_idle_conn移動到_used_conn列表中,并返回這個連接 try: return self._pool.pop() except IndexError: raise EmptyPoolError def _put_connection(self, conn: WrapperHTTPConnection) -> None: conn.last_time = time.time() self._pool.append(conn) def _create_connection(self) -> WrapperHTTPConnection: self.conn_num += 1 return WrapperHTTPConnection(self, HTTPConnection(self.host, self.port)) def is_pool_empty(self) -> bool: # 這里指的是,空閑可用的連接是否為空 return len(self._pool) == 0 def is_full(self) -> bool: if self.max_size is None: return False return self.conn_num >= self.max_size def close(self) -> None: if self.is_closed: return self.is_closed = True self.stop_clear_conn() pool, self._pool = self._pool, None for conn in pool: conn.close() def clear_idle_conn(self) -> None: if self.is_closed: raise ConnectionPoolClosed # 這里開一個新線程來清理空閑連接,避免了阻塞主線程導致的定時精度出錯 threading.Thread(target=self._clear_idle_conn).start() def _clear_idle_conn(self) -> None: if not self._lock.acquire(timeout=self.idle_timeout): # 因為是每隔self.idle_timeout秒檢查一次 # 如果過了self.idle_timeout秒還沒申請到鎖,下一次都開始了,本次也就不用繼續了 return current_time = time.time() if self.is_pool_empty(): pass elif current_time - self._pool[-1].last_time >= self.idle_timeout: # 這里處理下面的二分法沒法處理的邊界情況,即所有連接都閑置超時的情況 self.conn_num -= len(self._pool) self._pool.clear() else: # 通過二分法找出從左往右第一個不超時的連接的指針 left, right = 0, len(self._pool) - 1 while left < right:mid = (left + right) // 2if current_time - self._pool[mid].last_time >= self.idle_timeout: left = mid + 1else: right = mid self._pool = self._pool[left:] self.conn_num -= left self._lock.release() def start_clear_conn(self) -> None: if self.idle_timeout is None: # 如果空閑連接的超時時間為無限,那么就不應該清理連接 return self.clear_idle_conn() self._clearer = threading.Timer(self.idle_timeout, self.start_clear_conn) self._clearer.start() def stop_clear_conn(self) -> None: if self._clearer is not None: self._clearer.cancel() def __enter__(self) -> ’HTTPConnectionPool’: return self def __exit__(self, *exit_info: Any) -> None: self.close()class EmptyPoolError(Exception): passclass ConnectionPoolClosed(Exception): pass

首先, 這個連接池的核心就是對連接進行管理, 而這包含取出連接和釋放連接兩個過程. 因此這東西的本質就是一個生產者消費者模型, 取出線程時是消費者, 放入線程時是生產者, 使用threading自帶的Condition對象就能完美解決線程安全問題, 使二者協同合作.

解決獲取連接和釋放連接這個問題之后, 其實這個連接池就已經能用了. 但是如果涉及到更多細節方面的東西, 比如判斷連接是否可用, 自動釋放連接, 清理閑置連接等等, 就需要對這個連接進行封裝, 為它添加更多的屬性和方法, 這就引入了WrapperHTTPConnection這個類. 實現它的__enter___和__exit__方法之后, 就能使用上下文管理器來自動釋放連接. 至于清理閑置連接, 通過last_time屬性記錄每個連接的最后釋放時間, 然后在連接池中添加一個定時任務就行了.

以上就是如何用python實現一個HTTP連接池的詳細內容,更多關于python 實現一個HTTP連接池的資料請關注好吧啦網其它相關文章!

標簽: Python 編程
相關文章:
日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区
午夜电影一区| 国产一区白浆| 国产精品v一区二区三区| 亚洲精品麻豆| 日韩精品亚洲aⅴ在线影院| 日韩不卡免费视频| 久久99视频| 91av亚洲| 在线亚洲激情| 国产欧美一区二区色老头| 精品中文字幕一区二区三区| 91亚洲自偷观看高清| 一区二区三区四区在线看| 国产亚洲永久域名| 久久国产日韩欧美精品| 成人在线免费观看网站| 亚洲免费激情| 久久国内精品| 黄色网一区二区| 国产精品99免费看| 色综合视频一区二区三区日韩| 国产极品嫩模在线观看91精品| 深夜视频一区二区| 亚洲精品少妇| 国际精品欧美精品| 国产精品美女久久久浪潮软件| 日韩和欧美一区二区| 国产一区二区三区不卡av | 国产精品日本一区二区不卡视频 | 日韩国产专区| 综合视频一区| 国产在视频一区二区三区吞精| 91成人精品| 国产女人18毛片水真多18精品| 国产精品v一区二区三区| 久久精品播放| 久久狠狠亚洲综合| 亚洲少妇自拍| jizzjizz中国精品麻豆| 亚洲精品精选| 在线一区视频观看| 国产视频一区二| re久久精品视频| 国产精品成人国产| 尹人成人综合网| 国产成人免费精品| 日韩美女国产精品| 亚洲国产专区校园欧美| 国产精品最新| 日韩午夜黄色| 欧美aⅴ一区二区三区视频| 亚洲少妇在线| 色爱av综合网| 另类综合日韩欧美亚洲| 亚洲一区国产一区| 理论片午夜视频在线观看| 日本a级不卡| 亚洲欧美综合| 精品视频在线你懂得| 日韩欧美中文字幕一区二区三区 | 在线国产精品一区| 久久国产直播| 精品一区二区三区免费看| 亚洲精品视频一二三区| 日韩视频中文| 久久精品国产99久久| 成人午夜毛片| 精品黄色一级片| 视频一区日韩精品| 视频一区中文字幕国产| 秋霞影视一区二区三区| 国产66精品| 久久99偷拍| 国产精品久久久久久久免费软件| 免费在线观看一区二区三区| 亚洲四虎影院| 日韩三区免费| 日韩在线看片| 久久黄色影院| 日本韩国欧美超级黄在线观看| 精品国产乱码久久久久久樱花| 91嫩草精品| 日本久久二区| 蜜臀91精品一区二区三区| 免费日韩av片| 爽好久久久欧美精品| 国产亚洲永久域名| 国产亚洲在线| 老鸭窝毛片一区二区三区| 国产精品毛片在线| 欧美一级专区| 深夜福利一区| 青青国产91久久久久久| 国产美女久久| 免费一级欧美在线观看视频| 国产精选久久| 久久一区精品| 亚洲伊人av| 天堂中文在线播放| 色爱av综合网| 午夜国产精品视频免费体验区| 激情婷婷久久| 欧美专区18| 日韩精品视频网站| 国产精品国码视频| 老牛影视精品| 欧美在线亚洲综合一区| 噜噜噜久久亚洲精品国产品小说| 免费观看在线综合色| 欧美永久精品| 国产一区二区三区国产精品| 99久久九九| 丝袜脚交一区二区| 日本不卡在线视频| 久久一区国产| 在线日韩视频| 婷婷精品在线| 国产在视频一区二区三区吞精| 久久青草久久| 中文不卡在线| 久久精品资源| 亚洲二区免费| 天堂av一区| 免费视频一区二区三区在线观看| 人在线成免费视频| 黄色成人精品网站| 97久久精品| 亚洲精品一区三区三区在线观看| 国精品一区二区三区| 蜜臀av在线播放一区二区三区| 国产精品流白浆在线观看| 日韩在线二区| 中文字幕一区二区三区四区久久| 国产精品天天看天天狠| 亚洲黄色免费看| 日本欧美在线看| 精品一区二区三区亚洲| 亚洲激情社区| 国产精品一区二区99| 亚洲成av人片一区二区密柚| 石原莉奈在线亚洲二区| 国产精品成人国产| 午夜欧美视频| 久久国产精品色av免费看| 久久天堂成人| 国产精品视频一区二区三区| 婷婷中文字幕一区| 老司机精品视频网| 免费在线观看不卡| 日韩黄色大片| 国产精品羞羞答答在线观看| 久久在线电影| 国产精品videossex久久发布| 精品一区三区| 国产精品66| 亚洲一区二区小说| 久久天堂成人| 精品一区二区三区免费看| 欧美福利专区| 国产成人精品福利| 日韩福利视频导航| 在线一区电影| 久久影院午夜精品| 欧美日韩一视频区二区| 精品一区三区| 国产精品久久久久久久免费观看 | 国产精品丝袜在线播放| 羞羞答答国产精品www一本| 久久精品午夜| 亚洲伊人精品酒店| 五月天久久久| 不卡一二三区| 麻豆国产91在线播放| 亚洲精品麻豆| 精品91久久久久| 亚洲女同av| 精品美女在线视频| 国产美女久久| 日本国产亚洲| 在线日韩成人| 亚洲免费在线| 亚洲黑丝一区二区| 免费福利视频一区二区三区| 国产精品亚洲成在人线| 亚洲欧美日韩专区| 久久男女视频| 日韩在线综合| 天堂中文av在线资源库| 粉嫩av一区二区三区四区五区 | 久久午夜精品| 国产一区亚洲| 鲁鲁在线中文| 国产精品伦理久久久久久| 欧美精品不卡| 麻豆极品一区二区三区| 欧美精品三级在线| 日韩高清不卡一区| 青草久久视频| 国产亚洲欧美日韩精品一区二区三区 | 欧美日韩国产一区二区三区不卡|