1. 首页
  2. 技术知识

Apache FlinkCEP 实现超时状态监控的步骤详解

CEP – Complex Event Processing复杂事件处理。

订单下单后超过一定时间还未进行支付确认。

打车订单生成后超过一定时间没有确认上车。

外卖超过预定送达时间一定时限还没有确认送达。

Apache FlinkCEP API

CEPTimeoutEventJob


FlinkCEP源码简析

DataStream和PatternStream

DataStream 一般由相同类型事件或元素组成,一个DataStream可以通过一系列的转换操作如Filter、Map等转换为另一个DataStream。

PatternStream 是对CEP模式匹配的流的抽象,把DataStream和Pattern组合在一块,然后对外提供select和flatSelect等方法。PatternStream并不是DataStream,它提供方法把匹配的模式序列和与其相关联的事件组成的映射(就是Map<模式名称,List<事件>>)发出去,发到SingleOutputStreamOperator里面,SingleOutputStreamOperator是DataStream。

CEPOperatorUtils工具类里的方法和变量使用了「PatternStream」来命名,比如:

public

static

<IN, OUT>

SingleOutputStreamOperator

<OUT> createPatternStream(…){…}

public

static

<IN, OUT1, OUT2>

SingleOutputStreamOperator

<OUT1> createTimeoutPatternStream(…){…}

final

SingleOutputStreamOperator

<OUT> patternStream;SingleOutputStreamOperator

@Public

public

class

SingleOutputStreamOperator

<T>

extends

DataStream

<T> {…}
PatternStream的构造方法:

PatternStream

(

final

DataStream

<T> inputStream,

final

Pattern

<T, ?> pattern) {

this

.inputStream = inputStream;

this

.pattern = pattern;

this

.comparator =

null

;

}

PatternStream

(

final

DataStream

<T> inputStream,

final

Pattern

<T, ?> pattern,

final

EventComparator

<T> comparator) {

this

.inputStream = inputStream;

this

.pattern = pattern;

this

.comparator = comparator;

}Pattern、Quantifier和EventComparator

Pattern是模式定义的Base Class,Builder模式,定义好的模式会被NFACompiler用来生成NFA。

如果想要自己实现类似next和followedBy这种方法,比如timeEnd,对Pattern进行扩展重写应该是可行的。

public

class

Pattern

<T, F

extends

T> {

/** 模式名称 */

private

final

String

name;

/** 前面一个模式 */

private

final

Pattern

<T, ?

extends

T> previous;

/** 一个事件如果要被当前模式匹配到,必须满足的约束条件 */

private

IterativeCondition

<F> condition;

/** 时间窗口长度,在时间长度内进行模式匹配 */

private

Time

windowTime;

/** 模式量词,意思是一个模式匹配几个事件等 默认是匹配到一个 */

private

Quantifier

quantifier =

Quantifier

.one(

ConsumingStrategy

.STRICT);

/** 停止将事件收集到循环状态时,事件必须满足的条件 */

private

IterativeCondition

<F> untilCondition;

/**

   * 适用于{@code times}模式,用来维护模式里事件可以连续发生的次数

   */

private

Times

times;

// 匹配到事件之后的跳过策略

private

final

AfterMatchSkipStrategy

afterMatchSkipStrategy;

  …

}Quantifier是用来描述具体模式行为的,主要有三大类:

Single-单一匹配、Looping-循环匹配、Times-一定次数或者次数范围内都能匹配到。

每一个模式Pattern可以是optional可选的(单一匹配或循环匹配),并可以设置ConsumingStrategy。

循环和次数也有一个额外的内部ConsumingStrategy,用在模式中接收的事件之间。

public

class

Quantifier

