목차
정의
트리거 소스 코드
TriggerResult 소스 코드
Flink의 사전 설정 Trigger
EventTimeTrigger 소스 코드
ProcessingTimeTrigger 소스 코드
공용 창에 대한 트리거
스크롤 창
슬라이딩 창
세션 창
전역 창
Java java지도 시간 Java Flink 창 트리거 사용 방법

Java Flink 창 트리거 사용 방법

May 03, 2023 pm 01:10 PM
java trigger flink

정의

Trigger는 창(창 할당자에 의해 형성됨)이 창 기능에 의해 처리될 준비가 되는 시기를 결정합니다. 각 WindowAssigner에는 기본값인 Trigger가 함께 제공됩니다. 기본 트리거가 요구 사항을 충족하지 않는 경우 Trigger(…)를 사용할 수 있습니다.

트리거 소스 코드

public abstract class Trigger<T, W extends Window> implements Serializable {
	/**
	 只要有元素落⼊到当前窗⼝, 就会调⽤该⽅法
	 * @param element 收到的元素
	 * @param timestamp 元素抵达时间.
	 * @param window 元素所属的window窗口.
	 * @param ctx ⼀个上下⽂对象,通常⽤该对象注册 timer(ProcessingTime/EventTime) 回调.
	 */
    public abstract TriggerResult onElement(T var1, long var2, W var4, Trigger.TriggerContext var5) throws Exception;
	
	 /**
	 * processing-time 定时器回调函数
	 *
	 * @param time 定时器触发的时间.
	 * @param window 定时器触发的窗口对象.
	 * @param ctx ⼀个上下⽂对象,通常⽤该对象注册 timer(ProcessingTime/EventTime) 回调.
	 */
    public abstract TriggerResult onProcessingTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception;

	/**
	 * event-time 定时器回调函数
	 *
	 * @param time 定时器触发的时间.
	 * @param window 定时器触发的窗口对象.
	 * @param ctx ⼀个上下⽂对象,通常⽤该对象注册 timer(ProcessingTime/EventTime) 回调.
	 */
    public abstract TriggerResult onEventTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception;

	 /**
	 * 当 多个窗口合并到⼀个窗⼝的时候,调用该方法法,例如系统SessionWindow
	 *
	 * @param window 合并后的新窗口对象
	 * @param ctx ⼀个上下⽂对象,通常用该对象注册 timer(ProcessingTime/EventTime)回调以及访问状态
	 */
    public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception {
        throw new UnsupportedOperationException("This trigger does not support merging.");
    }
	
	/**
	 * 当窗口被删除后执⾏所需的任何操作。例如:可以清除定时器或者删除状态数据
	 */
    public abstract void clear(W var1, Trigger.TriggerContext var2) throws Exception;
    }

TriggerResult 소스 코드

public enum TriggerResult {
	// 表示对窗口不执行任何操作。即不触发窗口计算,也不删除元素。
    CONTINUE(false, false),
    // 触发窗口计算,输出结果,然后将窗口中的数据和窗口进行清除。
    FIRE_AND_PURGE(true, true),
    // 触发窗口计算,但是保留窗口元素
    FIRE(true, false),
    // 不触发窗口计算,丢弃窗口,并且删除窗口的元素。
    PURGE(false, true);

    private final boolean fire;
    private final boolean purge;

    private TriggerResult(boolean fire, boolean purge) {
        this.purge = purge;
        this.fire = fire;
    }

    public boolean isFire() {
        return this.fire;
    }

    public boolean isPurge() {
        return this.purge;
    }
}

트리거가 창이 처리할 준비가 되었다고 판단하면 실행되며 반환 상태는 FIRE 또는 FIRE_AND_PURGE일 수 있습니다. 그중 FIRE는 창 계산을 트리거하고 창 내용을 유지하는 반면, FIRE_AND_PURGE는 창 계산을 트리거하고 창 내용을 삭제합니다. 기본적으로 사전 구현된 트리거는 창 상태를 지우지 않고 단순히 FIRE됩니다.

Flink의 사전 설정 Trigger

  • EventTimeTrigger: EventTime과 Window Endtime을 비교하여 창 계산을 트리거할지 여부를 결정합니다. 그렇지 않으면 트리거되지 않습니다. 창은 계속 기다릴 것입니다.

  • ProcessTimeTrigger: ProcessTime과 창 EndTme를 비교하여 창을 트리거할지 여부를 결정합니다. ProcessTime이 EndTime보다 크면 계산이 트리거되고, 그렇지 않으면 창이 계속 대기합니다.

  • ContinuousEventTimeTrigger: 간격 주기적 트리거 창을 기준으로 계산되거나 창의 종료 시간이 현재 EndTime 트리거 창보다 작습니다.

  • ContinuousProcessingTimeTrigger: 간격 주기적 트리거 창을 기준으로 계산되거나 창의 종료 시간이 현재 ProcessTime 트리거 창보다 작습니다.

  • CountTrigger: 액세스된 데이터의 양이 설정된 임계값을 초과하는지 여부에 따라 창 계산을 트리거할지 여부를 결정합니다.

  • DeltaTrigger: 액세스 데이터에서 계산된 델타 표시기가 지정된 임계값을 초과하는지 여부에 따라 창 계산을 트리거할지 여부를 결정합니다.

  • PurgingTrigger: 모든 트리거를 매개변수로 Purge 유형 트리거로 변환할 수 있으며, 계산이 완료된 후 데이터가 정리됩니다.

  • NeverTrigger: 언제든지 창 계산을 트리거하지 않습니다.

