Flink 與Flink可視化平臺StreamPark教程(時間相關(guān) 1)
本文分享自天翼云開發(fā)者社區(qū)《Flink 與Flink可視化平臺StreamPark教程(時間相關(guān) 1)》,作者:l****n
但在分布式系統(tǒng)中,這種驅(qū)動方式又會有一些問題。因為數(shù)據(jù)本身在處理轉(zhuǎn)換的過程中會變化,如果遇到窗口聚合這樣的操作,其實是要攢一批數(shù)據(jù)才會輸出一個結(jié)果,那么下游的數(shù)據(jù)就會變少,時間進(jìn)度的控制就不夠精細(xì)了。
所以我們應(yīng)該把時鐘也以數(shù)據(jù)的形式傳遞出去,告訴下游任務(wù)當(dāng)前時間的進(jìn)展;而且這個時鐘的傳遞不會因為窗口聚合之類的運(yùn)算而停滯。一種簡單的想法是,在數(shù)據(jù)流中加入一個時鐘標(biāo)記,記錄當(dāng)前的事件時間;這個標(biāo)記可以直接廣播到下游,當(dāng)下游任務(wù)收到這個標(biāo)記,就可以更新自己的時鐘了。由于類似于水流中用來做標(biāo)志的記號,在 Flink 中,這種用來衡量事件時間(Event Time)進(jìn)展的標(biāo)記,就被稱作“水位線”(Watermark)。
水位線設(shè)置package cn.ctyun.demo.api.watermark;import cn.ctyun.demo.api.utils.TransformUtil;import com.alibaba.fastjson.JSONObject;import com.ververica.cdc.connectors.mysql.source.MySqlSource;import com.ververica.cdc.connectors.mysql.table.StartupOptions;import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.time.Duration;/**
* @classname: ViewContentStreamWithWaterMark
* @description: 擁有水位線
* @author: Liu Xinyuan
* @create: 2023-04-14 09:50
**/public class ViewContentStreamWithWaterMark {
public static DataStream<JSONObject> getViewContentDataStream(StreamExecutionEnvironment env){
// 1.創(chuàng)建Flink-MySQL-CDC的Source
MySqlSource<String> viewContentSouce = MySqlSource.<String>builder()
.hostname("***")
.port(3306)
.username("***")
.password("***")
.databaseList("test_cdc_source")
.tableList("test_cdc_source.user_view")
.startupOptions(StartupOptions.initial())
.deserializer(new JsonDebeziumDeserializationSchema())
.serverTimeZone("Asia/Shanghai")
.build();
// 2.使用CDC Source從MySQL讀取數(shù)據(jù)
DataStreamSource<String> mysqlDataStreamSource = env.fromSource(
viewContentSouce,
WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(1L)).withTimestampAssigner(
new SerializableTimestampAssigner<String>() {
@Override
public long extractTimestamp(String extractData, long l) {
return JSONObject.parseObject(extractData).getLong("ts_ms");
}
}
),
"ViewContentStreamWithWatermark Source"
);
// 3.轉(zhuǎn)換為指定格式
return mysqlDataStreamSource.map(TransformUtil::formatResult);
}}我們在cdc傳來的數(shù)據(jù)中獲取他的日志自帶更新時間戳字段ts_ms時間戳作為我們的事件時間,并生成水位線,此后此數(shù)據(jù)流將包含水位線進(jìn)行后續(xù)地傳遞。
在窗口中,有著不同的設(shè)置,可以面對不同的場景。我們按照數(shù)據(jù)不同的分配規(guī)則,將窗口的具體實現(xiàn)分為了以下四種,如下所示:
窗口操作主要有兩個部分:窗口分配器(Window Assigners)和窗口函數(shù)(Window Functions)
窗口函數(shù)MapReduce在這里,我們首先定義一個MapReduce過程,用來統(tǒng)計目前十秒內(nèi)的訪問統(tǒng)計數(shù)量,這里的水位線設(shè)定請參考代碼ViewContentStreamWithWaterMark(上文中提供的代碼)
package cn.ctyun.demo.api;import cn.ctyun.demo.api.watermark.ViewContentStreamWithWaterMark;import com.alibaba.fastjson.JSONObject;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;/**
* @classname: ApiTimeWindow
* @description: 時間窗的使用
* @author: Liu Xinyuan
* @create: 2023-04-17 20:39
**/public class ApiTimeWindow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<JSONObject> viewContentDataStream = ViewContentStreamWithWaterMark.getViewContentDataStream(env);
viewContentDataStream.filter(new FilterFunction<JSONObject>() {
@Override
public boolean filter(JSONObject value) throws Exception {
// 不將刪除的數(shù)據(jù)考慮在內(nèi)
return !value.getString("op").equals("d");
}
}).map(new MapFunction<JSONObject, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(JSONObject value) throws Exception {
return Tuple2.of(value.getString("user_name"), 1L);
}
}).keyBy(r -> r.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
// 設(shè)定一個累加規(guī)則
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
}).print();
env.execute();
}}這里設(shè)定了一個時間窗口為10秒,最終的結(jié)果為每十秒鐘將統(tǒng)計一個登錄統(tǒng)計,并輸出到控制臺。使用時間窗口后和不加的唯一區(qū)別是計算的范圍變?yōu)榱藭r間窗內(nèi)計算。
*博客內(nèi)容為網(wǎng)友個人發(fā)布,僅代表博主個人觀點(diǎn),如有侵權(quán)請聯(lián)系工作人員刪除。