{

  …

/**

   * 5个属性,可以组合,但并非所有的组合都是有效的

   */

public

enum

QuantifierProperty

{

    SINGLE,

    LOOPING,

    TIMES,

    OPTIONAL,

    GREEDY

  }

/**

   * 描述在此模式中匹配哪些事件的策略

   */

public

enum

ConsumingStrategy

{

    STRICT,

    SKIP_TILL_NEXT,

    SKIP_TILL_ANY,

    NOT_FOLLOW,

    NOT_NEXT

  }

/**

   * 描述当前模式里事件可以连续发生的次数;举个例子,模式条件无非就是boolean,满足true条件的事件连续出现times次,或者一个次数范围,比如2~4次,2次,3次,4次都会被当前模式匹配出来,因此同一个事件会被重复匹配到

   */

public

static

class

Times

{

private

final

int

from;

private

final

int

to;

private

Times

(

int

from,

int

to) {

Preconditions

.checkArgument(from >

0

,

“The from should be a positive number greater than 0.”

);

Preconditions

.checkArgument(to >= from,

“The to should be a number greater than or equal to from: ”

+ from +

“.”

);

this

.from = from;

this

.to = to;

    }

public

int

getFrom() {

return

from;

    }

public

int

getTo() {

return

to;

    }

// 次数范围

public

static

Times

of(

int

from,

int

to) {

return

new

Times

(from, to);

    }

// 指定具体次数

public

static

Times

of(

int

times) {

return

new

Times

(times, times);

    }

@Override

public

boolean

equals(

Object

o) {

if

(

this

== o) {

return

true

;

      }

if

(o ==

null

|| getClass() != o.getClass()) {

return

false

;

      }

Times

times = (

Times

) o;

return

from == times.from &&

        to == times.to;

    }

@Override

public

int

hashCode() {

return

Objects

.hash(from, to);

    }

  }

  …

}EventComparator,自定义事件比较器,实现EventComparator接口。

public

interface

EventComparator

<T>

extends

Comparator

<T>,

Serializable

{

long

serialVersionUID =

1L

;

}
NFACompiler和NFA


NFACompiler提供将Pattern编译成NFA或者NFAFactory的方法,使用NFAFactory可以创建多个NFA。

public

class

NFACompiler

{

  …

/**

   * NFAFactory 创建NFA的接口

   *

   * @param <T> Type of the input events which are processed by the NFA

   */

public

interface

NFAFactory

<T>

extends

Serializable

{

    NFA<T> createNFA();

  }

/**

   * NFAFactory的具体实现NFAFactoryImpl

   *

   * <p>The implementation takes the input type serializer, the window time and the set of

   * states and their transitions to be able to create an NFA from them.

   *

   * @param <T> Type of the input events which are processed by the NFA

   */

private

static

class

NFAFactoryImpl

<T>

implements

NFAFactory

<T> {

private

static

final

long

serialVersionUID =

8939783698296714379L

;

private

final

long

windowTime;

private

final

Collection

<

State

<T>> states;

private

final

boolean

timeoutHandling;

private

NFAFactoryImpl

(

long

windowTime,

Collection

<

State

<T>> states,

boolean

timeoutHandling) {

this

.windowTime = windowTime;

this

.states = states;

this

.timeoutHandling = timeoutHandling;

    }

@Override

public

NFA<T> createNFA() {

// 一个NFA由状态集合、时间窗口的长度和是否处理超时组成

return

new

NFA<>(states, windowTime, timeoutHandling);

    }

  }

}NFA:Non-deterministic finite automaton – 非确定的有限(状态)自动机。

更多内容参见

https://zh.wik1pedia.org/wik1/非确定有限状态自动机

public

class

NFA<T> {

/**

   * NFACompiler返回的所有有效的NFA状态集合

   * These are directly derived from the user-specified pattern.

   */

private

final

Map

<

String

,

State

<T>> states;

/**

   * Pattern.within(Time)指定的时间窗口长度

   */

private

final

long

windowTime;

/**

   * 一个超时匹配的标记

   */

private

final

boolean

handleTimeout;

  …

}


PatternSelectFunction和PatternFlatSelectFunction


当一个包含被匹配到的事件的映射能够通过模式名称访问到的时候,PatternSelectFunction的select()方法会被调用。模式名称是由Pattern定义的时候指定的。select()方法恰好返回一个结果,如果需要返回多个结果,则可以实现PatternFlatSelectFunction。

public

interface

PatternSelectFunction

<IN, OUT>

extends

Function

,

Serializable

{

/**

   * 从给到的事件映射中生成一个结果。这些事件使用他们关联的模式名称作为唯一标识

   */

  OUT select(

Map

<

String

,

List

<IN>> pattern)

throws

Exception

;

}

PatternFlatSelectFunction,不是返回一个OUT,而是使用Collector 把匹配到的事件收集起来。

public

interface

PatternFlatSelectFunction

<IN, OUT>

extends

Function

,

Serializable

