1. Flink CEP 原理和案例詳解

        共 5871字,需瀏覽 12分鐘

         ·

        2020-08-18 09:00

        點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)

        回復(fù)”資源“獲取更多資源

        大數(shù)據(jù)技術(shù)與架構(gòu)
        點(diǎn)擊右側(cè)關(guān)注,大數(shù)據(jù)開發(fā)領(lǐng)域最強(qiáng)公眾號(hào)!

        暴走大數(shù)據(jù)
        點(diǎn)擊右側(cè)關(guān)注,暴走大數(shù)據(jù)!

        1 概念

        (1)定義
        復(fù)合事件處理(Complex Event Processing,CEP)是一種基于動(dòng)態(tài)環(huán)境中事件流的分析技術(shù),事件在這里通常是有意義的狀態(tài)變化,通過(guò)分析事件間的關(guān)系,利用過(guò)濾、關(guān)聯(lián)、聚合等技術(shù),根據(jù)事件間的時(shí)序關(guān)系和聚合關(guān)系制定檢測(cè)規(guī)則,持續(xù)地從事件流中查詢出符合要求的事件序列,最終分析得到更復(fù)雜的復(fù)合事件。
        (2)特征
        CEP的特征如下:
        ??????目標(biāo):從有序的簡(jiǎn)單事件流中發(fā)現(xiàn)一些高階特征;
        ??????輸入:一個(gè)或多個(gè)簡(jiǎn)單事件構(gòu)成的事件流;
        ??????處理:識(shí)別簡(jiǎn)單事件之間的內(nèi)在聯(lián)系,多個(gè)符合一定規(guī)則的簡(jiǎn)單事件構(gòu)成復(fù)雜事件;
        ??????輸出:滿足規(guī)則的復(fù)雜事件。

        (3)功能
        CEP用于分析低延遲、頻繁產(chǎn)生的不同來(lái)源的事件流。CEP可以幫助在復(fù)雜的、不相關(guān)的時(shí)間流中找出有意義的模式和復(fù)雜的關(guān)系,以接近實(shí)時(shí)或準(zhǔn)實(shí)時(shí)的獲得通知或組織一些行為。
        CEP支持在流上進(jìn)行模式匹配,根據(jù)模式的條件不同,分為連續(xù)的條件或不連續(xù)的條件;模式的條件允許有時(shí)間的限制,當(dāng)條件范圍內(nèi)沒(méi)有達(dá)到滿足的條件時(shí),會(huì)導(dǎo)致模式匹配超時(shí)。
        ??????看起來(lái)很簡(jiǎn)單,但是它有很多不同的功能:
        ???
        ???① 輸入的流數(shù)據(jù),盡快產(chǎn)生結(jié)果;
        ??????② 在2個(gè)事件流上,基于時(shí)間進(jìn)行聚合類的計(jì)算;
        ??????③ 提供實(shí)時(shí)/準(zhǔn)實(shí)時(shí)的警告和通知;
        ??????④ 在多樣的數(shù)據(jù)源中產(chǎn)生關(guān)聯(lián)分析模式;
        ??????⑤ 高吞吐、低延遲的處理
        ??????市場(chǎng)上有多種CEP的解決方案,例如Spark、Samza、Beam等,但他們都沒(méi)有提供專門的庫(kù)支持。然而,F(xiàn)link提供了專門的CEP庫(kù)。
        (4)主要組件
        ????? Flink為CEP提供了專門的Flink CEP library,它包含如下組件:Event Stream、Pattern定義、Pattern檢測(cè)和生成Alert。
        首先,開發(fā)人員要在DataStream流上定義出模式條件,之后Flink CEP引擎進(jìn)行模式檢測(cè),必要時(shí)生成警告。

        2 Pattern API

        處理事件的規(guī)則,被叫作模式(Pattern)。
        Flink CEP提供了Pattern API用于對(duì)輸入流數(shù)據(jù)進(jìn)行復(fù)雜事件規(guī)則定義,用來(lái)提取符合規(guī)則的事件序列。
        模式大致分為三類:
        ?① 個(gè)體模式(Individual Patterns)
        組成復(fù)雜規(guī)則的每一個(gè)單獨(dú)的模式定義,就是個(gè)體模式。

        start.times(3).where(_.behavior.startsWith(‘fav’))

        ?② 組合模式(Combining Patterns,也叫模式序列)
        很多個(gè)體模式組合起來(lái),就形成了整個(gè)的模式序列。
        模式序列必須以一個(gè)初始模式開始:

        val start = Pattern.begin(‘start’)

        ③ 模式組(Group of Pattern)

        將一個(gè)模式序列作為條件嵌套在個(gè)體模式里,成為一組模式。

        2.1個(gè)體模式

        個(gè)體模式包括單例模式和循環(huán)模式。單例模式只接收一個(gè)事件,而循環(huán)模式可以接收多個(gè)事件。
        (1)量詞
        可以在一個(gè)個(gè)體模式后追加量詞,也就是指定循環(huán)次數(shù)

        // 匹配出現(xiàn)4次start.time(4)// 匹配出現(xiàn)0次或4次start.time(4).optional// 匹配出現(xiàn)2、3或4次start.time(2,4)// 匹配出現(xiàn)2、3或4次,并且盡可能多地重復(fù)匹配start.time(2,4).greedy// 匹配出現(xiàn)1次或多次start.oneOrMore// 匹配出現(xiàn)0、2或多次,并且盡可能多地重復(fù)匹配start.timesOrMore(2).optional.greedy

        (2)條件

        每個(gè)模式都需要指定觸發(fā)條件,作為模式是否接受事件進(jìn)入的判斷依據(jù)。CEP中的個(gè)體模式主要通過(guò)調(diào)用.where()、.or()和.until()來(lái)指定條件。按不同的調(diào)用方式,可以分成以下幾類:
        ??????① 簡(jiǎn)單條件
        通過(guò).where()方法對(duì)事件中的字段進(jìn)行判斷篩選,決定是否接收該事件

        start.where(event=>event.getName.startsWith(“foo”))

        ??????② 組合條件
        將簡(jiǎn)單的條件進(jìn)行合并;or()方法表示或邏輯相連,where的直接組合就相當(dāng)于與and。

        Pattern.where(event =>/*some condition*/).or(event => /*or condition*/)

        ??????③ 終止條件
        如果使用了oneOrMore或者oneOrMore.optional,建議使用.until()作為終止條件,以便清理狀態(tài)。
        ??????④ 迭代條件
        能夠?qū)δJ街八薪邮盏氖录M(jìn)行處理;調(diào)用.where((value,ctx) => {…}),可以調(diào)用ctx.getEventForPattern(“name”)

        2.2 模式序列

        ??????不同的近鄰模式如下圖:

        (1)嚴(yán)格近鄰
        所有事件按照嚴(yán)格的順序出現(xiàn),中間沒(méi)有任何不匹配的事件,由.next()指定。例如對(duì)于模式“a next b”,事件序列“a,c,b1,b2”沒(méi)有匹配。
        (2)寬松近鄰
        允許中間出現(xiàn)不匹配的事件,由.followedBy()指定。例如對(duì)于模式“a followedBy b”,事件序列“a,c,b1,b2”匹配為{a,b1}。
        (3)非確定性寬松近鄰
        進(jìn)一步放寬條件,之前已經(jīng)匹配過(guò)的事件也可以再次使用,由.followedByAny()指定。例如對(duì)于模式“a followedByAny b”,事件序列“a,c,b1,b2”匹配為{ab1},{a,b2}。
        除了以上模式序列外,還可以定義“不希望出現(xiàn)某種近鄰關(guān)系”:
        ????? .notNext():不想讓某個(gè)事件嚴(yán)格緊鄰前一個(gè)事件發(fā)生。
        ????? .notFollowedBy():不想讓某個(gè)事件在兩個(gè)事件之間發(fā)生。
        需要注意:①所有模式序列必須以.begin()開始;②模式序列不能以.notFollowedBy()結(jié)束;③“not”類型的模式不能被optional所修飾;④可以為模式指定時(shí)間約束,用來(lái)要求在多長(zhǎng)時(shí)間內(nèi)匹配有效。

        next.within(Time.seconds(10))

        2.3 模式的檢測(cè)

        指定要查找的模式序列后,就可以將其應(yīng)用于輸入流以檢測(cè)潛在匹配。調(diào)用CEP.pattern(),給定輸入流和模式,就能得到一個(gè)PatternStream。

        val input:DataStream[Event] = …val pattern:Pattern[Event,_] = …val patternStream:PatternStream[Event]=CEP.pattern(input,pattern)


        2.4 匹配事件的提取

        創(chuàng)建PatternStream之后,就可以應(yīng)用select或者flatSelect方法,從檢測(cè)到的事件序列中提取事件了。
        select()方法需要輸入一個(gè)select function作為參數(shù),每個(gè)成功匹配的事件序列都會(huì)調(diào)用它。
        select()以一個(gè)Map[String,Iterable[IN]]來(lái)接收匹配到的事件序列,其中key就是每個(gè)模式的名稱,而value就是所有接收到的事件的Iterable類型。

        def selectFn(pattern : Map[String,Iterable[IN]]):OUT={  val startEvent = pattern.get(“start”).get.next  val endEvent = pattern.get(“end”).get.next  OUT(startEvent, endEvent)}


        flatSelect通過(guò)實(shí)現(xiàn)PatternFlatSelectFunction實(shí)現(xiàn)與select相似的功能。唯一的區(qū)別就是flatSelect方法可以返回多條記錄,它通過(guò)一個(gè)Collector[OUT]類型的參數(shù)來(lái)將要輸出的數(shù)據(jù)傳遞到下游。

        2.5超時(shí)事件的提取

        當(dāng)一個(gè)模式通過(guò)within關(guān)鍵字定義了檢測(cè)窗口時(shí)間時(shí),部分事件序列可能因?yàn)槌^(guò)窗口長(zhǎng)度而被丟棄;為了能夠處理這些超時(shí)的部分匹配,select和flatSelect API調(diào)用允許指定超時(shí)處理程序。

        3 Flink CEP實(shí)戰(zhàn)

        為了使用Flink CEP,需要導(dǎo)入pom依賴。

        <dependency><groupId>org.apache.flinkgroupId><artifactId>flink-cep-scala_2.11artifactId><version>1.7.0version>dependency>

        LoginLog.csv中的數(shù)據(jù)格式為:

        5402,83.149.11.115,success,155843081523064,66.249.3.15,fail,15584308265692,80.149.25.29,fail,15584308337233,86.226.15.75,success,15584308325692,80.149.25.29,success,155843084029607,66.249.73.135,success,1558430841

        需求:檢測(cè)一個(gè)用戶在3秒內(nèi)連續(xù)登陸失敗。

        // 輸入的登錄事件樣例類case class LoginEvent( userId: Long, ip: String, eventType: String, eventTime: Long )// 輸出的異常報(bào)警信息樣例類case class Warning( userId: Long, firstFailTime: Long, lastFailTime: Long, warningMsg: String)
        object LoginFailWithCep { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1)
        // 1. 讀取事件數(shù)據(jù),創(chuàng)建簡(jiǎn)單事件流 val resource = getClass.getResource("/LoginLog.csv") val loginEventStream = env.readTextFile(resource.getPath) .map( data => { val dataArray = data.split(",") LoginEvent( dataArray(0).trim.toLong, dataArray(1).trim, dataArray(2).trim, dataArray(3).trim.toLong ) } ) .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[LoginEvent](Time.seconds(5)) { override def extractTimestamp(element: LoginEvent): Long = element.eventTime * 1000L } ) .keyBy(_.userId)
        // 2. 定義匹配模式 val loginFailPattern = Pattern.begin[LoginEvent]("begin").where(_.eventType == "fail") .next("next").where(_.eventType == "fail") .within(Time.seconds(3))
        // 3. 在事件流上應(yīng)用模式,得到一個(gè)pattern stream val patternStream = CEP.pattern(loginEventStream, loginFailPattern)
        // 4. 從pattern stream上應(yīng)用select function,檢出匹配事件序列 val loginFailDataStream = patternStream.select( new LoginFailMatch() )
        loginFailDataStream.print()
        env.execute("login fail with cep job") }}
        class LoginFailMatch() extends PatternSelectFunction[LoginEvent, Warning]{ override def select(map: util.Map[String, util.List[LoginEvent]]): Warning = { // 從map中按照名稱取出對(duì)應(yīng)的事件 // val iter = map.get("begin").iterator() val firstFail = map.get("begin").iterator().next() val lastFail = map.get("next").iterator().next() Warning( firstFail.userId, firstFail.eventTime, lastFail.eventTime, "login fail!" ) }}


        4 總結(jié)

        本章主要圍繞scala語(yǔ)言來(lái)講解Flink CEP庫(kù)。其實(shí),F(xiàn)link CEP也有SQL的實(shí)現(xiàn)。

        歡迎點(diǎn)贊+收藏+轉(zhuǎn)發(fā)朋友圈素質(zhì)三連



        文章不錯(cuò)?點(diǎn)個(gè)【在看】吧!??

        瀏覽 41
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
          
          

            1. 男女大鸡巴激情免费黄色视频网站 | 野外被三个男人躁一夜漫画 | 熟女 的搜索结果 - 91n | 午夜成人性爱视频 | 高清一区二区三区 |