首頁>技術>

批處理經常要解決的問題是將兩個資料來源做關聯Join操作。比如,很多手機APP都有一個使用者資料源User,同時APP會記錄使用者的行為,我們稱之為Behavior,兩個表按照userId來進行Join。在流處理場景下,Flink也支援了Join,只不過Flink是在一個時間視窗上來進行兩個表的Join。

Join示例圖

目前,Flink支援了兩種Join:Window Join(視窗連線)和Interval Join(時間間隔連線。

Window Join

從名字中能猜到,Window Join主要在Flink的視窗上進行操作,它將兩個流中落在相同視窗的元素按照某個Key進行Join。一個Window Join的大致骨架結構為:

input1.join(input2)    .where(<KeySelector>)      <- input1使用哪個欄位作為Key    .equalTo(<KeySelector>)    <- input2使用哪個欄位作為Key    .window(<WindowAssigner>)  <- 指定WindowAssigner    [.trigger(<Trigger>)]      <- 指定Trigger(可選)    [.evictor(<Evictor>)]      <- 指定Evictor(可選)    .apply(<JoinFunction>)     <- 指定JoinFunction

下圖展示了Join的大致過程。兩個輸入資料流先分別按Key進行分組,然後將元素劃分到視窗中。視窗的劃分需要使用WindowAssigner來定義,這裡可以使用Flink提供的滾動視窗、滑動視窗或會話視窗等預設的WindowAssigner。隨後兩個資料流中的元素會被分配到各個視窗上,也就是說一個視窗會包含來自兩個資料流的元素。相同視窗內的資料會以INNER JOIN的語義來相互關聯,形成一個數據對。當視窗的時間結束,Flink會呼叫JoinFunction來對視窗內的資料對進行處理。當然,我們也可以使用Trigger或Evictor做一些自定義優化,他們的使用方法和普通視窗的使用方法一樣。

接下來我們重點分析一下兩個資料流是如何INNER JOIN的:

視窗內資料INNER JOIN示意圖

一般滴,INNER JOIN只對兩個資料來源都出現的元素做Join,形成一個數據對,即資料來源input1中的某個元素與資料來源input2中的所有元素逐個配對。當資料來源某個視窗內沒資料時,比如圖中的第三個視窗,Join的結果也是空的。

class MyJoinFunction extends JoinFunction[(String, Int), (String, Int), String] { override def join(input1: (String, Int), input2: (String, Int)): String = { "input 1 :" + input1._2 + ", input 2 :" + input2._2 }}val input1: DataStream[(String, Int)] = ...val input2: DataStream[(String, Int)] = ...val joinResult = input1.join(input2) .where(i1 => i1._1) .equalTo(i2 => i2._1) .window(TumblingProcessingTimeWindows.of(Time.seconds(60))) .apply(new MyJoinFunction)

上面的程式碼自定義了JoinFunction,並將Join結果打印出來。無論程式碼中演示的滾動視窗,還是滑動視窗或會話視窗,其原理都是一樣的。除了JoinFunction,Flink還提供了FlatJoinFunction,其功能是輸出零到多個結果。

如果INNER JOIN不能滿足我們的需求,CoGroupFunction提供了更多可自定義的功能。需要注意的是,在呼叫時,要寫成input1.coGroup(input2).where(<KeySelector>).equalTo(<KeySelecotr>)。

class MyCoGroupFunction extends CoGroupFunction[(String, Int), (String, Int), String] { // 這裡的型別是Java的Iterable,需要引用 collection.JavaConverters._ 並轉成Scala override def coGroup(input1: lang.Iterable[(String, Int)], input2: lang.Iterable[(String, Int)], out: Collector[String]): Unit = { input1.asScala.foreach(element => out.collect("input1 :" + element.toString())) input2.asScala.foreach(element => out.collect("input2 :" + element.toString())) }}val input1: DataStream[(String, Int)] = ...val input2: DataStream[(String, Int)] = ...val coGroupResult = input1.coGroup(input2) .where(i1 => i1._1) .equalTo(i2 => i2._1) .window(TumblingProcessingTimeWindows.of(Time.seconds(60))) .apply(new MyCoGroupFunction)Interval Join

與Window Join不同,Interval Join不依賴Flink的WindowAssigner,而是根據一個時間間隔(Interval)界定時間。Interval需要一個時間下界(lower bound)和上界(upper bound),如果我們將input1和input2進行Interval Join,input1中的某個元素為input1.element1,時間戳為input1.element1.ts,那麼一個Interval就是[input1.element1.ts + lower bound, input1.element1.ts + upper bound],input2中落在這個時間段內的元素將會和input1.element1組成一個數據對。用數學公式表達為,凡是符合下面公式input1.element1.ts + lower bound <= input2.elementx.ts <=input1.element1.ts + upper bound的元素使用INNER JOIN語義,兩兩組合在一起。上下界可以是正數也可以是負數。

注意,目前Flink(1.9)的Interval Join只支援Event Time語義。

Interval Join示意圖

下面的程式碼展示了如何對兩個資料流進行Interval Join:

class MyProcessFunction extends ProcessJoinFunction[(String, Long, Int), (String, Long, Int), String] { override def processElement(input1: (String, Long, Int), input2: (String, Long, Int), context: ProcessJoinFunction[(String, Long, Int), (String, Long, Int), String]#Context, out: Collector[String]): Unit = { out.collect("input 1: " + input1.toString() + ", input 2: " + input2.toString) }}// 資料流有三個欄位:(key, 時間戳, 數值)val input1: DataStream[(String, Long, Int)] = ...val input2: DataStream[(String, Long, Int)] = ...val intervalJoinResult = input1.keyBy(_._1) .intervalJoin(input2.keyBy(_._1)) .between(Time.milliseconds(-5), Time.milliseconds(10)) .process(new MyProcessFunction)

預設的時間間隔是包含上下界的,我們可以使用.lowerBoundExclusive() 和.upperBoundExclusive來確定是否需要包含上下界。

val intervalJoinResult = input1.keyBy(_._1) .intervalJoin(input2.keyBy(_._1)) .between(Time.milliseconds(-5), Time.milliseconds(10)) .upperBoundExclusive() .lowerBoundExclusive() .process(new MyProcessFunction)

Interval Join內部是用快取來儲存所有資料的,因此需要注意快取資料不能太大,以免對記憶體造成絕大壓力。

最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • win10系統安裝SQL Server2005中文版安裝教程(親測成功)