Rumah >pembangunan bahagian belakang >Tutorial Python >Bagaimanakah Python menyiasat pengekstrakan data lengkap dari perpustakaan panggilan?
Untuk mengira proses pelaksanaan, anda perlu mengetahui kedudukan mula dan akhir proses pelaksanaan, jadi kaedah paling mudah dan paling kasar adalah berdasarkan keperluan Kaedah panggilan dikapsulkan, dan lapisan perantaraan dilaksanakan antara rangka kerja yang memanggil perpustakaan MySQL dan perpustakaan MySQL, dan statistik yang memakan masa dilengkapkan dalam lapisan perantaraan, seperti:
# 伪代码 def my_execute(conn, sql, param): # 针对MySql库的统计封装组件 with MyTracer(conn, sql, param): # 以下为正常使用MySql库的代码 with conn.cursor as cursor: cursor.execute(sql, param) ...
Nampaknya sangat bagus untuk dilaksanakan, Dan perubahan itu sangat mudah, tetapi kerana ia diubah suai pada API peringkat atas, ia sebenarnya sangat tidak fleksibel Pada masa yang sama, beberapa pra-operasi dilakukan dalam kursor .execute, seperti splicing sql dan param, dan memanggil nextset untuk mengosongkan kursor semasa dan banyak lagi. Data yang akhirnya kami perolehi, seperti masa dan penggunaan, juga tidak tepat dan tiada cara untuk mendapatkan beberapa metadata terperinci, seperti kod ralat, dsb.
Jika anda ingin mendapatkan yang paling langsung dan berguna data, hanya Anda boleh menukar kod sumber dan kemudian memanggil kod sumber, tetapi jika setiap perpustakaan perlu menukar kod sumber untuk membuat statistik, ia akan menjadi terlalu menyusahkan, Python juga menyediakan beberapa antara muka yang serupa dengan probe, yang boleh digunakan untuk mengira statistik. Gantikan kod sumber perpustakaan untuk melengkapkan kod kami.
Dalam Python, fungsi cangkuk import boleh dilaksanakan melalui sys.meta_path operasi berkaitan import, Perpustakaan berkaitan import akan diubah berdasarkan objek yang ditakrifkan oleh sys.meta_path Objek dalam sys.meta_path perlu melaksanakan kaedah find_module ini mengembalikan Tiada atau objek yang melaksanakan kaedah load_module. Kita boleh menggunakan objek ini untuk menyasarkan beberapa perpustakaan Apabila mengimport, menggantikan kaedah yang berkaitan Penggunaan mudah adalah seperti berikut
import importlib import sys from functools import wraps def func_wrapper(func): """这里通过一个装饰器来达到狸猫换太子和获取数据的效果""" @wraps(func) def wrapper(*args, **kwargs): # 记录开始时间 start = time.time() result = func(*args, **kwargs) # 统计消耗时间 end = time.time() print(f"speed time:{end - start}") return result return wrapper class MetaPathFinder: def find_module(self, fullname, path=None): # 执行时可以看出来在import哪些模块 print(f'find module:{path}:{fullname}') return MetaPathLoader() class MetaPathLoader: def load_module(self, fullname): # import的模块都会存放在sys.modules里面, 通过判断可以减少重复import if fullname in sys.modules: return sys.modules[fullname] # 防止递归调用 finder = sys.meta_path.pop(0) # 导入 module module = importlib.import_module(fullname) if fullname == 'time': # 替换函数 module.sleep = func_wrapper(module.sleep) sys.meta_path.insert(0, finder) return module sys.meta_path.insert(0, MetaPathFinder()) if __name__ == '__main__': import time time.sleep(1) # 输出示例: # find module:datetime # find module:time # load module:time # find module:math # find module:_datetime # speed time:1.00073385238647468
Melihat kod sumber cursor.fetchone (kursor .fetchall adalah serupa), kami mendapati bahawa data sebenarnya diperoleh daripada cache,
Data ini telah diperolehi semasa pelaksanaan cursor.execute:async def execute(self, query, args=None): """Executes the given operation Executes the given operation substituting any markers with the given parameters. For example, getting all rows where id is 5: cursor.execute("SELECT * FROM t1 WHERE id = %s", (5,)) :param query: ``str`` sql statement :param args: ``tuple`` or ``list`` of arguments for sql query :returns: ``int``, number of rows that has been produced of affected """ conn = self._get_db() while (await self.nextset()): pass if args is not None: query = query % self._escape_args(args, conn) await self._query(query) self._executed = query if self._echo: logger.info(query) logger.info("%r", args) return self._rowcount
Berdasarkan analisis di atas, kita hanya perlu membebankan kaedah teras self._query untuk mendapatkan data yang kita inginkan, daripada Kita boleh tahu daripada kod sumber bahawa kita boleh mendapatkan parameter diri dan sql yang dihantar ke diri._query, dan kita boleh dapatkan hasil pertanyaan berdasarkan diri Pada masa yang sama, kita boleh mendapatkan masa berjalan melalui penghias, dan semua data yang diperlukan pada asasnya tersedia,
Kod diubah suai mengikut ideanya adalah seperti berikut:def fetchone(self): """Fetch the next row """ self._check_executed() fut = self._loop.create_future() if self._rows is None or self._rownumber >= len(self._rows): fut.set_result(None) return fut result = self._rows[self._rownumber] self._rownumber += 1 fut = self._loop.create_future() fut.set_result(result) return fut
Contoh ini kelihatan sangat bagus, tetapi logiknya perlu dipanggil secara jelas di pintu masuk panggilan Biasanya Projek mungkin mempunyai beberapa entri logik ini akan menjadi sangat menyusahkan, dan logik cangkuk kami mesti dipanggil terlebih dahulu sebelum mengimport Dengan cara ini, spesifikasi pengenalan mesti ditetapkan, jika tidak cangkuk mungkin tidak berjaya di beberapa tempat Jika logik memperkenalkan cangkuk boleh diatur untuk dilaksanakan serta-merta selepas parser dimulakan, masalah ini boleh diselesaikan dengan sempurna Selepas menyemak maklumat, saya mendapati bahawa apabila penterjemah python dimulakan, ia secara automatik akan mengimport modul sitecustomize dan usercustomize yang wujud di bawah PYTHONPATH , kami hanya perlukan untuk mencipta modul dan menulis fungsi gantian kami dalam modul. import importlib
import time
import sys
from functools import wraps
from typing import cast, Any, Callable, Optional, Tuple, TYPE_CHECKING
from types import ModuleType
if TYPE_CHECKING:
import aiomysql
def func_wrapper(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs) -> Any:
start: float = time.time()
func_result: Any = await func(*args, **kwargs)
end: float = time.time()
# 根据_query可以知道, 第一格参数是self, 第二个参数是sql
self: aiomysql.Cursor = args[0]
sql: str = args[1]
# 通过self,我们可以拿到其他的数据
db: str = self._connection.db
user: str = self._connection.user
host: str = self._connection.host
port: str = self._connection.port
execute_result: Tuple[Tuple] = self._rows
# 可以根据自己定义的agent把数据发送到指定的平台, 然后我们就可以在平台上看到对应的数据或进行监控了,
# 这里只是打印一部分数据出来
print({
"sql": sql,
"db": db,
"user": user,
"host": host,
"port": port,
"result": execute_result,
"speed time": end - start
})
return func_result
return cast(Callable, wrapper)
class MetaPathFinder:
@staticmethod
def find_module(fullname: str, path: Optional[str] = None) -> Optional["MetaPathLoader"]:
if fullname == 'aiomysql':
# 只有aiomysql才进行hook
return MetaPathLoader()
else:
return None
class MetaPathLoader:
@staticmethod
def load_module(fullname: str):
if fullname in sys.modules:
return sys.modules[fullname]
# 防止递归调用
finder: "MetaPathFinder" = sys.meta_path.pop(0)
# 导入 module
module: ModuleType = importlib.import_module(fullname)
# 针对_query进行hook
module.Cursor._query = func_wrapper(module.Cursor._query)
sys.meta_path.insert(0, finder)
return module
async def test_mysql() -> None:
import aiomysql
pool: aiomysql.Pool = await aiomysql.create_pool(
host='127.0.0.1', port=3306, user='root', password='123123', db='mysql'
)
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT 42;")
(r,) = await cur.fetchone()
assert r == 42
pool.close()
await pool.wait_closed()
if __name__ == '__main__':
sys.meta_path.insert(0, MetaPathFinder())
import asyncio
asyncio.run(test_mysql())
# 输出示例:
# 可以看出sql语句与我们输入的一样, db, user, host, port等参数也是, 还能知道执行的结果和运行时间
# {'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.00045609474182128906}
hook_aiomysql.py ialah kod kami untuk membuat probe sebagai contoh, dan kod yang disimpan dalam sitecustomize.py adalah seperti berikut, cuma perkenalkan kod probe kami dan masukkan ke dalam sys.meta_path:
. ├── __init__.py ├── hook_aiomysql.py ├── sitecustomize.py └── test_auto_hook.pytest_auto_hook.py ialah kod ujian:
import sys from hook_aiomysql import MetaPathFinder sys.meta_path.insert(0, MetaPathFinder())
Seterusnya, cuma tetapkan PYTHONPATH dan jalankan kod kami (jika ia projek, ia biasanya dimulakan oleh penyelia, maka anda boleh Tetapkan PYTHONPATH dalam fail konfigurasi): import asyncio
from hook_aiomysql import test_mysql
asyncio.run(test_mysql())
4 Kaedah penggantian langsung
Anda dapat melihat bahawa kaedah di atas berjalan dengan baik, dan boleh disematkan dengan mudah ke dalam kami. project , tetapi ia bergantung pada fail sitecustomize.py dan sukar untuk mengekstraknya ke pustaka pihak ketiga Jika anda ingin mengekstraknya ke pustaka pihak ketiga, anda perlu mempertimbangkan sama ada terdapat kaedah lain. Apabila memperkenalkan MetaPathLoader di atas, saya menyebut sys.module, di mana sys.modules digunakan untuk mengurangkan pengenalan berulang:
(.venv) ➜ python_hook git:(master) ✗ export PYTHONPATH=. (.venv) ➜ python_hook git:(master) ✗ python test_auto_hook.py {'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.000213623046875}
import time from functools import wraps from typing import Any, Callable, Tuple, cast import aiomysql def func_wrapper(func: Callable): """和上面一样的封装函数, 这里简单略过""" # 判断是否hook过 _IS_HOOK: bool = False # 存放原来的_query _query: Callable = aiomysql.Cursor._query # hook函数 def install_hook() -> None: _IS_HOOK = False if _IS_HOOK: return aiomysql.Cursor._query = func_wrapper(aiomysql.Cursor._query) _IS_HOOK = True # 还原到原来的函数方法 def reset_hook() -> None: aiomysql.Cursor._query = _query _IS_HOOK = False
代码简单明了,接下来跑一跑刚才的测试:
import asyncio import aiomysql from demo import install_hook, reset_hook async def test_mysql() -> None: pool: aiomysql.Pool = await aiomysql.create_pool( host='127.0.0.1', port=3306, user='root', password='', db='mysql' ) async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute("SELECT 42;") (r,) = await cur.fetchone() assert r == 42 pool.close() await pool.wait_closed() print("install hook") install_hook() asyncio.run(test_mysql()) print("reset hook") reset_hook() asyncio.run(test_mysql()) print("end")
通过测试输出可以发现我们的逻辑的正确的, install hook后能出现我们提取的元信息, 而reset后则不会打印原信息
install hook {'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.000347137451171875} reset hook end
Atas ialah kandungan terperinci Bagaimanakah Python menyiasat pengekstrakan data lengkap dari perpustakaan panggilan?. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!