Java Flink 창 트리거 사용 방법

주로 EventTimeTrigger 및 ProcessTimeTrigger의 소스 코드를 살펴보세요.

EventTimeTrigger 소스 코드

public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private EventTimeTrigger() {
    }

    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
        return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
    }

    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteEventTimeTimer(window.maxTimestamp());
    }

    public boolean canMerge() {
        return true;
    }

    public void onMerge(TimeWindow window, OnMergeContext ctx) {
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
            ctx.registerEventTimeTimer(windowMaxTimestamp);
        }

    }

    public String toString() {
        return "EventTimeTrigger()";
    }

    public static EventTimeTrigger create() {
        return new EventTimeTrigger();
    }
}

ProcessingTimeTrigger 소스 코드

public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private ProcessingTimeTrigger() {
    }

    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
        ctx.registerProcessingTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }

    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
        return TriggerResult.FIRE;
    }

    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteProcessingTimeTimer(window.maxTimestamp());
    }

    public boolean canMerge() {
        return true;
    }

    public void onMerge(TimeWindow window, OnMergeContext ctx) {
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
            ctx.registerProcessingTimeTimer(windowMaxTimestamp);
        }

    }

    public String toString() {
        return "ProcessingTimeTrigger()";
    }

    public static ProcessingTimeTrigger create() {
        return new ProcessingTimeTrigger();
    }
}

onElement() 메소드에서 ctx.registerProcessingTimer(window.maxTimestamp())는 ProcessTime 타이머를 등록하고 시간 매개변수는 window.maxTimestamp()입니다. 창의 마지막 시간, 시간이 이 창의 마지막 시간에 도달하면 타이머가 onProcessingTime() 메서드를 트리거하고 호출합니다. onProcessingTime() 메서드에서 TriggerResult.FIRE를 반환하면 창의 데이터 계산이 트리거됩니다. , 그러나 창 요소는 유지합니다.

ProcessingTimeTrigger 클래스는 창의 마지막 시간이 도달할 때만 창 함수 계산을 트리거합니다. 계산이 완료된 후에는 창의 데이터가 메모리에 저장되지 않습니다. PURGE 또는 FIRE_AND_PURGE가 호출되지 않는 한 데이터는 항상 메모리에 있습니다. 실제로 PurgingTrigger 클래스를 제외하고 Flink에서 제공하는 Trigger 클래스 중 어느 것도 창의 데이터를 지우지 않습니다.

공용 창에 대한 트리거

스크롤 창

TumblingEventTimeWindows :EventTimeTrigger
public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
            return EventTimeTrigger.create();
        }
}

TumblingProcessingTimeWindows: ProcessingTimeTrigger

public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return ProcessingTimeTrigger.create();
    }
}

슬라이딩 창

SlidingEventTimeWindows:EventTimeTrigger
public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return EventTimeTrigger.create();
    }
}

SlidingProcessingTimeWindows: ProcessingTimeTrigger

public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
            return ProcessingTimeTrigger.create();
        }
}

세션 창

EventTimeSessionWindows:EventTimeTrigger
public class EventTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return EventTimeTrigger.create();
    }
}

ProcessingTimeSessionWindows : ProcessTimeTrigger

public class ProcessingTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return ProcessingTimeTrigger.create();
    }
}

전역 창

GlobalWindows :NeverTrigger
public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
     public Trigger<Object, GlobalWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
            return new GlobalWindows.NeverTrigger();
        }
}

위 내용은 Java Flink 창 트리거 사용 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.

핫 AI 도구

Undress AI Tool

Undress AI Tool

무료로 이미지를 벗다

Undresser.AI Undress

Undresser.AI Undress

사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover

AI Clothes Remover

사진에서 옷을 제거하는 온라인 AI 도구입니다.

Clothoff.io

Clothoff.io

AI 옷 제거제

Video Face Swap

Video Face Swap

완전히 무료인 AI 얼굴 교환 도구를 사용하여 모든 비디오의 얼굴을 쉽게 바꾸세요!

뜨거운 도구

메모장++7.3.1

메모장++7.3.1

사용하기 쉬운 무료 코드 편집기

SublimeText3 중국어 버전

SublimeText3 중국어 버전

중국어 버전, 사용하기 매우 쉽습니다.

스튜디오 13.0.1 보내기

스튜디오 13.0.1 보내기

강력한 PHP 통합 개발 환경

드림위버 CS6

드림위버 CS6

시각적 웹 개발 도구

SublimeText3 Mac 버전

SublimeText3 Mac 버전

신 수준의 코드 편집 소프트웨어(SublimeText3)

뜨거운 주제

