OpenSearch Python Client : Scroll API는 전체 데이터 검색을 구현합니다
引言:OpenSearch查询的限制与Scroll API
OpenSearch(或Elasticsearch)的search API默认情况下为了性能考虑,限制了单次查询返回的最大结果数,通常为10,000条。这意味着当您的查询匹配的数据量远超此限制时,简单的search请求无法获取所有数据。为了解决这一问题,OpenSearch提供了Scroll API。
Scroll API允许您创建一个“快照”式搜索上下文,并分批次地检索大量查询结果,而无需担心深度分页带来的性能问题。它非常适合需要全量数据导出的场景,例如数据分析、报告生成或数据迁移。
理解Scroll API的工作原理
Scroll API的工作流程分为两步:
- 初始化滚动查询: 第一次调用client.search()方法时,除了提供查询体外,还需要指定scroll参数(例如scroll='1m'表示滚动上下文保持1分钟)。OpenSearch会返回第一批结果和一个_scroll_id。
- 迭代获取后续结果: 随后的请求不再是search,而是调用client.scroll()方法,并传入上一次获得的_scroll_id和新的scroll时间。OpenSearch会返回下一批结果,并可能更新_scroll_id。这个过程会持续进行,直到不再有匹配的结果返回。
使用opensearch-py实现全量数据检索
以下是使用opensearch-py库通过Scroll API获取OpenSearch全量查询结果的详细步骤和示例代码。
1. 环境准备与客户端初始化
首先,确保您已安装opensearch-py库。如果未安装,请使用pip进行安装:
pip install opensearch-py
接下来,初始化OpenSearch客户端。根据您的OpenSearch集群配置,可能需要使用不同的认证方式(例如AWS签名认证或基本认证)。
import csv from opensearchpy import OpenSearch, RequestsHttpConnection # 如果使用AWS认证,需要安装requests-aws4auth并导入 from requests_aws4auth import AWS4Auth # OpenSearch集群配置 (请替换为您的实际值) host = 'your-opensearch-host.amazonaws.com' # 例如: 'your-domain.us-east-1.es.amazonaws.com' port = 443 # 默认端口 use_ssl = True verify_certs = True timeout = 300 # 请求超时时间 pool_maxsize = 20 # 连接池大小 # 认证方式示例 # 1. AWS签名认证 (适用于AWS OpenSearch Service) region = 'your-aws-region' # 例如: 'us-east-1' service = 'es' # 请替换为您的AWS凭证获取方式,例如从环境变量、IAM角色或配置文件 # from botocore.session import get_session # session = get_session() # credentials = session.get_credentials() # auth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) # 2. 基本认证 (适用于用户名/密码认证) auth = ('your_username', 'your_password') # 初始化OpenSearch客户端 client = OpenSearch( hosts=[{"host": host, "port": port}], http_auth=auth, use_ssl=use_ssl, timeout=timeout, verify_certs=verify_certs, connection_class=RequestsHttpConnection, pool_maxsize=pool_maxsize, )
2. 构建查询体
定义您的查询条件。为了提高性能,建议使用fields参数只返回您需要的字段,并设置_source: False以避免返回整个_source文档。
query_body = { "size": 10000, # 每次滚动请求返回的最大文档数,通常设为10000 "timeout": "300s", # 查询超时时间 "query": { "bool": { "must": [ {"match": {"type": "req"}}, # 匹配 'type' 字段为 'req' 的文档 {"range": {"@timestamp": {"gte": "now-7d/d", "lte": "now/d"}}}, # 查询过去7天的数据 {"wildcard": {"req_h_user_agent": {"value": "*googlebot*"}}}, # 用户代理包含 'googlebot' ] } }, "fields": [ # 指定需要返回的字段 "@timestamp", "resp_status", "resp_bytes", "req_h_referer", "req_h_user_agent", "req_h_host", "req_uri", "total_response_time", ], "_source": False, # 不返回完整的 _source 文档,只返回指定 fields }
3. 执行滚动查询并处理结果
以下代码展示了如何发起初始滚动请求,然后循环迭代获取所有匹配的文档,并将它们写入CSV文件。
csv_file_path = "opensearch_report.csv" # 输出CSV文件名 try: # 1. 发起初始滚动查询 # 'scroll' 参数指定了滚动上下文的有效期,例如 '1m' 表示1分钟 print("正在发起初始滚动查询...") response = client.search( index="fastly-*", # 您的索引模式 body=query_body, scroll='1m', # 保持滚动上下文1分钟 ) scroll_id = response.get("_scroll_id") if not scroll_id: print("没有找到匹配的结果或无法获取滚动ID。") exit() total_hits = response["hits"]["total"]["value"] print(f"查询到总计 {total_hits} 条匹配结果。") processed_hits_count = 0 with open(csv_file_path, "w", newline='', encoding='utf-8') as f: writer = csv.writer(f) # 定义CSV文件头,与query_body中的 'fields' 对应 writer.writerow([ "timestamp", "url", "response_code", "bytes", "response_time", "referer", "user_agent" ]) # 处理第一批结果 hits = response["hits"]["hits"] for hit in hits: fields = hit.get("fields", {}) # 注意:从 'fields' 获取的值通常是列表,需要取第一个元素 [0] writer.writerow([ fields.get("@timestamp", [""])[0], fields.get("req_h_host", [""])[0] + fields.get("req_uri", [""])[0], fields.get("resp_status", [""])[0], fields.get("resp_bytes", [""])[0], fields.get("total_response_time", [""])[0], fields.get("req_h_referer", [""])[0], fields.get("req_h_user_agent", [""])[0], ]) processed_hits_count += 1 print(f"已处理 {processed_hits_count} 条结果。") # 2. 循环迭代获取后续结果 while hits: # 当 hits 列表不为空时继续循环 print(f"正在获取下一批结果 (已处理: {processed_hits_count}/{total_hits})...") response = client.scroll( scroll_id=scroll_id, scroll='1m', # 每次滚动请求都刷新滚动上下文的有效期 ) # 更新滚动ID,以防它发生变化 (虽然通常不会变) scroll_id = response.get("_scroll_id") hits = response["hits"]["hits"] if not hits: # 如果没有更多结果,跳出循环 break for hit in hits: fields = hit.get("fields", {}) writer.writerow([ fields.get("@timestamp", [""])[0], fields.get("req_h_host", [""])[0] + fields.get("req_uri", [""])[0], fields.get("resp_status", [""])[0], fields.get("resp_bytes", [""])[0], fields.get("total_response_time", [""])[0], fields.get("req_h_referer", [""])[0], fields.get("req_h_user_agent", [""])[0], ]) processed_hits_count += 1 print(f"已处理 {processed_hits_count} 条结果。") except Exception as e: print(f"在获取数据过程中发生错误: {e}") finally: # 3. 清除滚动上下文 (可选但推荐) # 即使不手动清除,滚动上下文也会在有效期过后自动失效 if scroll_id: try: client.clear_scroll(scroll_id=scroll_id) print(f"滚动上下文 {scroll_id} 已清除。") except Exception as e: print(f"清除滚动上下文失败: {e}") print(f"所有匹配结果已检索完毕并保存到 {csv_file_path}。")
重要注意事项
- 滚动上下文的生命周期: scroll参数(例如'1m')定义了滚动上下文在OpenSearch服务器上保持活动的时长。每次调用client.scroll()时,这个计时器都会被重置。确保您的处理速度足够快,以免滚动上下文过期。
- 资源消耗: 滚动查询会占用OpenSearch集群的内存和CPU资源,因为它需要维护搜索上下文。在处理超大规模数据时,应谨慎使用,并考虑集群的负载能力。
- 清除滚动上下文: 虽然滚动上下文会在过期后自动清除,但为了及时释放资源,在完成数据检索后,强烈建议显式调用client.clear_scroll(scroll_id=scroll_id)来手动清除它。在上述代码的finally块中已经包含了这一操作。
- 数据一致性: 滚动查询提供的是一个“时间点”快照。这意味着在滚动查询开始后,即使索引中的数据发生变化,滚动查询返回的结果集也不会受影响,它会返回查询开始那一刻的数据状态。
- 替代方案:search_after: 对于需要实时或近实时数据,且每次查询只需要获取下一页数据而不是全部数据的场景,search_after API可能是更好的选择。它不维护搜索上下文,因此对资源消耗更小,但需要您手动管理上次查询的最后一个文档的排序值。然而,对于获取全量数据,Scroll API通常更简单直接。
- 错误处理: 在实际应用中,务必添加健壮的错误处理机制,例如网络中断、OpenSearch集群故障等。
总结
通过opensearch-py库结合OpenSearch的Scroll API,您可以轻松地克服10,000条结果的限制,高效地检索和处理大规模数据集。理解其工作原理并正确实现迭代逻辑是关键。始终记住在完成操作后清除滚动上下文,以优化集群资源利用。
위 내용은 OpenSearch Python Client : Scroll API는 전체 데이터 검색을 구현합니다의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

핫 AI 도구

Undress AI Tool
무료로 이미지를 벗다

Undresser.AI Undress
사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover
사진에서 옷을 제거하는 온라인 AI 도구입니다.

Clothoff.io
AI 옷 제거제

Video Face Swap
완전히 무료인 AI 얼굴 교환 도구를 사용하여 모든 비디오의 얼굴을 쉽게 바꾸세요!

인기 기사

뜨거운 도구

메모장++7.3.1
사용하기 쉬운 무료 코드 편집기

SublimeText3 중국어 버전
중국어 버전, 사용하기 매우 쉽습니다.

스튜디오 13.0.1 보내기
강력한 PHP 통합 개발 환경

드림위버 CS6
시각적 웹 개발 도구

SublimeText3 Mac 버전
신 수준의 코드 편집 소프트웨어(SublimeText3)

이 기사는 여러 상위 Python "완성 된"프로젝트 웹 사이트 및 고급 "블록버스터"학습 리소스 포털을 선택했습니다. 개발 영감, 마스터 레벨 소스 코드 관찰 및 학습 또는 실제 기능을 체계적으로 개선하든, 이러한 플랫폼은 놓치지 않아야하며 파이썬 마스터로 빠르게 성장할 수 있도록 도울 수 있습니다.

Subprocess.run ()을 사용하여 쉘 명령을 안전하게 실행하고 출력을 캡처하십시오. 주입 위험을 피하기 위해 목록에 매개 변수를 전달하는 것이 좋습니다. 2. 쉘 특성이 필요한 경우, shell = true를 설정할 수 있지만 명령 주입을 조심하십시오. 3. 하위 프로세스를 사용하여 실시간 출력 처리를 실현하십시오. 4. SET Check = 명령이 실패 할 때 예외를 던지기 위해 true; 5. 간단한 시나리오에서 체인을 직접 호출하여 출력을 얻을 수 있습니다. OS.System () 또는 더 이상 사용되지 않은 모듈을 사용하지 않으려면 일상 생활에서 Subprocess.run ()에 우선 순위를 부여해야합니다. 위의 방법은 파이썬에서 쉘 명령을 실행하는 핵심 사용을 무시합니다.

QUML (Quantum Machine Learning)을 시작하려면 선호되는 도구는 Python이며 Pennylane, Qiskit, Tensorflowquantum 또는 Pytorchquantum과 같은 라이브러리를 설치해야합니다. 그런 다음 Pennylane을 사용하여 양자 신경망을 구축하는 것과 같은 예제를 실행하여 프로세스에 익숙해 지십시오. 그런 다음 데이터 세트 준비, 데이터 인코딩, 구축 파라 메트릭 양자 회로 구축, 클래식 옵티마이 저 트레이닝 등의 단계에 따라 모델을 구현하십시오. 실제 전투에서는 처음부터 복잡한 모델을 추구하지 않고 하드웨어 제한에주의를 기울이고, 하이브리드 모델 구조를 채택하며, 최신 문서와 공식 문서를 지속적으로 언급하여 개발에 대한 후속 조치를 취해야합니다.

Python을 사용하여 WebApi를 호출하여 데이터를 얻는 것의 핵심은 기본 프로세스와 공통 도구를 마스터하는 것입니다. 1. 요청을 사용하여 HTTP 요청을 시작하는 것이 가장 직접적인 방법입니다. Get 메소드를 사용하여 응답을 얻고 JSON ()을 사용하여 데이터를 구문 분석하십시오. 2. 인증이 필요한 API의 경우 헤더를 통해 토큰 또는 키를 추가 할 수 있습니다. 3. 응답 상태 코드를 확인해야합니다. 예외를 자동으로 처리하려면 response.raise_for_status ()를 사용하는 것이 좋습니다. 4. 페이징 인터페이스에 직면하여 다른 페이지를 차례로 요청하고 주파수 제한을 피하기 위해 지연을 추가 할 수 있습니다. 5. 반환 된 JSON 데이터를 처리 할 때 구조에 따라 정보를 추출해야하며 복잡한 데이터를 데이터로 변환 할 수 있습니다.

Seaborn 's Loctplot을 사용하여 두 변수 간의 관계와 분포를 신속하게 시각화합니다. 2. 기본 산점도는 sns.jointPlot (data = tips, x = "total_bill", y = "tip", 종류 = "scatter")에 의해 구현됩니다. 중심은 산점도이며 히스토그램은 상단과 하단에 표시됩니다. 3. 회귀선과 밀도 정보를 친절한 = "reg"에 추가하고 marginal_kws를 결합하여 에지 플롯 스타일을 설정합니다. 4. 데이터 볼륨이 클 경우 "Hex"를 사용하는 것이 좋습니다.

Python에서는 join () 메소드를 사용하여 문자열을 병합 할 때 다음 점에 기록되어야합니다. 2. 목록의 요소가 모두 문자열인지 확인하고 스트링이 아닌 유형을 포함하는 경우 먼저 변환해야합니다. 3. 중첩 목록을 처리 할 때 연결하기 전에 구조를 평평하게해야합니다.

문자열 목록은 ".join (Words)과 같은 join () 메소드와 병합 될 수 있습니다. 2. 숫자 목록은 결합하기 전에 MAP (str, 숫자) 또는 [str (x) forxinnumbers]가있는 문자열로 변환해야합니다. 3. 모든 유형 목록은 디버깅에 적합한 괄호와 따옴표가있는 문자열로 직접 변환 할 수 있습니다. 4. '|'.join (f "[{item}]"furiteminitems) 출력과 같은 join ()과 결합 된 생성기 표현식으로 사용자 정의 형식을 구현할 수 있습니다.

Python Web Crawlers를 마스터하려면 세 가지 핵심 단계를 파악해야합니다. 1. 요청을 사용하여 요청을 시작하고 GET 메소드를 통해 웹 페이지 컨텐츠를 얻고, 헤더 설정에주의를 기울이고, 예외를 처리하고, robots.txt를 준수합니다. 2. BeautifulSoup 또는 XPath를 사용하여 데이터 추출. 전자는 간단한 구문 분석에 적합하지만 후자는 더 유연하고 복잡한 구조에 적합합니다. 3. 셀레늄을 사용하여 동적 로딩 컨텐츠에 대한 브라우저 작업을 시뮬레이션하십시오. 속도는 느리지 만 복잡한 페이지에 대처할 수 있습니다. 또한 효율성을 향상시키기 위해 웹 사이트 API 인터페이스를 찾을 수도 있습니다.
