日本a√视频在线,久久青青亚洲国产,亚洲一区欧美二区,免费g片在线观看网站

        <style id="k3y6c"><u id="k3y6c"></u></style>
        <s id="k3y6c"></s>
        <mark id="k3y6c"></mark>
          
          

          <mark id="k3y6c"></mark>

          "); //-->

          博客專欄

          EEPW首頁 > 博客 > Flink 與Flink可視化平臺StreamPark教程(時間相關(guān) 1)

          Flink 與Flink可視化平臺StreamPark教程(時間相關(guān) 1)

          發(fā)布人:天翼云開發(fā)者 時間:2025-09-12 來源:工程師 發(fā)布文章

          本文分享自天翼云開發(fā)者社區(qū)《Flink 與Flink可視化平臺StreamPark教程(時間相關(guān) 1)》,作者:l****n

          水位線與窗口

          對于流式數(shù)據(jù),時間是一個重要的標(biāo)識。在flink的事件時間語義下,我們不依賴系統(tǒng)時間,而是基于數(shù)據(jù)自帶的時間戳去定義了一個時鐘,用來表示當(dāng)前時間的進(jìn)展。于是每個并行子任務(wù)都會有一個自己的邏輯時鐘,它的前進(jìn)是靠數(shù)據(jù)的時間戳來驅(qū)動的。

          但在分布式系統(tǒng)中,這種驅(qū)動方式又會有一些問題。因為數(shù)據(jù)本身在處理轉(zhuǎn)換的過程中會變化,如果遇到窗口聚合這樣的操作,其實是要攢一批數(shù)據(jù)才會輸出一個結(jié)果,那么下游的數(shù)據(jù)就會變少,時間進(jìn)度的控制就不夠精細(xì)了。

          所以我們應(yīng)該把時鐘也以數(shù)據(jù)的形式傳遞出去,告訴下游任務(wù)當(dāng)前時間的進(jìn)展;而且這個時鐘的傳遞不會因為窗口聚合之類的運(yùn)算而停滯。一種簡單的想法是,在數(shù)據(jù)流中加入一個時鐘標(biāo)記,記錄當(dāng)前的事件時間;這個標(biāo)記可以直接廣播到下游,當(dāng)下游任務(wù)收到這個標(biāo)記,就可以更新自己的時鐘了。由于類似于水流中用來做標(biāo)志的記號,在 Flink 中,這種用來衡量事件時間(Event Time)進(jìn)展的標(biāo)記,就被稱作“水位線”(Watermark)。

          水位線設(shè)置

          這里我們將通過mysql-cdc來生成一個水位線,我們在讀取數(shù)據(jù)源的一側(cè)進(jìn)行設(shè)置。

          package cn.ctyun.demo.api.watermark;import cn.ctyun.demo.api.utils.TransformUtil;import com.alibaba.fastjson.JSONObject;import com.ververica.cdc.connectors.mysql.source.MySqlSource;import com.ververica.cdc.connectors.mysql.table.StartupOptions;import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.time.Duration;/**
           * @classname: ViewContentStreamWithWaterMark
           * @description: 擁有水位線
           * @author: Liu Xinyuan
           * @create: 2023-04-14 09:50
           **/public class ViewContentStreamWithWaterMark {
          
              public static DataStream<JSONObject> getViewContentDataStream(StreamExecutionEnvironment env){
                  // 1.創(chuàng)建Flink-MySQL-CDC的Source
                  MySqlSource<String> viewContentSouce = MySqlSource.<String>builder()
                          .hostname("***")
                          .port(3306)
                          .username("***")
                          .password("***")
                          .databaseList("test_cdc_source")
                          .tableList("test_cdc_source.user_view")
                          .startupOptions(StartupOptions.initial())
                          .deserializer(new JsonDebeziumDeserializationSchema())
                          .serverTimeZone("Asia/Shanghai")
                          .build();
          
                  // 2.使用CDC Source從MySQL讀取數(shù)據(jù)
                  DataStreamSource<String> mysqlDataStreamSource = env.fromSource(
                          viewContentSouce,
                          WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(1L)).withTimestampAssigner(
                                  new SerializableTimestampAssigner<String>() {
                                      @Override
                                      public long extractTimestamp(String extractData, long l) {
                                          return JSONObject.parseObject(extractData).getLong("ts_ms");
                                      }
                                  }
                          ),
                          "ViewContentStreamWithWatermark Source"
                  );
                  // 3.轉(zhuǎn)換為指定格式
                  return mysqlDataStreamSource.map(TransformUtil::formatResult);
          
              }}

          我們在cdc傳來的數(shù)據(jù)中獲取他的日志自帶更新時間戳字段ts_ms時間戳作為我們的事件時間,并生成水位線,此后此數(shù)據(jù)流將包含水位線進(jìn)行后續(xù)地傳遞。

          窗口設(shè)置

          在窗口中,有著不同的設(shè)置,可以面對不同的場景。我們按照數(shù)據(jù)不同的分配規(guī)則,將窗口的具體實現(xiàn)分為了以下四種,如下所示:

          • 滾動窗口(Tumbling Windows):滾動窗口有固定的大小,是一種對數(shù)據(jù)進(jìn)行“均勻切片”的劃分方式。窗口之間沒有重疊,也不會有間隔,是“首尾相接”的狀態(tài)。如果我們把多個窗口的創(chuàng)建,看作一個窗口的運(yùn)動,就好像它在不停地向前“翻滾”一樣。這是最簡單的窗口形式,我們之前所舉的例子都是滾動窗口。也正是因為滾動窗口是“無縫銜接”,所以每個數(shù)據(jù)都會被分配到一個窗口,而且只會屬于一個窗口。滾動窗口也是在BI分析中最常用的窗口類型之一。

          • 滑動窗口(Sliding Windows ):與滾動窗口類似,滑動窗口的大小也是固定的。區(qū)別在于,窗口之間并不是首尾相接的,而是可以“錯開”一定的位置。如果看作一個窗口的運(yùn)動,那么就像是向前小步“滑動”一樣。所以定義滑動窗口的參數(shù)有兩個:窗口大?。╳indow size)定義了窗口的大小,還有一個“滑動步長”(window slide),代表了窗口計算的頻率。

          • 會話窗口(Session Windows):會話窗口顧名思義,是基于“會話”(session)來來對數(shù)據(jù)進(jìn)行分組的。這里的會話類似Web 應(yīng)用中 session 的概念,不過并不表示兩端的通訊過程,而是借用會話超時失效的機(jī)制來 描述窗口。簡單來說,就是數(shù)據(jù)來了之后就開啟一個會話窗口,如果接下來還有數(shù)據(jù)陸續(xù)到來,那么就一直保持會話;如果一段時間一直沒收到數(shù)據(jù),那就認(rèn)為會話超時失效,窗口自動關(guān)閉。一般而言將會給數(shù)據(jù)設(shè)置一個超時時間,如果兩個數(shù)據(jù)間間隔過長并大于超時時間。在這里所有能夠控制的就是超時時間(gap),其作為判定新窗口開啟的一個重要指標(biāo)。

          • 全局窗口(Session Windows):這種窗口全局有效,會把相同 key 的所有數(shù)據(jù)都分配到同一個窗口中;無界流的數(shù)據(jù)永無止盡,所以這種窗口也沒有結(jié)束的時候,默認(rèn)是不會做觸發(fā)計算的。如果希望它能對數(shù)據(jù)進(jìn)行計算處理,還需要自定義“觸發(fā)器”(Trigger)。

          窗口API

          窗口操作主要有兩個部分:窗口分配器(Window Assigners)和窗口函數(shù)(Window Functions)

          窗口函數(shù)MapReduce

          在這里,我們首先定義一個MapReduce過程,用來統(tǒng)計目前十秒內(nèi)的訪問統(tǒng)計數(shù)量,這里的水位線設(shè)定請參考代碼ViewContentStreamWithWaterMark(上文中提供的代碼),具體的MapReduce如下所示

          package cn.ctyun.demo.api;import cn.ctyun.demo.api.watermark.ViewContentStreamWithWaterMark;import com.alibaba.fastjson.JSONObject;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;/**
           * @classname: ApiTimeWindow
           * @description: 時間窗的使用
           * @author: Liu Xinyuan
           * @create: 2023-04-17 20:39
           **/public class ApiTimeWindow {
          
              public static void main(String[] args) throws Exception {
                  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                  env.setParallelism(1);
          
                  DataStream<JSONObject> viewContentDataStream = ViewContentStreamWithWaterMark.getViewContentDataStream(env);
          
                  viewContentDataStream.filter(new FilterFunction<JSONObject>() {
                      @Override
                      public boolean filter(JSONObject value) throws Exception {
                          // 不將刪除的數(shù)據(jù)考慮在內(nèi)
                          return !value.getString("op").equals("d");
                      }
                  }).map(new MapFunction<JSONObject, Tuple2<String, Long>>() {
                      @Override
                      public Tuple2<String, Long> map(JSONObject value) throws Exception {
                          return Tuple2.of(value.getString("user_name"), 1L);
                      }
                  }).keyBy(r -> r.f0)
                          .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                          .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                              @Override
                              public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                                  // 設(shè)定一個累加規(guī)則
                                  return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                              }
                          }).print();
                  env.execute();
              }}

          這里設(shè)定了一個時間窗口為10秒,最終的結(jié)果為每十秒鐘將統(tǒng)計一個登錄統(tǒng)計,并輸出到控制臺。使用時間窗口后和不加的唯一區(qū)別是計算的范圍變?yōu)榱藭r間窗內(nèi)計算。


          *博客內(nèi)容為網(wǎng)友個人發(fā)布,僅代表博主個人觀點(diǎn),如有侵權(quán)請聯(lián)系工作人員刪除。


          關(guān)鍵詞: 大數(shù)據(jù) flink 計算

          相關(guān)推薦

          技術(shù)專區(qū)

          關(guān)閉