1. Flink入門 03.入門案例

        共 22218字,需瀏覽 45分鐘

         ·

        2021-09-04 23:47

        1   前置說明

        1.1   API

        Flink提供了多個層次的API供開發(fā)者使用,越往上抽象程度越高,使用起來越方便;越往下越底層,使用起來難度越大

        注意:在Flink1.12時支持流批一體,DataSet API已經(jīng)不推薦使用了,所以課程中除了個別案例使用DataSet外,后續(xù)其他案例都會優(yōu)先使用DataStream流式API,既支持無界數(shù)據(jù)處理/流處理,也支持有界數(shù)據(jù)處理/批處理!當(dāng)然Table&SQL-API會單獨學(xué)習(xí)

        https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/batch/

        https://developer.aliyun.com/article/780123?spm=a2c6h.12873581.0.0.1e3e46ccbYFFrC

        https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_api.html

        1.2   編程模型

        https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_api.html

        Flink 應(yīng)用程序結(jié)構(gòu)主要包含三部分,Source/Transformation/Sink,如下圖所示:

        2   準(zhǔn)備工程

        2.1  pom文件

        <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
            <flink.version>1.13.2</flink.version>
        </properties>

        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.11</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_2.11</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.20</version>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>8.0.25</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-jdbc_2.11</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka_2.11</artifactId>
                <version>${flink.version}</version>
            </dependency>
        </dependencies>

        2.2   log4j.properties

        log4j.rootLogger=WARN, console
        log4j.appender.console=org.apache.log4j.ConsoleAppender
        log4j.appender.console.layout=org.apache.log4j.PatternLayout
        log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

        3   Flink初體驗

        3.1   需求

        使用Flink實現(xiàn)WordCount

        3.2   編碼步驟

        https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_api.html

        1. 準(zhǔn)備環(huán)境-env

        2. 準(zhǔn)備數(shù)據(jù)-source

        3. 處理數(shù)據(jù)-transformation

        4. 輸出結(jié)果-sink

        5. 觸發(fā)執(zhí)行-execute

        其中創(chuàng)建環(huán)境可以使用如下3種方式:

        getExecutionEnvironment() //推薦使用
        createLocalEnvironment()
        createRemoteEnvironment(String host, int port, String... jarFiles)

        3.3   代碼實現(xiàn)

        3.3.1   基于DataSet

        package com.song.flink;

        import org.apache.flink.api.common.functions.FlatMapFunction;
        import org.apache.flink.api.common.functions.MapFunction;
        import org.apache.flink.api.common.operators.Order;
        import org.apache.flink.api.java.DataSet;
        import org.apache.flink.api.java.ExecutionEnvironment;
        import org.apache.flink.api.java.operators.*;
        import org.apache.flink.api.java.tuple.Tuple2;
        import org.apache.flink.util.Collector;

        /**
         * 需求:使用Flink完成WordCount-DataSet
         * 編碼步驟
         * 1.準(zhǔn)備環(huán)境-env
         * 2.準(zhǔn)備數(shù)據(jù)-source
         * 3.處理數(shù)據(jù)-transformation
         * 4.輸出結(jié)果-sink
         * 5.觸發(fā)執(zhí)行-execute//如果有print,DataSet不需要調(diào)用execute,DataStream需要調(diào)用execute
         */

        public class WorkCountWithDataSet {
            public static void main(String[] args) throws Exception {
                //老版本的批處理API如下,但已經(jīng)不推薦使用了
                //1.準(zhǔn)備環(huán)境-env
                ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                //2.準(zhǔn)備數(shù)據(jù)-source
                DataSet<String> lineDS = env.fromElements("Hello Hadoop""Hello Spark""Hello Flink");

                //3.處理數(shù)據(jù)-transformation
                //3.1 每行數(shù)據(jù)按照空格切分成一個個的單詞組成的集合
                DataSet<String> wordsDS = lineDS.flatMap(new FlatMapFunction<String, String>() {
                    @Override
                    public void flatMap(String line, Collector<String> out) throws Exception {
                        // line就是一行行的數(shù)據(jù)
                        String[] words = line.split(" ");
                        for (String word : words) {
                            // 將切割處理的一個個單詞收集起來并返回
                            out.collect(word);
                        }
                    }
                });

                // 3.2 對集合中的每個單詞記為1,(word, 1)
                DataSet<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String word) throws Exception {
                        return Tuple2.of(word, 1);
                    }
                });

                // 3.3 對數(shù)據(jù)按照單詞key進(jìn)行分組
                // 0表示按照tuple中索引為0的字段,也就是key(單詞)進(jìn)行分組
                UnsortedGrouping<Tuple2<String, Integer>> groupedDS = wordAndOneDS.groupBy(0);

                // 3.4 對各個組內(nèi)數(shù)據(jù)按照數(shù)量(value)進(jìn)行聚合求sum
                // 1表示按照tuple中的索引為1的字段也就是按照數(shù)量進(jìn)行聚合累加
                DataSet<Tuple2<String, Integer>> aggResult = groupedDS.sum(1);

                // 3.5 排序
                DataSet<Tuple2<String, Integer>> result = aggResult.sortPartition(1, Order.DESCENDING).setParallelism(1);

                // 4.輸出結(jié)果
                result.print();

                // 5.觸發(fā)執(zhí)行-execute
                // 如果有print,Dataset不需要調(diào)用execute,DataStream需要調(diào)用execute
                // env.execute(); // execute(),count(),collect(),print()
            }
        }

        執(zhí)行結(jié)果如下:

        3.3.2   基于DataStream

        package com.song.flink;

        import org.apache.flink.api.common.RuntimeExecutionMode;
        import org.apache.flink.api.common.functions.FlatMapFunction;
        import org.apache.flink.api.common.functions.MapFunction;
        import org.apache.flink.api.java.tuple.Tuple;
        import org.apache.flink.api.java.tuple.Tuple2;
        import org.apache.flink.streaming.api.datastream.DataStream;
        import org.apache.flink.streaming.api.datastream.KeyedStream;
        import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import org.apache.flink.util.Collector;

        /**
         * Author SongXitang
         * Desc
         * 需求:使用Flink完成WordCount-DataStream
         * 編碼步驟
         * 1.準(zhǔn)備環(huán)境-env
         * 2.準(zhǔn)備數(shù)據(jù)-source
         * 3.處理數(shù)據(jù)-transformation
         * 4.輸出結(jié)果-sink
         * 5.觸發(fā)執(zhí)行-execute
         */

        public class WordCountWithDataStream {
            public static void main(String[] args) throws Exception {
                // 新版本的流批統(tǒng)一API,既支持流處理也支持批處理
                // 1.準(zhǔn)備環(huán)境-env
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
                //env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
                //env.setRuntimeMode(RuntimeExecutionMode.BATCH);

                // 2.準(zhǔn)備數(shù)據(jù)-source
                DataStream<String> linesDS = env.fromElements("Hello Hadoop""Hello Spark""Hello Flink");

                // 3.處理數(shù)據(jù)-transfromation
                DataStream<String> wordsDS = linesDS.flatMap(new FlatMapFunction<String, String>() {
                    @Override
                    public void flatMap(String line, Collector<String> collector) throws Exception {
                        String[] words = line.split(" ");
                        for (String word : words) {
                            collector.collect(word);
                        }
                    }
                });
                DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String word) throws Exception {
                        return Tuple2.of(word, 1);
                    }
                });
                //KeyedStream<Tuple2<String, Integer>, Tuple> keyByDS = wordAndOneDS.keyBy(0);
                KeyedStream<Tuple2<String, Integer>, String> keyByDS = wordAndOneDS.keyBy(t -> t.f0);
                SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyByDS.sum(1);

                // 4.輸出結(jié)果
                result.print();

                // 5.觸發(fā)執(zhí)行-execute
                // DataStream需要調(diào)用execute
                env.execute();
            }
        }

        執(zhí)行結(jié)果如下:

        3.3.3   Lambda版

        https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/java_lambdas.html#java-lambda-expressions

        package com.song.flink;

        import org.apache.flink.api.common.RuntimeExecutionMode;
        import org.apache.flink.api.common.typeinfo.TypeHint;
        import org.apache.flink.api.common.typeinfo.TypeInformation;
        import org.apache.flink.api.common.typeinfo.Types;
        import org.apache.flink.api.java.tuple.Tuple2;
        import org.apache.flink.streaming.api.datastream.DataStream;
        import org.apache.flink.streaming.api.datastream.KeyedStream;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import org.apache.flink.util.Collector;

        import java.util.Arrays;

        public class WordCountLambda {
            public static void main(String[] args) throws Exception {
                // 1.env
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

                // 2.source
                DataStream<String> linesDS = env.fromElements("Hello Hadoop""Hello Spark""Hello Flink");

                // 3.transformation
                DataStream<String> wordsDS = linesDS.flatMap((String value, Collector<String> out) ->
                        Arrays.stream(value.split(" ")).forEach(out::collect)).returns(Types.STRING);
                //DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map((String value) -> Tuple2.of(value, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));
                DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map((String value) -> Tuple2.of(value, 1),
                        TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
                KeyedStream<Tuple2<String, Integer>, String> keyByDS = wordAndOneDS.keyBy(t -> t.f0);
                DataStream<Tuple2<String, Integer>> result = keyByDS.sum(1);

                // 4.sink
                result.print();

                // 5.execute
                env.execute();
            }
        }

        執(zhí)行結(jié)果如下:

        3.3.4   在Yarn上運行

        注意

        寫入HDFS如果存在權(quán)限問題:

        進(jìn)行如下設(shè)置:

        hadoop fs -chmod -R 777 /

        并在代碼中添加:

        System.setProperty("HADOOP_USER_NAME""root")
        1. 修改代碼

          package com.song.flink;

          import org.apache.flink.api.common.RuntimeExecutionMode;
          import org.apache.flink.api.common.typeinfo.TypeHint;
          import org.apache.flink.api.common.typeinfo.TypeInformation;
          import org.apache.flink.api.common.typeinfo.Types;
          import org.apache.flink.api.java.tuple.Tuple2;
          import org.apache.flink.api.java.utils.ParameterTool;
          import org.apache.flink.streaming.api.datastream.DataStream;
          import org.apache.flink.streaming.api.datastream.KeyedStream;
          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
          import org.apache.flink.util.Collector;

          import java.util.Arrays;

          public class WordCountYarn {
              public static void main(String[] args) throws Exception {
                  // 設(shè)置yarn提交用戶
                  System.setProperty("HADOOP_USER_NAME""song");

                  // 獲取參數(shù)
                  ParameterTool params = ParameterTool.fromArgs(args);
                  String output = null;
                  if (params.has("output")){
                      output = params.get("output");
                  }else {
                      output = "hdfs://nameservice1/data/flink/wordcount/output_" + System.currentTimeMillis();
                  }

                  // env
                  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                  env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

                  // source
                  DataStream<String> linesDS = env.fromElements("Hello Hadoop""Hello Spark""Hello Flink");

                  // transformation
                  DataStream<String> wordsDS = linesDS.flatMap((String line, Collector<String> out) -> Arrays.stream(line.split(" "))
                          .forEach(out::collect)).returns(Types.STRING);
                  DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map((String word) -> Tuple2.of(word, 1),
                          TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
                  KeyedStream<Tuple2<String, Integer>, String> keyByDS = wordAndOneDS.keyBy(t -> t.f0);
                  DataStream<Tuple2<String, Integer>> result = keyByDS.sum(1);

                  // sink
                  result.writeAsText(output).setParallelism(1);

                  // execute
                  env.execute();
              }
          }
        2. 打包

        3. 查看打包的jar

        4. 上傳

          [song@cdh68 jars]$ pwd
          /home/song/data/jars
          [song@cdh68 jars]$ ll
          total 16
          -rw-r--r-- 1 song song 15532 Aug 31 16:49 WordCount-1.0-SNAPSHOT.jar
          [song@cdh68 jars]$ chmod 755 WordCount-1.0-SNAPSHOT.jar 
          [song@cdh68 jars]$ ll
          total 16
          -rwxr-xr-x 1 song song 15532 Aug 31 16:49 WordCount-1.0-SNAPSHOT.jar
        5. 提交任務(wù)

          https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html

          [song@cdh68 ~]$ flink run-application -t yarn-application \
          -Dexecution.runtime-mode=BATCH \
          -yjm 4096 \
          -ytm 16384 \
          -ys 4 \
          -c com.song.flink.WordCountYarn \
          /home/song/data/jars/WordCount-1.0-SNAPSHOT.jar \
          --output hdfs://nameservice1/data/flink/wordcount/output
        6. 在Web頁面可以觀察到提交的程序


        歡迎關(guān)注微信公眾號:大數(shù)據(jù)AI
        瀏覽 75
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
          
          

            1. 99久久99久久久国产精品青草 | 九九九免费在线视频 | 西西4444WWW大胆无视频 | 性少妇bbwbbw实拍 | 亚洲男女在线观看 |