1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        Java封裝 bigQuery 查詢工具類,有用

        共 10255字,需瀏覽 21分鐘

         ·

        2022-03-06 00:43

        點擊上方“程序員大白”,選擇“星標(biāo)”公眾號

        重磅干貨,第一時間送達

        緣起

        最近在公司基于bigQuery開發(fā)埋點數(shù)據(jù)分析功能,所以總結(jié)一下自己封裝的bigQuery查詢工具類(網(wǎng)上關(guān)于bigQuery的文章比較少)

        關(guān)于bigQuery的概念功能可以參考:

        https://cloud.google.com/bigquery/docs?hl=zh-CN

        在示例那包含了很多操作

        示例中一段查詢代碼

        import?com.google.cloud.bigquery.BigQuery;
        import?com.google.cloud.bigquery.BigQueryOptions;
        import?com.google.cloud.bigquery.FieldValueList;
        import?com.google.cloud.bigquery.Job;
        import?com.google.cloud.bigquery.JobId;
        import?com.google.cloud.bigquery.JobInfo;
        import?com.google.cloud.bigquery.QueryJobConfiguration;
        import?com.google.cloud.bigquery.TableResult;
        import?java.util.UUID;

        public?class?SimpleApp?{
        ??public?static?void?main(String...?args)?throws?Exception?{
        ????BigQuery?bigquery?=?BigQueryOptions.getDefaultInstance().getService();
        ????QueryJobConfiguration?queryConfig?=
        ????????QueryJobConfiguration.newBuilder(
        ????????????????"SELECT?commit,?author,?repo_name?"
        ????????????????????+?"FROM?`bigquery-public-data.github_repos.commits`?"
        ????????????????????+?"WHERE?subject?like?'%bigquery%'?"
        ????????????????????+?"ORDER?BY?subject?DESC?LIMIT?10")
        ????????????//?Use?standard?SQL?syntax?for?queries.
        ????????????//?See:?https://cloud.google.com/bigquery/sql-reference/
        ????????????.setUseLegacySql(false)
        ????????????.build();

        ????//?Create?a?job?ID?so?that?we?can?safely?retry.
        ????JobId?jobId?=?JobId.of(UUID.randomUUID().toString());
        ????Job?queryJob?=?bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

        ????//?Wait?for?the?query?to?complete.
        ????queryJob?=?queryJob.waitFor();

        ????//?Check?for?errors
        ????if?(queryJob?==?null)?{
        ??????throw?new?RuntimeException("Job?no?longer?exists");
        ????}?else?if?(queryJob.getStatus().getError()?!=?null)?{
        ??????//?You?can?also?look?at?queryJob.getStatus().getExecutionErrors()?for?all
        ??????//?errors,?not?just?the?latest?one.
        ??????throw?new?RuntimeException(queryJob.getStatus().getError().toString());
        ????}

        ????//?Get?the?results.
        ????TableResult?result?=?queryJob.getQueryResults();

        ????//?Print?all?pages?of?the?results.
        ????for?(FieldValueList?row?:?result.iterateAll())?{
        ??????//?String?type
        ??????String?commit?=?row.get("commit").getStringValue();
        ??????//?Record?type
        ??????FieldValueList?author?=?row.get("author").getRecordValue();
        ??????String?name?=?author.get("name").getStringValue();
        ??????String?email?=?author.get("email").getStringValue();
        ??????//?String?Repeated?type
        ??????String?repoName?=?row.get("repo_name").getRecordValue().get(0).getStringValue();
        ??????System.out.printf(
        ??????????"Repo?name:?%s?Author?name:?%s?email:?%s?commit:?%s\n",?repoName,?name,?email,?commit);
        ????}
        ??}
        }

        以上這段查詢的代碼,給我的感覺就是步驟都是類似的,都是創(chuàng)建一個JOB,等待查詢,處理查詢的結(jié)果集。如果我每寫一個查詢都要寫這一大坨,那簡直要惡心死。

        其實以上的查詢代碼除了SQL和結(jié)果集的處理不一樣,其他的都是一樣,基于這一點來封裝一個 bigQuery 查詢工具類,想要達到的效果就是:我給你一段SQL,你給我處理好的結(jié)果。

        Maven 依賴

        首先需要引入 Maven 依賴,當(dāng)然這個在 bigQuery 官方示例上是有的

        <dependencyManagement>
        ??<dependencies>
        ????<dependency>
        ??????<groupId>com.google.cloudgroupId>
        ??????<artifactId>libraries-bomartifactId>
        ??????<version>24.0.0version>
        ??????<type>pomtype>
        ??????<scope>importscope>
        ????dependency>
        ??dependencies>
        dependencyManagement>

        <dependencies>
        ??<dependency>
        ????<groupId>com.google.cloudgroupId>
        ????<artifactId>google-cloud-bigqueryartifactId>
        ??dependency>

        設(shè)置身份驗證

        要運行客戶端庫,必須先設(shè)置身份驗證,也就是說需要你的服務(wù)帳號密鑰才能去連接和操作bigQuery,官方文檔中也說到這點,他提供的方案是:設(shè)置環(huán)境變量 GOOGLE_APPLICATION_CREDENTIALS 向應(yīng)用代碼提供身份驗證憑據(jù)

        我覺得這種方式對于我們來說不太友好,我不能在服務(wù)器上設(shè)置環(huán)境變量,我們現(xiàn)在都是微服務(wù),部署在k8s上,所以這種方案也不知道如何使用(會這種方式的小伙伴一定要告訴我怎么使用)

        所以我的做法是將 身份驗證憑據(jù) JSON 文件加在項目 resource 里,通過流的方式讀取憑據(jù)。

        @Value(value?=?"classpath:netpop-e792a-data-analytics.json")
        private?Resource?dataAnalyticsResource;

        配置 BigQuery Bean

        由上面那段查詢案例可知,重要的一個 Bean 就是 BigQuery,所以把這個Bean 注冊到IOC容器中。

        @Configuration
        public?class?BigQueryConfiguration?{

        ????//?加載?身份驗證憑據(jù)
        ????@Value(value?=?"classpath:netpop-e792a-data-analytics.json")
        ????private?Resource?dataAnalyticsResource;

        ????//?配置核心Bean
        ????@Bean
        ????BigQuery?bigQuery()?throws?IOException?{
        ????????GoogleCredentials?credentials?=?GoogleCredentials.fromStream(dataAnalyticsResource.getInputStream());
        ????????BigQuery?bigquery?=?BigQueryOptions.newBuilder().setCredentials(credentials).build().getService();
        ????????return?bigquery;
        ????}

        ????//?將bigQuery?分裝工具類注冊到IOC容器中
        ????@Bean
        ????BigQueryHelper?bigQueryHelper(@Autowired?BigQuery?bigQuery)?{
        ????????return?new?BigQueryHelper(bigQuery);
        ????}

        }

        工具類


        package?groot.data.analysis.support;

        import?com.google.cloud.bigquery.*;
        import?lombok.extern.slf4j.Slf4j;
        import?org.springframework.util.Assert;
        import?org.springframework.util.ReflectionUtils;

        import?java.lang.reflect.InvocationTargetException;
        import?java.math.BigDecimal;
        import?java.util.*;

        /**
        ?*?@Classname?BigQueryHelper
        ?*?@Description
        ?*?@Created?by?wangchangjiu
        ?*/

        @Slf4j
        public?class?BigQueryHelper?{

        ????private?BigQuery?bigQuery;

        ????public?BigQueryHelper()?{
        ????}

        ????public?BigQueryHelper(BigQuery?bigQuery)?{
        ????????this.bigQuery?=?bigQuery;
        ????}


        ????/**
        ?????*?獲取列表?返回類型的字段不支持復(fù)雜類型
        ?????*
        ?????*?@param?sql
        ?????*?@param?returnType
        ?????*?@param?
        ?????*?@return
        ?????*?@throws?InterruptedException
        ?????*/

        ????public??List?queryForList(String?sql,?Class?returnType)?throws?InterruptedException?{
        ????????TableResult?result?=?execute(sql);
        ????????Map?fieldMap?=?getStringFieldMap(result);
        ????????List?results?=?new?ArrayList<>();
        ????????result.iterateAll().forEach(row?->?{
        ????????????T?returnObj;
        ????????????try?{
        ????????????????returnObj?=?returnType.getDeclaredConstructor().newInstance();
        ????????????}?catch?(InstantiationException?|?IllegalAccessException?|?InvocationTargetException?|?NoSuchMethodException?ex)?{
        ????????????????throw?new?RuntimeException("reflect?create?object?error?:",?ex);
        ????????????}
        ????????????ReflectionUtils.doWithFields(returnType,?field?->?{
        ????????????????Field?bigQueryField?=?fieldMap.get(field.getName());
        ????????????????if?(bigQueryField?!=?null)?{
        ????????????????????FieldValue?fieldValue?=?row.get(bigQueryField.getName());
        ????????????????????if?(bigQueryField.getType().getStandardType()?==?StandardSQLTypeName.STRUCT)?{
        ????????????????????????throw?new?UnsupportedOperationException("unsupported?returnType?field?include?complex?types");
        ????????????????????}
        ????????????????????field.setAccessible(true);
        ????????????????????ReflectionUtils.setField(field,?returnObj,?resultWrapper(fieldValue,?field.getType()));
        ????????????????}
        ????????????});
        ????????????results.add(returnObj);
        ????????});
        ????????return?results;
        ????}

        ????/**
        ?????*??字段名和字段映射
        ?????*?@param?result
        ?????*?@return
        ?????*/

        ????private?Map?getStringFieldMap(TableResult?result)?{
        ????????FieldList?fieldList?=?result.getSchema().getFields();
        ????????Map?fieldMap?=?new?HashMap<>(fieldList.size());
        ????????for?(int?i?=?0;?i?????????????Field?field?=?fieldList.get(i);
        ????????????fieldMap.put(field.getName(),?field);
        ????????}
        ????????return?fieldMap;
        ????}

        ????/**
        ?????*??執(zhí)行SQL?獲取結(jié)果集
        ?????*?@param?sql
        ?????*?@return
        ?????*?@throws?InterruptedException
        ?????*/

        ????private?TableResult?execute(String?sql)?throws?InterruptedException?{
        ????????Assert.notNull(sql,?"SQL?must?not?be?null");
        ????????QueryJobConfiguration?queryConfig?=?QueryJobConfiguration.newBuilder(sql).setUseLegacySql(false).build();

        ????????//?Create?a?job?ID?so?that?we?can?safely?retry.
        ????????JobId?jobId?=?JobId.of(UUID.randomUUID().toString());
        ????????Job?queryJob?=?bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

        ????????//?Wait?for?the?query?to?complete.
        ????????queryJob?=?queryJob.waitFor();

        ????????if?(queryJob?==?null)?{
        ????????????throw?new?RuntimeException("Job?no?longer?exists");
        ????????}?else?if?(queryJob.getStatus().getError()?!=?null)?{
        ????????????throw?new?RuntimeException(queryJob.getStatus().getError().toString());
        ????????}
        ????????//?Get?the?results.
        ????????return?queryJob.getQueryResults();
        ????}


        ????/**
        ?????*??查詢列表,實現(xiàn)?ResultSetExtractor?接口?自定義提取數(shù)據(jù)
        ?????*?@param?sql
        ?????*?@param?rse
        ?????*?@param?
        ?????*?@return
        ?????*?@throws?InterruptedException
        ?????*/

        ????public??List?queryForList(String?sql,?ResultSetExtractor?rse)?throws?InterruptedException?{
        ????????TableResult?tableResult?=?execute(sql);
        ????????List?results?=?new?ArrayList<>();
        ????????tableResult.iterateAll().forEach(row?->?results.add(rse.extractData(row)));
        ????????return?results;
        ????}

        ????/**
        ?????*??查詢返回單個結(jié)果集
        ?????*?@param?sql
        ?????*?@param?returnType
        ?????*?@param?
        ?????*?@return
        ?????*?@throws?InterruptedException
        ?????*/

        ????public??T?queryForSingleResult(String?sql,?Class?returnType)?throws?InterruptedException?{
        ????????TableResult?tableResult?=?execute(sql);
        ????????if?(tableResult.iterateAll().iterator().hasNext())?{
        ????????????//?只有一行
        ????????????FieldValueList?fieldValues?=?tableResult.iterateAll().iterator().next();
        ????????????if?(isBasicType(returnType))?{
        ????????????????return?(T)?resultWrapper(fieldValues.get(0),?returnType);
        ????????????}?else?{
        ????????????????T?returnObj;
        ????????????????try?{
        ????????????????????returnObj?=?returnType.getDeclaredConstructor().newInstance();
        ????????????????}?catch?(InstantiationException?|?IllegalAccessException?|?InvocationTargetException?|?NoSuchMethodException?ex)?{
        ????????????????????throw?new?RuntimeException("reflect?create?object?error?:",?ex);
        ????????????????}

        ????????????????Map?fieldMap?=?getStringFieldMap(tableResult);
        ????????????????ReflectionUtils.doWithFields(returnType,?field?->?{
        ????????????????????Field?bigQueryField?=?fieldMap.get(field.getName());
        ????????????????????if?(bigQueryField?!=?null)?{
        ????????????????????????FieldValue?fieldValue?=?fieldValues.get(bigQueryField.getName());
        ????????????????????????if?(bigQueryField.getType().getStandardType()?==?StandardSQLTypeName.STRUCT)?{
        ????????????????????????????throw?new?UnsupportedOperationException("unsupported?returnType?field?include?complex?types");
        ????????????????????????}
        ????????????????????????field.setAccessible(true);
        ????????????????????????ReflectionUtils.setField(field,?returnObj,?resultWrapper(fieldValue,?field.getType()));
        ????????????????????}
        ????????????????});
        ????????????????return?returnObj;
        ????????????}
        ????????}
        ????????return?null;
        ????}

        ????/**
        ?????*??結(jié)果類型處理
        ?????*?@param?fieldValue
        ?????*?@param?returnType
        ?????*?@return
        ?????*/

        ????private?Object?resultWrapper(FieldValue?fieldValue,?Class?returnType)?{
        ????????if?(returnType?==?Boolean.class?||?returnType?==?boolean.class)?{
        ????????????return?fieldValue.getBooleanValue();
        ????????}?else?if?(returnType?==?Long.class?||?returnType?==?long.class)?{
        ????????????return?fieldValue.getLongValue();
        ????????}?else?if?(returnType?==?Double.class?||?returnType?==?double.class)?{
        ????????????return?fieldValue.getDoubleValue();
        ????????}?else?if?(returnType?==?BigDecimal.class)?{
        ????????????return?fieldValue.getNumericValue();
        ????????}?else?if?(returnType?==?String.class)?{
        ????????????return?fieldValue.getStringValue();
        ????????}
        ????????return?fieldValue.getValue();
        ????}

        ????/**
        ?????*??判斷是否是簡單類型
        ?????*?@param?returnType
        ?????*?@param?
        ?????*?@return
        ?????*/

        ????private??boolean?isBasicType(Class?returnType)?{
        ????????return?returnType?==?String.class?||?returnType.isPrimitive()
        ????????????????||?returnType?
        ==?Boolean.class?||?returnType?==?Byte.class
        ????????????????||?returnType?
        ==?Integer.class?||?returnType?==?Long.class
        ????????????????||?returnType?
        ==?Double.class?||?returnType?==?Short.class
        ????????????????||?returnType?
        ==?Float.class?||?returnType?==?BigDecimal.class;
        ????}

        }

        這里對外主要提供了

        //?獲取列表?返回類型的字段不支持復(fù)雜類型
        public??List?queryForList(String?sql,?Class?returnType)?throws?InterruptedException?

        //?查詢列表,實現(xiàn)?ResultSetExtractor?接口?自定義提取數(shù)據(jù)
        public??List?queryForList(String?sql,?ResultSetExtractor?rse)?throws?InterruptedException

        //?查詢返回單個結(jié)果集
        public??T?queryForSingleResult(String?sql,?Class?returnType)?throws?InterruptedException?

        我這里主要的思想就是利用反射創(chuàng)建目標(biāo)對象,將字段賦值進去

        當(dāng)然這里不支持返回類型是對象嵌套對象的形式,原因是那種比較復(fù)雜,而且我現(xiàn)在這里也沒有這種場景。

        還有就是這里沒有支持分頁等其他操作

        使用工具類

        使用的話就是注入 bigQueryHelper 工具類

        以上就是簡單的一個bigQuery 分裝類,當(dāng)然還可以進一步優(yōu)化封裝的更全一點。

        來源:juejin.cn/post/7036337598025072653


        13個你一定要知道的PyTorch特性

        解讀:為什么要做特征歸一化/標(biāo)準(zhǔn)化?

        一文搞懂 PyTorch 內(nèi)部機制

        張一鳴:每個逆襲的年輕人,都具備的底層能力


        關(guān)


        ,學(xué)西學(xué)學(xué)質(zhì),結(jié)關(guān)[],學(xué)習(xí)!


        瀏覽 65
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            三级片在线视频观看 | www色色色com 免费 成人 外 | 丰满女邻居做爰全黄 | 国产精品嫩草在线 | 搞搞电影 | 亚洲成人性爱网 | 性一交一乱一区二区洋洋av | 外国操逼 | 99国产精品久久久久久久 | 小黄文在线观看 |