Table des matières
Definition
Code source du déclencheur
Code source du déclencheurResult
Flink preset Trigger
%%PRE_BLOCK_4%%
Maison Java javaDidacticiel Comment utiliser le déclencheur de fenêtre Java Flink Trigger

Comment utiliser le déclencheur de fenêtre Java Flink Trigger

May 03, 2023 pm 01:10 PM
java trigger flink

Definition

Trigger détermine quand la fenêtre (formée par l'allocateur de fenêtre) est prête à être traitée par la fonction window. Chaque WindowAssigner est livré avec une valeur par défaut Trigger. Si le déclencheur par défaut ne répond pas à vos besoins, vous pouvez utiliser trigger(…).

Code source du déclencheur

public abstract class Trigger<T, W extends Window> implements Serializable {
	/**
	 只要有元素落⼊到当前窗⼝, 就会调⽤该⽅法
	 * @param element 收到的元素
	 * @param timestamp 元素抵达时间.
	 * @param window 元素所属的window窗口.
	 * @param ctx ⼀个上下⽂对象,通常⽤该对象注册 timer(ProcessingTime/EventTime) 回调.
	 */
    public abstract TriggerResult onElement(T var1, long var2, W var4, Trigger.TriggerContext var5) throws Exception;
	
	 /**
	 * processing-time 定时器回调函数
	 *
	 * @param time 定时器触发的时间.
	 * @param window 定时器触发的窗口对象.
	 * @param ctx ⼀个上下⽂对象,通常⽤该对象注册 timer(ProcessingTime/EventTime) 回调.
	 */
    public abstract TriggerResult onProcessingTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception;

	/**
	 * event-time 定时器回调函数
	 *
	 * @param time 定时器触发的时间.
	 * @param window 定时器触发的窗口对象.
	 * @param ctx ⼀个上下⽂对象,通常⽤该对象注册 timer(ProcessingTime/EventTime) 回调.
	 */
    public abstract TriggerResult onEventTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception;

	 /**
	 * 当 多个窗口合并到⼀个窗⼝的时候,调用该方法法,例如系统SessionWindow
	 *
	 * @param window 合并后的新窗口对象
	 * @param ctx ⼀个上下⽂对象,通常用该对象注册 timer(ProcessingTime/EventTime)回调以及访问状态
	 */
    public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception {
        throw new UnsupportedOperationException("This trigger does not support merging.");
    }
	
	/**
	 * 当窗口被删除后执⾏所需的任何操作。例如:可以清除定时器或者删除状态数据
	 */
    public abstract void clear(W var1, Trigger.TriggerContext var2) throws Exception;
    }

Code source du déclencheurResult

public enum TriggerResult {
	// 表示对窗口不执行任何操作。即不触发窗口计算,也不删除元素。
    CONTINUE(false, false),
    // 触发窗口计算,输出结果,然后将窗口中的数据和窗口进行清除。
    FIRE_AND_PURGE(true, true),
    // 触发窗口计算,但是保留窗口元素
    FIRE(true, false),
    // 不触发窗口计算,丢弃窗口,并且删除窗口的元素。
    PURGE(false, true);

    private final boolean fire;
    private final boolean purge;

    private TriggerResult(boolean fire, boolean purge) {
        this.purge = purge;
        this.fire = fire;
    }

    public boolean isFire() {
        return this.fire;
    }

    public boolean isPurge() {
        return this.purge;
    }
}

Une fois que le déclencheur détermine que la fenêtre est prête à être traitée, elle se déclenchera et le le statut de retour peut être FIRE ou FIRE_AND_PURGE. Parmi eux, FIRE déclenche le calcul de la fenêtre et conserve le contenu de la fenêtre, tandis que FIRE_AND_PURGE déclenche le calcul de la fenêtre et supprime le contenu de la fenêtre. Par défaut, les déclencheurs pré-implémentés déclenchent simplement FIRE sans effacer l'état de la fenêtre.

  • EventTimeTrigger : Déterminez s'il faut déclencher le calcul de la fenêtre en comparant EventTime avec l'heure de fin de la fenêtre si EventTime est supérieur à. Window EndTime, Trigger, sinon il ne se déclenchera pas et la fenêtre continuera à attendre.

  • ProcessTimeTrigger : Déterminez s'il faut déclencher la fenêtre en comparant ProcessTime et window EndTme. Si ProcessTime est supérieur à EndTime, le calcul est déclenché, sinon la fenêtre continue d'attendre.

  • ContinuousEventTimeTrigger : calculé en fonction de la fenêtre de déclenchement périodique de l'intervalle ou de l'heure de fin de la fenêtre est inférieure à la fenêtre de déclenchement EndTime actuelle.

  • ContinuousProcessingTimeTrigger : calculé en fonction de la fenêtre de déclenchement périodique de l'intervalle ou de l'heure de fin de la fenêtre est inférieure à la fenêtre de déclenchement ProcessTime actuelle.

  • CountTrigger : Déterminez s'il faut déclencher le calcul de la fenêtre en fonction du fait que la quantité de données accédées dépasse le seuil défini.

  • DeltaTrigger : Le fait que l'indicateur Delta calculé en fonction des données d'accès dépasse le seuil spécifié est utilisé pour déterminer s'il faut déclencher le calcul de la fenêtre.

  • PurgingTrigger : N'importe quel déclencheur peut être converti en déclencheur de type Purge en tant que paramètre, et les données seront nettoyées une fois le calcul terminé.

  • NeverTrigger : Ne jamais déclencher le calcul de fenêtre à aucun moment

Comment utiliser le déclencheur de fenêtre Java Flink Trigger

# 🎜🎜# Regardez principalement le code source de EventTimeTrigger et ProcessingTimeTrigger.

EventTimeTrigger code source

public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private EventTimeTrigger() {
    }

    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
        return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
    }

    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteEventTimeTimer(window.maxTimestamp());
    }

    public boolean canMerge() {
        return true;
    }

    public void onMerge(TimeWindow window, OnMergeContext ctx) {
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
            ctx.registerEventTimeTimer(windowMaxTimestamp);
        }

    }

    public String toString() {
        return "EventTimeTrigger()";
    }

    public static EventTimeTrigger create() {
        return new EventTimeTrigger();
    }
}