{

/**

   * 生成一个或多个结果

   */

void

flatSelect(

Map

<

String

,

List

<IN>> pattern,

Collector

<OUT> out)

throws

Exception

;

}
SelectTimeoutCepOperator、PatternTimeoutFunction


SelectTimeoutCepOperator是在CEPOperatorUtils中调用createTimeoutPatternStream()方法时创建出来。

SelectTimeoutCepOperator中会被算子迭代调用的方法是processMatchedSequences()和processTimedOutSequences()。

模板方法…对应到抽象类AbstractKeyedCEPPatternOperator中processEvent()方法和advanceTime()方法。

还有FlatSelectTimeoutCepOperator和对应的PatternFlatTimeoutFunction。

public

class

SelectTimeoutCepOperator

<IN, OUT1, OUT2, KEY>

extends

AbstractKeyedCEPPatternOperator

<IN, KEY, OUT1,

SelectTimeoutCepOperator

.

SelectWrAPPer

<IN, OUT1, OUT2>> {

private

OutputTag

<OUT2> timedOutOutputTag;

public

SelectTimeoutCepOperator

(

TypeSerializer

<IN> inputSerializer,

boolean

isProcessingTime,

NFACompiler

.

NFAFactory

<IN> nfaFactory,

final

EventComparator

<IN> comparator,

AfterMatchSkipStrategy

skipStrategy,

// 参数命名混淆了flat…包括SelectWrapper类中的成员命名…

PatternSelectFunction

<IN, OUT1> flatSelectFunction,

PatternTimeoutFunction

<IN, OUT2> flatTimeoutFunction,

OutputTag

<OUT2> outputTag,

OutputTag

<IN> lateDataOutputTag) {

super

(

      inputSerializer,

      isProcessingTime,

      nfaFactory,

      comparator,

      skipStrategy,

new

SelectWrapper

<>(flatSelectFunction, flatTimeoutFunction),

      lateDataOutputTag);

this

.timedOutOutputTag = outputTag;

  }

  …

}

public

interface

PatternTimeoutFunction

<IN, OUT>

extends

Function

,

Serializable

{

  OUT timeout(

Map

<

String

,

List

<IN>> pattern,

long

timeoutTimestamp)

throws

Exception

;

}

public

interface

PatternFlatTimeoutFunction

<IN, OUT>

extends

Function

,

Serializable

{

void

timeout(

Map

<

String

,

List

<IN>> pattern,

long

timeoutTimestamp,

Collector

<OUT> out)

throws

Exception

;

}


CEP和CEPOperatorUtils

CEP是创建PatternStream的工具类,PatternStream只是DataStream和Pattern的组合。

public

class

CEP {

public

static

<T>

PatternStream

<T> pattern(

DataStream

<T> input,

Pattern

<T, ?> pattern) {

return

new

PatternStream

<>(input, pattern);

  }

public

static

<T>

PatternStream

<T> pattern(

DataStream

<T> input,

Pattern

<T, ?> pattern,

EventComparator

<T> comparator) {

return

new

PatternStream

<>(input, pattern, comparator);

  }

}

CEPOperatorUtils是在PatternStream的select()方法和flatSelect()方法被调用的时候,去创建SingleOutputStreamOperator(DataStream)。

public

class

CEPOperatorUtils

{

  …

private

static

<IN, OUT, K>

SingleOutputStreamOperator

<OUT> createPatternStream(

final

DataStream

<IN> inputStream,

final

Pattern

<IN, ?> pattern,

final

TypeInformation

<OUT> outTypeInfo,

final

boolean

timeoutHandling,

final

EventComparator

<IN> comparator,

final

OperatorBuilder

<IN, OUT> operatorBuilder) {

final

TypeSerializer

<IN> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig());

// check whether we use processing time

final

boolean

isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() ==

TimeCharacteristic

.

ProcessingTime

;

// compile our pattern into a NFAFactory to instantiate NFAs later on

final

NFACompiler

.

NFAFactory

<IN> nfaFactory =

NFACompiler

.compileFactory(pattern, timeoutHandling);

final

SingleOutputStreamOperator

<OUT> patternStream;

if

(inputStream

instanceof

KeyedStream

) {

KeyedStream

<IN, K> keyedStream = (

KeyedStream

<IN, K>) inputStream;

      patternStream = keyedStream.transform(

        operatorBuilder.getKeyedOperatorName(),

        outTypeInfo,

        operatorBuilder.build(

          inputSerializer,

          isProcessingTime,

          nfaFactory,

          comparator,

          pattern.getAfterMatchSkipStrategy()));

    }

else

{

KeySelector

<IN,

Byte

> keySelector =

new

NullByteKeySelector

<>();

      patternStream = inputStream.keyBy(keySelector).transform(

        operatorBuilder.getOperatorName(),

        outTypeInfo,

        operatorBuilder.build(

          inputSerializer,

          isProcessingTime,

          nfaFactory,

          comparator,

          pattern.getAfterMatchSkipStrategy()

        )).forceNonParallel();

    }

return

patternStream;

  }

  …

}FlinkCEP实现步骤

    IN: DataSource -> DataStream -> Transformations -> DataStreamPattern: Pattern.begin.where.next.where…times…PatternStream: CEP.pattern(DataStream, Pattern)DataStream: PatternStream.select(PatternSelectFunction) PatternStream.flatSelect(PatternSelectFunction)OUT: DataStream -> Transformations -> DataStream -> DataSink