PHP 튜토리얼
1543
276
Excel은 작동하지 않음을 찾고 교체하십시오 Excel은 작동하지 않음을 찾고 교체하십시오 Aug 13, 2025 pm 04:49 PM

cheecksearchsettingslike "matchEntirecellContents"및 "matchcase"exexpandingoptionsinfindandreplace, "tocorrectscope 내에서"lookin "issettovaluesand"를 보장합니다

Java 응용 프로그램을 배포하는 방법 Java 응용 프로그램을 배포하는 방법 Aug 17, 2025 am 12:56 AM

repay yourApplicationBenorgradletobuildajarorwarfile, 외부화 공기

Java 응용 프로그램에서 로깅을 구성하는 방법은 무엇입니까? Java 응용 프로그램에서 로깅을 구성하는 방법은 무엇입니까? Aug 15, 2025 am 11:50 AM

로그백 또는 log4J2와 결합 된 SLF4J를 사용하는 것은 Java 응용 프로그램에서 로그를 구성하는 권장 방법입니다. 해당 Maven 의존성을 추가하여 API 및 구현 라이브러리를 소개합니다. 2. 코드에서 SLF4J의 LoggerFactory를 통해 로거를 가져오고 매개 변수화 된 로깅 방법을 사용하여 분리되고 효율적인 로그 코드를 작성하십시오. 3. 로그 출력 형식, 레벨, 대상 (콘솔, 파일) 및 패키지 레벨 로그 컨트롤을 logback.xml 또는 log4j2.xml 구성 파일을 정의합니다. 4. 선택적으로 구성 파일 스캔 기능을 활성화하여 로그 레벨의 동적 조정을 달성하고 SpringBoot도 액추에이터 엔드 포인트를 통해 관리 할 수도 있습니다. 5. 모범 사례를 포함하여

Java의 Castor와의 XML 데이터 바인딩 Java의 Castor와의 XML 데이터 바인딩 Aug 15, 2025 am 03:43 AM

castorenablesxml-to-javaobjectmappingViAdventionSorxclationSpollicitMappingFiles; 1) definejavaclasseswithgetters/setters; 2) useUnmarshallertoconvertxmltoobjects; 3) USEMARSHALLERTOSERIAZEOBJECTSBACKTOXML;

성능 비교 : Java vs. 백엔드 서비스로 이동합니다 성능 비교 : Java vs. 백엔드 서비스로 이동합니다 Aug 14, 2025 pm 03:32 PM

Proughoughputandlowerlatency, 특히 orfori/o-heavyservices, duetoitslightgeightgoroutinesandefficientscheduler, whilejava, hithlowstart, canmatchgoincpu-boundtasksafterjitoptimization.2.gousessme

JS 배열 시작에 요소를 추가합니다 JS 배열 시작에 요소를 추가합니다 Aug 14, 2025 am 11:51 AM

JavaScript에서 배열의 시작 부분에 요소를 추가하는 가장 일반적인 방법은 Unshift () 메소드를 사용하는 것입니다. 1. Unshift ()를 사용하여 원래 배열을 직접 수정하면 하나 이상의 요소를 추가하여 추가 된 배열의 새 길이를 반환 할 수 있습니다. 2. 원래 배열을 수정하지 않으려면 확장 연산자 (예 : [Newlement, ... ARR])를 사용하여 새 배열을 만드는 것이 좋습니다. 3. Concat () 메소드를 사용하여 새 요소 배열을 원래 번호와 결합하고 원래 배열을 변경하지 않고 새 배열을 반환 할 수 있습니다. 요약하면 원래 배열을 수정할 때 Unshift ()를 사용하고 원래 배열을 변경하지 않으면 확장 연산자를 권장하십시오.

Java에서 JSON과 함께 일하는 방법 Java에서 JSON과 함께 일하는 방법 Aug 14, 2025 pm 03:40 PM

toworkwithjsoninjava, Useathird-Partylibrary Locking Jackson, Gson, Orjson-B, AsjavalacksBuilt-insupport; 2.Fordeserialization, mapjsontojavaObjectSusingObjectMapperSonorgson.Fromjson; 3. Forserialization, ConvertJavaoBoBoBobjsonstojsonstringsviawritevalueastring

Java에서 WebSocket을 통해 메시지를 보내고받는 방법 Java에서 WebSocket을 통해 메시지를 보내고받는 방법 Aug 16, 2025 am 10:36 AM

@serverendpoint를 사용하여 경로를 정의하고 @onopen, @onmessage, @onclose 및 @onerror를 통해 연결, 메시지 수신, 닫기 및 오류를 처리하기 위해 WebSocket 서버 엔드 포인트를 작성하십시오. 2. 배포 중에 Javax.websocket-API 종속성이 도입되어 컨테이너에 의해 자동으로 등록되어 있는지 확인하십시오. 3. Java 클라이언트는 ContainerProvider를 통해 WebSocketContainer를 얻고 ConnectToServer를 호출하여 서버에 연결하고 @ClientendPoint 주석 클래스를 사용하여 메시지를 수신합니다. 4. 세션 getbasicre를 사용하십시오

See all articles