1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        Hive基于UDF進行文本分詞

        共 12431字,需瀏覽 25分鐘

         ·

        2020-12-31 23:49

        本文大綱

        UDF 簡介

        Hive作為一個sql查詢引擎,自帶了一些基本的函數(shù),比如count(計數(shù)),sum(求和),有時候這些基本函數(shù)滿足不了我們的需求,這時候就要寫hive hdf(user defined funation),又叫用戶自定義函數(shù)。編寫Hive UDF的步驟:

        • 添加相關(guān)依賴,創(chuàng)建項目,這里我用的管理工具是maven,所以我創(chuàng)建的也是一個maven 項目(這個時候你需要選擇合適的依賴版本,主要是Hadoop 和 Hive,可以使用hadoop versionhive --version 來分別查看版本)

        • 繼承org.apache.hadoop.hive.ql.exec.UDF類,實現(xiàn)evaluate方法,然后打包;

        • 使用 add方法添加jar 包到分布式緩存,如果jar包是上傳到$HIVE_HOME/lib/目錄以下,就不需要執(zhí)行add命令了

        • 通過create temporary function創(chuàng)建臨時函數(shù),不加temporary就創(chuàng)建了一個永久函數(shù);

        • 在SQL 中使用你創(chuàng)建的UDF;

        UDF分詞

        這個是一個比較常見的場景,例如公司的產(chǎn)品有每天都會產(chǎn)生大量的彈幕或者評論,這個時候我們可能會想去分析一下大家最關(guān)心的熱點話題是什么,或者是我們會分析最近一段時間的網(wǎng)絡趨勢是什么,但是這里有一個問題就是你的詞庫建設的問題,因為你使用通用的詞庫可能不能達到很好的分詞效果,尤其有很多網(wǎng)絡流行用語它是不在詞庫里的,還有一個就是停用詞的問題了,因為很多時候停用詞是沒有意義的,所以這里我們需要將其過濾,而過濾的方式就是通過停用詞詞表進行過濾。

        這個時候我們的解決方案主要有兩種,一種是使用第三方提供的一些詞庫,還有一種是自建詞庫,然后有專人去維護,這個也是比較常見的一種情況。

        最后一個就是我們使用的分詞工具,因為目前主流的分詞器很多,選擇不同的分詞工具可能對我們的分詞結(jié)果有很多影響。

        分詞工具

        1:Elasticsearch的開源中文分詞器 IK Analysis(Star:2471)

        IK中文分詞器在Elasticsearch上的使用。原生IK中文分詞是從文件系統(tǒng)中讀取詞典,es-ik本身可擴展成從不同的源讀取詞典。目前提供從sqlite3數(shù)據(jù)庫中讀取。es-ik-plugin-sqlite3使用方法:1. 在elasticsearch.yml中設置你的sqlite3詞典的位置:ik_analysis_db_path: /opt/ik/dictionary.db

        2:開源的java中文分詞庫 IKAnalyzer(Star:343)

        IK Analyzer 是一個開源的,基于java語言開發(fā)的輕量級的中文分詞工具包。從2006年12月推出1.0版開始, IKAnalyzer已經(jīng)推出了4個大版本。最初,它是以開源項目Luence為應用主體的,結(jié)合詞典分詞和文法分析算法的中文分詞組件。從3.0版本開始,IK發(fā)展為面向Java的公用分詞組件,獨立于Lucene項目

        3:java開源中文分詞 Ansj(Star:3019)

        Ansj中文分詞 這是一個ictclas的java實現(xiàn).基本上重寫了所有的數(shù)據(jù)結(jié)構(gòu)和算法.詞典是用的開源版的ictclas所提供的.并且進行了部分的人工優(yōu)化 分詞速度達到每秒鐘大約200萬字左右,準確率能達到96%以上。

        目前實現(xiàn)了.中文分詞. 中文姓名識別 . 詞性標注、用戶自定義詞典,關(guān)鍵字提取,自動摘要,關(guān)鍵字標記等功能。

        可以應用到自然語言處理等方面,適用于對分詞效果要求高的各種項目.

        4:結(jié)巴分詞 ElasticSearch 插件(Star:188)

        elasticsearch官方只提供smartcn這個中文分詞插件,效果不是很好,好在國內(nèi)有medcl大神(國內(nèi)最早研究es的人之一)寫的兩個中文分詞插件,一個是ik的,一個是mmseg的

        5:Java分布式中文分詞組件 - word分詞(Star:672)

        word分詞是一個Java實現(xiàn)的分布式的中文分詞組件,提供了多種基于詞典的分詞算法,并利用ngram模型來消除歧義。能準確識別英文、數(shù)字,以及日期、時間等數(shù)量詞,能識別人名、地名、組織機構(gòu)名等未登錄詞

        6:Java開源中文分詞器jcseg(Star:400)

        Jcseg是什么?Jcseg是基于mmseg算法的一個輕量級開源中文分詞器,同時集成了關(guān)鍵字提取,關(guān)鍵短語提取,關(guān)鍵句子提取和文章自動摘要等功能,并且提供了最新版本的lucene, solr, elasticsearch的分詞接口, Jcseg自帶了一個 jcseg.properties文件…

        7:中文分詞庫Paoding

        庖丁中文分詞庫是一個使用Java開發(fā)的,可結(jié)合到Lucene應用中的,為互聯(lián)網(wǎng)、企業(yè)內(nèi)部網(wǎng)使用的中文搜索引擎分詞組件。Paoding填補了國內(nèi)中文分詞方面開源組件的空白,致力于此并希翼成為互聯(lián)網(wǎng)網(wǎng)站首選的中文分詞開源組件。Paoding中文分詞追求分詞的高效率和用戶良好體驗。

        8:中文分詞器mmseg4j

        mmseg4j 用 Chih-Hao Tsai 的 MMSeg 算法(http://technology.chtsai.org/mmseg/ )實現(xiàn)的中文分詞器,并實現(xiàn) lucene 的 analyzer 和 solr 的TokenizerFactory 以方便在Lucene和Solr中使…

        9:中文分詞Ansj(Star:3015)

        Ansj中文分詞 這是一個ictclas的java實現(xiàn).基本上重寫了所有的數(shù)據(jù)結(jié)構(gòu)和算法.詞典是用的開源版的ictclas所提供的.并且進行了部分的人工優(yōu)化 內(nèi)存中中文分詞每秒鐘大約100萬字(速度上已經(jīng)超越ictclas) 文件讀取分詞每秒鐘大約30萬字 準確率能達到96%以上 目前實現(xiàn)了….

        10:Lucene中文分詞庫ICTCLAS4J

        ictclas4j中文分詞系統(tǒng)是sinboy在中科院張華平和劉群老師的研制的FreeICTCLAS的基礎上完成的一個java開源分詞項目,簡化了原分詞程序的復雜度,旨在為廣大的中文分詞愛好者一個更好的學習機會。

        代碼實現(xiàn)

        第一步:引入依賴

        這里我們引入了兩個依賴,其實是兩個不同分詞工具


        ??org.ansj
        ??ansj_seg
        ??5.1.6
        ??compile


        ??com.janeluo
        ??ikanalyzer
        ??2012_u6

        在開始之前我們先寫一個demo 玩玩,讓大家有個基本的認識

        @Test
        public??void?testAnsjSeg()?{
        ????String?str?=?"我叫李太白,我是一個詩人,我生活在唐朝"?;
        ??????//?選擇使用哪種分詞器?BaseAnalysis?ToAnalysis?NlpAnalysis??IndexAnalysis
        ????Result?result?=?ToAnalysis.parse(str);
        ????System.out.println(result);
        ????KeyWordComputer?kwc?=?new?KeyWordComputer(5);
        ????Collection?keywords?=?kwc.computeArticleTfidf(str);
        ????System.out.println(keywords);
        }

        輸出結(jié)果

        /r,叫/v,李太白/nr,,/w,我/r,是/v,一個/m,詩人/n,,/w,我/r,生活/vn,在/p,唐朝/t
        [李太白/24.72276098504223,?詩人/3.0502185968368885,?唐朝/0.8965677022546215,?生活/0.6892230219652541]

        第二步:引入停用詞詞庫

        因為是停用詞詞庫,本身也不是很大,所以我直接放在項目里了,當然你也可以放在其他地方,例如HDFS 上

        第三步:編寫UDF

        代碼很簡單我就不不做詳細解釋了,需要注意的是GenericUDF 里面的一些方法的使用規(guī)則,至于代碼設計的好壞以及還有什么改進的方案我們后面再說,下面兩套實現(xiàn)的思路幾乎是一致的,不一樣的是在使用的分詞工具上的不一樣

        ansj的實現(xiàn)

        /**
        ?*?Chinese?words?segmentation?with?user-dict?in?com.kingcall.dic
        ?*?use?Ansj(a?java?open?source?analyzer)
        ?*/


        //?這個信息就是你每次使用desc?進行獲取函數(shù)信息的時候返回的
        @Description(name?=?"ansj_seg",?value?=?"_FUNC_(str)?-?chinese?words?segment?using?ansj.?Return?list?of?words.",
        ????????extended?=?"Example:?select?_FUNC_('我是測試字符串')?from?src?limit?1;\n"
        ????????????????+?"[\"我\",?\"是\",?\"測試\",?\"字符串\"]")

        public?class?AnsjSeg?extends?GenericUDF?{
        ????private?transient?ObjectInspectorConverters.Converter[]?converters;
        ????private?static?final?String?userDic?=?"/app/stopwords/com.kingcall.dic";

        ????//load?userDic?in?hdfs
        ????static?{
        ????????try?{
        ????????????FileSystem?fs?=?FileSystem.get(new?Configuration());
        ????????????FSDataInputStream?in?=?fs.open(new?Path(userDic));
        ????????????BufferedReader?br?=?new?BufferedReader(new?InputStreamReader(in));

        ????????????String?line?=?null;
        ????????????String[]?strs?=?null;
        ????????????while?((line?=?br.readLine())?!=?null)?{
        ????????????????line?=?line.trim();
        ????????????????if?(line.length()?>?0)?{
        ????????????????????strs?=?line.split("\t");
        ????????????????????strs[0]?=?strs[0].toLowerCase();
        ????????????????????DicLibrary.insert(DicLibrary.DEFAULT,?strs[0]);?//ignore?nature?and?freq
        ????????????????}
        ????????????}
        ????????????MyStaticValue.isNameRecognition?=?Boolean.FALSE;
        ????????????MyStaticValue.isQuantifierRecognition?=?Boolean.TRUE;
        ????????}?catch?(Exception?e)?{
        ????????????System.out.println("Error?when?load?userDic"?+?e.getMessage());
        ????????}
        ????}

        ????@Override
        ????public?ObjectInspector?initialize(ObjectInspector[]?arguments)?throws?UDFArgumentException?{
        ????????if?(arguments.length?1?||?arguments.length?>?2)?{
        ????????????throw?new?UDFArgumentLengthException(
        ????????????????????"The?function?AnsjSeg(str)?takes?1?or?2?arguments.");
        ????????}

        ????????converters?=?new?ObjectInspectorConverters.Converter[arguments.length];
        ????????converters[0]?=?ObjectInspectorConverters.getConverter(arguments[0],?PrimitiveObjectInspectorFactory.writableStringObjectInspector);
        ????????if?(2?==?arguments.length)?{
        ????????????converters[1]?=?ObjectInspectorConverters.getConverter(arguments[1],?PrimitiveObjectInspectorFactory.writableIntObjectInspector);
        ????????}
        ????????return?ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
        ????}


        ????@Override
        ????public?Object?evaluate(DeferredObject[]?arguments)?throws?HiveException?{
        ????????boolean?filterStop?=?false;
        ????????if?(arguments[0].get()?==?null)?{
        ????????????return?null;
        ????????}
        ????????if?(2?==?arguments.length)?{
        ????????????IntWritable?filterParam?=?(IntWritable)?converters[1].convert(arguments[1].get());
        ????????????if?(1?==?filterParam.get())?filterStop?=?true;
        ????????}

        ????????Text?s?=?(Text)?converters[0].convert(arguments[0].get());
        ????????ArrayList?result?=?new?ArrayList<>();

        ????????if?(filterStop)?{
        ????????????for?(Term?words?:?DicAnalysis.parse(s.toString()).recognition(StopLibrary.get()))?{
        ????????????????if?(words.getName().trim().length()?>?0)?{
        ????????????????????result.add(new?Text(words.getName().trim()));
        ????????????????}
        ????????????}
        ????????}?else?{
        ????????????for?(Term?words?:?DicAnalysis.parse(s.toString()))?{
        ????????????????if?(words.getName().trim().length()?>?0)?{
        ????????????????????result.add(new?Text(words.getName().trim()));
        ????????????????}
        ????????????}
        ????????}
        ????????return?result;
        ????}


        ????@Override
        ????public?String?getDisplayString(String[]?children)?{
        ????????return?getStandardDisplayString("ansj_seg",?children);
        ????}
        }

        ikanalyzer的實現(xiàn)

        @Description(name?=?"ansj_seg",?value?=?"_FUNC_(str)?-?chinese?words?segment?using?Iknalyzer.?Return?list?of?words.",
        ????????extended?=?"Example:?select?_FUNC_('我是測試字符串')?from?src?limit?1;\n"
        ????????????????+?"[\"我\",?\"是\",?\"測試\",?\"字符串\"]")
        public?class?IknalyzerSeg?extends?GenericUDF?{
        ????private?transient?ObjectInspectorConverters.Converter[]?converters;
        ????//用來存放停用詞的集合
        ????Set?stopWordSet?=?new?HashSet();

        ????@Override
        ????public?ObjectInspector?initialize(ObjectInspector[]?arguments)?throws?UDFArgumentException?{
        ????????if?(arguments.length?1?||?arguments.length?>?2)?{
        ????????????throw?new?UDFArgumentLengthException(
        ????????????????????"The?function?AnsjSeg(str)?takes?1?or?2?arguments.");
        ????????}
        ????????//讀入停用詞文件
        ????????BufferedReader?StopWordFileBr?=?null;
        ????????try?{
        ????????????StopWordFileBr?=?new?BufferedReader(new?InputStreamReader(new?FileInputStream(new?File("stopwords/baidu_stopwords.txt"))));
        ????????????//初如化停用詞集
        ????????????String?stopWord?=?null;
        ????????????for(;?(stopWord?=?StopWordFileBr.readLine())?!=?null;){
        ????????????????stopWordSet.add(stopWord);
        ????????????}
        ????????}?catch?(FileNotFoundException?e)?{
        ????????????e.printStackTrace();
        ????????}?catch?(IOException?e)?{
        ????????????e.printStackTrace();
        ????????}

        ????????converters?=?new?ObjectInspectorConverters.Converter[arguments.length];
        ????????converters[0]?=?ObjectInspectorConverters.getConverter(arguments[0],?PrimitiveObjectInspectorFactory.writableStringObjectInspector);
        ????????if?(2?==?arguments.length)?{
        ????????????converters[1]?=?ObjectInspectorConverters.getConverter(arguments[1],?PrimitiveObjectInspectorFactory.writableIntObjectInspector);
        ????????}
        ????????return?ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableStringObjectInspector);

        ????}

        ????@Override
        ????public?Object?evaluate(DeferredObject[]?arguments)?throws?HiveException?{
        ????????boolean?filterStop?=?false;
        ????????if?(arguments[0].get()?==?null)?{
        ????????????return?null;
        ????????}
        ????????if?(2?==?arguments.length)?{
        ????????????IntWritable?filterParam?=?(IntWritable)?converters[1].convert(arguments[1].get());
        ????????????if?(1?==?filterParam.get())?filterStop?=?true;
        ????????}
        ????????Text?s?=?(Text)?converters[0].convert(arguments[0].get());
        ????????StringReader?reader?=?new?StringReader(s.toString());
        ????????IKSegmenter?iks?=?new?IKSegmenter(reader,?true);
        ????????List?list?=?new?ArrayList<>();
        ????????if?(filterStop)?{
        ????????????try?{
        ????????????????Lexeme?lexeme;
        ????????????????while?((lexeme?=?iks.next())?!=?null)?{
        ????????????????????if?(!stopWordSet.contains(lexeme.getLexemeText()))?{
        ????????????????????????list.add(new?Text(lexeme.getLexemeText()));
        ????????????????????}
        ????????????????}
        ????????????}?catch?(IOException?e)?{
        ????????????}
        ????????}?else?{
        ????????????try?{
        ????????????????Lexeme?lexeme;
        ????????????????while?((lexeme?=?iks.next())?!=?null)?{
        ????????????????????list.add(new?Text(lexeme.getLexemeText()));
        ????????????????}
        ????????????}?catch?(IOException?e)?{
        ????????????}
        ????????}
        ????????return?list;
        ????}

        ????@Override
        ????public?String?getDisplayString(String[]?children)?{
        ????????return?"Usage:?evaluate(String?str)";
        ????}
        }

        第四步:編寫測試用例

        GenericUDF 給我們提供了一些方法,這些方法可以用來構(gòu)建測試需要的環(huán)境和參數(shù),這樣我們就可以測試這些代碼了

        @Test
        public?void?testAnsjSegFunc()?throws?HiveException?{
        ????AnsjSeg?udf?=?new?AnsjSeg();
        ????ObjectInspector?valueOI0?=?PrimitiveObjectInspectorFactory.javaStringObjectInspector;
        ????ObjectInspector?valueOI1?=?PrimitiveObjectInspectorFactory.javaIntObjectInspector;
        ????ObjectInspector[]?init_args?=?{valueOI0,?valueOI1};
        ????udf.initialize(init_args);

        ????Text?str?=?new?Text("我是測試字符串");

        ????GenericUDF.DeferredObject?valueObj0?=?new?GenericUDF.DeferredJavaObject(str);
        ????GenericUDF.DeferredObject?valueObj1?=?new?GenericUDF.DeferredJavaObject(0);
        ????GenericUDF.DeferredObject[]?args?=?{valueObj0,?valueObj1};
        ????ArrayList?res?=?(ArrayList)?udf.evaluate(args);
        ????System.out.println(res);
        }


        @Test
        public?void?testIkSegFunc()?throws?HiveException?{
        ????IknalyzerSeg?udf?=?new?IknalyzerSeg();
        ????ObjectInspector?valueOI0?=?PrimitiveObjectInspectorFactory.javaStringObjectInspector;
        ????ObjectInspector?valueOI1?=?PrimitiveObjectInspectorFactory.javaIntObjectInspector;
        ????ObjectInspector[]?init_args?=?{valueOI0,?valueOI1};
        ????udf.initialize(init_args);

        ????Text?str?=?new?Text("我是測試字符串");

        ????GenericUDF.DeferredObject?valueObj0?=?new?GenericUDF.DeferredJavaObject(str);
        ????GenericUDF.DeferredObject?valueObj1?=?new?GenericUDF.DeferredJavaObject(0);
        ????GenericUDF.DeferredObject[]?args?=?{valueObj0,?valueObj1};
        ????ArrayList?res?=?(ArrayList)?udf.evaluate(args);
        ????System.out.println(res);
        }

        我們看到加載停用詞沒有找到,但是整體還是跑起來了,因為讀取不到HDFS 上的文件

        但是我們第二個樣例是不需要從HDFS 上加載停用詞信息,所以可以完美的測試運行

        后來為了能在外部更新文件,我將其放在了HDFS 上,和AnsjSeg 中的代碼一樣

        第五步:創(chuàng)建UDF 并使用

        add?jar?/Users/liuwenqiang/workspace/code/idea/HiveUDF/target/HiveUDF-0.0.4.jar;
        create?temporary?function?ansjSeg?as?'com.kingcall.bigdata.HiveUDF.AnsjSeg';
        select?ansjSeg("我是字符串,你是啥");
        --?開啟停用詞過濾
        select?ansjSeg("我是字符串,你是啥",1);
        create?temporary?function?ikSeg?as?'com.kingcall.bigdata.HiveUDF.IknalyzerSeg';
        select?ikSeg("我是字符串,你是啥");
        select?ikSeg("我是字符串,你是啥",1);

        上面方法的第二個參數(shù),就是是否開啟停用詞過濾,我們使用ikSeg函數(shù)演示一下

        下面我們嘗試獲取一下函數(shù)的描述信息

        如果沒有寫的話,就是下面的這樣的

        其它應用場景

        通過編寫Hive UDF可以輕松幫我們實現(xiàn)大量常見需求,其它應該場景還有:

        • ip地址轉(zhuǎn)地區(qū):將上報的用戶日志中的ip字段轉(zhuǎn)化為國家-省-市格式,便于做地域分布統(tǒng)計分析;

        • 使用Hive SQL計算的標簽數(shù)據(jù),不想編寫Spark程序,可以通過UDF在靜態(tài)代碼塊中初始化連接池,利用Hive啟動的并行MR任務,并行快速導入大量數(shù)據(jù)到codis中,應用于一些推薦業(yè)務;

        • 還有其它sql實現(xiàn)相對復雜的任務,都可以編寫永久Hive UDF進行轉(zhuǎn)化;

        總結(jié)

        1. 這一節(jié)我們學習了一個比較常見的UDF,通過實現(xiàn)GenericUDF 抽象類來實現(xiàn),這一節(jié)的重點在于代碼的實現(xiàn)以及對GenericUDF類中方法的理解

        2. 上面的代碼實現(xiàn)上有一個問題,那就是關(guān)于停用詞的加載,就是我們能不能動態(tài)加載停用詞呢?


        瀏覽 54
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

          <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            亚洲视频在线观看网站 | 午夜精品秘 一区二区三区 | 国产欧美亚洲视频 | 用力挺进她的花苞啊太视频 | 国偷自产av一区二区三区麻豆 | 天天拍天天干天天射 | 极品少妇一区二区三区 | 国产精品成人自产拍在线观看 | 操肏一区| 意大利黄色片 |