FlinkCEP匹配超时实现步骤

TimeoutCEP的流需要keyBy,即KeyedStream,如果inputStream不是KeyedStream,会new一个0字节的Key(上面CEPOperatorUtils源码里有提到)。

KeySelector

<IN,

Byte

> keySelector =

new

NullByteKeySelector

<>();

Pattern最后调用within设置窗口时间。 如果是对主键进行分组,一个时间窗口内最多只会匹配出一个超时事件,使用PatternStream.select(…)就可以了。

    IN: DataSource -> DataStream -> Transformations -> DataStream -> keyBy -> KeyedStreamPattern: Pattern.begin.where.next.where…within(Time windowTime)PatternStream: CEP.pattern(KeyedStream, Pattern)OutputTag: new OutputTag(…)SingleOutputStreamOperator: PatternStream.flatSelect(OutputTag, PatternFlatTimeoutFunction, PatternFlatSelectFunction)DataStream: SingleOutputStreamOperator.getSideOutput(OutputTag)OUT: DataStream -> Transformations -> DataStream -> DataSink

FlinkCEP超时不足

和Flink窗口聚合类似,如果使用事件时间和依赖事件生成的水印向前推进,需要后续的事件到达,才会触发窗口进行计算和输出结果。

FlinkCEP超时完整demo

public

class

CEPTimeoutEventJob

