> 文章列表 > 结合Presto CLI,Presto Client学习

结合Presto CLI,Presto Client学习

结合Presto CLI,Presto Client学习

1. 序言

  • 作为Presto的客户端之一,Presto CLI是一个基于终端的交互式shell,它依赖Presto client向Presto server提交查询并获取查询执行进度、执行结果等。
  • Presto使用Master-Slave架构,对外提供服务的接口都在coordinator中,上面的描述可以改写为:Pressto client向coordinator提交查询
  • 笔者认为,在学习Presto CLI前,了解Presto client与coordinator之间如何进行HTTP通信是极有必要的
  • 官网有Presto Client的相关文档:Presto Client REST API,本文对Presto client的介绍也会参考该文档,并辅以Presto 0.279的源码

2. Presto client如何提交一个查询?

2.1 关键类StatementClientV1

  • StatementClientV1内含OkHttpClient,是实现client与coordinator通信的关键

  • 与Presto Client有关的代码中,大多直接使用client表示StatementClientV1

    class StatementClientV1 implements StatementClient
    
  • StatementClientV1的一些重要成员如下:

    private final OkHttpClient httpClient; 
    // 当前session的user,每次访问/v1/statement接口,都必须将user放在resquest header的X-Presto-User中
    private final String user; // 来自response的QueryResults,里面记录了查询的相关信息,如query_id、nextUri、data、error等
    private final AtomicReference<QueryResults> currentResults = new AtomicReference<>();// 来自response header,需要在之后的request header中使用
    private final AtomicReference<String> setCatalog = new AtomicReference<>();
    private final AtomicReference<String> setSchema = new AtomicReference<>();
    ... // 还有更多来自response header的属性,这里不再赘述// 初始化为RUNNING状态,还有CLIENT_ERROR、CLIENT_ABORTED、FINISHED三种状态
    private final AtomicReference<State> state = new AtomicReference<>(State.RUNNING);
    

2.2 提交一个查询

  • 创建StatementClientV1时,通过httpClient向coordinator发起一个查询请求,然后获取并处理请求结果

    1. 向coordinator的/v1/statement接口、以POST方式发起查询请求,body中包含query string
    2. coordinator的QueuedStatementResource.postStatement()方法将处理查询请求,并返回一个包含QueryResults的response
    3. 这个response将被转化为JsonResponse<QueryResults>,内含QueryResults类型的JSON文档、response headers等信息
    4. 执行processResponse()方法,使用response中的信息更新StatementClientV1
    public StatementClientV1(OkHttpClient httpClient, ClientSession session, String query)
    {... // 其他代码省略// 1. 构建request,用于向coordinator提交查询;此时查询尚未执,需要访问response中nextUri触发查询的执行Request request = buildQueryRequest(session, query);// 2. 提交查询,response将被封装为JsonResponse<QueryResults>JsonResponse<QueryResults> response = JsonResponse.execute(QUERY_RESULTS_CODEC, httpClient, request);// 如果发起查询的请求失败,更新client的状态并上抛异常if ((response.getStatusCode() != HTTP_OK) || !response.hasValue()) {state.compareAndSet(State.RUNNING, State.CLIENT_ERROR);throw requestFailedException("starting query", request, response);}// 3. 处理response,使用response header和body更新statementClientprocessResponse(response.getHeaders(), response.getValue());
    }
    
  • processResponse()方法的代码如下:

    1. 保存response header,新的request中将使用这些headrs
    2. respopnse中的QueryResults赋值给自身的currentResults字段
    private void processResponse(Headers headers, QueryResults results)
    {setCatalog.set(headers.get(PRESTO_SET_CATALOG));setSchema.set(headers.get(PRESTO_SET_SCHEMA));for (String setSession : headers.values(PRESTO_SET_SESSION)) {List<String> keyValue = SESSION_HEADER_SPLITTER.splitToList(setSession);if (keyValue.size() != 2) {continue;}setSessionProperties.put(keyValue.get(0), urlDecode(keyValue.get(1)));}resetSessionProperties.addAll(headers.values(PRESTO_CLEAR_SESSION));... // 对response header的记录,省略部分代码// 更新执行结果currentResults.set(results);
    }
    
  • StatementClientV1每次成功请求coordinator后,都将使用processResponse()方法更新自身

