Flink 與Flink可視化平臺StreamPark教程(時間相關(guān) 1)
本文分享自天翼云開發(fā)者社區(qū)《Flink 與Flink可視化平臺StreamPark教程(時間相關(guān) 1)》,作者:l****n
但在分布式系統(tǒng)中,這種驅(qū)動方式又會有一些問題。因為數(shù)據(jù)本身在處理轉(zhuǎn)換的過程中會變化,如果遇到窗口聚合這樣的操作,其實是要攢一批數(shù)據(jù)才會輸出一個結(jié)果,那么下游的數(shù)據(jù)就會變少,時間進(jìn)度的控制就不夠精細(xì)了。
所以我們應(yīng)該把時鐘也以數(shù)據(jù)的形式傳遞出去,告訴下游任務(wù)當(dāng)前時間的進(jìn)展;而且這個時鐘的傳遞不會因為窗口聚合之類的運(yùn)算而停滯。一種簡單的想法是,在數(shù)據(jù)流中加入一個時鐘標(biāo)記,記錄當(dāng)前的事件時間;這個標(biāo)記可以直接廣播到下游,當(dāng)下游任務(wù)收到這個標(biāo)記,就可以更新自己的時鐘了。由于類似于水流中用來做標(biāo)志的記號,在 Flink 中,這種用來衡量事件時間(Event Time)進(jìn)展的標(biāo)記,就被稱作“水位線”(Watermark)。
水位線設(shè)置package cn.ctyun.demo.api.watermark;import cn.ctyun.demo.api.utils.TransformUtil;import com.alibaba.fastjson.JSONObject;import com.ververica.cdc.connectors.mysql.source.MySqlSource;import com.ververica.cdc.connectors.mysql.table.StartupOptions;import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.time.Duration;/**
 * @classname: ViewContentStreamWithWaterMark
 * @description: 擁有水位線
 * @author: Liu Xinyuan
 * @create: 2023-04-14 09:50
 **/public class ViewContentStreamWithWaterMark {
    public static DataStream<JSONObject> getViewContentDataStream(StreamExecutionEnvironment env){
        // 1.創(chuàng)建Flink-MySQL-CDC的Source
        MySqlSource<String> viewContentSouce = MySqlSource.<String>builder()
                .hostname("***")
                .port(3306)
                .username("***")
                .password("***")
                .databaseList("test_cdc_source")
                .tableList("test_cdc_source.user_view")
                .startupOptions(StartupOptions.initial())
                .deserializer(new JsonDebeziumDeserializationSchema())
                .serverTimeZone("Asia/Shanghai")
                .build();
        // 2.使用CDC Source從MySQL讀取數(shù)據(jù)
        DataStreamSource<String> mysqlDataStreamSource = env.fromSource(
                viewContentSouce,
                WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(1L)).withTimestampAssigner(
                        new SerializableTimestampAssigner<String>() {
                            @Override
                            public long extractTimestamp(String extractData, long l) {
                                return JSONObject.parseObject(extractData).getLong("ts_ms");
                            }
                        }
                ),
                "ViewContentStreamWithWatermark Source"
        );
        // 3.轉(zhuǎn)換為指定格式
        return mysqlDataStreamSource.map(TransformUtil::formatResult);
    }}我們在cdc傳來的數(shù)據(jù)中獲取他的日志自帶更新時間戳字段ts_ms時間戳作為我們的事件時間,并生成水位線,此后此數(shù)據(jù)流將包含水位線進(jìn)行后續(xù)地傳遞。
在窗口中,有著不同的設(shè)置,可以面對不同的場景。我們按照數(shù)據(jù)不同的分配規(guī)則,將窗口的具體實現(xiàn)分為了以下四種,如下所示:
窗口操作主要有兩個部分:窗口分配器(Window Assigners)和窗口函數(shù)(Window Functions)
窗口函數(shù)MapReduce在這里,我們首先定義一個MapReduce過程,用來統(tǒng)計目前十秒內(nèi)的訪問統(tǒng)計數(shù)量,這里的水位線設(shè)定請參考代碼ViewContentStreamWithWaterMark(上文中提供的代碼)
package cn.ctyun.demo.api;import cn.ctyun.demo.api.watermark.ViewContentStreamWithWaterMark;import com.alibaba.fastjson.JSONObject;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;/**
 * @classname: ApiTimeWindow
 * @description: 時間窗的使用
 * @author: Liu Xinyuan
 * @create: 2023-04-17 20:39
 **/public class ApiTimeWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<JSONObject> viewContentDataStream = ViewContentStreamWithWaterMark.getViewContentDataStream(env);
        viewContentDataStream.filter(new FilterFunction<JSONObject>() {
            @Override
            public boolean filter(JSONObject value) throws Exception {
                // 不將刪除的數(shù)據(jù)考慮在內(nèi)
                return !value.getString("op").equals("d");
            }
        }).map(new MapFunction<JSONObject, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(JSONObject value) throws Exception {
                return Tuple2.of(value.getString("user_name"), 1L);
            }
        }).keyBy(r -> r.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                        // 設(shè)定一個累加規(guī)則
                        return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                    }
                }).print();
        env.execute();
    }}這里設(shè)定了一個時間窗口為10秒,最終的結(jié)果為每十秒鐘將統(tǒng)計一個登錄統(tǒng)計,并輸出到控制臺。使用時間窗口后和不加的唯一區(qū)別是計算的范圍變?yōu)榱藭r間窗內(nèi)計算。
*博客內(nèi)容為網(wǎng)友個人發(fā)布,僅代表博主個人觀點(diǎn),如有侵權(quán)請聯(lián)系工作人員刪除。

 加入技術(shù)交流群
加入技術(shù)交流群



 
						
					