{

private

static

final

String

LOCAL_KAFKA_BROKER =

“localhost:9092”

;

private

static

final

String

GROUP_ID =

CEPTimeoutEventJob

.

class

.getSimpleName();

private

static

final

String

GROUP_TOPIC = GROUP_ID;

public

static

void

main(

String

[] args)

throws

Exception

{

// 参数

ParameterTool

params =

ParameterTool

.fromArgs(args);

StreamExecutionEnvironment

env =

StreamExecutionEnvironment

.getExecutionEnvironment();

// 使用事件时间

    env.setStreamTimeCharacteristic(

TimeCharacteristic

.

EventTime

);

    env.enableCheckpointing(

5000

);

    env.getCheckpointConfig().enableExternalizedCheckpoints(

CheckpointConfig

.

ExternalizedCheckpointCleanup

.RETAIN_ON_CANCELLATION);

    env.getConfig().disableSysoutLogging();

    env.getConfig().setRestartStrategy(

RestartStrategies

.fixedDelayRestart(

5

,

10000

));

// 不使用POJO的时间

final

AssignerWithPeriodicWatermarks

extractor =

new

IngestionTimeExtractor

<POJO>();

// 与Kafka Topic的Partition保持一致

    env.setParallelism(

3

);

Properties

kafkaProps =

new

Properties

();

    kafkaProps.setProperty(

“bootstrap.servers”

, LOCAL_KAFKA_BROKER);

    kafkaProps.setProperty(

“group.id”

, GROUP_ID);

// 接入Kafka的消息

FlinkKafkaConsumer011

<POJO> consumer =

new

FlinkKafkaConsumer011

<>(GROUP_TOPIC,

new

POJOSchema

(), kafkaProps);

DataStream

<POJO> pojoDataStream = env.addSource(consumer)

        .assignTimestampsAndWatermarks(extractor);

    pojoDataStream.print();

// 根据主键aid分组 即对每一个POJO事件进行匹配检测【不同类型的POJO,可以采用不同的within时间】

// 1.

DataStream

<POJO> keyedPojos = pojoDataStream

        .keyBy(

“aid”

);

// 从初始化到终态-一个完整的POJO事件序列

// 2.

Pattern

<POJO, POJO> completedPojo =

Pattern

.<POJO>begin(

“init”

)

            .where(

new

SimpleCondition

<POJO>() {

private

static

final

long

serialVersionUID = –

6847788055093903603L

;

@Override

public

boolean

filter(POJO pojo)

throws

Exception

{

return

“02”

.equals(pojo.getAstatus());

              }

            })

            .followedBy(

“end”

)

//            .next(“end”)

            .where(

new

SimpleCondition

<POJO>() {

private

static

final

long

serialVersionUID = –

2655089736460847552L

;

@Override

public

boolean

filter(POJO pojo)

throws

Exception

{

return

“00”

.equals(pojo.getAstatus()) ||

“01”

.equals(pojo.getAstatus());

              }

            });

// 找出1分钟内【便于测试】都没有到终态的事件aid

// 如果针对不同类型有不同within时间,比如有的是超时1分钟,有的可能是超时1个小时 则生成多个PatternStream

// 3.

PatternStream

<POJO> patternStream = CEP.pattern(keyedPojos, completedPojo.within(

Time

.minutes(

1

)));

// 定义侧面输出timedout

// 4.

OutputTag

<POJO> timedout =

new

OutputTag

<POJO>(

“timedout”

) {

private

static

final

long

serialVersionUID =

773503794597666247L

;

    };

// OutputTag<L> timeoutOutputTag, PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction, PatternFlatSelectFunction<T, R> patternFlatSelectFunction

// 5.

SingleOutputStreamOperator

<POJO> timeoutPojos = patternStream.flatSelect(

        timedout,

new

POJOTimedOut

(),

new

FlatSelectNothing

()

    );

// 打印输出超时的POJO

// 6.7.

    timeoutPojos.getSideOutput(timedout).print();

    timeoutPojos.print();

    env.execute(

CEPTimeoutEventJob

.

class

.getSimpleName());

  }

/**

   * 把超时的事件收集起来

   */

public

static

class

POJOTimedOut

implements

PatternFlatTimeoutFunction

<POJO, POJO> {

private

static

final

long

serialVersionUID = –

4214641891396057732L

;

@Override

public

void

timeout(

Map

<

String

,

List

<POJO>> map,

long

l,

Collector

<POJO> collector)

throws

Exception

{

if

(

null

!= map.get(

“init”

)) {

for

(POJO pojoInit : map.get(

“init”

)) {

System

.out.println(

“timeout init:”

+ pojoInit.getAid());

          collector.collect(pojoInit);

        }

      }

// 因为end超时了,还没收到end,所以这里是拿不到end的

System

.out.println(

“timeout end: ”

+ map.get(

“end”

));

    }

  }

/**

   * 通常什么都不做,但也可以把所有匹配到的事件发往下游;如果是宽松临近,被忽略或穿透的事件就没办法选中发往下游了

   * 一分钟时间内走完init和end的数据

   *

   * @param <T>

   */

public

static

class

FlatSelectNothing

<T>

implements

PatternFlatSelectFunction

<T, T> {

private

static

final

long

serialVersionUID = –

3029589950677623844L

;

@Override

public

void

flatSelect(

Map

<

String

,

List

<T>> pattern,

Collector

<T> collector) {

System

.out.println(

“flatSelect: ”

+ pattern);

    }

  }

}测试结果(followedBy):

3

> POJO{aid=

‘ID000-0’

, astyle=

‘STYLE000-0’

, aname=

‘NAME-0’

, logTime=

1563419728242

, energy=

529.00

, age=

0

, tt=

2022



07



18

, astatus=

’02’

, createTime=

null

, updateTime=

null

}

3

> POJO{aid=

‘ID000-1’

, astyle=

‘STYLE000-2’

, aname=

‘NAME-1’

, logTime=

1563419728783

, energy=

348.00

, age=

26

, tt=

2022



07



18

, astatus=

’02’

, createTime=

null

, updateTime=

null

}

3

> POJO{aid=

‘ID000-0’

, astyle=

‘STYLE000-0’

, aname=

‘NAME-0’

, logTime=

1563419749259

, energy=

492.00

, age=

0

, tt=

2022



07



18

, astatus=

’00’

, createTime=

null

, updateTime=

null

}

