1. 如何用 Java 幾分鐘處理完 30 億個(gè)數(shù)據(jù)?

        共 8972字,需瀏覽 18分鐘

         ·

        2022-07-06 10:31

        往期熱門文章:

        1、面試官 | Spring Boot 項(xiàng)目如何統(tǒng)一結(jié)果,統(tǒng)一異常,統(tǒng)一日志?

        2、為什么不建議使用ON DUPLICATE KEY UPDATE?

        3、Java8 Stream,過分絲滑!

        4、8 種最坑SQL語法,工作中踩過嗎?

        5、Java 語言“坑爹” TOP 10

        轉(zhuǎn)自:Dream_it_possible!,

        鏈接:blog.csdn.net/qq_33036061/article/details/124568689


        1. 場景說明


        現(xiàn)有一個(gè) 10G 文件的數(shù)據(jù),里面包含了 18-70 之間的整數(shù),分別表示 18-70 歲的人群數(shù)量統(tǒng)計(jì)。假設(shè)年齡范圍分布均勻,分別表示系統(tǒng)中所有用戶的年齡數(shù),找出重復(fù)次數(shù)最多的那個(gè)數(shù),現(xiàn)有一臺內(nèi)存為 4G、2 核 CPU 的電腦,請寫一個(gè)算法實(shí)現(xiàn)。


        23,31,42,19,60,30,36,........


        2. 模擬數(shù)據(jù)

        Java 中一個(gè)整數(shù)占 4 個(gè)字節(jié),模擬 10G 為 30 億左右個(gè)數(shù)據(jù), 采用追加模式寫入 10G 數(shù)據(jù)到硬盤里。

        每 100 萬個(gè)記錄寫一行,大概 4M 一行,10G 大概 2500 行數(shù)據(jù)。


        package bigdata; import java.io.*;import java.util.Random; /** * @Desc: * @Author: bingbing * @Date: 2022/5/4 0004 19:05 */public class GenerateData {    private static Random random = new Random();     public static int generateRandomData(int start, int end) {        return random.nextInt(end - start + 1) + start;    }     /**     * 產(chǎn)生10G的 1-1000的數(shù)據(jù)在D盤     */    public void generateData() throws IOException {        File file = new File("D:\\ User.dat");        if (!file.exists()) {            try {                file.createNewFile();            } catch (IOException e) {                e.printStackTrace();            }        }         int start = 18;        int end = 70;        long startTime = System.currentTimeMillis();        BufferedWriter bos = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file, true)));        for (long i = 1; i < Integer.MAX_VALUE * 1.7; i++) {            String data = generateRandomData(start, end) + ",";            bos.write(data);            // 每100萬條記錄成一行,100萬條數(shù)據(jù)大概4M            if (i % 1000000 == 0) {                bos.write("\n");            }        }        System.out.println("寫入完成! 共花費(fèi)時(shí)間:" + (System.currentTimeMillis() - startTime) / 1000 + " s");        bos.close();    }     public static void main(String[] args) {        GenerateData generateData = new GenerateData();        try {            generateData.generateData();        } catch (IOException e) {            e.printStackTrace();        }    }}

        上述代碼調(diào)整參數(shù)執(zhí)行 2 次,湊 10G 數(shù)據(jù)在 D 盤 User.dat 文件里:



        準(zhǔn)備好 10G 數(shù)據(jù)后,接著寫如何處理這些數(shù)據(jù)。

        3. 場景分析

        10G 的數(shù)據(jù)比當(dāng)前擁有的運(yùn)行內(nèi)存大的多,不能全量加載到內(nèi)存中讀取。如果采用全量加載,那么內(nèi)存會(huì)直接爆掉,只能按行讀取。Java 中的 bufferedReader 的 readLine() 按行讀取文件里的內(nèi)容。

        4. 讀取數(shù)據(jù)

        首先,我們寫一個(gè)方法單線程讀完這 30 億數(shù)據(jù)需要多少時(shí)間,每讀 100 行打印一次:


        private static void readData() throws IOException {    BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(FILE_NAME), "utf-8"));    String line;    long start = System.currentTimeMillis();    int count = 1;    while ((line = br.readLine()) != null) {        // 按行讀取        if (count % 100 == 0) {            System.out.println("讀取100行,總耗時(shí)間: " + (System.currentTimeMillis() - start) / 1000 + " s");            System.gc();        }        count++;    }    running = false;    br.close();}


        按行讀完 10G 的數(shù)據(jù)大概 20 秒,基本每 100 行,1 億多數(shù)據(jù)花 1 秒,速度還挺快。




        5. 處理數(shù)據(jù)


        5.1 思路一

        通過單線程處理,初始化一個(gè) countMap,key 為年齡,value 為出現(xiàn)的次數(shù)。將每行讀取到的數(shù)據(jù)按照 "," 進(jìn)行分割,然后獲取到的每一項(xiàng)進(jìn)行保存到 countMap 里。如果存在,那么值 key 的 value+1。


        for (int i = start; i <= end; i++) {    try {        File subFile = new File(dir + "\\" + i + ".dat");        if (!file.exists()) {            subFile.createNewFile();        }        countMap.computeIfAbsent(i + "", integer -> new AtomicInteger(0));    } catch (FileNotFoundException e) {        e.printStackTrace();    } catch (IOException e) {        e.printStackTrace();    }}


        單線程讀取并統(tǒng)計(jì) countMap:


        publicstatic void splitLine(String lineData) {    String[] arr = lineData.split(",");    for (String str : arr) {        if (StringUtils.isEmpty(str)) {            continue;        }        countMap.computeIfAbsent(str, s -> new AtomicInteger(0)).getAndIncrement();    }}

        通過比較找出年齡數(shù)最多的年齡并打印出來:


        private static void findMostAge() {    Integer targetValue = 0;    String targetKey = null;    Iterator<Map.Entry<String, AtomicInteger>> entrySetIterator = countMap.entrySet().iterator();    while (entrySetIterator.hasNext()) {        Map.Entry<String, AtomicInteger> entry = entrySetIterator.next();        Integer value = entry.getValue().get();        String key = entry.getKey();        if (value > targetValue) {            targetValue = value;            targetKey = key;        }    }    System.out.println("數(shù)量最多的年齡為:" + targetKey + "數(shù)量為:" + targetValue);}


        測試結(jié)果

        總共花了 3 分鐘讀取完并統(tǒng)計(jì)完所有數(shù)據(jù)。



        內(nèi)存消耗為 2G-2.5G,CPU 利用率太低,只向上浮動(dòng)了 20%-25% 之間。




        要想提高 CPU 利用率,那么可以使用多線程去處理。

        下面我們使用多線程去解決這個(gè) CPU 利用率低的問題。


        5.2 思路二:分治法

        使用多線程去消費(fèi)讀取到的數(shù)據(jù)。采用生產(chǎn)者、消費(fèi)者模式去消費(fèi)數(shù)據(jù)。


        因?yàn)樵谧x取的時(shí)候是比較快的,單線程的數(shù)據(jù)處理能力比較差。因此思路一的性能阻塞在取數(shù)據(jù)的一方且又是同步操作,導(dǎo)致整個(gè)鏈路的性能會(huì)變的很差。


        所謂分治法就是分而治之,也就是說將海量數(shù)據(jù)分割處理。根據(jù) CPU 的能力初始化 n 個(gè)線程,每一個(gè)線程去消費(fèi)一個(gè)隊(duì)列,這樣線程在消費(fèi)的時(shí)候不會(huì)出現(xiàn)搶占隊(duì)列的問題。同時(shí)為了保證線程安全和生產(chǎn)者消費(fèi)者模式的完整,采用阻塞隊(duì)列。Java 中提供了 LinkedBlockingQueue 就是一個(gè)阻塞隊(duì)列。



        初始化阻塞隊(duì)列

        使用 LinkedList 創(chuàng)建一個(gè)阻塞隊(duì)列列表:


        private static List<LinkedBlockingQueue<String>> blockQueueLists = new LinkedList<>();

        在 static 塊里初始化阻塞隊(duì)列的數(shù)量和單個(gè)阻塞隊(duì)列的容量為 256。


        上面講到了 30 億數(shù)據(jù)大概 2500 行,按行塞到隊(duì)列里。20 個(gè)隊(duì)列,那么每個(gè)隊(duì)列 125 個(gè),因此可以容量可以設(shè)計(jì)為 256 即可。


        //每個(gè)隊(duì)列容量為256for (int i = 0; i < threadNums; i++) {    blockQueueLists.add(new LinkedBlockingQueue<>(256));}

        生產(chǎn)者

        為了實(shí)現(xiàn)負(fù)載的功能,首先定義一個(gè) count 計(jì)數(shù)器,用來記錄行數(shù):


        private static AtomicLong count = new AtomicLong(0);

        按照行數(shù)來計(jì)算隊(duì)列的下標(biāo) long index=count.get()%threadNums。 


        下面算法就實(shí)現(xiàn)了對隊(duì)列列表中的隊(duì)列進(jìn)行輪詢的投放:


        static class SplitData {    public static void splitLine(String lineData) {        String[] arr = lineData.split("\n");        for (String str : arr) {            if (StringUtils.isEmpty(str)) {                continue;            }            long index = count.get() % threadNums;            try {                // 如果滿了就阻塞                blockQueueLists.get((int) index).put(str);            } catch (InterruptedException e) {                e.printStackTrace();            }            count.getAndIncrement();        }    }

        消費(fèi)者


        1) 隊(duì)列線程私有化

        消費(fèi)方在啟動(dòng)線程的時(shí)候根據(jù) index 去獲取到指定的隊(duì)列,這樣就實(shí)現(xiàn)了隊(duì)列的線程私有化。


        private static void startConsumer() throws FileNotFoundException, UnsupportedEncodingException {    //如果共用一個(gè)隊(duì)列,那么線程不宜過多,容易出現(xiàn)搶占現(xiàn)象    System.out.println("開始消費(fèi)...");    for (int i = 0; i < threadNums; i++) {        final int index = i;        // 每一個(gè)線程負(fù)責(zé)一個(gè) queue,這樣不會(huì)出現(xiàn)線程搶占隊(duì)列的情況。        new Thread(() -> {            while (consumerRunning) {                startConsumer = true;                try {                    String str = blockQueueLists.get(index).take();                    countNum(str);                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        }).start();        }}

        2) 多子線程分割字符串


        由于從隊(duì)列中多到的字符串非常的龐大,如果又是用單線程調(diào)用 split(",") 去分割,那么性能同樣會(huì)阻塞在這個(gè)地方。


        // 按照 arr的大小,運(yùn)用多線程分割字符串private static void countNum(String str) {    int[] arr = new int[2];    arr[1] = str.length() / 3;    for (int i = 0; i < 3; i++) {        final String innerStr = SplitData.splitStr(str, arr);        new Thread(() -> {            String[] strArray = innerStr.split(",");            for (String s : strArray) {                countMap.computeIfAbsent(s, s1 -> new AtomicInteger(0)).getAndIncrement();            }        }).start();    }}


        3) 分割字符串算法

        分割時(shí)從 0 開始,按照等分的原則,將字符串 n 等份,每一個(gè)線程分到一份。

        用一個(gè) arr 數(shù)組的 arr[0] 記錄每次的分割開始位置。arr[1] 記錄每次分割的結(jié)束位置,如果遇到的開始的字符不為 "," 那么就 startIndex-1。如果結(jié)束的位置不為 "," 那么將 endIndex 向后移一位。

        如果 endIndex 超過了字符串的最大長度,那么就把最后一個(gè)字符賦值給 arr[1]。


        /** * 按照 x坐標(biāo) 來分割 字符串,如果切到的字符不為“,”, 那么把坐標(biāo)向前或者向后移動(dòng)一位。 * * @param line * @param arr  存放x1,x2坐標(biāo) * @return */public static String splitStr(String line, int[] arr) {    int startIndex = arr[0];    int endIndex = arr[1];    char start = line.charAt(startIndex);    char end = line.charAt(endIndex);    if ((startIndex == 0 || start == ',') && end == ',') {        arr[0] = endIndex + 1;        arr[1] = arr[0] + line.length() / 3;        if (arr[1] >= line.length()) {            arr[1] = line.length() - 1;        }        return line.substring(startIndex, endIndex);    }
        if (startIndex != 0 && start != ',') { startIndex = startIndex - 1; }
        if (end != ',') { endIndex = endIndex + 1; }
        arr[0] = startIndex; arr[1] = endIndex; if (arr[1] >= line.length()) { arr[1] = line.length() - 1; } return splitStr(line, arr);}

        測試結(jié)果


        內(nèi)存和 CPU 初始占用大?。?/span>



        啟動(dòng)后,運(yùn)行時(shí)內(nèi)存穩(wěn)定在 11.7G,CPU 穩(wěn)定利用在 90% 以上。



        總耗時(shí)由 180 秒縮減到 103 秒,效率提升 75%,得到的結(jié)果也與單線程處理的一致。



        6. 遇到的問題


        如果在運(yùn)行了的時(shí)候,發(fā)現(xiàn) GC 突然罷工不工作了,有可能是 JVM 的堆中存在的垃圾太多,沒回收導(dǎo)致內(nèi)存的突增。



        解決方法


        在讀取一定數(shù)量后,可以讓主線程暫停幾秒,手動(dòng)調(diào)用 GC。


        提示: 本 demo 的線程創(chuàng)建都是手動(dòng)創(chuàng)建的,實(shí)際開發(fā)中使用的是線程池。

        最近熱文閱讀:

        1、面試官 | Spring Boot 項(xiàng)目如何統(tǒng)一結(jié)果,統(tǒng)一異常,統(tǒng)一日志?
        2、為什么不建議使用ON DUPLICATE KEY UPDATE?
        3、Java8 Stream,過分絲滑!
        4、8 種最坑SQL語法,工作中踩過嗎?
        5、Java 語言“坑爹” TOP 10
        6、你還不明白如何解決分布式Session?看這篇就夠了!
        7、能解決 80% 故障的排查思路
        8、程序員坐牢了,會(huì)被安排寫代碼嗎?
        9、面試被問Nginx,怎么破?
        10、為什么很多 SpringBoot 開發(fā)者放棄了 Tomcat,選擇了 Undertow?
        關(guān)注公眾號,你想要的Java都在這里

        瀏覽 58
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評論
        圖片
        表情
        推薦
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
          
          

            1. 久久精品国产9久久综合日本欧 | 小泽玛利亚av在线 | 插进去综合 | 草久在线观看精品免费 | 韩国伦理一区二区 |