1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        Apache Iceberg核心原理分析文件存儲(chǔ)及數(shù)據(jù)寫入流程

        共 92882字,需瀏覽 186分鐘

         ·

        2022-07-09 10:47

        點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”
        回復(fù)"面試"獲取更多驚喜

        全網(wǎng)最全大數(shù)據(jù)面試提升手冊(cè)!

        第一部分:Iceberg文件存儲(chǔ)格式

        Apache Iceberg作為一款新興的數(shù)據(jù)湖解決方案在實(shí)現(xiàn)上高度抽象,在存儲(chǔ)上能夠?qū)赢?dāng)前主流的HDFS,S3文件系統(tǒng)并且支持多種文件存儲(chǔ)格式,例如Parquet、ORC、AVRO。相較于Hudi、Delta與Spark的強(qiáng)耦合,Iceberg可以與多種計(jì)算引擎對(duì)接,目前社區(qū)已經(jīng)支持Spark讀寫Iceberg、Impala/Hive查詢Iceberg。本文基于Apache Iceberg 0.10.0,介紹Iceberg文件的組織方式以及不同文件的存儲(chǔ)格式。

        Iceberg Table Format

        從圖中可以看到iceberg將數(shù)據(jù)進(jìn)行分層管理,主要分為元數(shù)據(jù)管理層和數(shù)據(jù)存儲(chǔ)層。元數(shù)據(jù)管理層又可以細(xì)分為三層:

        • VersionMetadata
        • Snapshot
        • Manifest

        VersionMetadata存儲(chǔ)當(dāng)前版本的元數(shù)據(jù)信息(所有snapshot信息);Snapshot表示當(dāng)前操作的一個(gè)快照,每次commit都會(huì)生成一個(gè)快照,一個(gè)快照中包含多個(gè)Manifest,每個(gè)Manifest中記錄了當(dāng)前操作生成數(shù)據(jù)所對(duì)應(yīng)的文件地址,也就是data files的地址。基于snapshot的管理方式,iceberg能夠進(jìn)行time travel(歷史版本讀取以及增量讀?。?,并且提供了serializable isolation。

        數(shù)據(jù)存儲(chǔ)層支持不同的文件格式,目前支持Parquet、ORC、AVRO。

        下面以HadoopTableOperation commit生成的數(shù)據(jù)為例介紹各層的數(shù)據(jù)格式。iceberg生成的數(shù)據(jù)目錄結(jié)構(gòu)如下所示:

        ├── data
        │   ├── id=1
        │   │   ├── 00000-0-04ae60eb-657d-45cb-bb99-d1cb7fe0ad5a-00001.parquet
        │   │   └── 00000-4-487b841b-13b4-4ae8-9238-f70674d5102e-00001.parquet
        │   ├── id=2
        │   │   ├── 00001-1-e85b018b-e43a-44d7-9904-09c80a9b9c24-00001.parquet
        │   │   └── 00001-5-0e2be766-c921-4269-8e1e-c3cff4b98a5a-00001.parquet
        │   ├── id=3
        │   │   ├── 00002-2-097171c5-d810-4de9-aa07-58f3f8a3f52e-00001.parquet
        │   │   └── 00002-6-9d738169-1dbe-4cc5-9a87-f79457a9ec0b-00001.parquet
        │   └── id=4
        │       ├── 00003-3-b0c91d66-9e4e-4b7a-bcd5-db3dc1b847f2-00001.parquet
        │       └── 00003-7-68c45a24-21a2-41e8-90f1-ef4be42f3002-00001.parquet
        └── metadata
            ├── 1f8279fb-5b2d-464c-af12-d9d6fbe9b5ae-m0.avro
            ├── 1f8279fb-5b2d-464c-af12-d9d6fbe9b5ae-m1.avro
            ├── f475511f-877e-4da5-90aa-efa5928a7759-m0.avro
            ├── snap-2080639593951710914-1-1f8279fb-5b2d-464c-af12-d9d6fbe9b5ae.avro
            ├── snap-5178718682852547007-1-f475511f-877e-4da5-90aa-efa5928a7759.avro
            ├── v1.metadata.json
            ├── v2.metadata.json
            ├── v3.metadata.json
            └── version-hint.text

        其中metadata目錄存放元數(shù)據(jù)管理層的數(shù)據(jù):

        • version-hint.text:存儲(chǔ)version.metadata.json的版本號(hào),即下文的number
        • version[number].metadata.json
        • snap-[snapshotID]-[attemptID]-[commitUUID].avro(snapshot文件)
        • [commitUUID]-m-[manifestCount].avr- o(manifest文件)

        data目錄組織形式類似于hive,都是以分區(qū)進(jìn)行目錄組織(上圖中id為分區(qū)列),最終數(shù)據(jù)可以使用不同文件格式進(jìn)行存儲(chǔ):

        • [sparkPartitionID]-[sparkTaskID]-[UUID]-[fileCount].[parquet | avro | orc]
        VersionMetadata
        // 
        {
          // 當(dāng)前文件格式版本信息
          // 目前為version 1
          // 支持row-level delete等功能的version 2還在開發(fā)中
          "format-version" : 1,
          "table-uuid" : "a9114f94-911e-4acf-94cc-6d000b321812",
          // hadoopTable location
          "location" : "hdfs://10.242.199.202:9000/hive/empty_order_item",
          // 最新snapshot的創(chuàng)建時(shí)間
          "last-updated-ms" : 1608810968725,
          "last-column-id" : 6,
          // iceberg schema
          "schema" : {
            "type" : "struct",
            "fields" : [ {
              "id" : 1,
              "name" : "id",
              "required" : false, // 類似probuf中的required
              "type" : "long"
            }, {
              "id" : 2,
              "name" : "order_id",
              "required" : false,
              "type" : "long"
            }, {
              "id" : 3,
              "name" : "product_id",
              "required" : false,
              "type" : "long"
            }, {
              "id" : 4,
              "name" : "product_price",
              "required" : false,
              "type" : "decimal(7, 2)"
            }, {
              "id" : 5,
              "name" : "product_quantity",
              "required" : false,
              "type" : "int"
            }, {
              "id" : 6,
              "name" : "product_name",
              "required" : false,
              "type" : "string"
            } ]
          },
          "partition-spec" : [ {
            "name" : "id",
            "transform" : "identity", // transform類型
            "source-id" : 1,
            "field-id" : 1000
          } ],
          "default-spec-id" : 0,
          // 分區(qū)信息
          "partition-specs" : [ {
            "spec-id" : 0,
            "fields" : [ {
              "name" : "id",
              // transform類型:目前支持identity,year,bucket等
              "transform" : "identity",
              // 對(duì)應(yīng)schema.fields中相應(yīng)field的ID
              "source-id" : 1,
              "field-id" : 1000
            } ]
          } ],
          "default-sort-order-id" : 0,
          "sort-orders" : [ {
            "order-id" : 0,
            "fields" : [ ]
          } ],
          // hive創(chuàng)建該表存儲(chǔ)的一些hive property信息
          "properties" : {
            "totalSize" : "0",
            "rawDataSize" : "0",
            "numRows" : "0",
            "COLUMN_STATS_ACCURATE" : "{\"BASIC_STATS\":\"true\"}",
            "numFiles" : "0"
          },
          // 當(dāng)前snapshot id
          "current-snapshot-id" : 2080639593951710914,
          // snapshot信息
          "snapshots" : [ {
            "snapshot-id" : 5178718682852547007,
            // 創(chuàng)建snapshot時(shí)間
            "timestamp-ms" : 1608809818168,
            "summary" : {
              // spark寫入方式,目前支持overwrite以及append
              "operation" : "overwrite",
              "spark.app.id" : "local-1608809790982",
              "replace-partitions" : "true",
              // 本次snapshot添加的文件數(shù)量
              "added-data-files" : "4",
              // 本次snapshot添加的record數(shù)量
              "added-records" : "4",
              // 本次snapshot添加的文件大小
              "added-files-size" : "7217",
              // 本次snapshot修改的分區(qū)數(shù)量
              "changed-partition-count" : "4",
              // 本次snapshot中record總數(shù) = lastSnapshotTotalRecord - currentSnapshotDeleteRecord + currentSnapshotAddRecord
              "total-records" : "4",
              "total-data-files" : "4",
              "total-delete-files" : "0",
              "total-position-deletes" : "0",
              "total-equality-deletes" : "0"
            },
            "manifest-list" : "hdfs://10.242.199.202:9000/hive/empty_order_item/metadata/snap-5178718682852547007-1-f475511f-877e-4da5-90aa-efa5928a7759.avro"
          }, {
            "snapshot-id" : 2080639593951710914,
            // 上次snapshotID
            "parent-snapshot-id" : 5178718682852547007,
            "timestamp-ms" : 1608810968725,
            "summary" : {
              "operation" : "overwrite",
              "spark.app.id" : "local-1608809790982",
              "replace-partitions" : "true",
              "added-data-files" : "4",
              "deleted-data-files" : "4",
              "added-records" : "4",
              "deleted-records" : "4",
              "added-files-size" : "7217",
              "removed-files-size" : "7217",
              "changed-partition-count" : "4",
              "total-records" : "4",
              "total-data-files" : "4",
              "total-delete-files" : "0",
              "total-position-deletes" : "0",
              "total-equality-deletes" : "0"
            },
            // snapshot文件路徑
            "manifest-list" : "hdfs://10.242.199.202:9000/hive/empty_order_item/metadata/snap-2080639593951710914-1-1f8279fb-5b2d-464c-af12-d9d6fbe9b5ae.avro"
          } ],
          // snapshot記錄
          "snapshot-log" : [ {
            "timestamp-ms" : 1608809818168,
            "snapshot-id" : 5178718682852547007
          }, {
            "timestamp-ms" : 1608810968725,
            "snapshot-id" : 2080639593951710914
          } ],
          // metada記錄
          "metadata-log" : [ {
            "timestamp-ms" : 1608809758229,
            "metadata-file" : "hdfs://10.242.199.202:9000/hive/empty_order_item/metadata/v1.metadata.json"
          }, {
            "timestamp-ms" : 1608809818168,
            "metadata-file" : "hdfs://10.242.199.202:9000/hive/empty_order_item/metadata/v2.metadata.json"
          } ]
        }

        上例展示的是v3.metadata.json中的數(shù)據(jù),該文件保存了iceberg table schema、partition、snapshot信息,partition中的transform信息使得iceberg能夠根據(jù)字段進(jìn)行hidden partition,而無需像hive一樣顯示的指定分區(qū)字段。由于VersionMetadata中記錄了每次snapshot的id以及create_time,我們可以通過時(shí)間或snapshotId查詢相應(yīng)snapshot的數(shù)據(jù),實(shí)現(xiàn)Time Travel。

        Snapshot
        // Snapshot: 2080639593951710914
        // Location: hdfs://10.242.199.202:9000/hive/empty_order_item/metadata/snap-2080639593951710914-1-1f8279fb-5b2d-464c-af12-d9d6fbe9b5ae.avro

        // manifest entry
        {
          "manifest_path" : "hdfs://10.242.199.202:9000/hive/empty_order_item/metadata/1f8279fb-5b2d-464c-af12-d9d6fbe9b5ae-m1.avro",
          "manifest_length" : 5291,
          "partition_spec_id" : 0,
          // 該manifest entry所屬的snapshot
          "added_snapshot_id" : {
            "long" : 2080639593951710914
          },
          // 該manifest中添加的文件數(shù)量
          "added_data_files_count" : {
            "int" : 4
          },
          // 創(chuàng)建該manifest時(shí)已經(jīng)存在且
          // 沒有被這次創(chuàng)建操作刪除的文件數(shù)量
          "existing_data_files_count" : {
            "int" : 0
          },
          // 創(chuàng)建manifest刪除的文件
          "deleted_data_files_count" : {
            "int" : 0
          },
          // 該manifest中partition字段的范圍
          "partitions" : {
            "array" : [ {
              "contains_null" : false,
              "lower_bound" : {
                "bytes" : "\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000"
              },
              "upper_bound" : {
                "bytes" : "\u0004\u0000\u0000\u0000\u0000\u0000\u0000\u0000"
              }
            } ]
          },
          "added_rows_count" : {
            "long" : 4
          },
          "existing_rows_count" : {
            "long" : 0
          },
          "deleted_rows_count" : {
            "long" : 0
          }
        }
        // manifest entry
        {
          "manifest_path" : "hdfs://10.242.199.202:9000/hive/empty_order_item/metadata/1f8279fb-5b2d-464c-af12-d9d6fbe9b5ae-m0.avro",
          "manifest_length" : 5289,
          "partition_spec_id" : 0,
          "added_snapshot_id" : {
            "long" : 2080639593951710914
          },
          "added_data_files_count" : {
            "int" : 0
          },
          "existing_data_files_count" : {
            "int" : 0
          },
          "deleted_data_files_count" : {
            "int" : 4
          },
          "partitions" : {
            "array" : [ {
              "contains_null" : false,
              "lower_bound" : {
                "bytes" : "\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000"
              },
              "upper_bound" : {
                "bytes" : "\u0004\u0000\u0000\u0000\u0000\u0000\u0000\u0000"
              }
            } ]
          },
          "added_rows_count" : {
            "long" : 0
          },
          "existing_rows_count" : {
            "long" : 0
          },
          "deleted_rows_count" : {
            "long" : 4
          }
        }

        一個(gè)snapshot中可以包含多個(gè)manifest entry,一個(gè)manifest entry表示一個(gè)manifest,其中重點(diǎn)需要關(guān)注的是每個(gè)manifest中的partitions字段,在根據(jù)filter進(jìn)行過濾時(shí)可以首先通過該字段表示的分區(qū)范圍對(duì)manifest進(jìn)行過濾,避免無效的查詢。

        Manifest
        // 
        {
          // 當(dāng)前文件格式版本信息
          // 目前為version 1
          // 支持row-level delete等功能的version 2還在開發(fā)中
          "format-version" : 1,
          "table-uuid" : "a9114f94-911e-4acf-94cc-6d000b321812",
          // hadoopTable location
          "location" : "hdfs://10.242.199.202:9000/hive/empty_order_item",
          // 最新snapshot的創(chuàng)建時(shí)間
          "last-updated-ms" : 1608810968725,
          "last-column-id" : 6,
          // iceberg schema
          "schema" : {
            "type" : "struct",
            "fields" : [ {
              "id" : 1,
              "name" : "id",
              "required" : false, // 類似probuf中的required
              "type" : "long"
            }, {
              "id" : 2,
              "name" : "order_id",
              "required" : false,
              "type" : "long"
            }, {
              "id" : 3,
              "name" : "product_id",
              "required" : false,
              "type" : "long"
            }, {
              "id" : 4,
              "name" : "product_price",
              "required" : false,
              "type" : "decimal(7, 2)"
            }, {
              "id" : 5,
              "name" : "product_quantity",
              "required" : false,
              "type" : "int"
            }, {
              "id" : 6,
              "name" : "product_name",
              "required" : false,
              "type" : "string"
            } ]
          },
          "partition-spec" : [ {
            "name" : "id",
            "transform" : "identity", // transform類型
            "source-id" : 1,
            "field-id" : 1000
          } ],
          "default-spec-id" : 0,
          // 分區(qū)信息
          "partition-specs" : [ {
            "spec-id" : 0,
            "fields" : [ {
              "name" : "id",
              // transform類型:目前支持identity,year,bucket等
              "transform" : "identity",
              // 對(duì)應(yīng)schema.fields中相應(yīng)field的ID
              "source-id" : 1,
              "field-id" : 1000
            } ]
          } ],
          "default-sort-order-id" : 0,
          "sort-orders" : [ {
            "order-id" : 0,
            "fields" : [ ]
          } ],
          // hive創(chuàng)建該表存儲(chǔ)的一些hive property信息
          "properties" : {
            "totalSize" : "0",
            "rawDataSize" : "0",
            "numRows" : "0",
            "COLUMN_STATS_ACCURATE" : "{\"BASIC_STATS\":\"true\"}",
            "numFiles" : "0"
          },
          // 當(dāng)前snapshot id
          "current-snapshot-id" : 2080639593951710914,
          // snapshot信息
          "snapshots" : [ {
            "snapshot-id" : 5178718682852547007,
            // 創(chuàng)建snapshot時(shí)間
            "timestamp-ms" : 1608809818168,
            "summary" : {
              // spark寫入方式,目前支持overwrite以及append
              "operation" : "overwrite",
              "spark.app.id" : "local-1608809790982",
              "replace-partitions" : "true",
              // 本次snapshot添加的文件數(shù)量
              "added-data-files" : "4",
              // 本次snapshot添加的record數(shù)量
              "added-records" : "4",
              // 本次snapshot添加的文件大小
              "added-files-size" : "7217",
              // 本次snapshot修改的分區(qū)數(shù)量
              "changed-partition-count" : "4",
              // 本次snapshot中record總數(shù) = lastSnapshotTotalRecord - currentSnapshotDeleteRecord + currentSnapshotAddRecord
              "total-records" : "4",
              "total-data-files" : "4",
              "total-delete-files" : "0",
              "total-position-deletes" : "0",
              "total-equality-deletes" : "0"
            },
            "manifest-list" : "hdfs://10.242.199.202:9000/hive/empty_order_item/metadata/snap-5178718682852547007-1-f475511f-877e-4da5-90aa-efa5928a7759.avro"
          }, {
            "snapshot-id" : 2080639593951710914,
            // 上次snapshotID
            "parent-snapshot-id" : 5178718682852547007,
            "timestamp-ms" : 1608810968725,
            "summary" : {
              "operation" : "overwrite",
              "spark.app.id" : "local-1608809790982",
              "replace-partitions" : "true",
              "added-data-files" : "4",
              "deleted-data-files" : "4",
              "added-records" : "4",
              "deleted-records" : "4",
              "added-files-size" : "7217",
              "removed-files-size" : "7217",
              "changed-partition-count" : "4",
              "total-records" : "4",
              "total-data-files" : "4",
              "total-delete-files" : "0",
              "total-position-deletes" : "0",
              "total-equality-deletes" : "0"
            },
            // snapshot文件路徑
            "manifest-list" : "hdfs://10.242.199.202:9000/hive/empty_order_item/metadata/snap-2080639593951710914-1-1f8279fb-5b2d-464c-af12-d9d6fbe9b5ae.avro"
          } ],
          // snapshot記錄
          "snapshot-log" : [ {
            "timestamp-ms" : 1608809818168,
            "snapshot-id" : 5178718682852547007
          }, {
            "timestamp-ms" : 1608810968725,
            "snapshot-id" : 2080639593951710914
          } ],
          // metada記錄
          "metadata-log" : [ {
            "timestamp-ms" : 1608809758229,
            "metadata-file" : "hdfs://10.242.199.202:9000/hive/empty_order_item/metadata/v1.metadata.json"
          }, {
            "timestamp-ms" : 1608809818168,
            "metadata-file" : "hdfs://10.242.199.202:9000/hive/empty_order_item/metadata/v2.metadata.json"
          } ]
        }

        上例展示的是v3.metadata.json中的數(shù)據(jù),該文件保存了iceberg table schema、partition、snapshot信息,partition中的transform信息使得iceberg能夠根據(jù)字段進(jìn)行hidden partition,而無需像hive一樣顯示的指定分區(qū)字段。由于VersionMetadata中記錄了每次snapshot的id以及create_time,我們可以通過時(shí)間或snapshotId查詢相應(yīng)snapshot的數(shù)據(jù),實(shí)現(xiàn)Time Travel。

        Snapshot

        // Snapshot: 2080639593951710914
        // Location: hdfs://10.242.199.202:9000/hive/empty_order_item/metadata/snap-2080639593951710914-1-1f8279fb-5b2d-464c-af12-d9d6fbe9b5ae.avro

        // manifest entry
        {
          "manifest_path" : "hdfs://10.242.199.202:9000/hive/empty_order_item/metadata/1f8279fb-5b2d-464c-af12-d9d6fbe9b5ae-m1.avro",
          "manifest_length" : 5291,
          "partition_spec_id" : 0,
          // 該manifest entry所屬的snapshot
          "added_snapshot_id" : {
            "long" : 2080639593951710914
          },
          // 該manifest中添加的文件數(shù)量
          "added_data_files_count" : {
            "int" : 4
          },
          // 創(chuàng)建該manifest時(shí)已經(jīng)存在且
          // 沒有被這次創(chuàng)建操作刪除的文件數(shù)量
          "existing_data_files_count" : {
            "int" : 0
          },
          // 創(chuàng)建manifest刪除的文件
          "deleted_data_files_count" : {
            "int" : 0
          },
          // 該manifest中partition字段的范圍
          "partitions" : {
            "array" : [ {
              "contains_null" : false,
              "lower_bound" : {
                "bytes" : "\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000"
              },
              "upper_bound" : {
                "bytes" : "\u0004\u0000\u0000\u0000\u0000\u0000\u0000\u0000"
              }
            } ]
          },
          "added_rows_count" : {
            "long" : 4
          },
          "existing_rows_count" : {
            "long" : 0
          },
          "deleted_rows_count" : {
            "long" : 0
          }
        }
        // manifest entry
        {
          "manifest_path" : "hdfs://10.242.199.202:9000/hive/empty_order_item/metadata/1f8279fb-5b2d-464c-af12-d9d6fbe9b5ae-m0.avro",
          "manifest_length" : 5289,
          "partition_spec_id" : 0,
          "added_snapshot_id" : {
            "long" : 2080639593951710914
          },
          "added_data_files_count" : {
            "int" : 0
          },
          "existing_data_files_count" : {
            "int" : 0
          },
          "deleted_data_files_count" : {
            "int" : 4
          },
          "partitions" : {
            "array" : [ {
              "contains_null" : false,
              "lower_bound" : {
                "bytes" : "\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000"
              },
              "upper_bound" : {
                "bytes" : "\u0004\u0000\u0000\u0000\u0000\u0000\u0000\u0000"
              }
            } ]
          },
          "added_rows_count" : {
            "long" : 0
          },
          "existing_rows_count" : {
            "long" : 0
          },
          "deleted_rows_count" : {
            "long" : 4
          }
        }

        Manifest管理多個(gè)data文件,一條DataFileEntry對(duì)應(yīng)一個(gè)data文件,DataFileEntry中記錄了所屬partition,value bounds等信息,value_counts和null_value_counts可以用于過濾null列,例:column a所對(duì)應(yīng)的value_count為3,且對(duì)應(yīng)的null_value_count也為3,此時(shí)如果select a,則可以根據(jù)value_count-null_value_count=0判斷a全為null直接返回而無需再進(jìn)行parquet文件的查詢;除此之外,可以根據(jù)value bounds進(jìn)行過濾,加速查詢。

        第二部分:Spark寫Iceberg流程分析

        spark寫入示例
        import org.apache.iceberg.hadoop.HadoopTables
        import org.apache.hadoop.conf.Configuration
        import org.apache.iceberg.catalog.TableIdentifier
        import org.apache.iceberg.Schema
        import org.apache.iceberg.types._
        import org.apache.spark.sql.types._
        import org.apache.iceberg.PartitionSpec
        import org.apache.iceberg.spark.SparkSchemaUtil
        import org.apache.spark.sql._
        import spark.implicits._

        val order_item_schema = StructType(List(
              StructField("id", LongType, true),
              StructField("order_id", LongType, true),
              StructField("product_id", LongType, true),
              StructField("product_price", DecimalType(7,2), true),
              StructField("product_quantity", IntegerType, true),
              StructField("product_name", StringType, true)
            ))

        val order_item_action = Seq(
              Row(1L, 1L, 1L, Decimal.apply(50.00, 7, 2), 2, "table lamp"), 
              Row(2L, 1L, 2L, Decimal.apply(100.5, 7, 2), 1, "skirt"),
              Row(3L, 2L, 1L, Decimal.apply(50.00, 7, 2), 1, "table lamp"), 
              Row(4L, 3L, 3L, Decimal.apply(0.99, 7, 2), 1, "match")
            )

        val iceberg_schema = new Schema(
              Types.NestedField.optional(1, "id", Types.LongType.get()),
              Types.NestedField.optional(2, "order_id", Types.LongType.get()),
              Types.NestedField.optional(3, "product_id", Types.LongType.get()),
              Types.NestedField.optional(4, "product_price", Types.DecimalType.of(7, 2)),
              Types.NestedField.optional(5, "product_quantity", Types.IntegerType.get()),
              Types.NestedField.optional(6, "product_name", Types.StringType.get())
            )

        val iceberg_partition = PartitionSpec.builderFor(iceberg_schema).identity("id").build()  

        val hadoopTable = new HadoopTables(sc.hadoopConfiguration);
        val location = "hdfs://10.242.199.202:9000/hive/empty_order_item";

        hadoopTable.create(iceberg_schema, iceberg_partition, location)

        val df = spark.createDataFrame(sc.makeRDD(order_item_action), order_item_schema)

        df.write.format("iceberg").mode("overwrite").save("hdfs://10.242.199.202:9000/hive/empty_order_item")

        spark寫入iceberg主要分為兩步:

        • Executor寫入數(shù)據(jù)
        • Driver commit生成元數(shù)據(jù)
        Executor寫入邏輯

        由上圖可以看到IcebergSource實(shí)現(xiàn)了spark ReadSupport、WriteSupport、StreamWriteSupport等接口,WriteFactory根據(jù)寫入表的類型:(1) 分區(qū)表 (2) 非分區(qū)表,生成不同的writer,最后通過write方法寫入數(shù)據(jù)。

        我們以寫入分區(qū)表為例簡(jiǎn)單介紹一下executor端iceberg寫入數(shù)據(jù)的流程:

        • 根據(jù)file format生成對(duì)應(yīng)的FileAppender,F(xiàn)ileAppender完成實(shí)際的寫文件操作。目前支持3種文件格式的寫入:Parquet、Avro以及Orc
        • iceberg分區(qū)數(shù)據(jù)不直接寫入數(shù)據(jù)文件中,而是通過目錄樹結(jié)構(gòu)來進(jìn)行存儲(chǔ),分區(qū)目錄結(jié)構(gòu)與hive類型,都是以key1=value1/key2=value2的形式進(jìn)行組織。在寫入數(shù)據(jù)之前,partitionWriter首先根據(jù)partition transform函數(shù)得到對(duì)應(yīng)的partition value,然后創(chuàng)建對(duì)應(yīng)的分區(qū)目錄
        • fileAppender通過調(diào)用不同的file format組件將數(shù)據(jù)寫入到文件中。iceberg寫入時(shí)可以通過設(shè)置write.target-file-size-bytes table property調(diào)整寫入文件target大小,默認(rèn)為L(zhǎng)ONG_MAX
        • 當(dāng)所有數(shù)據(jù)寫入完成后,iceberg會(huì)收集寫入的統(tǒng)計(jì)信息,例如record_count, lower_bound, upper_bound, value_count等用于driver端生成對(duì)應(yīng)的manifest文件,最后executor端將這些信息傳回driver端。
        Driver commit邏輯

        iceberg snapshot中的統(tǒng)計(jì)信息實(shí)際是累計(jì)更新的結(jié)果,相較于上次commit,本次commit發(fā)生了哪些變化,例新增了多少條記錄,刪除了多少條記錄,新增了多少文件,刪除了多少文件等等。既然是累計(jì)更新,首先需要知道上次snapshot的信息,然后計(jì)算最后的結(jié)果。iceberg讀取當(dāng)前最新snapshot數(shù)據(jù)過程如下:

        1. 讀取version.hint中記錄的最新metadata版本號(hào)versionNumber
        2. 讀取version[versionNumber].metadata.json文件,根據(jù)metadata中記錄的snpshots信息以及current snapshot id得到最新snapshot的location
        3. 最后根據(jù)獲得的location讀取對(duì)應(yīng)的snapshot文件得到最新的snapshot數(shù)據(jù)

        overwrite實(shí)際上可以等價(jià)劃分成兩個(gè)步驟:

        • delete
        • insert

        那么我們?nèi)绾沃佬枰獎(jiǎng)h除哪些數(shù)據(jù)呢?這里就要用到剛剛讀取的current snapshot數(shù)據(jù)以及executor傳回的信息,根據(jù)這些信息,我們可以計(jì)算得到哪些分區(qū)文件是需要通過覆蓋刪除的,實(shí)際上是將manifest中的對(duì)應(yīng)DataFileEntry標(biāo)記成刪除寫入到新的manifest文件中,沒有被刪除的DataFileEntry則標(biāo)記成Existing寫入到manifest文件中

        在完成了delete操作之后,insert操作就相對(duì)比較簡(jiǎn)單,只要將GenericDataFile全部寫入到新的manifest中即可

        iceberg默認(rèn)開啟merge manifest功能,當(dāng)manifest文件數(shù)量超過commit.manifest.min-count-to-merge時(shí)(默認(rèn)100),將多個(gè)small manifest文件合并成large manifest(large manifest文件大小由commit.manifest.target-size-bytes指定,默認(rèn)為8M)

        最后iceberg根據(jù)這些Added/Deleted/Existing DataFileEntry得到本次commit的差值統(tǒng)計(jì)信息,與前一次snapshot統(tǒng)計(jì)信息累加最終得到本次snapshot的統(tǒng)計(jì)信息(added_data_files_count, added_rows_count等)。生成snapshot的整個(gè)過程如下圖所示:

        在生成新的snapshot后,只剩最后一步那就是生成新版本的version.metadata.json文件,同時(shí)將版本號(hào)寫入到version.hint中,至此完成了所有iceberg數(shù)據(jù)的寫入。

        第三部分:Flink寫Iceberg流程分析

        開始實(shí)例

        flink支持DataStream和DataStream寫入iceberg

        StreamExecutionEnvironment env = ...;

        DataStream<RowData> input = ... ;
        Configuration hadoopConf = new Configuration();
        TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
        FlinkSink.forRowData(input, FLINK_SCHEMA)
            .tableLoader(tableLoader)
            .writeParallelism(1)
            .build();
        env.execute("Test Iceberg DataStream");

        input為DataStream和DataStream形式的輸入流,F(xiàn)LINK_SCHEMA為TableSchema;

        首先看build()方法:

        public DataStreamSink<RowData> build() {
                    Preconditions.checkArgument(this.rowDataInput != null, "Please use forRowData() to initialize the input DataStream.");
                    Preconditions.checkNotNull(this.tableLoader, "Table loader shouldn't be null");
                    if (this.table == null) {
                        this.tableLoader.open();

                        try {
                            TableLoader loader = this.tableLoader;
                            Throwable var2 = null;

                            try {
                                this.table = loader.loadTable();
                            } catch (Throwable var12) {
                                var2 = var12;
                                throw var12;
                            } finally {
                                if (loader != null) {
                                    if (var2 != null) {
                                        try {
                                            loader.close();
                                        } catch (Throwable var11) {
                                            var2.addSuppressed(var11);
                                        }
                                    } else {
                                        loader.close();
                                    }
                                }

                            }
                        } catch (IOException var14) {
                            throw new UncheckedIOException("Failed to load iceberg table from table loader: " + this.tableLoader, var14);
                        }
                    }

                    List<Integer> equalityFieldIds = Lists.newArrayList();
                    if (this.equalityFieldColumns != null && this.equalityFieldColumns.size() > 0) {
                        Iterator var16 = this.equalityFieldColumns.iterator();

                        while(var16.hasNext()) {
                            String column = (String)var16.next();
                            NestedField field = this.table.schema().findField(column);
                            Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s", column, this.table.schema());
                            equalityFieldIds.add(field.fieldId());
                        }
                    }

                    RowType flinkRowType = FlinkSink.toFlinkRowType(this.table.schema(), this.tableSchema);
                    this.rowDataInput = this.distributeDataStream(this.rowDataInput, this.table.properties(), this.table.spec(), this.table.schema(), flinkRowType);
                    IcebergStreamWriter<RowData> streamWriter = FlinkSink.createStreamWriter(this.table, flinkRowType, equalityFieldIds);
                    IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(this.tableLoader, this.overwrite);
                    this.writeParallelism = this.writeParallelism == null ? this.rowDataInput.getParallelism() : this.writeParallelism;
                    DataStream<Void> returnStream = this.rowDataInput.transform(FlinkSink.ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(WriteResult.class), streamWriter).setParallelism(this.writeParallelism).transform(FlinkSink.ICEBERG_FILES_COMMITTER_NAME, Types.VOID, filesCommitter).setParallelism(1).setMaxParallelism(1);
                    return returnStream.addSink(new DiscardingSink()).name(String.format("IcebergSink %s", this.table.name())).setParallelism(1);
                }

        此處創(chuàng)建寫的iceberg核心算子IcebergStreamWriter和IcebergFilesCommitter

        IcebergStreamWriter
        IcebergStreamWriter<RowData> streamWriter = FlinkSink.createStreamWriter(this.table, flinkRowType, equalityFieldIds);

        build()方法中,調(diào)用createStreamWriter()創(chuàng)建IcebergStreamWriter

        static IcebergStreamWriter<RowData> createStreamWriter(Table table, RowType flinkRowType, List<Integer> equalityFieldIds) {
                Map<String, String> props = table.properties();
                long targetFileSize = getTargetFileSizeBytes(props);
                FileFormat fileFormat = getFileFormat(props);
                TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(table.schema(), flinkRowType, table.spec(), table.locationProvider(), table.io(), table.encryption(), targetFileSize, fileFormat, props, equalityFieldIds);
                return new IcebergStreamWriter(table.name(), taskWriterFactory);
            }

        根據(jù)表信息構(gòu)建TaskWriterFactory,并傳入到IcebergStreamWriter

        class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult> implements OneInputStreamOperator<T, WriteResult>, BoundedOneInput {
            private static final long serialVersionUID = 1L;
            private final String fullTableName;
            private final TaskWriterFactory<T> taskWriterFactory;
            private transient TaskWriter<T> writer;
            private transient int subTaskId;
            private transient int attemptId;

            IcebergStreamWriter(String fullTableName, TaskWriterFactory<T> taskWriterFactory) {
                this.fullTableName = fullTableName;
                this.taskWriterFactory = taskWriterFactory;
                this.setChainingStrategy(ChainingStrategy.ALWAYS);
            }

            public void open() {
                this.subTaskId = this.getRuntimeContext().getIndexOfThisSubtask();
                this.attemptId = this.getRuntimeContext().getAttemptNumber();
                this.taskWriterFactory.initialize(this.subTaskId, this.attemptId);
                this.writer = this.taskWriterFactory.create();
            }
            public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
                this.emit(this.writer.complete());
                this.writer = this.taskWriterFactory.create();
            }

            public void processElement(StreamRecord<T> element) throws Exception {
                this.writer.write(element.getValue());
            }
        }

        在open中通過傳入的taskWriterFactory構(gòu)建TaskWriter

        public TaskWriter<RowData> create() {
                Preconditions.checkNotNull(this.outputFileFactory, "The outputFileFactory shouldn't be null if we have invoked the initialize().");
                if (this.equalityFieldIds != null && !this.equalityFieldIds.isEmpty()) {
                    return (TaskWriter)(this.spec.isUnpartitioned() ? new UnpartitionedDeltaWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes, this.schema, this.flinkSchema, this.equalityFieldIds) : new PartitionedDeltaWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes, this.schema, this.flinkSchema, this.equalityFieldIds));
                } else {
                    return (TaskWriter)(this.spec.isUnpartitioned() ? new UnpartitionedWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes) : new RowDataTaskWriterFactory.RowDataPartitionedFanoutWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes, this.schema, this.flinkSchema));
                }
            }

        此方法中根據(jù)是否指定字段,構(gòu)造分區(qū)寫(PartitionedDeltaWriter/RowDataPartitionedFanoutWriter)和非分區(qū)寫實(shí)例(UnpartitionedDeltaWriter/UnpartitionedWriter) 四個(gè)類的調(diào)用關(guān)系:

        指定字段:

        UnpartitionedDeltaWriter -> BaseEqualityDeltaWriter.write() -> RollingFileWriter.write() -> appender.add() PartitionedDeltaWriter -> BaseDeltaTaskWriter.write() -> RollingFileWriter.write() -> appender.add()

        未指定字段:

        UnpartitionedWriter -> RollingFileWriter.write() -> appender.add()

        RowDataPartitionedFanoutWriter -> BaseRollingWriter.write -> RollingFileWriter.write() -> appender.add()

        底層調(diào)用的appender為創(chuàng)建TaskWriter傳入的FlinkAppenderFactory創(chuàng)建的

        在processElement()中調(diào)用write(element.getValue())方法,將數(shù)據(jù)寫入,最后在checkpoint時(shí)提交。

        提示:task執(zhí)行三部曲:beforeInvoke() -> runMailboxLoop() -> afterInvoke() beforeInvoke調(diào)用open()和initializeState(),runMailboxLoop調(diào)用processElement()處理數(shù)據(jù)

        IcebergFilesCommitter
        IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(this.tableLoader, this.overwrite);

        build()方法中,傳入tableLoader和overwrite直接創(chuàng)建IcebergFilesCommitter。

        checkpoint初始化操作在IcebergFilesCommitter的initializeState()

        public void initializeState(StateInitializationContext context) throws Exception {
                super.initializeState(context);
                this.flinkJobId = this.getContainingTask().getEnvironment().getJobID().toString();
                this.tableLoader.open();
                this.table = this.tableLoader.loadTable();
                int subTaskId = this.getRuntimeContext().getIndexOfThisSubtask();
                int attemptId = this.getRuntimeContext().getAttemptNumber();
                this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(this.table, this.flinkJobId, subTaskId, (long)attemptId);
                this.maxCommittedCheckpointId = -1L;
                this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
                this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
                if (context.isRestored()) {
                    String restoredFlinkJobId = (String)((Iterable)this.jobIdState.get()).iterator().next();
                    Preconditions.checkState(!Strings.isNullOrEmpty(restoredFlinkJobId), "Flink job id parsed from checkpoint snapshot shouldn't be null or empty");
                    this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(this.table, restoredFlinkJobId);
                    NavigableMap<Long, byte[]> uncommittedDataFiles = Maps.newTreeMap((SortedMap)((Iterable)this.checkpointsState.get()).iterator().next()).tailMap(this.maxCommittedCheckpointId, false);
                    if (!uncommittedDataFiles.isEmpty()) {
                        long maxUncommittedCheckpointId = (Long)uncommittedDataFiles.lastKey();
                        this.commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId, maxUncommittedCheckpointId);
                    }
                }

            }

        checkpoint提交流程在IcebergFilesCommitter的snapshotState中

        public void snapshotState(StateSnapshotContext context) throws Exception {
                super.snapshotState(context);
                long checkpointId = context.getCheckpointId();
                LOG.info("Start to flush snapshot state to state backend, table: {}, checkpointId: {}", this.table, checkpointId);
                this.dataFilesPerCheckpoint.put(checkpointId, this.writeToManifest(checkpointId));
                this.checkpointsState.clear();
                this.checkpointsState.add(this.dataFilesPerCheckpoint);
                this.jobIdState.clear();
                this.jobIdState.add(this.flinkJobId);
                this.writeResultsOfCurrentCkpt.clear();
            }

        this.dataFilesPerCheckpoint.put(checkpointId, this.writeToManifest(checkpointId)); 為更新當(dāng)前的checkpointId和manifest元文件信息

        dataFilesPerCheckpoint與調(diào)用關(guān)系如下:

        private byte[] writeToManifest(long checkpointId) throws IOException {
                if (this.writeResultsOfCurrentCkpt.isEmpty()) {
                    return EMPTY_MANIFEST_DATA;
                } else {
                    WriteResult result = WriteResult.builder().addAll(this.writeResultsOfCurrentCkpt).build();
                    DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles(result, () -> {
                        return this.manifestOutputFileFactory.create(checkpointId);
                    }, this.table.spec());
                    return SimpleVersionedSerialization.writeVersionAndSerialize(DeltaManifestsSerializer.INSTANCE, deltaManifests);
                }
            }

        writeResultsOfCurrentCkpt中包含了datafile文件、deletefile文件和referenced數(shù)據(jù)文件。然后,根據(jù)result創(chuàng)建deltaManifests ,并且返回序列化后的manifest信息。

        deltaManifests 值如下:

        static DeltaManifests writeCompletedFiles(WriteResult result, Supplier<OutputFile> outputFileSupplier, PartitionSpec spec) throws IOException {
                ManifestFile dataManifest = null;
                ManifestFile deleteManifest = null;
                if (result.dataFiles() != null && result.dataFiles().length > 0) {
                    dataManifest = writeDataFiles((OutputFile)outputFileSupplier.get(), spec, Lists.newArrayList(result.dataFiles()));
                }

                if (result.deleteFiles() != null && result.deleteFiles().length > 0) {
                    OutputFile deleteManifestFile = (OutputFile)outputFileSupplier.get();
                    ManifestWriter<DeleteFile> deleteManifestWriter = ManifestFiles.writeDeleteManifest(2, spec, deleteManifestFile, DUMMY_SNAPSHOT_ID);
                    ManifestWriter<DeleteFile> writer = deleteManifestWriter;
                    Throwable var8 = null;

                    try {
                        DeleteFile[] var9 = result.deleteFiles();
                        int var10 = var9.length;

                        for(int var11 = 0; var11 < var10; ++var11) {
                            DeleteFile deleteFile = var9[var11];
                            writer.add(deleteFile);
                        }
                    } catch (Throwable var16) {
                        var8 = var16;
                        throw var16;
                    } finally {
                        if (writer != null) {
                            $closeResource(var8, writer);
                        }

                    }

                    deleteManifest = deleteManifestWriter.toManifestFile();
                }

                return new DeltaManifests(dataManifest, deleteManifest, result.referencedDataFiles());
            }

        從上面寫入過程可以看出,datafile和deletefile寫入后,分別生成各自的Manifest文件,最后創(chuàng)建DeltaManifests返回。

        最后通知checkpoint完成,提交checkpoint

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
                super.notifyCheckpointComplete(checkpointId);
                if (checkpointId > this.maxCommittedCheckpointId) {
                    this.commitUpToCheckpoint(this.dataFilesPerCheckpoint, this.flinkJobId, checkpointId);
                    this.maxCommittedCheckpointId = checkpointId;
                }

            }
        private void commitUpToCheckpoint(NavigableMap<Long, byte[]> deltaManifestsMap, String newFlinkJobId, long checkpointId) throws IOException {
                NavigableMap<Long, byte[]> pendingMap = deltaManifestsMap.headMap(checkpointId, true);
                List<ManifestFile> manifests = Lists.newArrayList();
                NavigableMap<Long, WriteResult> pendingResults = Maps.newTreeMap();
                Iterator var8 = pendingMap.entrySet().iterator();

                while(var8.hasNext()) {
                    Entry<Long, byte[]> e = (Entry)var8.next();
                    if (!Arrays.equals(EMPTY_MANIFEST_DATA, (byte[])e.getValue())) {
                        DeltaManifests deltaManifests = (DeltaManifests)SimpleVersionedSerialization.readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, (byte[])e.getValue());
                        pendingResults.put((Long)e.getKey(), FlinkManifestUtil.readCompletedFiles(deltaManifests, this.table.io()));
                        manifests.addAll(deltaManifests.manifests());
                    }
                }

                if (this.replacePartitions) {
                    this.replacePartitions(pendingResults, newFlinkJobId, checkpointId);
                } else {
                    this.commitDeltaTxn(pendingResults, newFlinkJobId, checkpointId);
                }

                pendingMap.clear();
                var8 = manifests.iterator();

                while(var8.hasNext()) {
                    ManifestFile manifest = (ManifestFile)var8.next();

                    try {
                        this.table.io().deleteFile(manifest.path());
                    } catch (Exception var12) {
                        String details = MoreObjects.toStringHelper(this).add("flinkJobId", newFlinkJobId).add("checkpointId", checkpointId).add("manifestPath", manifest.path()).toString();
                        LOG.warn("The iceberg transaction has been committed, but we failed to clean the temporary flink manifests: {}", details, var12);
                    }
                }

            }

        這里會(huì)反序列化之前序列化的值,生成deltaManifests,添加到manifests列表中,manifests值:

        然后根據(jù)replacePartitions(創(chuàng)建時(shí)傳入的overwrite值,默認(rèn)為false)值提交事務(wù),默認(rèn)情況下調(diào)用commitDeltaTxn()

            private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, String newFlinkJobId, long checkpointId) {
                int deleteFilesNum = pendingResults.values().stream().mapToInt((r) -> {
                    return r.deleteFiles().length;
                }).sum();
                Stream var10000;
                if (deleteFilesNum == 0) {
                    AppendFiles appendFiles = this.table.newAppend();
                    int numFiles = 0;
                    Iterator var8 = pendingResults.values().iterator();

                    while(var8.hasNext()) {
                        WriteResult result = (WriteResult)var8.next();
                        Preconditions.checkState(result.referencedDataFiles().length == 0, "Should have no referenced data files.");
                        numFiles += result.dataFiles().length;
                        var10000 = Arrays.stream(result.dataFiles());
                        Objects.requireNonNull(appendFiles);
                        var10000.forEach(appendFiles::appendFile);
                    }

                    this.commitOperation(appendFiles, numFiles, 0, "append", newFlinkJobId, checkpointId);
                } else {
                    Iterator var12 = pendingResults.entrySet().iterator();

                    while(var12.hasNext()) {
                        Entry<Long, WriteResult> e = (Entry)var12.next();
                        WriteResult result = (WriteResult)e.getValue();
                        RowDelta rowDelta = this.table.newRowDelta().validateDataFilesExist(ImmutableList.copyOf(result.referencedDataFiles())).validateDeletedFiles();
                        int numDataFiles = result.dataFiles().length;
                        var10000 = Arrays.stream(result.dataFiles());
                        Objects.requireNonNull(rowDelta);
                        var10000.forEach(rowDelta::addRows);
                        int numDeleteFiles = result.deleteFiles().length;
                        var10000 = Arrays.stream(result.deleteFiles());
                        Objects.requireNonNull(rowDelta);
                        var10000.forEach(rowDelta::addDeletes);
                        this.commitOperation(rowDelta, numDataFiles, numDeleteFiles, "rowDelta", newFlinkJobId, (Long)e.getKey());
                    }
                }

            }

        創(chuàng)建一個(gè)RowDelta的對(duì)象rowDelta或MergeAppend的appendFiles,rowDelta的實(shí)現(xiàn)類為BaseRowDelta繼承自MergingSnapshotProducer作為一個(gè)新的snapshot提交;appendFiles的實(shí)現(xiàn)類MergeAppend,同樣繼承MergingSnapshotProducer。

        private void commitOperation(SnapshotUpdate<?> operation, int numDataFiles, int numDeleteFiles, String description, String newFlinkJobId, long checkpointId) {
                LOG.info("Committing {} with {} data files and {} delete files to table {}", new Object[]{description, numDataFiles, numDeleteFiles, this.table});
                operation.set("flink.max-committed-checkpoint-id", Long.toString(checkpointId));
                operation.set("flink.job-id", newFlinkJobId);
                long start = System.currentTimeMillis();
                operation.commit();
                long duration = System.currentTimeMillis() - start;
                LOG.info("Committed in {} ms", duration);
            }

        operation.commit()會(huì)調(diào)用SnapshotProducer中的commit()方法

        public void commit() {
                AtomicLong newSnapshotId = new AtomicLong(-1L);

                try {
                    Tasks.foreach(new TableOperations[]{this.ops}).retry(this.base.propertyAsInt("commit.retry.num-retries", 4)).exponentialBackoff((long)this.base.propertyAsInt("commit.retry.min-wait-ms", 100), (long)this.base.propertyAsInt("commit.retry.max-wait-ms", 60000), (long)this.base.propertyAsInt("commit.retry.total-timeout-ms", 1800000), 2.0D).onlyRetryOn(CommitFailedException.class).run((taskOps) -> {
                        Snapshot newSnapshot = this.apply();
                        newSnapshotId.set(newSnapshot.snapshotId());
                        TableMetadata updated;
                        if (this.stageOnly) {
                            updated = this.base.addStagedSnapshot(newSnapshot);
                        } else {
                            updated = this.base.replaceCurrentSnapshot(newSnapshot);
                        }

                        if (updated != this.base) {
                            taskOps.commit(this.base, updated.withUUID());
                        }
                    });
                } catch (RuntimeException var5) {
                    Exceptions.suppressAndThrow(var5, this::cleanAll);
                }

                LOG.info("Committed snapshot {} ({})", newSnapshotId.get(), this.getClass().getSimpleName());

                try {
                    Snapshot saved = this.ops.refresh().snapshot(newSnapshotId.get());
                    if (saved != null) {
                        this.cleanUncommitted(Sets.newHashSet(saved.allManifests()));
                        Iterator var3 = this.manifestLists.iterator();

                        while(var3.hasNext()) {
                            String manifestList = (String)var3.next();
                            if (!saved.manifestListLocation().equals(manifestList)) {
                                this.deleteFile(manifestList);
                            }
                        }
                    } else {
                        LOG.warn("Failed to load committed snapshot, skipping manifest clean-up");
                    }
                } catch (RuntimeException var6) {
                    LOG.warn("Failed to load committed table metadata, skipping manifest clean-up", var6);
                }

                this.notifyListeners();
            }

        SnapshotProducer.apply() 方法執(zhí)行寫入manifestFiles數(shù)據(jù),返回快照數(shù)據(jù);

        public Snapshot apply() {
                this.base = this.refresh();
                Long parentSnapshotId = this.base.currentSnapshot() != null ? this.base.currentSnapshot().snapshotId() : null;
                long sequenceNumber = this.base.nextSequenceNumber();
                this.validate(this.base);
                List<ManifestFile> manifests = this.apply(this.base);
                if (this.base.formatVersion() <= 1 && !this.base.propertyAsBoolean("write.manifest-lists.enabled"true)) {
                    return new BaseSnapshot(this.ops.io(), this.snapshotId(), parentSnapshotId, System.currentTimeMillis(), this.operation(), this.summary(this.base), manifests);
                } else {
                    OutputFile manifestList = this.manifestListPath();

                    try {
                        ManifestListWriter writer = ManifestLists.write(this.ops.current().formatVersion(), manifestList, this.snapshotId(), parentSnapshotId, sequenceNumber);
                        Throwable var7 = null;

                        try {
                            this.manifestLists.add(manifestList.location());
                            ManifestFile[] manifestFiles = new ManifestFile[manifests.size()];
                            Tasks.range(manifestFiles.length).stopOnFailure().throwFailureWhenFinished().executeWith(ThreadPools.getWorkerPool()).run((index) -> {
                                manifestFiles[index] = (ManifestFile)this.manifestsWithMetadata.get((ManifestFile)manifests.get(index));
                            });
                            writer.addAll(Arrays.asList(manifestFiles));
                        } catch (Throwable var13) {
                            var7 = var13;
                            throw var13;
                        } finally {
                            if (writer != null) {
                                $closeResource(var7, writer);
                            }

                        }
                    } catch (IOException var15) {
                        throw new RuntimeIOException(var15, "Failed to write manifest list file", new Object[0]);
                    }

                    return new BaseSnapshot(this.ops.io(), sequenceNumber, this.snapshotId(), parentSnapshotId, System.currentTimeMillis(), this.operation(), this.summary(this.base), manifestList.location());
                }
            }

        然后生成表的元數(shù)據(jù)updated

        public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) {
                if (this.snapshotsById.containsKey(snapshot.snapshotId())) {
                    return this.setCurrentSnapshotTo(snapshot);
                } else {
                    ValidationException.check(this.formatVersion == 1 || snapshot.sequenceNumber() > this.lastSequenceNumber, "Cannot add snapshot with sequence number %s older than last sequence number %s", new Object[]{snapshot.sequenceNumber(), this.lastSequenceNumber});
                    List<Snapshot> newSnapshots = ImmutableList.builder().addAll(this.snapshots).add(snapshot).build();
                    List<HistoryEntry> newSnapshotLog = ImmutableList.builder().addAll(this.snapshotLog).add(new TableMetadata.SnapshotLogEntry(snapshot.timestampMillis(), snapshot.snapshotId())).build();
                    return new TableMetadata((InputFile)null, this.formatVersion, this.uuid, this.location, snapshot.sequenceNumber(), snapshot.timestampMillis(), this.lastColumnId, this.schema, this.defaultSpecId, this.specs, this.defaultSortOrderId, this.sortOrders, this.properties, snapshot.snapshotId(), newSnapshots, newSnapshotLog, this.addPreviousFile(this.file, this.lastUpdatedMillis));
                }
            }

        調(diào)用BaseMetastoreTableOperations算子的commit()方法

        public void commit(TableMetadata base, TableMetadata metadata) {
                if (base != this.current()) {
                    throw new CommitFailedException("Cannot commit: stale table metadata", new Object[0]);
                } else if (base == metadata) {
                    LOG.info("Nothing to commit.");
                } else {
                    long start = System.currentTimeMillis();
                    this.doCommit(base, metadata);
                    this.deleteRemovedMetadataFiles(base, metadata);
                    this.requestRefresh();
                    LOG.info("Successfully committed to table {} in {} ms", this.tableName(), System.currentTimeMillis() - start);
                }
            }

        最后調(diào)用HiveTableOperations的doCommit(),執(zhí)行提交操作。

        protected void doCommit(TableMetadata base, TableMetadata metadata) {
                String newMetadataLocation = this.writeNewMetadata(metadata, this.currentVersion() + 1);
                boolean hiveEngineEnabled = hiveEngineEnabled(metadata, this.conf);
                boolean threw = true;
                boolean updateHiveTable = false;
                Optional lockId = Optional.empty();

                try {
                    lockId = Optional.of(this.acquireLock());
                    Table tbl = this.loadHmsTable();
                    if (tbl != null) {
                        if (base == null && tbl.getParameters().get("metadata_location") != null) {
                            throw new AlreadyExistsException("Table already exists: %s.%s", new Object[]{this.database, this.tableName});
                        }

                        updateHiveTable = true;
                        LOG.debug("Committing existing table: {}", this.fullName);
                    } else {
                        tbl = this.newHmsTable();
                        LOG.debug("Committing new table: {}", this.fullName);
                    }

                    tbl.setSd(this.storageDescriptor(metadata, hiveEngineEnabled));
                    String metadataLocation = (String)tbl.getParameters().get("metadata_location");
                    String baseMetadataLocation = base != null ? base.metadataFileLocation() : null;
                    if (!Objects.equals(baseMetadataLocation, metadataLocation)) {
                        throw new CommitFailedException("Base metadata location '%s' is not same as the current table metadata location '%s' for %s.%s", new Object[]{baseMetadataLocation, metadataLocation, this.database, this.tableName});
                    }

                    this.setParameters(newMetadataLocation, tbl, hiveEngineEnabled);
                    this.persistTable(tbl, updateHiveTable);
                    threw = false;
                } catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException var16) {
                    throw new AlreadyExistsException("Table already exists: %s.%s", new Object[]{this.database, this.tableName});
                } catch (UnknownHostException | TException var17) {
                    if (var17.getMessage() != null && var17.getMessage().contains("Table/View 'HIVE_LOCKS' does not exist")) {
                        throw new RuntimeException("Failed to acquire locks from metastore because 'HIVE_LOCKS' doesn't exist, this probably happened when using embedded metastore or doesn't create a transactional meta table. To fix this, use an alternative metastore", var17);
                    }

                    throw new RuntimeException(String.format("Metastore operation failed for %s.%s", this.database, this.tableName), var17);
                } catch (InterruptedException var18) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException("Interrupted during commit", var18);
                } finally {
                    this.cleanupMetadataAndUnlock(threw, newMetadataLocation, lockId);
                }
        附:flink task執(zhí)行流程

        task的生命周期:

        StreamTask是所有stream task的基本類。一個(gè)task 運(yùn)行一個(gè)或者多個(gè)StreamOperator(如果成chain)。成chain的算子在同一個(gè)線程內(nèi)同步運(yùn)行。

        執(zhí)行過程:

        @Override
         public final void invoke() throws Exception {
          try {
           beforeInvoke();

           // final check to exit early before starting to run
           if (canceled) {
            throw new CancelTaskException();
           }

           // let the task do its work
           runMailboxLoop();

           // if this left the run() method cleanly despite the fact that this was canceled,
           // make sure the "clean shutdown" is not attempted
           if (canceled) {
            throw new CancelTaskException();
           }

           afterInvoke();
          }
          catch (Exception invokeException) {
           try {
            cleanUpInvoke();
           }
           catch (Throwable cleanUpException) {
            throw (Exception) ExceptionUtils.firstOrSuppressed(cleanUpException, invokeException);
           }
           throw invokeException;
          }
          cleanUpInvoke();
         }

        在beforeInvoke中會(huì)做一些初始化工作,包括提取出所有的operator等。 在runMailboxLoop中調(diào)用task運(yùn)行 在afterInvoke中結(jié)束

        調(diào)用關(guān)系:

        -- invoke()
        *        |
        *        +----> Create basic utils (config, etc) and load the chain of operators
        *        +----> operators.setup()
        *        +----> task specific init()
        *        +----> initialize-operator-states()
        *        +----> open-operators()
        *        +----> run()
        * --------------> mailboxProcessor.runMailboxLoop();
        * --------------> StreamTask.processInput()
        * --------------> StreamTask.inputProcessor.processInput()
        * --------------> 間接調(diào)用 operator的processElement()和processWatermark()方法
        *        +----> close-operators()
        *        +----> dispose-operators()
        *        +----> common cleanup
        *        +----> task specific cleanup()
        1. 創(chuàng)建狀態(tài)存儲(chǔ)后端,為 OperatorChain 中的所有算子提供狀態(tài)
        2. 加載 OperatorChain 中的所有算子
        3. 所有的 operator 調(diào)用 setup
        4. task 相關(guān)的初始化操作
        5. 所有 operator 調(diào)用 initializeState 初始化狀態(tài)
        6. 所有的 operator 調(diào)用 open
        7. run 方法循環(huán)處理數(shù)據(jù)
        8. 所有 operator 調(diào)用 close
        9. 所有 operator 調(diào)用 dispose
        10. 通用的 cleanup 操作
        11. task 相關(guān)的 cleanup 操作
        如果這個(gè)文章對(duì)你有幫助,不要忘記 「在看」 「點(diǎn)贊」 「收藏」 三連啊喂!

        2022年全網(wǎng)首發(fā)|大數(shù)據(jù)專家級(jí)技能模型與學(xué)習(xí)指南(勝天半子篇)
        互聯(lián)網(wǎng)最壞的時(shí)代可能真的來了
        我在B站讀大學(xué),大數(shù)據(jù)專業(yè)
        我們?cè)趯W(xué)習(xí)Flink的時(shí)候,到底在學(xué)習(xí)什么?
        193篇文章暴揍Flink,這個(gè)合集你需要關(guān)注一下
        Flink生產(chǎn)環(huán)境TOP難題與優(yōu)化,阿里巴巴藏經(jīng)閣YYDS
        Flink CDC我吃定了耶穌也留不住他!| Flink CDC線上問題小盤點(diǎn)
        我們?cè)趯W(xué)習(xí)Spark的時(shí)候,到底在學(xué)習(xí)什么?
        在所有Spark模塊中,我愿稱SparkSQL為最強(qiáng)!
        硬剛Hive | 4萬字基礎(chǔ)調(diào)優(yōu)面試小總結(jié)
        數(shù)據(jù)治理方法論和實(shí)踐小百科全書
        標(biāo)簽體系下的用戶畫像建設(shè)小指南
        4萬字長(zhǎng)文 | ClickHouse基礎(chǔ)&實(shí)踐&調(diào)優(yōu)全視角解析
        【面試&個(gè)人成長(zhǎng)】2021年過半,社招和校招的經(jīng)驗(yàn)之談
        大數(shù)據(jù)方向另一個(gè)十年開啟 |《硬剛系列》第一版完結(jié)
        我寫過的關(guān)于成長(zhǎng)/面試/職場(chǎng)進(jìn)階的文章
        當(dāng)我們?cè)趯W(xué)習(xí)Hive的時(shí)候在學(xué)習(xí)什么?「硬剛Hive續(xù)集」
        瀏覽 69
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            色戒完整版在线观看免费视频电影 | 免费性爱视频在线观看 | 囯产精品久久久久久久久九秃爱 | 麻豆天堂在线观看了 | 蘑菇视频logo | 成人AV影院久久 | 女人被暴躁c到高潮 | 又黄又免费的动漫 | 国产黄免费 | 国产很黄很色的视频 |