flatSelect: {init=[POJO{aid=

‘ID000-0’

, astyle=

‘STYLE000-0’

, aname=

‘NAME-0’

, logTime=

1563419728242

, energy=

529.00

, age=

0

, tt=

2022



07



18

, astatus=

’02’

, createTime=

null

, updateTime=

null

}],

end

=[POJO{aid=

‘ID000-0’

, astyle=

‘STYLE000-0’

, aname=

‘NAME-0’

, logTime=

1563419749259

, energy=

492.00

, age=

0

, tt=

2022



07



18

, astatus=

’00’

, createTime=

null

, updateTime=

null

}]}

timeout init:ID000-

1

3

> POJO{aid=

‘ID000-1’

, astyle=

‘STYLE000-2’

, aname=

‘NAME-1’

, logTime=

1563419728783

, energy=

348.00

, age=

26

, tt=

2022



07



18

, astatus=

’02’

, createTime=

null

, updateTime=

null

}

timeout

end

:

null

3

> POJO{aid=

‘ID000-2’

, astyle=

‘STYLE000-0’

, aname=

‘NAME-0’

, logTime=

1563419829639

, energy=

467.00

, age=

0

, tt=

2022



07



18

, astatus=

’03’

, createTime=

null

, updateTime=

null

}

3

> POJO{aid=

‘ID000-2’

, astyle=

‘STYLE000-0’

, aname=

‘NAME-0’

, logTime=

1563419841394

, energy=

107.00

, age=

0

, tt=

2022



07



18

, astatus=

’00’

, createTime=

null

, updateTime=

null

}

3

> POJO{aid=

‘ID000-3’

, astyle=

‘STYLE000-0’

, aname=

‘NAME-0’

, logTime=

1563419967721

, energy=

431.00

, age=

0

, tt=

2022



07



18

, astatus=

’02’

, createTime=

null

, updateTime=

null

}

3

> POJO{aid=

‘ID000-3’

, astyle=

‘STYLE000-2’

, aname=

‘NAME-0’

, logTime=

1563419979567

, energy=

32.00

, age=

26

, tt=

2022



07



18

, astatus=

’03’

, createTime=

null

, updateTime=

null

}

3

> POJO{aid=

‘ID000-3’

, astyle=

‘STYLE000-2’

, aname=

‘NAME-0’

, logTime=

1563419993612

, energy=

542.00

, age=

26

, tt=

2022



07



18

, astatus=

’01’

, createTime=

null

, updateTime=

null

}

flatSelect: {init=[POJO{aid=

‘ID000-3’

, astyle=

‘STYLE000-0’

, aname=

‘NAME-0’

, logTime=

1563419967721

, energy=

431.00

, age=

0

, tt=

2022



07



18

, astatus=

’02’

, createTime=

null

, updateTime=

null

}],

end

=[POJO{aid=

‘ID000-3’

, astyle=

‘STYLE000-2’

, aname=

‘NAME-0’

, logTime=

1563419993612

, energy=

542.00

, age=

26

, tt=

2022



07



18

, astatus=

’01’

, createTime=

null

, updateTime=

null

}]}

3

> POJO{aid=

‘ID000-4’

, astyle=

‘STYLE000-0’

, aname=

‘NAME-0’

, logTime=

1563420063760

, energy=

122.00

, age=

0

, tt=

2022



07



18

, astatus=

’02’

, createTime=

null

, updateTime=

null

}

3

> POJO{aid=

‘ID000-4’

, astyle=

‘STYLE000-0’

, aname=

‘NAME-0’

, logTime=

1563420228008

, energy=

275.00

, age=

0

, tt=

2022



07



18

, astatus=

’03’

, createTime=

null

, updateTime=

null

}

timeout init:ID000-

4

3

> POJO{aid=

‘ID000-4’

, astyle=

‘STYLE000-0’

, aname=

‘NAME-0’

, logTime=

1563420063760

, energy=

122.00

, age=

0

, tt=

2022



07



18

, astatus=

’02’

, createTime=

null

, updateTime=

null

}

timeout

end

:

null


总结

以上所述是小编给大家介绍的Apache FlinkCEP 实现超时状态监控的步骤,希望对大家有所帮助,如果大家有任何疑问欢迎给我留言,小编会及时回复大家的!

原创文章,作者:starterknow,如若转载,请注明出处:https://www.starterknow.com/108543.html

联系我们