【问题标题】:Bigquery: Extract Job does not create fileBigquery:提取作业不创建文件
【发布时间】:2012-07-30 05:50:41
【问题描述】:

我正在开发一个使用 Bigquery 作为分析引擎的 Java 应用程序。能够使用Insert a Query Job 上的代码运行查询作业(并获得结果)。必须修改代码以在 stackoverflow 上使用 this comment 使用服务帐户。

现在,需要运行提取作业以将表导出到 GoogleStorage 上的存储桶。基于 Exporting a Table,能够修改 Java 代码以插入提取作业(代码如下)。运行时,提取作业的状态从 PENDING 变为 RUNNING 再到 DONE。问题是实际上没有文件上传到指定的bucket。

可能有用的信息:

  • createAuthorizedClient 函数返回一个 Bigquery 实例并适用于查询作业,因此服务帐户、私钥等可能没有问题。
  • 还尝试在google's api-explorer 上手动创建和运行插入作业,并在存储桶中成功创建文件。对项目、数据集、表和目标 uri 使用与代码中相同的值,因此这些值应该是正确的。

这是代码(粘贴完整的文件以防其他人发现它有用):

import java.io.File;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.Arrays;
import java.util.List;

import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson.JacksonFactory;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.Bigquery.Jobs.Insert;
import com.google.api.services.bigquery.BigqueryScopes;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfiguration;
import com.google.api.services.bigquery.model.JobConfigurationExtract;
import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.TableReference;

public class BigQueryJavaGettingStarted {
    
    private static final String PROJECT_ID = "123456789012";
    private static final String DATASET_ID = "MY_DATASET_NAME";
    private static final String TABLE_TO_EXPORT = "MY_TABLE_NAME";
    private static final String SERVICE_ACCOUNT_ID = "123456789012-...@developer.gserviceaccount.com";
    private static final File PRIVATE_KEY_FILE = new File("/path/to/privatekey.p12");
    private static final String DESTINATION_URI = "gs://mybucket/file.csv";

    private static final List<String> SCOPES =  Arrays.asList(BigqueryScopes.BIGQUERY);
    private static final HttpTransport TRANSPORT = new NetHttpTransport();
    private static final JsonFactory JSON_FACTORY = new JacksonFactory();
    