ProcessingTimeTrigger code source

public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private ProcessingTimeTrigger() {
    }

    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
        ctx.registerProcessingTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }

    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
        return TriggerResult.FIRE;
    }

    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteProcessingTimeTimer(window.maxTimestamp());
    }

    public boolean canMerge() {
        return true;
    }

    public void onMerge(TimeWindow window, OnMergeContext ctx) {
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
            ctx.registerProcessingTimeTimer(windowMaxTimestamp);
        }

    }

    public String toString() {
        return "ProcessingTimeTrigger()";
    }

    public static ProcessingTimeTrigger create() {
        return new ProcessingTimeTrigger();
    }
}

Dans la méthode onElement(), ctx.registerProcessingTimeTimer(window.maxTimestamp()) sera être enregistré Un minuteur ProcessingTime, le paramètre time est window.maxTimestamp(), qui est l'heure finale de la fenêtre. Lorsque l'heure atteint l'heure finale de la fenêtre, le minuteur se déclenche et appelle la méthode onProcessingTime() dans onProcessingTime. (), renvoie TriggerResult.FIRE Autrement dit, renvoie FIRE, déclenchant le calcul des données dans la fenêtre, mais en conservant les éléments de la fenêtre.

Il est à noter que la classe ProcessingTimeTrigger ne déclenchera le calcul de la fonction fenêtre que lorsque l'heure finale de la fenêtre arrivera. Une fois le calcul terminé, les données de la fenêtre ne seront pas effacées. Ces données sont stockées en mémoire sauf si vous appelez PURGE ou FIRE_AND_PURGE, sinon les données resteront en mémoire. En fait, aucune des classes Trigger fournies dans Flink, à l'exception de la classe PurgingTrigger, n'effacera les données de la fenêtre.

Déclencheur de fenêtre commun

Fenêtre roulante

TumblingEventTimeWindows :EventTimeTrigger
public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
            return EventTimeTrigger.create();
        }
}

TumblingProcessingTimeWindows: ProcessingTimeTrigger

public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return ProcessingTimeTrigger.create();
    }
}

Fenêtre coulissante#🎜 🎜#
SlidingEventTimeWindows:EventTimeTrigger
public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return EventTimeTrigger.create();
    }
}
🎜🎜 #
public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
            return ProcessingTimeTrigger.create();
        }
}

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn

Outils d'IA chauds

Undress AI Tool

Undress AI Tool

Images de déshabillage gratuites

Undresser.AI Undress

Undresser.AI Undress

Application basée sur l'IA pour créer des photos de nu réalistes

AI Clothes Remover

AI Clothes Remover

Outil d'IA en ligne pour supprimer les vêtements des photos.

Clothoff.io

Clothoff.io

Dissolvant de vêtements AI

Video Face Swap

Video Face Swap

Échangez les visages dans n'importe quelle vidéo sans effort grâce à notre outil d'échange de visage AI entièrement gratuit !

Outils chauds

Bloc-notes++7.3.1

Bloc-notes++7.3.1

Éditeur de code facile à utiliser et gratuit

SublimeText3 version chinoise

SublimeText3 version chinoise

Version chinoise, très simple à utiliser

Envoyer Studio 13.0.1

Envoyer Studio 13.0.1

Puissant environnement de développement intégré PHP

Dreamweaver CS6