2.3 关于QueryResults

  • QueryResults就是查询结果,其重要字段如下:

    public class QueryResultsimplements QueryStatusInfo, QueryData
    {// qiery_id,YYYYMMdd_HHmmss_index_coordId,例如:20230405_045317_00000_tagb8private final String id; // 查询对应的web UI地址private final URI infoUri;// 决定查询是否结束的关键字段,如果nextUri不为null,则client需要访问nextUri以获取下一批查询结果private final URI nextUri;// 查询结果的列名和数据类型private final List<Column> columns; // 查询结果,一行对应一个List<Object>,并按照columns属性指定的顺序组织行中的列值private final Iterable<List<Object>> data; // 查询的状态,可能与server上的查询状态不同步。因此,切记不要使用该字段判断查询是否执行结束private final StatementStats stats; // 记录查询失败的原因,error不为null,表示查询失败private final QueryError error;// 表示操作的human-readable的字符串,例如,对于 CREATE TABLE 请求,updateType 将为“CREATE TABLE”private final String updateType;
    }
    
  • 由于使用JSON记录QueryResults的内容,当某些字段为null时,则JSON文档中不存在该字段

  • 例如,若JSON不包含nextUri,说明查询已结束,可能是成功也可能是失败;若JSON中包含nextUri,则还需要继续访问nextUri以获取下一批的查询结果

  • 以下是发起查询select * from presto_test.user_info limit 5时,response中的QueryResults数据示例:

    {"id": "20230405_052739_00010_tagb8","infoUri": "http://localhost:8080/ui/query.html?20230405_052739_00010_tagb8","nextUri": "http://localhost:8080/v1/statement/queued/20230405_052739_00010_tagb8/1?slug=xf2c83b975c12420fa978f47d5cf59404","stats": {"state": "WAITING_FOR_PREREQUISITES","waitingForPrerequisites": true,... // 其他stats信息省略},"warnings": []
    }
    

3. Presto client如何获取查询执行结果?

3.1 Presto查询的lazy execution机制

  • 笔者基本没见过直接调用REST API向Presto提交查询的,基本都是通过Presto CLI、jdbc、presto-python-client等封装好的client向Presto提交查询
  • 而笔者接触过的另一个OLAP组件Druid,访问一次REST API(/druid/v2/sql)就能提交查询并获取查询结果
  • 同时,在debug学习Presto CLI时,发现new StatementClientV1()之后,访问response中的infoUri(http://localhost:8080/ui/query.html?20230405_052739_00010_tagb8)提示Query not found
  • 这是因为Presot查询具有lazy execution机制,QueuedStatementResource.postStatement()方法的注释也证明了该说法:

    HTTP endpoint for submitting queries to the Presto Coordinator. Presto performs lazy execution. The submission of a query returns a placeholder for the result set, but the query gets scheduled/dispatched only when the client polls for results

  • 方法注释的大意:

    Presto中的查询采用lazy execution机制,创建StatementClientV1时,对/v1/statement的访问只是提交了一个查询。只有继续访问response中的nextUri,才会触发查询的执行

3.2 对nextUri的GET访问

  • Presto Client REST API,介绍HTTP Methods时有如下描述:

    A GET to the nextUri attribute returns the next batch of query results.

  • 这里的query results是指QueryResults,包括执行进度(QueryResults.stats)和执行结果( QueryResults.data),而非狭义的查询执行结果
  • 结合Presto查询的lazy execution机制,笔者认为对nextUri的GET访问还有一个关键作用 —— 推动查询的执行
  • 具体来说,StatementClientV1创建好以后,上层类将多次调用StatementClientV1.advance()方法,以访问response中nextUri,从而推动查询的执行并获取执行结果

3.3 StatementClientV1.advance()方法

  • advance()方法的代码如下:

    • 如果nextUri为null,说明查询结束,更新StatementClientV1的状态为FINISHED并退出advance()方法
    • 否则,以GET方式访问nextUri,获取下一批查询结果
    • response状态码response状态码为200,则表示请求成功;状态码为503,则表示Presto server处于繁忙状态,可以等待一小段时间后再次发起请求,直到请求成功或超时;状态码不是200或503,则表示请求失败
    public boolean advance()
    {... // 省略其他代码// nextUri为null,statementClient状态更新为FINISHEDURI nextUri = currentStatusInfo().getNextUri();if (nextUri == null) {state.compareAndSet(State.RUNNING, State.FINISHED);return false;}Request request = prepareRequest(HttpUrl.get(nextUri)).build();Exception cause = null;long start = System.nanoTime();long attempts = 0;//状态码503,说明server繁忙,允许多次尝试访问nextUri,直到请求超时while (true) {if (isClientAborted()) {return false;}Duration sinceStart = Duration.nanosSince(start);if (attempts > 0 && sinceStart.compareTo(requestTimeoutNanos) > 0) {state.compareAndSet(State.RUNNING, State.CLIENT_ERROR);throw new RuntimeException(format("Error fetching next (attempts: %s, duration: %s)", attempts, sinceStart), cause);}if (attempts > 0) {// back-off on retrytry {MILLISECONDS.sleep(attempts * 100);}catch (InterruptedException e) {try {close();}finally {Thread.currentThread().interrupt();}state.compareAndSet(State.RUNNING, State.CLIENT_ERROR);throw new RuntimeException("StatementClient thread was interrupted");}}attempts++;JsonResponse<QueryResults> response;try {response = JsonResponse.execute(QUERY_RESULTS_CODEC, httpClient, request);}catch (RuntimeException e) {cause = e;continue;}// 请求成功,处理response,退出advance()方法if ((response.getStatusCode() == HTTP_OK) && response.hasValue()) {processResponse(response.getHeaders(), response.getValue());return true;}// 状态码不是200或503,查询执行失败if (response.getStatusCode() != HTTP_UNAVAILABLE) {state.compareAndSet(State.RUNNING, State.CLIENT_ERROR);throw requestFailedException("fetching next", request, response);}}
    }
    

3.4 获取查询执行结果

  • Presto采用pipe-lined(流水线设计):有执行结果,coordinator就会返回给client

  • 这导致response中的执行结果可能是部分,因此在获取执行结果时,还需要多次调用advance()方法

  • 以Presto CLI打印执行结果为例,在按行构建并打印执行结果时,就将多次advance()方法

    public void processRows(StatementClient client)throws IOException
    {while (client.isRunning()) {Iterable<List<Object>> data = client.currentData().getData();if (data != null) {for (List<Object> row : data) {processRow(unmodifiableList(row)); // 超过MAX_BUFFERED_ROWS(10_000)则刷新输出}}// 超过MAX_BUFFER_TIME,也会刷新输出if (nanosSince(bufferStart).compareTo(MAX_BUFFER_TIME) >= 0) {flush(false);}// 访问nextUri,获取新的执行结果client.advance();}
    }
    

4. 其他

4.1 何时以DELETE方式访问nextUri?

  • Presto Client REST API,介绍HTTP Methods时有如下描述:

    A DELETE to nextUri terminates a running query

  • StatementClientV1的close()方法代码如下,若StatementClientV1处于RUNNING状态,则将其置为CLIENT_ABORTED状态,并通过DELETE方式取消查询

    @Override
    public void close()
    {// If the query is not done, abort the query.// 如果存在多线程,这里的CAS操作不一定成功?if (state.compareAndSet(State.RUNNING, State.CLIENT_ABORTED)) {URI uri = currentResults.get().getNextUri();if (uri != null) {httpDelete(uri);}}
    }private void httpDelete(URI uri)
    {Request request = prepareRequest(HttpUrl.get(uri)).delete().build();httpClient.newCall(request).enqueue(new NullCallback());
    }
    
  • 以Presto CLI为例,对close()方法的调用主要发生如下情况:

    1. 在查询被用户主动取消时,例如Query的renderOutput()方法和renderResults()方法
    2. 按页展示执行结果结束时,即Query.pageOutput()方法
    3. 关闭Query时,即Query.close()方法

4.2 CLIENT_ERROR状态如何处理?

  • 从StatementClientV1的构造函数和advance()方法可以看出,当请求失败时,都会将自身状态设置为CLIENT_ERROR并上抛一个RuntimeException

    // advance()方法中,当状态码不是200或503时的处理操作
    if (response.getStatusCode() != HTTP_UNAVAILABLE) {state.compareAndSet(State.RUNNING, State.CLIENT_ERROR);throw requestFailedException("fetching next", request, response);
    }
    
  • 在presto cli中,上抛的RuntimeException最终将被Console.process()方法中的catch语句捕获

  • 捕获到RuntimeException后,process()方法将打印失败信息并返回false

    catch (RuntimeException e) {System.err.println("Error running command: " + e.getMessage());if (queryRunner.isDebug()) {e.printStackTrace();}return false;
    }
    

4.3 debug模式跟踪response

  • 以下面的简单查询为例,以debug方式观察Presto CLI交互模式下,response中的QueryResults,以体会nextUri在Presto查询推进中的关键作用

    select * from presto_test.user_info limit 5
    

① 提交查询

  • new StatementClientV1()时,以POST方式向/v1/statement接口发送查询请求

    Request{method=POST, url=http://localhost:8080/v1/statement, tag=null}
    
  • 查询请求被QueuedStatementResource处理,返回的response如下:

    {"id": "20230415_030118_00000_sznzw","infoUri": "http://localhost:8080/ui/query.html?20230415_030118_00000_sznzw","nextUri": "http://localhost:8080/v1/statement/queued/20230415_030118_00000_sznzw/1?slug=xabd194690e1045648c1418e3817545a9","stats": {"state": "WAITING_FOR_PREREQUISITES","waitingForPrerequisites": true,"queued": false,"scheduled": false,"nodes": 0,"totalSplits": 0,"queuedSplits": 0,"runningSplits": 0,"completedSplits": 0,"cpuTimeMillis": 0,"wallTimeMillis": 0,"waitingForPrerequisitesTimeMillis": 0,"queuedTimeMillis": 0,"elapsedTimeMillis": 0,"processedRows": 0,"processedBytes": 0,"peakMemoryBytes": 0,"peakTotalMemoryBytes": 0,"peakTaskTotalMemoryBytes": 0,"spilledBytes": 0},"warnings": []
    }
    
  • 其中,nextUri指向QueuedStatementResource的API

    @GET
    @Path("/v1/statement/queued/{queryId}/{token}")
    @Produces(APPLICATION_JSON)
    public void getStatus(@PathParam("queryId") QueryId queryId,@PathParam("token") long token,@QueryParam("slug") String slug,... // 其他入参省略)
    

② 获取查询结果

  • presto-cli的StatusPrinter.printInitialStatusUpdates()方法,用于初始化并获取查询结果。它将调用client.advance()方法、通过nextUri获取查询结果

  • client.advance()方法中,基于nextUri,构建的GET方式的request如下:

    Request{method=GET, url=http://localhost:8080/v1/statement/queued/20230415_030118_00000_sznzw/1?slug=xabd194690e1045648c1418e3817545a9, tag=null}
    
  • 查询请求被QueuedStatementResource处理后,返回的response如下,内含nextUri和查询执行进度(stats字段)

    {"id": "20230415_030118_00000_sznzw","infoUri": "http://localhost:8080/ui/query.html?20230415_030118_00000_sznzw","nextUri": "http://localhost:8080/v1/statement/queued/20230415_030118_00000_sznzw/2?slug=xabd194690e1045648c1418e3817545a9","stats": {"state": "QUEUED","waitingForPrerequisites": false,"queued": false,"scheduled": false,"nodes": 0,"totalSplits": 0,"queuedSplits": 0,"runningSplits": 0,"completedSplits": 0,"cpuTimeMillis": 0,"wallTimeMillis": 0,"waitingForPrerequisitesTimeMillis": 93,"queuedTimeMillis": 20,"elapsedTimeMillis": 113,"processedRows": 0,"processedBytes": 0,"peakMemoryBytes": 0,"peakTotalMemoryBytes": 0,"peakTaskTotalMemoryBytes": 0,"spilledBytes": 0},"warnings": []
    }
    
  • 再次调用client.advance()方法,访问nextUri,获取查询结果

    Request{method=GET, url=http://localhost:8080/v1/statement/queued/20230415_030118_00000_sznzw/2?slug=xabd194690e1045648c1418e3817545a9, tag=null}
    
  • 查询请求被QueuedStatementResource处理后,返回的response如下:

    {"id": "20230415_030118_00000_sznzw","infoUri": "http://localhost:8080/ui/query.html?20230415_030118_00000_sznzw","partialCancelUri": "http://10.13.208.227:8080/v1/stage/20230415_030118_00000_sznzw.0","nextUri": "http://localhost:8080/v1/statement/executing/20230415_030118_00000_sznzw/1?slug=xabd194690e1045648c1418e3817545a9","columns": [{"name": "id","type": "integer","typeSignature": {"rawType": "integer","typeArguments": [],"literalArguments": [],"arguments": []}},{"name": "name","type": "varchar","typeSignature": {"rawType": "varchar","typeArguments": [],"literalArguments": [],"arguments": [{"kind": "LONG_LITERAL","value": 2147483647}]}},{"name": "password","type": "varchar","typeSignature": {"rawType": "varchar","typeArguments": [],"literalArguments": [],"arguments": [{"kind": "LONG_LITERAL","value": 2147483647}]}},{"name": "age","type": "integer","typeSignature": {"rawType": "integer","typeArguments": [],"literalArguments": [],"arguments": []}},{"name": "city","type": "varchar","typeSignature": {"rawType": "varchar","typeArguments": [],"literalArguments": [],"arguments": [{"kind": "LONG_LITERAL","value": 2147483647}]}}],"data": [[500000,"11120066","sunrise1",25,"成都"],[500000,"11120066","sunrise1",25,"成都"],[0,"43689316","rjjxhsmy",49,"成都"],[1,"97888912","eoggm9wn",26,"深圳"],[2,"01196315","laxoh1kv",29,"北京"]],"stats": {"state": "RUNNING","waitingForPrerequisites": false,"queued": false,"scheduled": true,"nodes": 1,"totalSplits": 21,"queuedSplits": 0,"runningSplits": 0,"completedSplits": 21,"cpuTimeMillis": 1035,"wallTimeMillis": 6175,"waitingForPrerequisitesTimeMillis": 93,"queuedTimeMillis": 42,"elapsedTimeMillis": 128320,"processedRows": 32312,"processedBytes": 932628,"peakMemoryBytes": 0,"peakTotalMemoryBytes": 771017,"peakTaskTotalMemoryBytes": 770483,"spilledBytes": 0,"rootStage": {"stageId": "0","state": "RUNNING","done": false,"nodes": 1,"totalSplits": 17,"queuedSplits": 0,"runningSplits": 0,"completedSplits": 17,"cpuTimeMillis": 74,"wallTimeMillis": 2302,"processedRows": 7,"processedBytes": 657,"subStages": [{"stageId": "1","state": "FINISHED","done": true,"nodes": 1,"totalSplits": 4,"queuedSplits": 0,"runningSplits": 0,"completedSplits": 4,"cpuTimeMillis": 961,"wallTimeMillis": 3873,"processedRows": 32312,"processedBytes": 932628,"subStages": []}]},"runtimeStats": {"S0-taskScheduledTimeNanos": {"name": "S0-taskScheduledTimeNanos","unit": "NANO","sum": 2302434797,"count": 1,"max": 2302434797,"min": 2302434797},"S1-taskScheduledTimeNanos": {"name": "S1-taskScheduledTimeNanos","unit": "NANO","sum": 3873131041,"count": 1,"max": 3873131041,"min": 3873131041},"getMaterializedViewTimeNanos": {"name": "getMaterializedViewTimeNanos","unit": "NANO","sum": 3377709,"count": 1,"max": 3377709,"min": 3377709},"S1-driverCountPerTask": {"name": "S1-driverCountPerTask","unit": "NONE","sum": 4,"count": 1,"max": 4,"min": 4},"getLayoutTimeNanos": {"name": "getLayoutTimeNanos","unit": "NANO","sum": 22208500,"count": 2,"max": 21863750,"min": 344750},"S0-taskBlockedTimeNanos": {"name": "S0-taskBlockedTimeNanos","unit": "NANO","sum": 55196190503,"count": 1,"max": 55196190503,"min": 55196190503},"S1-taskElapsedTimeNanos": {"name": "S1-taskElapsedTimeNanos","unit": "NANO","sum": 1681457292,"count": 1,"max": 1681457292,"min": 1681457292},"fragmentPlanTimeNanos": {"name": "fragmentPlanTimeNanos","unit": "NANO","sum": 159828875,"count": 1,"max": 159828875,"min": 159828875},"S0-taskQueuedTimeNanos": {"name": "S0-taskQueuedTimeNanos","unit": "NANO","sum": 1034353750,"count": 1,"max": 1034353750,"min": 1034353750},"S1-getSplitsTimeNanos": {"name": "S1-getSplitsTimeNanos","unit": "NANO","sum": 3207506458,"count": 1,"max": 3207506458,"min": 3207506458},"logicalPlannerTimeNanos": {"name": "logicalPlannerTimeNanos","unit": "NANO","sum": 397693959,"count": 1,"max": 397693959,"min": 397693959},"S0-driverCountPerTask": {"name": "S0-driverCountPerTask","unit": "NONE","sum": 17,"count": 1,"max": 17,"min": 17},"S0-taskElapsedTimeNanos": {"name": "S0-taskElapsedTimeNanos","unit": "NANO","sum": 0,"count": 1,"max": 0,"min": 0},"S1-taskQueuedTimeNanos": {"name": "S1-taskQueuedTimeNanos","unit": "NANO","sum": 89375917,"count": 1,"max": 89375917,"min": 89375917},"getViewTimeNanos": {"name": "getViewTimeNanos","unit": "NANO","sum": 312521125,"count": 1,"max": 312521125,"min": 312521125},"getTableMetadataTimeNanos": {"name": "getTableMetadataTimeNanos","unit": "NANO","sum": 39864292,"count": 1,"max": 39864292,"min": 39864292},"getTableHandleTimeNanos": {"name": "getTableHandleTimeNanos","unit": "NANO","sum": 3236875,"count": 1,"max": 3236875,"min": 3236875}},"progressPercentage": 100.0},"warnings": []
    }
    
  • 此时,response中已经包含查询执行结果,nextUri不再指向QueuedStatementResource的API,而是指向ExecutingStatementResource的API

    @GET
    @Path("/v1/statement/executing/{queryId}/{token}")
    @Produces(MediaType.APPLICATION_JSON)
    public void getQueryResults(@PathParam("queryId") QueryId queryId,@PathParam("token") long token,@QueryParam("slug") String slug,... // 其他入参省略)
    

③ 打印执行结果

  • 调用Query.renderQueryOutput()方法打印执行结果,最终将调用Query.pageOutput()方法按页打印执行结果

  • 打印执行结果时,会调用client.advance()方法访问nextUri获取更多的查询结果

    Request{method=GET, url=http://localhost:8080/v1/statement/executing/20230415_030118_00000_sznzw/1?slug=xabd194690e1045648c1418e3817545a9, tag=null}
    
  • 请求被ExecutingStatementResource处理后,返回的response如下

  • 此时,nextUri不存在,说明查询已结束;stats.state的值为FINISHED,说明查询在Presto集群中已结束

    {"id": "20230415_030118_00000_sznzw","infoUri": "http://localhost:8080/ui/query.html?20230415_030118_00000_sznzw","columns": [{"name": "id","type": "integer","typeSignature": {"rawType": "integer","typeArguments": [],"literalArguments": [],"arguments": []}},{"name": "name","type": "varchar","typeSignature": {"rawType": "varchar","typeArguments": [],"literalArguments": [],"arguments": [{"kind": "LONG_LITERAL","value": 2147483647}]}},{"name": "password","type": "varchar","typeSignature": {"rawType": "varchar","typeArguments": [],"literalArguments": [],"arguments": [{"kind": "LONG_LITERAL","value": 2147483647}]}},{"name": "age","type": "integer","typeSignature": {"rawType": "integer","typeArguments": [],"literalArguments": [],"arguments": []}},{"name": "city","type": "varchar","typeSignature": {"rawType": "varchar","typeArguments": [],"literalArguments": [],"arguments": [{"kind": "LONG_LITERAL","value": 2147483647}]}}],"stats": {"state": "FINISHED","waitingForPrerequisites": false,"queued": false,"scheduled": true,"nodes": 1,"totalSplits": 21,"queuedSplits": 0,"runningSplits": 0,"completedSplits": 21,"cpuTimeMillis": 1035,"wallTimeMillis": 6175,"waitingForPrerequisitesTimeMillis": 93,"queuedTimeMillis": 42,"elapsedTimeMillis": 128406,"processedRows": 32312,"processedBytes": 932628,"peakMemoryBytes": 0,"peakTotalMemoryBytes": 771017,"peakTaskTotalMemoryBytes": 770483,"spilledBytes": 0,"rootStage": {"stageId": "0","state": "FINISHED","done": true,"nodes": 1,"totalSplits": 17,"queuedSplits": 0,"runningSplits": 0,"completedSplits": 17,"cpuTimeMillis": 74,"wallTimeMillis": 2302,"processedRows": 7,"processedBytes": 657,"subStages": [{"stageId": "1","state": "FINISHED","done": true,"nodes": 1,"totalSplits": 4,"queuedSplits": 0,"runningSplits": 0,"completedSplits": 4,"cpuTimeMillis": 961,"wallTimeMillis": 3873,"processedRows": 32312,"processedBytes": 932628,"subStages": []}]},"runtimeStats": {"S0-taskScheduledTimeNanos": {"name": "S0-taskScheduledTimeNanos","unit": "NANO","sum": 2302434797,"count": 1,"max": 2302434797,"min": 2302434797},"S1-taskScheduledTimeNanos": {"name": "S1-taskScheduledTimeNanos","unit": "NANO","sum": 3873131041,"count": 1,"max": 3873131041,"min": 3873131041},"getMaterializedViewTimeNanos": {"name": "getMaterializedViewTimeNanos","unit": "NANO","sum": 3377709,"count": 1,"max": 3377709,"min": 3377709},"S1-driverCountPerTask": {"name": "S1-driverCountPerTask","unit": "NONE","sum": 4,"count": 1,"max": 4,"min": 4},"getLayoutTimeNanos": {"name": "getLayoutTimeNanos","unit": "NANO","sum": 22208500,"count": 2,"max": 21863750,"min": 344750},"S0-taskBlockedTimeNanos": {"name": "S0-taskBlockedTimeNanos","unit": "NANO","sum": 55196190503,"count": 1,"max": 55196190503,"min": 55196190503},"S1-taskElapsedTimeNanos": {"name": "S1-taskElapsedTimeNanos","unit": "NANO","sum": 1681457292,"count": 1,"max": 1681457292,"min": 1681457292},"fragmentPlanTimeNanos": {"name": "fragmentPlanTimeNanos","unit": "NANO","sum": 159828875,"count": 1,"max": 159828875,"min": 159828875},"S0-taskQueuedTimeNanos": {"name": "S0-taskQueuedTimeNanos","unit": "NANO","sum": 1034353750,"count": 1,"max": 1034353750,"min": 1034353750},"S1-getSplitsTimeNanos": {"name": "S1-getSplitsTimeNanos","unit": "NANO","sum": 3207506458,"count": 1,"max": 3207506458,"min": 3207506458},"logicalPlannerTimeNanos": {"name": "logicalPlannerTimeNanos","unit": "NANO","sum": 397693959,"count": 1,"max": 397693959,"min": 397693959},"S0-driverCountPerTask": {"name": "S0-driverCountPerTask","unit": "NONE","sum": 17,"count": 1,"max": 17,"min": 17},"S0-taskElapsedTimeNanos": {"name": "S0-taskElapsedTimeNanos","unit": "NANO","sum": 125757444000,"count": 1,"max": 125757444000,"min": 125757444000},"S1-taskQueuedTimeNanos": {"name": "S1-taskQueuedTimeNanos","unit": "NANO","sum": 89375917,"count": 1,"max": 89375917,"min": 89375917},"getViewTimeNanos": {"name": "getViewTimeNanos","unit": "NANO","sum": 312521125,"count": 1,"max": 312521125,"min": 312521125},"getTableMetadataTimeNanos": {"name": "getTableMetadataTimeNanos","unit": "NANO","sum": 39864292,"count": 1,"max": 39864292,"min": 39864292},"getTableHandleTimeNanos": {"name": "getTableHandleTimeNanos","unit": "NANO","sum": 3236875,"count": 1,"max": 3236875,"min": 3236875}},"progressPercentage": 100.0},"warnings": []
    }
    
  • 再次调用client.advance()方法,nextUri为null,将client的状态设置为FINISHED

    if (nextUri == null) {state.compareAndSet(State.RUNNING, State.FINISHED);return false;
    }
    
  • 上层应用发现StatementClientV1的状态为CLIENT_ERROR,都会调用

④ 打印finalInfo

  • 执行StatusPrinter.printFinalInfo()方法,打印finalInfofinalInfo实际来自最后一次response中的stats部分

    Query 20230415_030118_00000_sznzw, FINISHED, 1 node
    http://localhost:8080/ui/query.html?20230415_030118_00000_sznzw
    Splits: 21 total, 21 done (100.00%)
    CPU Time: 1.0s total, 31.2K rows/s,  880KB/s, 16% active
    Per Node: 0.0 parallelism,    27 rows/s,   783B/sParallelism: 0.0
    logicalPlannerTimeNanos: sum=398ms count=1 min=398ms max=398ms19:51 [32.3K rows, 911KB] [27 rows/s, 783B/s]
    

⑤ 总结

  • 通过debug模式不难看出,Presto Client的REST API不是一蹴而就的
  • Presto查询具备lazy execution特性,需要不断访问response中的nextUri才能触发查询的执行并获取查询结果