结合Presto CLI,Presto Client学习
1. 序言
- 作为Presto的客户端之一,Presto CLI是一个基于终端的交互式shell,它依赖Presto client向Presto server提交查询并获取查询执行进度、执行结果等。
- Presto使用Master-Slave架构,对外提供服务的接口都在coordinator中,上面的描述可以改写为:Pressto client向coordinator提交查询
- 笔者认为,在学习Presto CLI前,了解Presto client与coordinator之间如何进行HTTP通信是极有必要的
- 官网有Presto Client的相关文档:Presto Client REST API,本文对Presto client的介绍也会参考该文档,并辅以Presto 0.279的源码
2. Presto client如何提交一个查询?
2.1 关键类StatementClientV1
-
StatementClientV1
内含OkHttpClient
,是实现client与coordinator通信的关键 -
与Presto Client有关的代码中,大多直接使用client表示StatementClientV1
class StatementClientV1 implements StatementClient
-
StatementClientV1的一些重要成员如下:
private final OkHttpClient httpClient; // 当前session的user,每次访问/v1/statement接口,都必须将user放在resquest header的X-Presto-User中 private final String user; // 来自response的QueryResults,里面记录了查询的相关信息,如query_id、nextUri、data、error等 private final AtomicReference<QueryResults> currentResults = new AtomicReference<>();// 来自response header,需要在之后的request header中使用 private final AtomicReference<String> setCatalog = new AtomicReference<>(); private final AtomicReference<String> setSchema = new AtomicReference<>(); ... // 还有更多来自response header的属性,这里不再赘述// 初始化为RUNNING状态,还有CLIENT_ERROR、CLIENT_ABORTED、FINISHED三种状态 private final AtomicReference<State> state = new AtomicReference<>(State.RUNNING);
2.2 提交一个查询
-
创建StatementClientV1时,通过httpClient向coordinator发起一个查询请求,然后获取并处理请求结果
- 向coordinator的
/v1/statement
接口、以POST方式发起查询请求,body中包含query string - coordinator的
QueuedStatementResource.postStatement()
方法将处理查询请求,并返回一个包含QueryResults
的response - 这个response将被转化为
JsonResponse<QueryResults>
,内含QueryResults类型的JSON文档、response headers等信息 - 执行processResponse()方法,使用response中的信息更新StatementClientV1
public StatementClientV1(OkHttpClient httpClient, ClientSession session, String query) {... // 其他代码省略// 1. 构建request,用于向coordinator提交查询;此时查询尚未执,需要访问response中nextUri触发查询的执行Request request = buildQueryRequest(session, query);// 2. 提交查询,response将被封装为JsonResponse<QueryResults>JsonResponse<QueryResults> response = JsonResponse.execute(QUERY_RESULTS_CODEC, httpClient, request);// 如果发起查询的请求失败,更新client的状态并上抛异常if ((response.getStatusCode() != HTTP_OK) || !response.hasValue()) {state.compareAndSet(State.RUNNING, State.CLIENT_ERROR);throw requestFailedException("starting query", request, response);}// 3. 处理response,使用response header和body更新statementClientprocessResponse(response.getHeaders(), response.getValue()); }
- 向coordinator的
-
processResponse()方法的代码如下:
- 保存response header,新的request中将使用这些headrs
- respopnse中的QueryResults赋值给自身的
currentResults
字段
private void processResponse(Headers headers, QueryResults results) {setCatalog.set(headers.get(PRESTO_SET_CATALOG));setSchema.set(headers.get(PRESTO_SET_SCHEMA));for (String setSession : headers.values(PRESTO_SET_SESSION)) {List<String> keyValue = SESSION_HEADER_SPLITTER.splitToList(setSession);if (keyValue.size() != 2) {continue;}setSessionProperties.put(keyValue.get(0), urlDecode(keyValue.get(1)));}resetSessionProperties.addAll(headers.values(PRESTO_CLEAR_SESSION));... // 对response header的记录,省略部分代码// 更新执行结果currentResults.set(results); }
-
StatementClientV1每次成功请求coordinator后,都将使用processResponse()方法更新自身
2.3 关于QueryResults
-
QueryResults就是查询结果,其重要字段如下:
public class QueryResultsimplements QueryStatusInfo, QueryData {// qiery_id,YYYYMMdd_HHmmss_index_coordId,例如:20230405_045317_00000_tagb8private final String id; // 查询对应的web UI地址private final URI infoUri;// 决定查询是否结束的关键字段,如果nextUri不为null,则client需要访问nextUri以获取下一批查询结果private final URI nextUri;// 查询结果的列名和数据类型private final List<Column> columns; // 查询结果,一行对应一个List<Object>,并按照columns属性指定的顺序组织行中的列值private final Iterable<List<Object>> data; // 查询的状态,可能与server上的查询状态不同步。因此,切记不要使用该字段判断查询是否执行结束private final StatementStats stats; // 记录查询失败的原因,error不为null,表示查询失败private final QueryError error;// 表示操作的human-readable的字符串,例如,对于 CREATE TABLE 请求,updateType 将为“CREATE TABLE”private final String updateType; }
-
由于使用JSON记录QueryResults的内容,当某些字段为
null
时,则JSON文档中不存在该字段 -
例如,若JSON不包含nextUri,说明查询已结束,可能是成功也可能是失败;若JSON中包含nextUri,则还需要继续访问nextUri以获取下一批的查询结果
-
以下是发起查询
select * from presto_test.user_info limit 5
时,response中的QueryResults数据示例:{"id": "20230405_052739_00010_tagb8","infoUri": "http://localhost:8080/ui/query.html?20230405_052739_00010_tagb8","nextUri": "http://localhost:8080/v1/statement/queued/20230405_052739_00010_tagb8/1?slug=xf2c83b975c12420fa978f47d5cf59404","stats": {"state": "WAITING_FOR_PREREQUISITES","waitingForPrerequisites": true,... // 其他stats信息省略},"warnings": [] }
3. Presto client如何获取查询执行结果?
3.1 Presto查询的lazy execution机制
- 笔者基本没见过直接调用REST API向Presto提交查询的,基本都是通过Presto CLI、jdbc、presto-python-client等封装好的client向Presto提交查询
- 而笔者接触过的另一个OLAP组件Druid,访问一次REST API(
/druid/v2/sql
)就能提交查询并获取查询结果 - 同时,在debug学习Presto CLI时,发现new StatementClientV1()之后,访问response中的infoUri(
http://localhost:8080/ui/query.html?20230405_052739_00010_tagb8
)提示Query not found
- 这是因为Presot查询具有lazy execution机制,
QueuedStatementResource.postStatement()
方法的注释也证明了该说法:
HTTP endpoint for submitting queries to the Presto Coordinator. Presto performs
lazy execution
. The submission of a query returns a placeholder for the result set, but the query gets scheduled/dispatched only when the client polls for results - 方法注释的大意:
Presto中的查询采用
lazy execution
机制,创建StatementClientV1时,对/v1/statement
的访问只是提交了一个查询。只有继续访问response中的nextUri,才会触发查询的执行
3.2 对nextUri的GET访问
- Presto Client REST API,介绍HTTP Methods时有如下描述:
A GET to the nextUri attribute returns the next batch of query results.
- 这里的
query results
是指QueryResults,包括执行进度(QueryResults.stats
)和执行结果(QueryResults.data
),而非狭义的查询执行结果 - 结合Presto查询的lazy execution机制,笔者认为对nextUri的GET访问还有一个关键作用 —— 推动查询的执行
- 具体来说,StatementClientV1创建好以后,上层类将多次调用
StatementClientV1.advance()
方法,以访问response中nextUri,从而推动查询的执行并获取执行结果
3.3 StatementClientV1.advance()方法
-
advance()方法的代码如下:
- 如果nextUri为
null
,说明查询结束,更新StatementClientV1的状态为FINISHED
并退出advance()方法 - 否则,以GET方式访问nextUri,获取下一批查询结果
- response状态码response状态码为200,则表示请求成功;状态码为503,则表示Presto server处于繁忙状态,可以等待一小段时间后再次发起请求,直到请求成功或超时;状态码不是200或503,则表示请求失败
public boolean advance() {... // 省略其他代码// nextUri为null,statementClient状态更新为FINISHEDURI nextUri = currentStatusInfo().getNextUri();if (nextUri == null) {state.compareAndSet(State.RUNNING, State.FINISHED);return false;}Request request = prepareRequest(HttpUrl.get(nextUri)).build();Exception cause = null;long start = System.nanoTime();long attempts = 0;//状态码503,说明server繁忙,允许多次尝试访问nextUri,直到请求超时while (true) {if (isClientAborted()) {return false;}Duration sinceStart = Duration.nanosSince(start);if (attempts > 0 && sinceStart.compareTo(requestTimeoutNanos) > 0) {state.compareAndSet(State.RUNNING, State.CLIENT_ERROR);throw new RuntimeException(format("Error fetching next (attempts: %s, duration: %s)", attempts, sinceStart), cause);}if (attempts > 0) {// back-off on retrytry {MILLISECONDS.sleep(attempts * 100);}catch (InterruptedException e) {try {close();}finally {Thread.currentThread().interrupt();}state.compareAndSet(State.RUNNING, State.CLIENT_ERROR);throw new RuntimeException("StatementClient thread was interrupted");}}attempts++;JsonResponse<QueryResults> response;try {response = JsonResponse.execute(QUERY_RESULTS_CODEC, httpClient, request);}catch (RuntimeException e) {cause = e;continue;}// 请求成功,处理response,退出advance()方法if ((response.getStatusCode() == HTTP_OK) && response.hasValue()) {processResponse(response.getHeaders(), response.getValue());return true;}// 状态码不是200或503,查询执行失败if (response.getStatusCode() != HTTP_UNAVAILABLE) {state.compareAndSet(State.RUNNING, State.CLIENT_ERROR);throw requestFailedException("fetching next", request, response);}} }
- 如果nextUri为
3.4 获取查询执行结果
-
Presto采用pipe-lined(流水线设计):有执行结果,coordinator就会返回给client
-
这导致response中的执行结果可能是部分,因此在获取执行结果时,还需要多次调用advance()方法
-
以Presto CLI打印执行结果为例,在按行构建并打印执行结果时,就将多次advance()方法
public void processRows(StatementClient client)throws IOException {while (client.isRunning()) {Iterable<List<Object>> data = client.currentData().getData();if (data != null) {for (List<Object> row : data) {processRow(unmodifiableList(row)); // 超过MAX_BUFFERED_ROWS(10_000)则刷新输出}}// 超过MAX_BUFFER_TIME,也会刷新输出if (nanosSince(bufferStart).compareTo(MAX_BUFFER_TIME) >= 0) {flush(false);}// 访问nextUri,获取新的执行结果client.advance();} }
4. 其他
4.1 何时以DELETE方式访问nextUri?
-
Presto Client REST API,介绍HTTP Methods时有如下描述:
A DELETE to nextUri terminates a running query
-
StatementClientV1的close()方法代码如下,若StatementClientV1处于
RUNNING
状态,则将其置为CLIENT_ABORTED
状态,并通过DELETE方式取消查询@Override public void close() {// If the query is not done, abort the query.// 如果存在多线程,这里的CAS操作不一定成功?if (state.compareAndSet(State.RUNNING, State.CLIENT_ABORTED)) {URI uri = currentResults.get().getNextUri();if (uri != null) {httpDelete(uri);}} }private void httpDelete(URI uri) {Request request = prepareRequest(HttpUrl.get(uri)).delete().build();httpClient.newCall(request).enqueue(new NullCallback()); }
-
以Presto CLI为例,对close()方法的调用主要发生如下情况:
- 在查询被用户主动取消时,例如Query的
renderOutput()
方法和renderResults()
方法 - 按页展示执行结果结束时,即
Query.pageOutput()
方法 - 关闭Query时,即
Query.close()
方法
- 在查询被用户主动取消时,例如Query的
4.2 CLIENT_ERROR状态如何处理?
-
从StatementClientV1的构造函数和advance()方法可以看出,当请求失败时,都会将自身状态设置为
CLIENT_ERROR
并上抛一个RuntimeException
// advance()方法中,当状态码不是200或503时的处理操作 if (response.getStatusCode() != HTTP_UNAVAILABLE) {state.compareAndSet(State.RUNNING, State.CLIENT_ERROR);throw requestFailedException("fetching next", request, response); }
-
在presto cli中,上抛的RuntimeException最终将被Console.process()方法中的catch语句捕获
-
捕获到RuntimeException后,process()方法将打印失败信息并返回false
catch (RuntimeException e) {System.err.println("Error running command: " + e.getMessage());if (queryRunner.isDebug()) {e.printStackTrace();}return false; }
4.3 debug模式跟踪response
-
以下面的简单查询为例,以debug方式观察Presto CLI交互模式下,response中的QueryResults,以体会nextUri在Presto查询推进中的关键作用
select * from presto_test.user_info limit 5
① 提交查询
-
new StatementClientV1()
时,以POST方式向/v1/statement
接口发送查询请求Request{method=POST, url=http://localhost:8080/v1/statement, tag=null}
-
查询请求被
QueuedStatementResource
处理,返回的response如下:{"id": "20230415_030118_00000_sznzw","infoUri": "http://localhost:8080/ui/query.html?20230415_030118_00000_sznzw","nextUri": "http://localhost:8080/v1/statement/queued/20230415_030118_00000_sznzw/1?slug=xabd194690e1045648c1418e3817545a9","stats": {"state": "WAITING_FOR_PREREQUISITES","waitingForPrerequisites": true,"queued": false,"scheduled": false,"nodes": 0,"totalSplits": 0,"queuedSplits": 0,"runningSplits": 0,"completedSplits": 0,"cpuTimeMillis": 0,"wallTimeMillis": 0,"waitingForPrerequisitesTimeMillis": 0,"queuedTimeMillis": 0,"elapsedTimeMillis": 0,"processedRows": 0,"processedBytes": 0,"peakMemoryBytes": 0,"peakTotalMemoryBytes": 0,"peakTaskTotalMemoryBytes": 0,"spilledBytes": 0},"warnings": [] }
-
其中,nextUri指向QueuedStatementResource的API
@GET @Path("/v1/statement/queued/{queryId}/{token}") @Produces(APPLICATION_JSON) public void getStatus(@PathParam("queryId") QueryId queryId,@PathParam("token") long token,@QueryParam("slug") String slug,... // 其他入参省略)
② 获取查询结果
-
presto-cli的
StatusPrinter.printInitialStatusUpdates()
方法,用于初始化并获取查询结果。它将调用client.advance()
方法、通过nextUri获取查询结果 -
在
client.advance()
方法中,基于nextUri,构建的GET方式的request如下:Request{method=GET, url=http://localhost:8080/v1/statement/queued/20230415_030118_00000_sznzw/1?slug=xabd194690e1045648c1418e3817545a9, tag=null}
-
查询请求被
QueuedStatementResource
处理后,返回的response如下,内含nextUri和查询执行进度(stats字段){"id": "20230415_030118_00000_sznzw","infoUri": "http://localhost:8080/ui/query.html?20230415_030118_00000_sznzw","nextUri": "http://localhost:8080/v1/statement/queued/20230415_030118_00000_sznzw/2?slug=xabd194690e1045648c1418e3817545a9","stats": {"state": "QUEUED","waitingForPrerequisites": false,"queued": false,"scheduled": false,"nodes": 0,"totalSplits": 0,"queuedSplits": 0,"runningSplits": 0,"completedSplits": 0,"cpuTimeMillis": 0,"wallTimeMillis": 0,"waitingForPrerequisitesTimeMillis": 93,"queuedTimeMillis": 20,"elapsedTimeMillis": 113,"processedRows": 0,"processedBytes": 0,"peakMemoryBytes": 0,"peakTotalMemoryBytes": 0,"peakTaskTotalMemoryBytes": 0,"spilledBytes": 0},"warnings": [] }
-
再次调用
client.advance()
方法,访问nextUri,获取查询结果Request{method=GET, url=http://localhost:8080/v1/statement/queued/20230415_030118_00000_sznzw/2?slug=xabd194690e1045648c1418e3817545a9, tag=null}
-
查询请求被
QueuedStatementResource
处理后,返回的response如下:{"id": "20230415_030118_00000_sznzw","infoUri": "http://localhost:8080/ui/query.html?20230415_030118_00000_sznzw","partialCancelUri": "http://10.13.208.227:8080/v1/stage/20230415_030118_00000_sznzw.0","nextUri": "http://localhost:8080/v1/statement/executing/20230415_030118_00000_sznzw/1?slug=xabd194690e1045648c1418e3817545a9","columns": [{"name": "id","type": "integer","typeSignature": {"rawType": "integer","typeArguments": [],"literalArguments": [],"arguments": []}},{"name": "name","type": "varchar","typeSignature": {"rawType": "varchar","typeArguments": [],"literalArguments": [],"arguments": [{"kind": "LONG_LITERAL","value": 2147483647}]}},{"name": "password","type": "varchar","typeSignature": {"rawType": "varchar","typeArguments": [],"literalArguments": [],"arguments": [{"kind": "LONG_LITERAL","value": 2147483647}]}},{"name": "age","type": "integer","typeSignature": {"rawType": "integer","typeArguments": [],"literalArguments": [],"arguments": []}},{"name": "city","type": "varchar","typeSignature": {"rawType": "varchar","typeArguments": [],"literalArguments": [],"arguments": [{"kind": "LONG_LITERAL","value": 2147483647}]}}],"data": [[500000,"11120066","sunrise1",25,"成都"],[500000,"11120066","sunrise1",25,"成都"],[0,"43689316","rjjxhsmy",49,"成都"],[1,"97888912","eoggm9wn",26,"深圳"],[2,"01196315","laxoh1kv",29,"北京"]],"stats": {"state": "RUNNING","waitingForPrerequisites": false,"queued": false,"scheduled": true,"nodes": 1,"totalSplits": 21,"queuedSplits": 0,"runningSplits": 0,"completedSplits": 21,"cpuTimeMillis": 1035,"wallTimeMillis": 6175,"waitingForPrerequisitesTimeMillis": 93,"queuedTimeMillis": 42,"elapsedTimeMillis": 128320,"processedRows": 32312,"processedBytes": 932628,"peakMemoryBytes": 0,"peakTotalMemoryBytes": 771017,"peakTaskTotalMemoryBytes": 770483,"spilledBytes": 0,"rootStage": {"stageId": "0","state": "RUNNING","done": false,"nodes": 1,"totalSplits": 17,"queuedSplits": 0,"runningSplits": 0,"completedSplits": 17,"cpuTimeMillis": 74,"wallTimeMillis": 2302,"processedRows": 7,"processedBytes": 657,"subStages": [{"stageId": "1","state": "FINISHED","done": true,"nodes": 1,"totalSplits": 4,"queuedSplits": 0,"runningSplits": 0,"completedSplits": 4,"cpuTimeMillis": 961,"wallTimeMillis": 3873,"processedRows": 32312,"processedBytes": 932628,"subStages": []}]},"runtimeStats": {"S0-taskScheduledTimeNanos": {"name": "S0-taskScheduledTimeNanos","unit": "NANO","sum": 2302434797,"count": 1,"max": 2302434797,"min": 2302434797},"S1-taskScheduledTimeNanos": {"name": "S1-taskScheduledTimeNanos","unit": "NANO","sum": 3873131041,"count": 1,"max": 3873131041,"min": 3873131041},"getMaterializedViewTimeNanos": {"name": "getMaterializedViewTimeNanos","unit": "NANO","sum": 3377709,"count": 1,"max": 3377709,"min": 3377709},"S1-driverCountPerTask": {"name": "S1-driverCountPerTask","unit": "NONE","sum": 4,"count": 1,"max": 4,"min": 4},"getLayoutTimeNanos": {"name": "getLayoutTimeNanos","unit": "NANO","sum": 22208500,"count": 2,"max": 21863750,"min": 344750},"S0-taskBlockedTimeNanos": {"name": "S0-taskBlockedTimeNanos","unit": "NANO","sum": 55196190503,"count": 1,"max": 55196190503,"min": 55196190503},"S1-taskElapsedTimeNanos": {"name": "S1-taskElapsedTimeNanos","unit": "NANO","sum": 1681457292,"count": 1,"max": 1681457292,"min": 1681457292},"fragmentPlanTimeNanos": {"name": "fragmentPlanTimeNanos","unit": "NANO","sum": 159828875,"count": 1,"max": 159828875,"min": 159828875},"S0-taskQueuedTimeNanos": {"name": "S0-taskQueuedTimeNanos","unit": "NANO","sum": 1034353750,"count": 1,"max": 1034353750,"min": 1034353750},"S1-getSplitsTimeNanos": {"name": "S1-getSplitsTimeNanos","unit": "NANO","sum": 3207506458,"count": 1,"max": 3207506458,"min": 3207506458},"logicalPlannerTimeNanos": {"name": "logicalPlannerTimeNanos","unit": "NANO","sum": 397693959,"count": 1,"max": 397693959,"min": 397693959},"S0-driverCountPerTask": {"name": "S0-driverCountPerTask","unit": "NONE","sum": 17,"count": 1,"max": 17,"min": 17},"S0-taskElapsedTimeNanos": {"name": "S0-taskElapsedTimeNanos","unit": "NANO","sum": 0,"count": 1,"max": 0,"min": 0},"S1-taskQueuedTimeNanos": {"name": "S1-taskQueuedTimeNanos","unit": "NANO","sum": 89375917,"count": 1,"max": 89375917,"min": 89375917},"getViewTimeNanos": {"name": "getViewTimeNanos","unit": "NANO","sum": 312521125,"count": 1,"max": 312521125,"min": 312521125},"getTableMetadataTimeNanos": {"name": "getTableMetadataTimeNanos","unit": "NANO","sum": 39864292,"count": 1,"max": 39864292,"min": 39864292},"getTableHandleTimeNanos": {"name": "getTableHandleTimeNanos","unit": "NANO","sum": 3236875,"count": 1,"max": 3236875,"min": 3236875}},"progressPercentage": 100.0},"warnings": [] }
-
此时,response中已经包含查询执行结果,nextUri不再指向QueuedStatementResource的API,而是指向
ExecutingStatementResource
的API@GET @Path("/v1/statement/executing/{queryId}/{token}") @Produces(MediaType.APPLICATION_JSON) public void getQueryResults(@PathParam("queryId") QueryId queryId,@PathParam("token") long token,@QueryParam("slug") String slug,... // 其他入参省略)
③ 打印执行结果
-
调用
Query.renderQueryOutput()
方法打印执行结果,最终将调用Query.pageOutput()
方法按页打印执行结果 -
打印执行结果时,会调用
client.advance()
方法访问nextUri获取更多的查询结果Request{method=GET, url=http://localhost:8080/v1/statement/executing/20230415_030118_00000_sznzw/1?slug=xabd194690e1045648c1418e3817545a9, tag=null}
-
请求被
ExecutingStatementResource
处理后,返回的response如下 -
此时,nextUri不存在,说明查询已结束;
stats.state
的值为FINISHED
,说明查询在Presto集群中已结束{"id": "20230415_030118_00000_sznzw","infoUri": "http://localhost:8080/ui/query.html?20230415_030118_00000_sznzw","columns": [{"name": "id","type": "integer","typeSignature": {"rawType": "integer","typeArguments": [],"literalArguments": [],"arguments": []}},{"name": "name","type": "varchar","typeSignature": {"rawType": "varchar","typeArguments": [],"literalArguments": [],"arguments": [{"kind": "LONG_LITERAL","value": 2147483647}]}},{"name": "password","type": "varchar","typeSignature": {"rawType": "varchar","typeArguments": [],"literalArguments": [],"arguments": [{"kind": "LONG_LITERAL","value": 2147483647}]}},{"name": "age","type": "integer","typeSignature": {"rawType": "integer","typeArguments": [],"literalArguments": [],"arguments": []}},{"name": "city","type": "varchar","typeSignature": {"rawType": "varchar","typeArguments": [],"literalArguments": [],"arguments": [{"kind": "LONG_LITERAL","value": 2147483647}]}}],"stats": {"state": "FINISHED","waitingForPrerequisites": false,"queued": false,"scheduled": true,"nodes": 1,"totalSplits": 21,"queuedSplits": 0,"runningSplits": 0,"completedSplits": 21,"cpuTimeMillis": 1035,"wallTimeMillis": 6175,"waitingForPrerequisitesTimeMillis": 93,"queuedTimeMillis": 42,"elapsedTimeMillis": 128406,"processedRows": 32312,"processedBytes": 932628,"peakMemoryBytes": 0,"peakTotalMemoryBytes": 771017,"peakTaskTotalMemoryBytes": 770483,"spilledBytes": 0,"rootStage": {"stageId": "0","state": "FINISHED","done": true,"nodes": 1,"totalSplits": 17,"queuedSplits": 0,"runningSplits": 0,"completedSplits": 17,"cpuTimeMillis": 74,"wallTimeMillis": 2302,"processedRows": 7,"processedBytes": 657,"subStages": [{"stageId": "1","state": "FINISHED","done": true,"nodes": 1,"totalSplits": 4,"queuedSplits": 0,"runningSplits": 0,"completedSplits": 4,"cpuTimeMillis": 961,"wallTimeMillis": 3873,"processedRows": 32312,"processedBytes": 932628,"subStages": []}]},"runtimeStats": {"S0-taskScheduledTimeNanos": {"name": "S0-taskScheduledTimeNanos","unit": "NANO","sum": 2302434797,"count": 1,"max": 2302434797,"min": 2302434797},"S1-taskScheduledTimeNanos": {"name": "S1-taskScheduledTimeNanos","unit": "NANO","sum": 3873131041,"count": 1,"max": 3873131041,"min": 3873131041},"getMaterializedViewTimeNanos": {"name": "getMaterializedViewTimeNanos","unit": "NANO","sum": 3377709,"count": 1,"max": 3377709,"min": 3377709},"S1-driverCountPerTask": {"name": "S1-driverCountPerTask","unit": "NONE","sum": 4,"count": 1,"max": 4,"min": 4},"getLayoutTimeNanos": {"name": "getLayoutTimeNanos","unit": "NANO","sum": 22208500,"count": 2,"max": 21863750,"min": 344750},"S0-taskBlockedTimeNanos": {"name": "S0-taskBlockedTimeNanos","unit": "NANO","sum": 55196190503,"count": 1,"max": 55196190503,"min": 55196190503},"S1-taskElapsedTimeNanos": {"name": "S1-taskElapsedTimeNanos","unit": "NANO","sum": 1681457292,"count": 1,"max": 1681457292,"min": 1681457292},"fragmentPlanTimeNanos": {"name": "fragmentPlanTimeNanos","unit": "NANO","sum": 159828875,"count": 1,"max": 159828875,"min": 159828875},"S0-taskQueuedTimeNanos": {"name": "S0-taskQueuedTimeNanos","unit": "NANO","sum": 1034353750,"count": 1,"max": 1034353750,"min": 1034353750},"S1-getSplitsTimeNanos": {"name": "S1-getSplitsTimeNanos","unit": "NANO","sum": 3207506458,"count": 1,"max": 3207506458,"min": 3207506458},"logicalPlannerTimeNanos": {"name": "logicalPlannerTimeNanos","unit": "NANO","sum": 397693959,"count": 1,"max": 397693959,"min": 397693959},"S0-driverCountPerTask": {"name": "S0-driverCountPerTask","unit": "NONE","sum": 17,"count": 1,"max": 17,"min": 17},"S0-taskElapsedTimeNanos": {"name": "S0-taskElapsedTimeNanos","unit": "NANO","sum": 125757444000,"count": 1,"max": 125757444000,"min": 125757444000},"S1-taskQueuedTimeNanos": {"name": "S1-taskQueuedTimeNanos","unit": "NANO","sum": 89375917,"count": 1,"max": 89375917,"min": 89375917},"getViewTimeNanos": {"name": "getViewTimeNanos","unit": "NANO","sum": 312521125,"count": 1,"max": 312521125,"min": 312521125},"getTableMetadataTimeNanos": {"name": "getTableMetadataTimeNanos","unit": "NANO","sum": 39864292,"count": 1,"max": 39864292,"min": 39864292},"getTableHandleTimeNanos": {"name": "getTableHandleTimeNanos","unit": "NANO","sum": 3236875,"count": 1,"max": 3236875,"min": 3236875}},"progressPercentage": 100.0},"warnings": [] }
-
再次调用
client.advance()
方法,nextUri为null,将client的状态设置为FINISHED
if (nextUri == null) {state.compareAndSet(State.RUNNING, State.FINISHED);return false; }
-
上层应用发现StatementClientV1的状态为
CLIENT_ERROR
,都会调用
④ 打印finalInfo
-
执行
StatusPrinter.printFinalInfo()
方法,打印finalInfo
,finalInfo
实际来自最后一次response中的stats部分Query 20230415_030118_00000_sznzw, FINISHED, 1 node http://localhost:8080/ui/query.html?20230415_030118_00000_sznzw Splits: 21 total, 21 done (100.00%) CPU Time: 1.0s total, 31.2K rows/s, 880KB/s, 16% active Per Node: 0.0 parallelism, 27 rows/s, 783B/sParallelism: 0.0 logicalPlannerTimeNanos: sum=398ms count=1 min=398ms max=398ms19:51 [32.3K rows, 911KB] [27 rows/s, 783B/s]
⑤ 总结
- 通过debug模式不难看出,Presto Client的REST API不是一蹴而就的
- Presto查询具备lazy execution特性,需要不断访问response中的nextUri才能触发查询的执行并获取查询结果