Flink CEP 原理和案例詳解
點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”

1 概念
(1)定義
復(fù)合事件處理(Complex Event Processing,CEP)是一種基于動(dòng)態(tài)環(huán)境中事件流的分析技術(shù),事件在這里通常是有意義的狀態(tài)變化,通過(guò)分析事件間的關(guān)系,利用過(guò)濾、關(guān)聯(lián)、聚合等技術(shù),根據(jù)事件間的時(shí)序關(guān)系和聚合關(guān)系制定檢測(cè)規(guī)則,持續(xù)地從事件流中查詢出符合要求的事件序列,最終分析得到更復(fù)雜的復(fù)合事件。
(2)特征
CEP的特征如下:
??????目標(biāo):從有序的簡(jiǎn)單事件流中發(fā)現(xiàn)一些高階特征;
??????輸入:一個(gè)或多個(gè)簡(jiǎn)單事件構(gòu)成的事件流;
??????處理:識(shí)別簡(jiǎn)單事件之間的內(nèi)在聯(lián)系,多個(gè)符合一定規(guī)則的簡(jiǎn)單事件構(gòu)成復(fù)雜事件;
??????輸出:滿足規(guī)則的復(fù)雜事件。
(3)功能
CEP用于分析低延遲、頻繁產(chǎn)生的不同來(lái)源的事件流。CEP可以幫助在復(fù)雜的、不相關(guān)的時(shí)間流中找出有意義的模式和復(fù)雜的關(guān)系,以接近實(shí)時(shí)或準(zhǔn)實(shí)時(shí)的獲得通知或組織一些行為。
CEP支持在流上進(jìn)行模式匹配,根據(jù)模式的條件不同,分為連續(xù)的條件或不連續(xù)的條件;模式的條件允許有時(shí)間的限制,當(dāng)條件范圍內(nèi)沒(méi)有達(dá)到滿足的條件時(shí),會(huì)導(dǎo)致模式匹配超時(shí)。
??????看起來(lái)很簡(jiǎn)單,但是它有很多不同的功能:
??????① 輸入的流數(shù)據(jù),盡快產(chǎn)生結(jié)果;
??????② 在2個(gè)事件流上,基于時(shí)間進(jìn)行聚合類的計(jì)算;
??????③ 提供實(shí)時(shí)/準(zhǔn)實(shí)時(shí)的警告和通知;
??????④ 在多樣的數(shù)據(jù)源中產(chǎn)生關(guān)聯(lián)分析模式;
??????⑤ 高吞吐、低延遲的處理
??????市場(chǎng)上有多種CEP的解決方案,例如Spark、Samza、Beam等,但他們都沒(méi)有提供專門的庫(kù)支持。然而,F(xiàn)link提供了專門的CEP庫(kù)。
(4)主要組件
????? Flink為CEP提供了專門的Flink CEP library,它包含如下組件:Event Stream、Pattern定義、Pattern檢測(cè)和生成Alert。
首先,開發(fā)人員要在DataStream流上定義出模式條件,之后Flink CEP引擎進(jìn)行模式檢測(cè),必要時(shí)生成警告。
2 Pattern API
處理事件的規(guī)則,被叫作模式(Pattern)。
Flink CEP提供了Pattern API用于對(duì)輸入流數(shù)據(jù)進(jìn)行復(fù)雜事件規(guī)則定義,用來(lái)提取符合規(guī)則的事件序列。
模式大致分為三類:
?① 個(gè)體模式(Individual Patterns)
組成復(fù)雜規(guī)則的每一個(gè)單獨(dú)的模式定義,就是個(gè)體模式。
start.times(3).where(_.behavior.startsWith(‘fav’))?② 組合模式(Combining Patterns,也叫模式序列)
很多個(gè)體模式組合起來(lái),就形成了整個(gè)的模式序列。
模式序列必須以一個(gè)初始模式開始:
val start = Pattern.begin(‘start’)
③ 模式組(Group of Pattern)
將一個(gè)模式序列作為條件嵌套在個(gè)體模式里,成為一組模式。
2.1個(gè)體模式
個(gè)體模式包括單例模式和循環(huán)模式。單例模式只接收一個(gè)事件,而循環(huán)模式可以接收多個(gè)事件。
(1)量詞
可以在一個(gè)個(gè)體模式后追加量詞,也就是指定循環(huán)次數(shù)
// 匹配出現(xiàn)4次start.time(4)// 匹配出現(xiàn)0次或4次start.time(4).optional// 匹配出現(xiàn)2、3或4次start.time(2,4)// 匹配出現(xiàn)2、3或4次,并且盡可能多地重復(fù)匹配start.time(2,4).greedy// 匹配出現(xiàn)1次或多次start.oneOrMore// 匹配出現(xiàn)0、2或多次,并且盡可能多地重復(fù)匹配start.timesOrMore(2).optional.greedy
(2)條件
每個(gè)模式都需要指定觸發(fā)條件,作為模式是否接受事件進(jìn)入的判斷依據(jù)。CEP中的個(gè)體模式主要通過(guò)調(diào)用.where()、.or()和.until()來(lái)指定條件。按不同的調(diào)用方式,可以分成以下幾類:
??????① 簡(jiǎn)單條件
通過(guò).where()方法對(duì)事件中的字段進(jìn)行判斷篩選,決定是否接收該事件
start.where(event=>event.getName.startsWith(“foo”))??????② 組合條件
將簡(jiǎn)單的條件進(jìn)行合并;or()方法表示或邏輯相連,where的直接組合就相當(dāng)于與and。
Pattern.where(event => …/*some condition*/).or(event => /*or condition*/)??????③ 終止條件
如果使用了oneOrMore或者oneOrMore.optional,建議使用.until()作為終止條件,以便清理狀態(tài)。
??????④ 迭代條件
能夠?qū)δJ街八薪邮盏氖录M(jìn)行處理;調(diào)用.where((value,ctx) => {…}),可以調(diào)用ctx.getEventForPattern(“name”)
2.2 模式序列
??????不同的近鄰模式如下圖:
(1)嚴(yán)格近鄰
所有事件按照嚴(yán)格的順序出現(xiàn),中間沒(méi)有任何不匹配的事件,由.next()指定。例如對(duì)于模式“a next b”,事件序列“a,c,b1,b2”沒(méi)有匹配。
(2)寬松近鄰
允許中間出現(xiàn)不匹配的事件,由.followedBy()指定。例如對(duì)于模式“a followedBy b”,事件序列“a,c,b1,b2”匹配為{a,b1}。
(3)非確定性寬松近鄰
進(jìn)一步放寬條件,之前已經(jīng)匹配過(guò)的事件也可以再次使用,由.followedByAny()指定。例如對(duì)于模式“a followedByAny b”,事件序列“a,c,b1,b2”匹配為{ab1},{a,b2}。
除了以上模式序列外,還可以定義“不希望出現(xiàn)某種近鄰關(guān)系”:
????? .notNext():不想讓某個(gè)事件嚴(yán)格緊鄰前一個(gè)事件發(fā)生。
????? .notFollowedBy():不想讓某個(gè)事件在兩個(gè)事件之間發(fā)生。
需要注意:①所有模式序列必須以.begin()開始;②模式序列不能以.notFollowedBy()結(jié)束;③“not”類型的模式不能被optional所修飾;④可以為模式指定時(shí)間約束,用來(lái)要求在多長(zhǎng)時(shí)間內(nèi)匹配有效。
next.within(Time.seconds(10))2.3 模式的檢測(cè)
指定要查找的模式序列后,就可以將其應(yīng)用于輸入流以檢測(cè)潛在匹配。調(diào)用CEP.pattern(),給定輸入流和模式,就能得到一個(gè)PatternStream。
val input:DataStream[Event] = …val pattern:Pattern[Event,_] = …val patternStream:PatternStream[Event]=CEP.pattern(input,pattern)
2.4 匹配事件的提取
創(chuàng)建PatternStream之后,就可以應(yīng)用select或者flatSelect方法,從檢測(cè)到的事件序列中提取事件了。
select()方法需要輸入一個(gè)select function作為參數(shù),每個(gè)成功匹配的事件序列都會(huì)調(diào)用它。
select()以一個(gè)Map[String,Iterable[IN]]來(lái)接收匹配到的事件序列,其中key就是每個(gè)模式的名稱,而value就是所有接收到的事件的Iterable類型。
def selectFn(pattern : Map[String,Iterable[IN]]):OUT={val startEvent = pattern.get(“start”).get.nextval endEvent = pattern.get(“end”).get.nextOUT(startEvent, endEvent)}
flatSelect通過(guò)實(shí)現(xiàn)PatternFlatSelectFunction實(shí)現(xiàn)與select相似的功能。唯一的區(qū)別就是flatSelect方法可以返回多條記錄,它通過(guò)一個(gè)Collector[OUT]類型的參數(shù)來(lái)將要輸出的數(shù)據(jù)傳遞到下游。
2.5超時(shí)事件的提取
當(dāng)一個(gè)模式通過(guò)within關(guān)鍵字定義了檢測(cè)窗口時(shí)間時(shí),部分事件序列可能因?yàn)槌^(guò)窗口長(zhǎng)度而被丟棄;為了能夠處理這些超時(shí)的部分匹配,select和flatSelect API調(diào)用允許指定超時(shí)處理程序。
3 Flink CEP實(shí)戰(zhàn)
為了使用Flink CEP,需要導(dǎo)入pom依賴。
<dependency><groupId>org.apache.flinkgroupId><artifactId>flink-cep-scala_2.11artifactId><version>1.7.0version>dependency>
LoginLog.csv中的數(shù)據(jù)格式為:
5402,83.149.11.115,success,155843081523064,66.249.3.15,fail,15584308265692,80.149.25.29,fail,15584308337233,86.226.15.75,success,15584308325692,80.149.25.29,success,155843084029607,66.249.73.135,success,1558430841
需求:檢測(cè)一個(gè)用戶在3秒內(nèi)連續(xù)登陸失敗。
// 輸入的登錄事件樣例類case class LoginEvent( userId: Long, ip: String, eventType: String, eventTime: Long )// 輸出的異常報(bào)警信息樣例類case class Warning( userId: Long, firstFailTime: Long, lastFailTime: Long, warningMsg: String)object LoginFailWithCep {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.setParallelism(1)// 1. 讀取事件數(shù)據(jù),創(chuàng)建簡(jiǎn)單事件流val resource = getClass.getResource("/LoginLog.csv")val loginEventStream = env.readTextFile(resource.getPath).map( data => {val dataArray = data.split(",")LoginEvent( dataArray(0).trim.toLong, dataArray(1).trim, dataArray(2).trim, dataArray(3).trim.toLong )} ).assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[LoginEvent](Time.seconds(5)) {override def extractTimestamp(element: LoginEvent): Long = element.eventTime * 1000L} ).keyBy(_.userId)// 2. 定義匹配模式val loginFailPattern = Pattern.begin[LoginEvent]("begin").where(_.eventType == "fail").next("next").where(_.eventType == "fail").within(Time.seconds(3))// 3. 在事件流上應(yīng)用模式,得到一個(gè)pattern streamval patternStream = CEP.pattern(loginEventStream, loginFailPattern)// 4. 從pattern stream上應(yīng)用select function,檢出匹配事件序列val loginFailDataStream = patternStream.select( new LoginFailMatch() )loginFailDataStream.print()env.execute("login fail with cep job")}}class LoginFailMatch() extends PatternSelectFunction[LoginEvent, Warning]{override def select(map: util.Map[String, util.List[LoginEvent]]): Warning = {// 從map中按照名稱取出對(duì)應(yīng)的事件// val iter = map.get("begin").iterator()val firstFail = map.get("begin").iterator().next()val lastFail = map.get("next").iterator().next()Warning( firstFail.userId, firstFail.eventTime, lastFail.eventTime, "login fail!" )}}
4 總結(jié)
本章主要圍繞scala語(yǔ)言來(lái)講解Flink CEP庫(kù)。其實(shí),F(xiàn)link CEP也有SQL的實(shí)現(xiàn)。
文章不錯(cuò)?點(diǎn)個(gè)【在看】吧!??