Dreamweaver CS6

Outils de développement Web visuel

SublimeText3 version Mac

SublimeText3 version Mac

Logiciel d'édition de code au niveau de Dieu (SublimeText3)

Sujets chauds

Tutoriel PHP
1538
276
Excel trouver et remplacer ne fonctionne pas Excel trouver et remplacer ne fonctionne pas Aug 13, 2025 pm 04:49 PM

CheckkSearchSettings like "MatchEnteRireCellContents" et "MatchCase" ByExpandingOptionsInFindanDreplace, garantissant "lookin" issettominuesand »dans" TOCORRECTSCOPE; 2.LOORHFORHIDDENCHARACTER

Comment déployer une application Java Comment déployer une application Java Aug 17, 2025 am 12:56 AM

Préparez-vous en application par rapport à Mavenorgradletobuildajarorwarfile, externalisationConfiguration.2.ChoOSEADPLOYENDIRONMENT: Runonbaremetal / vmwithjava-jarandsystemd, deploywarontomcat, compeneriserisewithdocker, orusecloudplatformslikelise.

Comment configurer la journalisation dans une application Java? Comment configurer la journalisation dans une application Java? Aug 15, 2025 am 11:50 AM

L'utilisation de SLF4J combinée avec la journalisation ou le log4j2 est le moyen recommandé de configurer les journaux dans les applications Java. Il introduit des bibliothèques API et implémentation en ajoutant des dépendances Maven correspondantes; 2. Obtenez l'enregistreur via le loggerfactory de SLF4J dans le code et écrivez le code journal découplé et efficace à l'aide de méthodes de journalisation paramétrée; 3. Définir le format de sortie du journal, le niveau, la cible (console, le fichier) et le contrôle du journal du package via Logback.xml ou les fichiers de configuration log4j2.xml; 4. Activer éventuellement la fonction de balayage de fichiers de configuration pour atteindre un ajustement dynamique du niveau de journal, et Springboot peut également être géré via des points de terminaison de l'actionneur; 5. Suivez les meilleures pratiques, y compris

Liaison des données XML avec Castor en Java Liaison des données XML avec Castor en Java Aug 15, 2025 am 03:43 AM

CASTORENablesxml-to-javaObjectMappingViadefaultConverionsOrexplicitMappingFiles; 1) DefinejavaclasseswithGetters / seters; 2) useUnmarShallertOConvertXmltoObjects; 3)

JS Ajouter un élément au début du tableau JS Ajouter un élément au début du tableau Aug 14, 2025 am 11:51 AM

Dans JavaScript, la méthode la plus courante pour ajouter des éléments au début d'un tableau est d'utiliser la méthode Unsich (); 1. En utilisant unsith () modifiera directement le tableau d'origine, vous pouvez ajouter un ou plusieurs éléments pour retourner la nouvelle longueur du tableau ajouté; 2. Si vous ne souhaitez pas modifier le tableau d'origine, il est recommandé d'utiliser l'opérateur d'extension (tel que [Newelement, ... Arr]) pour créer un nouveau tableau; 3. Vous pouvez également utiliser la méthode CONCAT () pour combiner le nouveau tableau d'éléments avec le numéro d'origine, renvoyez le nouveau tableau sans modifier le tableau d'origine; En résumé, utilisez Unsich () lors de la modification du tableau d'origine et recommandez l'opérateur d'extension lorsque vous gardez le tableau d'origine inchangé.

Comparaison des performances: Java Vs. GO pour les services backend Comparaison des performances: Java Vs. GO pour les services backend Aug 14, 2025 pm 03:32 PM

GOTYPICAL OFFERSBETTERRUNTIMEPERFORMANCE AVEC LA MAINTRÉE DE PUTHROUGHTANDLOWERLATENCE, ENTERTFORI / O-HEAVYSERVICES, DUETOITSLIGHT LONDEGOROUTINESANDERFICENTSCHEDULL

Comment travailler avec JSON à Java Comment travailler avec JSON à Java Aug 14, 2025 pm 03:40 PM

ToworkwithJSONinJava,useathird-partylibrarylikeJackson,Gson,orJSON-B,asJavalacksbuilt-insupport;2.Fordeserialization,mapJSONtoJavaobjectsusingObjectMapperinJacksonorGson.fromJson;3.Forserialization,convertJavaobjectstoJSONstringsviawriteValueAsString

Quel est le mot-clé Assert en Java? Quel est le mot-clé Assert en Java? Aug 17, 2025 am 12:52 AM

TheassertKeywordInjavaisUsedTovalIdateShandshandingsDuringDevelopment, ThrowinganAssertionErroriftheconditionisfalse.2.ithastwoforms: AssertCondition; AndSersertCondition: Message; avecthelatterProvidActureCustomerMessage.3.

See all articles