    public static void main (String[] args) {
        try {
            executeExtractJob();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static final void executeExtractJob() throws IOException, InterruptedException, GeneralSecurityException {
        Bigquery bigquery = createAuthorizedClient();
        
        //Create a new Extract job
        Job job = new Job();
        JobConfiguration config = new JobConfiguration();
        JobConfigurationExtract extractConfig = new JobConfigurationExtract();
        TableReference sourceTable = new TableReference();

        sourceTable.setProjectId(PROJECT_ID).setDatasetId(DATASET_ID).setTableId(TABLE_TO_EXPORT);
        extractConfig.setSourceTable(sourceTable);
        extractConfig.setDestinationUri(DESTINATION_URI);
        config.setExtract(extractConfig);
        job.setConfiguration(config);

        //Insert/Execute the created extract job
        Insert insert = bigquery.jobs().insert(PROJECT_ID, job);
        insert.setProjectId(PROJECT_ID);
        JobReference jobId = insert.execute().getJobReference();
        
        //Now check to see if the job has successfuly completed (Optional for extract jobs?)
        long startTime = System.currentTimeMillis();
        long elapsedTime;
        while (true) {
            Job pollJob = bigquery.jobs().get(PROJECT_ID, jobId.getJobId()).execute();
            elapsedTime = System.currentTimeMillis() - startTime;
            System.out.format("Job status (%dms) %s: %s\n", elapsedTime, jobId.getJobId(), pollJob.getStatus().getState());
            if (pollJob.getStatus().getState().equals("DONE")) {
                break;
            }
            //Wait a second before rechecking job status
            Thread.sleep(1000);
        }
        
    }

    private static Bigquery createAuthorizedClient() throws GeneralSecurityException, IOException {
        GoogleCredential credential = new GoogleCredential.Builder()
            .setTransport(TRANSPORT)
            .setJsonFactory(JSON_FACTORY)
            .setServiceAccountScopes(SCOPES)
            .setServiceAccountId(SERVICE_ACCOUNT_ID)
            .setServiceAccountPrivateKeyFromP12File(PRIVATE_KEY_FILE)
            .build();
        
        return Bigquery.builder(TRANSPORT, JSON_FACTORY)
            .setApplicationName("My Reports")
            .setHttpRequestInitializer(credential)
            .build();
    }
}

这是输出:

Job status (337ms) job_dc08f7327e3d48cc9b5ba708efe5b6b5: PENDING
...
Job status (9186ms) job_dc08f7327e3d48cc9b5ba708efe5b6b5: PENDING
Job status (10798ms) job_dc08f7327e3d48cc9b5ba708efe5b6b5: RUNNING
...
Job status (53952ms) job_dc08f7327e3d48cc9b5ba708efe5b6b5: RUNNING
Job status (55531ms) job_dc08f7327e3d48cc9b5ba708efe5b6b5: DONE

这是一张小桌子(大约 4MB),所以大约需要一分钟的工作似乎还可以。不知道为什么没有在存储桶中创建文件或如何进行调试。任何帮助将不胜感激。

正如 Craig 指出的,打印了 status.errorResult() 和 status.errors() 值。

  • getErrorResults(): {"message":"后端错误。作业中止。","re​​ason":"internalError"}
  • getErrors(): null

【问题讨论】:

    标签: google-bigquery


    【解决方案1】:

    看起来在写入路径时出现访问被拒绝错误:gs://pixalate_test/from_java.csv。您能否确保执行导出作业的用户对存储桶具有写入权限(并且该文件尚不存在)? 我已经就这个问题提交了一个内部 bigquery 错误……在这种情况下,我们应该给出一个更好的错误。 .

    【讨论】:

    • 谢谢乔丹。将存储桶权限更改为公共和提取作业完美运行。需要弄清楚如何将服务帐户用户添加到存储桶的 acl。当我弄清楚时会更新评论(以供将来搜索)。 (现在使用:gsutil setacl public-read-write gs://&lt;mybucket&gt;
    【解决方案2】:

    我认为问题出在您使用的存储桶名称上——上面的mybucket 只是一个示例,您需要将其替换为您在 Google 存储中实际拥有的存储桶。如果您以前从未使用过 GS,intro docs 会有所帮助。

    您的第二个问题是如何调试它——我建议在状态设置为DONE 后查看返回的Job 对象。以错误结尾的作业仍然会进入DONE 状态,不同之处在于它们附加了错误结果,因此job.getStatus().hasErrorResult() 应该为真。 (我从未使用过 Java 客户端库,所以我猜测该方法名称。)您可以在 jobs docs 中找到更多信息。

    【讨论】:

    • 感谢您的快速回复。 1-我使用的是实际的存储桶名称而不是“mybucket”,所以这不是问题。 2- 感谢您指出错误结果。打印出来。这是输出(用于投票作业): getErrorResults(): {"message":"Backend error. Job aborted.","re​​ason":"internalError"} getErrors(): null 有什么想法吗?
    • 啊哈 - 重读这个问题,我想知道您是否有权访问存储桶,但您使用的服务帐户没有?就像检查一样,您能否将存储桶设置为世界可写并重新运行作业,以至少缩小范围?
    • 谢谢克雷格。是的,这是一个权限问题。由于我只能接受一个答案,我在下面接受乔丹的答案。
    • 为了更新bucket acl,你需要做一个gsutil getacl gs://mybucket &gt;acls.xml,添加一个UserByEmail范围的条目,然后gsutil setacl
    【解决方案3】:

    还有一个区别,我注意到您没有将作业类型传递为 config.setJobType(JOB_TYPE); 其中常量是私有静态最终字符串 JOB_TYPE = "extract"; json也是一样,需要设置格式。

    【讨论】:

      【解决方案4】:

      我遇到了同样的问题。但结果是我输入了错误的表名。但是,Google 并没有生成一条错误消息,说“该表不存在”。这会帮助我找到我的问题。

      谢谢!

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2016-02-01
        • 2014-10-13
        • 1970-01-01
        • 2018-02-22
        • 1970-01-01
        相关资源
        最近更新 更多