Last active
December 30, 2020 14:33
Revisions
-
ashigeru revised this gist
Mar 13, 2014 . No changes.There are no files selected for viewing
-
ashigeru revised this gist
Mar 13, 2014 . No changes.There are no files selected for viewing
-
ashigeru revised this gist
Mar 13, 2014 . 1 changed file with 16 additions and 13 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,21 +1,21 @@ # Presto メモ * presto 0.60 * [ `QueryResource` , `TaskResource` ) ## 環境 なんかそのまま `mvn clean install eclipse:eclipse -DskipTests` コマンド叩いても checkstyle で落ちたので: ```sh mvn clean install -DskipTests mvn eclipse:eclipse -Dcheckstyle.skip ``` ちなみに、EclipseじゃなくてIDEA推奨の模様。 # エントリポイント サービスエントリ。 リクエストを受け取る `QueryResource` から。 ```java // presto-server/src/main/java/com/facebook/presto/server/QueryResource.java:L89 @@ -145,7 +145,7 @@ statement returns [Statement value] ******** `query` 構文の `.value` は `Query` 型。 ```g // presto-parser/src/main/antlr3/com/facebook/presto/sql/parser/StatementBuilder.g:L76 query returns [Query value] : ^(QUERY queryExpr) { $value = $queryExpr.value; } @@ -378,7 +378,7 @@ TupleAnalyzer analyzer = new TupleAnalyzer(analysis, session, metadata, approxim TupleDescriptor descriptor = analyzer.process(node.getQueryBody(), context); ``` `TupleAnalyzer extends AstVisitor` なので、 `analyzer.process()` は `TupleAnalyzer.visitQuerySpecification()` へ。 ******** `TupleAnalyzer.visitQuerySpecification()` の本体。 @@ -405,6 +405,7 @@ analysis.setOutputDescriptor(node, descriptor); ざっくり: * `RelationPlanner.process()` でクエリ全体の論理オペレータツリーを作成 * `createOutputPlan()` で論理オペレータツリーの最後に出力用のオペレータを接続 * `PlanOptimizer.optimize()` で最適化 * `Plan` オブジェクトで包んで返す ```java @@ -562,7 +563,7 @@ public Plan plan(Analysis analysis) ``` `PlanOptimizersFactory` のあたりで `PlanOptimizer` を正しい順序で登録している。 適用順序が超重要なので、コメントが長め。 ```java // presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizersFactory.java:L57 @@ -602,10 +603,11 @@ public SubPlan createSubPlans(Plan plan, boolean createSingleNodePlan) ``` ざっくり言うと: 0. `PlanNode` の木をいくつかの部分木に分解する 0. 部分木に分解した際の切断面のうち、根に `SinkNode` を配置する → 別のサブプランにデータを渡す 0. 部分木に分解した際の切断面のうち、末端に `ExchangeNode` を配置する → 別のサブプランからデータを受け取る 0. それぞれの部分木を `SubPlan` とする という感じ。 それぞれの `PlanNode` に対し、対応する `DistributedLogicalPlanner.Visitor.visit*()` で上記を行っていく。 @@ -644,7 +646,7 @@ public class PlanFragment `PlanDistribution` : * `NONE` - 1台のノードで実行する ( `OutputNode`, `TopNNode`, `LimitNode`, グループ化なしの `AggregationNode` など) * `FIXED` - N台のノードで実行する ( `MarkDistinctNode`, `WindowNode`, グループ化ありの `AggregationNode` など ) * `SOURCE` - データのあるノードで実行する ( `TableScanNode` のみ ) * `COORDINATOR_ONLY` - コーディネータノードのみで実行する ( `TableCommitNode` のみ ) @@ -707,6 +709,7 @@ public SubPlanBuilder visitJoin(JoinNode node, Void context) これを見る限り、内部結合でも左と右が重要? あと、 `CROSS` は `case` に含まれていない模様。 `PredicatePushDown.Rewriter.rewriteJoin()` あたりで、`JOIN ... ON 0 = 0` にしている。 ******** `OutputNode` の例。 -
ashigeru created this gist
Mar 12, 2014 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,1394 @@ # Presto メモ * http://atnd.org/events/47810 * presto 0.60 * [QueryResource, TaskResource) ## 環境 なんかそのまま `mvn` コマンド叩いても checkstyle で落ちたので: ```sh mvn clean install eclipse:eclipse -DskipTests -Dcheckstyle.skip ``` ちなみに、EclipseじゃなくてIDEA推奨の模様。 # エントリポイント サービスエントリ。 リクエストを受け取る QueryResource から。 ```java // presto-server/src/main/java/com/facebook/presto/server/QueryResource.java:L89 @POST @Produces(MediaType.APPLICATION_JSON) public Response createQuery(String query, @HeaderParam(PRESTO_USER) String user, @HeaderParam(PRESTO_SOURCE) String source, @HeaderParam(PRESTO_CATALOG) @DefaultValue(DEFAULT_CATALOG) String catalog, @HeaderParam(PRESTO_SCHEMA) @DefaultValue(DEFAULT_SCHEMA) String schema, @HeaderParam(USER_AGENT) String userAgent, @Context HttpServletRequest requestContext, @Context UriInfo uriInfo) ``` ******** クエリ発行リクエストの処理。 ```java // presto-server/src/main/java/com/facebook/presto/server/QueryResource.java:L106 QueryInfo queryInfo = queryManager.createQuery(new Session(user, source, catalog, schema, remoteUserAddress, userAgent), query); ``` `QueryManager.createQuery()` を起動しているが、 `QueryManager` はインターフェース。 ******** `QueryManager` のバインディング。 `SqlQueryManager.createQuery()` へ。 ```java // presto-server/src/main/java/com/facebook/presto/server/CoordinatorModule.java:L104 binder.bind(QueryManager.class).to(SqlQueryManager.class).in(Scopes.SINGLETON); ``` ******** `SqlQueryManager.createQuery()` の先頭。 ```java // presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManager.java:L191 public QueryInfo createQuery(Session session, String query) { checkNotNull(query, "query is null"); Preconditions.checkArgument(!query.isEmpty(), "query must not be empty string"); QueryId queryId = queryIdGenerator.createNextQueryId(); Statement statement; try { statement = SqlParser.createStatement(query); ``` `SqlParser.createStatement()` でクエリ文字列をパース。 # 構文解析周り `SqlParser.createStatement()` の本体。 ```java // presto-parser/src/main/java/com/facebook/presto/sql/parser/SqlParser.java:L32 public static Statement createStatement(String sql) { try { return createStatement(parseStatement(sql)); } catch (StackOverflowError e) { throw new ParsingException("statement is too large (stack overflow while parsing)"); } } ``` ******** `CommonTree` はANTLRのジェネリックなASTモデル。 ```java // presto-parser/src/main/java/com/facebook/presto/sql/parser/SqlParser.java:L78 static CommonTree parseStatement(String sql) { try { return (CommonTree) getParser(sql).singleStatement().getTree(); } catch (RecognitionException e) { throw new AssertionError(e); // RecognitionException is not thrown } } ``` ******** ANTLRの機構を使って2-passでASTモデル構築。 ```java // presto-parser/src/main/java/com/facebook/presto/sql/parser/SqlParser.java:L53 static Statement createStatement(CommonTree tree) { TreeNodeStream stream = new BufferedTreeNodeStream(tree); StatementBuilder builder = new StatementBuilder(stream); try { return builder.statement().value; } catch (RecognitionException e) { throw new AssertionError(e); // RecognitionException is not thrown } } ``` ******** `statement` 構文規則。 ```antlr // presto-parser/src/main/antlr3/com/facebook/presto/sql/parser/StatementBuilder.g:L58 statement returns [Statement value] : query { $value = $query.value; } | explain { $value = $explain.value; } | showTables { $value = $showTables.value; } | showSchemas { $value = $showSchemas.value; } | showCatalogs { $value = $showCatalogs.value; } | showColumns { $value = $showColumns.value; } | showPartitions { $value = $showPartitions.value; } | showFunctions { $value = $showFunctions.value; } | useCollection { $value = $useCollection.value; } | createTable { $value = $createTable.value; } | createMaterializedView { $value = $createMaterializedView.value; } | refreshMaterializedView { $value = $refreshMaterializedView.value; } | createAlias { $value = $createAlias.value; } | dropAlias { $value = $dropAlias.value; } | dropTable { $value = $dropTable.value; } ; ``` 今回は `query` を見る。 ******** `query` 構文の `.value` は `Query` 型。 ```antlr // presto-parser/src/main/antlr3/com/facebook/presto/sql/parser/StatementBuilder.g:L76 query returns [Query value] : ^(QUERY queryExpr) { $value = $queryExpr.value; } ; ``` ******** `Query` クラスのフィールド。 ```java // presto-parser/src/main/java/com/facebook/presto/sql/tree/Query.java:L23 public class Query extends Statement { private final Optional<With> with; private final QueryBody queryBody; private final List<SortItem> orderBy; private final Optional<String> limit; private final Optional<Approximate> approximate; ``` `QueryBody` 型の `queryBody` がリレーションの計算部分。 `QueryBody` は以下のサブクラスがある。 * `QuerySpecification` * `Except` * `Intersect` * `Union` * `Table` * `TableSubquery` `SELECT ... FROM ... WHERE ...` は `QuerySpecification` で表現。 `FROM` 句にテーブルを直接指定すると `Table` で表現。 # クエリの実行オブジェクト `QueryExecution` の生成。 `executionFactories : Map<Class<? extends Statement>, QueryExecutionFactory>` からファクトリを探す。 `statement.getClass()` は `Query` クラス。 ```java // presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManager.java:L206 QueryExecutionFactory<?> queryExecutionFactory = executionFactories.get(statement.getClass()); Preconditions.checkState(queryExecutionFactory != null, "Unsupported statement type %s", statement.getClass().getName()); final QueryExecution queryExecution = queryExecutionFactory.createQueryExecution(queryId, query, session, statement); ``` ******** バインディングの解決。 `Query` の場合は `SqlQueryExecution` 系がバインディングされている。 ```java // presto-server/src/main/java/com/facebook/presto/server/CoordinatorModule.java:L152 MapBinder<Class<? extends Statement>, QueryExecution.QueryExecutionFactory<?>> executionBinder = newMapBinder(binder, new TypeLiteral<Class<? extends Statement>>() {}, new TypeLiteral<QueryExecution.QueryExecutionFactory<?>>() {} ); ... executionBinder.addBinding(Query.class).to(SqlQueryExecution.SqlQueryExecutionFactory.class).in(Scopes.SINGLETON); ``` ******** いちおう `SqlQueryExecutionFactory` をみておくと、 `SqlQueryExecution` を作るだけ。 ```java // presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java:L453 @Override public SqlQueryExecution createQueryExecution(QueryId queryId, String query, Session session, Statement statement) { SqlQueryExecution queryExecution = new SqlQueryExecution(queryId, ... ``` ******** `queryExecutor` 経由でクエリを実行。 ```java // presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManager.java:L227 // start the query in the background queryExecutor.submit(new QueryStarter(queryExecution, stats)); ``` ******** `queryExecutor` は普通のスレッドプール。 `queryExecutor.submit(Runnable)` で `run()` メソッドを起動するだけ。 ```java // presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManager.java:L101 this.queryExecutor = Executors.newCachedThreadPool(threadsNamed("query-scheduler-%d")); ``` ******** `QueryStarter implements Runnable` の本体。 ```java // presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManager.java:L378 public QueryStarter(QueryExecution queryExecution, SqlQueryManagerStats stats) { this.queryExecution = queryExecution; this.stats = stats; } @Override public void run() { try (SetThreadName setThreadName = new SetThreadName("Query-%s", queryExecution.getQueryInfo().getQueryId())) { stats.queryStarted(); queryExecution.start(); } } ``` `QueryExecution.start()` を呼んでいるので、 `SqlQueryExecution.start()` へ。 # 論理実行計画の生成 `SqlQueryExecution.start()` の前半部分。 `analyzeQuery()` メソッドで `SubPlan` を作る。 ```java // presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java:L151 // transition to planning if (!stateMachine.beginPlanning()) { // query already started or finished return; } // analyze query SubPlan subplan = analyzeQuery(); ``` ******** `SubPlan` はサブプラン本体の `PlanFragment` と、再帰的なサブプランをもっている感じ。 ```java // presto-main/src/main/java/com/facebook/presto/sql/planner/SubPlan.java:L31 public class SubPlan { private final PlanFragment fragment; private final List<SubPlan> children; ``` ******** `PlanFragment` でまず重要そうなのは、 `PlanNode` のあたり。 ```java // presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragment.java:L42 public class PlanFragment { ... private final PlanNode root; ... private final List<PlanNode> sources; ``` `PlanNode` は論理オペレータを表していて、たとえば以下のサブクラスがある。 * `TableScanNode` * `ProjectNode` * `SelectNode` * `JoinNode` * ... それぞれも、オペレータの入力としてさらに `PlanNode` を持っている感じ。 なお、以下のサブクラスは `SubPlan` 以降にしか出現しない。 * `SinkNode` - `PlanFragment` の出力 * `ExchangeNode` - `SinkNode` による出力を入力にとる ******** `analyzeQuery()` 本体。 重要そうなのは: * `Analyzer.analyze()` * `LogicalPlanner.plan()` * `DistributedLogicalPlanner.createSubPlans()` ```java // presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java:L210 Analyzer analyzer = new Analyzer(stateMachine.getSession(), metadata, Optional.of(queryExplainer), approximateQueriesEnabled); Analysis analysis = analyzer.analyze(statement); PlanNodeIdAllocator idAllocator = new PlanNodeIdAllocator(); // plan query LogicalPlanner logicalPlanner = new LogicalPlanner(stateMachine.getSession(), planOptimizers, idAllocator, metadata, periodicImportManager, storageManager); Plan plan = logicalPlanner.plan(analysis); List<Input> inputs = new InputExtractor(metadata).extract(plan.getRoot()); stateMachine.setInputs(inputs); // fragment the plan SubPlan subplan = new DistributedLogicalPlanner(metadata, idAllocator).createSubPlans(plan, false); stateMachine.recordAnalysisTime(analysisStart); return subplan; ``` ******** なお、 `LogicalPlanner.plan()` が生成する `Plan` は、ステージ区切り前の論理実行計画。 ルートとして `PlanNode` のツリーを持っている。 ```java // presto-main/src/main/java/com/facebook/presto/sql/planner/Plan.java:L22 public class Plan { private final PlanNode root; ``` ## 意味解析 (Analysis) `Analyzer.analyze()` 本体。 名前表を作ったり、型の計算をしたりして、 `Analysis` オブジェクトに積んでいく感じ。 戻り値の `TupleDescriptor` は主に結果リレーションの型情報。 ```java // presto-main/src/main/java/com/facebook/presto/sql/analyzer/Analyzer.java:L50 Analysis analysis = new Analysis(); StatementAnalyzer analyzer = new StatementAnalyzer(analysis, metadata, session, approximateQueriesEnabled, queryExplainer); TupleDescriptor outputDescriptor = analyzer.process(statement, new AnalysisContext()); analysis.setOutputDescriptor(outputDescriptor); return analysis; ``` `StatementAnalyzer extends AstVisitor` なので、 `visitQuery()` へ。 ******** `StatementAnalyzer.visitQuery()` の一部を抜粋。 `Query.getQueryBody() : QueryBody` は、 `QuerySpecification` として進める。 ```java // presto-main/src/main/java/com/facebook/presto/sql/analyzer/StatementAnalyzer.java:L504 TupleAnalyzer analyzer = new TupleAnalyzer(analysis, session, metadata, approximateQueriesEnabled); TupleDescriptor descriptor = analyzer.process(node.getQueryBody(), context); ``` `TupleAnalyzer extends AstVisitor` なので、 `analyzer.process()` は `TupleAnalyzer.visitQueryBody()` へ。 ******** `TupleAnalyzer.visitQuerySpecification()` の本体。 ざっくり言えば、 `FROM` 句で入力した型から、 `SELECT` 句で出力する型を計算している感じ。 ```java // presto-main/src/main/java/com/facebook/presto/sql/analyzer/TupleAnalyzer.java:L258 TupleDescriptor tupleDescriptor = analyzeFrom(node, context); analyzeWhere(node, tupleDescriptor, context); List<FieldOrExpression> outputExpressions = analyzeSelect(node, tupleDescriptor, context); List<FieldOrExpression> groupByExpressions = analyzeGroupBy(node, tupleDescriptor, context, outputExpressions); List<FieldOrExpression> orderByExpressions = analyzeOrderBy(node, tupleDescriptor, context, outputExpressions); analyzeHaving(node, tupleDescriptor, context); analyzeAggregations(node, tupleDescriptor, groupByExpressions, outputExpressions, orderByExpressions); analyzeWindowFunctions(node, outputExpressions, orderByExpressions); TupleDescriptor descriptor = computeOutputDescriptor(node, tupleDescriptor); analysis.setOutputDescriptor(node, descriptor); ``` ## 論理オペレータへの変換 (Plan) `LogicalPlanner.plan()` 本体。 ざっくり: * `RelationPlanner.process()` でクエリ全体の論理オペレータツリーを作成 * `createOutputPlan()` で論理オペレータツリーの最後に出力用のオペレータを接続 * `Plan` オブジェクトで包んで返す ```java // presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java:L92 public Plan plan(Analysis analysis) { RelationPlan plan; ... else { RelationPlanner planner = new RelationPlanner(analysis, symbolAllocator, idAllocator, metadata, session); plan = planner.process(analysis.getQuery(), null); } PlanNode root = createOutputPlan(plan, analysis); ... for (PlanOptimizer optimizer : planOptimizers) { root = optimizer.optimize(root, session, symbolAllocator.getTypes(), symbolAllocator, idAllocator); } ... return new Plan(root, symbolAllocator); } ``` `RelationPlanner extends AstVisitor` なので、 `RelationPlanner.visitQuery()` へ。 ******** `RelationPlanner.visitQuery()` 本体。 さらに `QueryPlanner.visitQuery()` へ。 ```java // presto-main/src/main/java/com/facebook/presto/sql/planner/RelationPlanner.java:L215 protected RelationPlan visitQuery(Query node, Void context) { PlanBuilder subPlan = new QueryPlanner(analysis, symbolAllocator, idAllocator, metadata, session).process(node, null); ImmutableList.Builder<Symbol> outputSymbols = ImmutableList.builder(); for (FieldOrExpression fieldOrExpression : analysis.getOutputExpressions(node)) { outputSymbols.add(subPlan.translate(fieldOrExpression)); } return new RelationPlan(subPlan.getRoot(), analysis.getOutputDescriptor(node), outputSymbols.build()); } ``` ******** `QueryPlanner.visitQuery()` 本体。 おそらく、 `QuerySpecification (<: QueryBody)` ) を論理オペレーターに変換した後、 `IN`, `ORDER BY`, `LIMIT`, `SELECT` 句あたりを適当に処理するのではないかと。 ```java // presto-main/src/main/java/com/facebook/presto/sql/planner/QueryPlanner.java:L103 protected PlanBuilder visitQuery(Query query, Void context) { PlanBuilder builder = planQueryBody(query); Set<InPredicate> inPredicates = analysis.getInPredicates(query); builder = appendSemiJoins(builder, inPredicates); List<FieldOrExpression> orderBy = analysis.getOrderByExpressions(query); List<FieldOrExpression> outputs = analysis.getOutputExpressions(query); builder = project(builder, Iterables.concat(orderBy, outputs)); builder = sort(builder, query); builder = project(builder, analysis.getOutputExpressions(query)); builder = limit(builder, query); return builder; } ``` 省略して `RelationPlanner.visitQuerySpecification()` へ。 ******** `QueryPlanner.visitQuerySpecification()` 本体。 `FROM`, `IN`, `WHERE` などを順番に適用していっている感じ。 ```java // presto-main/src/main/java/com/facebook/presto/sql/planner/QueryPlanner.java:L121 protected PlanBuilder visitQuerySpecification(QuerySpecification node, Void context) { PlanBuilder builder = planFrom(node); Set<InPredicate> inPredicates = analysis.getInPredicates(node); builder = appendSemiJoins(builder, inPredicates); builder = filter(builder, analysis.getWhere(node)); builder = aggregate(builder, node); builder = filter(builder, analysis.getHaving(node)); builder = window(builder, node); List<FieldOrExpression> orderBy = analysis.getOrderByExpressions(node); List<FieldOrExpression> outputs = analysis.getOutputExpressions(node); builder = project(builder, Iterables.concat(orderBy, outputs)); builder = distinct(builder, node, outputs, orderBy); builder = sort(builder, node); builder = project(builder, analysis.getOutputExpressions(node)); builder = limit(builder, node); return builder; } ``` `QueryPlanner.planFrom()` だけ読んでみる。 ******** `QueryPlanner.planFrom()` 本体。 ```java // presto-main/src/main/java/com/facebook/presto/sql/planner/QueryPlanner.java:L160 private PlanBuilder planFrom(QuerySpecification node) { RelationPlan relationPlan; ... else { relationPlan = new RelationPlanner(analysis, symbolAllocator, idAllocator, metadata, session) .process(Iterables.getOnlyElement(node.getFrom()), null); } ... return new PlanBuilder(translations, relationPlan.getRoot()); } ``` `FROM` 句に指定されたリレーションを再帰的に `RelationPlanner` で変換している。 テーブルの場合を見るということで、 `RelationPlanner.visitTable()` へ。 ******** `RelationPlanner.visitTable()` 本体。 かなり省略すると、テーブルに対応するハンドルを探して、 `TableScanNode` というオペレータを作る。 さらに、サンプリングの設定がなされていたら、テーブルスキャン結果を `MaterializeSampleNode` で包む。 ```java // presto-main/src/main/java/com/facebook/presto/sql/planner/RelationPlanner.java:L92 protected RelationPlan visitTable(Table node, Void context) { ... TableHandle handle = analysis.getTableHandle(node); ... PlanNode root = new TableScanNode(idAllocator.getNextId(), handle, nodeOutputSymbols, columns.build(), null, Optional.<GeneratedPartitions>absent()); if (sampleWeightSymbol != null) { root = new MaterializeSampleNode(idAllocator.getNextId(), root, sampleWeightSymbol); } return new RelationPlan(root, descriptor, planOutputSymbols); } ``` ******** `LogicalPlanner.plan()` まで戻って、 `PlanOptimizer` について少しだけ。 ```java // presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java:L92 public Plan plan(Analysis analysis) { RelationPlan plan; ... for (PlanOptimizer optimizer : planOptimizers) { root = optimizer.optimize(root, session, symbolAllocator.getTypes(), symbolAllocator, idAllocator); } ... return new Plan(root, symbolAllocator); } ``` `PlanOptimizersFactory` のあたりで `PlanOptimizer` を正しい順序で登録している。 適用が超重要なので、コメントが長め。 ```java // presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizersFactory.java:L57 builder.add(new ImplementSampleAsFilter(), new SimplifyExpressions(metadata), new UnaliasSymbolReferences(), new PruneRedundantProjections(), new SetFlatteningOptimizer(), new MaterializeSamplePullUp(), new LimitPushDown(), // Run the LimitPushDown after flattening set operators to make it easier to do the set flattening new PredicatePushDown(metadata, splitManager, analyzerConfig.isApproximateQueriesEnabled()), new PredicatePushDown(metadata, splitManager, analyzerConfig.isApproximateQueriesEnabled()), // Run predicate push down one more time in case we can leverage new information from generated partitions new MergeProjections(), new SimplifyExpressions(metadata), // Re-run the SimplifyExpressions to simplify any recomposed expressions from other optimizations new UnaliasSymbolReferences(), // Run again because predicate pushdown might add more projections new PruneUnreferencedOutputs(), // Make sure to run this at the end to help clean the plan for logging/execution and not remove info that other optimizers might need at an earlier point new PruneRedundantProjections()); // This MUST run after PruneUnreferencedOutputs as it may introduce new redundant projections ``` 論理実行計画はこの辺りで。 ## 論理オペレータのステージ分割 (SubPlan) `DistributedLogicalPlanner.createSubPlans()` 本体。 `DistributedLogicalPlanner.Visitor (extends PlanVisitor)` に処理を移譲し、 `SubPlanBuilder.build()` で `SubPlan` を作る。 ```java // presto-main/src/main/java/com/facebook/presto/sql/planner/DistributedLogicalPlanner.java:L82 public SubPlan createSubPlans(Plan plan, boolean createSingleNodePlan) { Visitor visitor = new Visitor(plan.getSymbolAllocator(), createSingleNodePlan); SubPlanBuilder builder = plan.getRoot().accept(visitor, null); SubPlan subplan = builder.build(); subplan.sanityCheck(); return subplan; } ``` ざっくり言うと: 1. `PlanNode` の木をいくつかの部分木に分解する 1. 部分木に分解した際の切断面のうち、根に `SinkNode` を配置する → 別のサブプランにデータを渡す 1. 部分木に分解した際の切断面のうち、末端に `ExchangeNode` を配置する → 別のサブプランからデータを受け取る 1. それぞれの部分木を `SubPlan` とする という感じ。 それぞれの `PlanNode` に対し、対応する `DistributedLogicalPlanner.Visitor.visit*()` で上記を行っていく。 ただし、引数の `createSingleNodePlan` が `true` の場合、 `SinkNode` や `ExchangeNode` は生成されない模様。 ******** 分散処理にあたって `PlanFragment` をもういちど。 ```java // presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragment.java:L42 public class PlanFragment { public enum PlanDistribution { NONE, FIXED, SOURCE, COORDINATOR_ONLY } public static enum OutputPartitioning { NONE, HASH } ... private final PlanDistribution distribution; ... private final OutputPartitioning outputPartitioning; private final List<Symbol> partitionBy; ``` 今のところの理解では、 `PlanDistribution` は処理を分散する際の制約。 `OutputPartitioning` は `SinkNode` が出力する内容のパーティショニング方法。 `PlanDistribution` : * `NONE` - 1台のノードで実行する ( `OutputNode`, `TopNNode`, `LimitNode`, グループ化なしの `AggregationNode` など) * `FIXED` - パーティションごとに決まったノードで実行する ( `MarkDistinctNode`, `WindowNode`, グループ化ありの `AggregationNode` など ) * `SOURCE` - データのあるノードで実行する ( `TableScanNode` のみ ) * `COORDINATOR_ONLY` - コーディネータノードのみで実行する ( `TableCommitNode` のみ ) ※上記以外の `PlanDistribution` はないため、データは小さめでCPUだけぶん回すような計算はあまり想定していないか `OutputPartitioning` : * `NONE` - 適当に出力 * `HASH` - 特定の属性でパーティション化して出力 ******** `TableScanNode` の例。 ノード自身からなるサブプランを新しく作成している。 ```java // presto-main/src/main/java/com/facebook/presto/sql/planner/DistributedLogicalPlanner.java:L353 public SubPlanBuilder visitTableScan(TableScanNode node, Void context) { return createSourceDistributionPlan(node, node.getId()); } ``` ******** `FilterNode` の例。 サブプランの区切りは特に出現しない。 ```java // presto-main/src/main/java/com/facebook/presto/sql/planner/DistributedLogicalPlanner.java:L228 public SubPlanBuilder visitFilter(FilterNode node, Void context) { SubPlanBuilder current = node.getSource().accept(this, context); current.setRoot(new FilterNode(node.getId(), current.getRoot(), node.getPredicate())); return current; } ``` ******** `JoinNode` の例。 `SinkNode` と `ExchangeNode` を作って、 `SubPlanBuilder.build()` を実行している。 ```java // presto-main/src/main/java/com/facebook/presto/sql/planner/DistributedLogicalPlanner.java:L436 public SubPlanBuilder visitJoin(JoinNode node, Void context) { SubPlanBuilder left = node.getLeft().accept(this, context); SubPlanBuilder right = node.getRight().accept(this, context); if (left.isDistributed() || right.isDistributed()) { switch (node.getType()) { case INNER: case LEFT: right.setRoot(new SinkNode(idAllocator.getNextId(), right.getRoot(), right.getRoot().getOutputSymbols())); left.setRoot(new JoinNode(node.getId(), node.getType(), left.getRoot(), new ExchangeNode(idAllocator.getNextId(), right.getId(), right.getRoot().getOutputSymbols()), node.getCriteria())); left.addChild(right.build()); return left; ``` これを見る限り、内部結合でも左と右が重要? あと、 `CROSS` は `case` に含まれていない模様。 ******** `OutputNode` の例。 出力対象の `node.getSource()` までを `SinkNode` につなげてサブプランを区切りつつ、 `ExchangeNode` で受け取って `OutputNode` で出力するところは、 `DistributionPlan.NONE` で単一ノードにかき集めている。 ```java // presto-main/src/main/java/com/facebook/presto/sql/planner/DistributedLogicalPlanner.java:L298 public SubPlanBuilder visitOutput(OutputNode node, Void context) { SubPlanBuilder current = node.getSource().accept(this, context); if (current.isDistributed()) { current.setRoot(new SinkNode(idAllocator.getNextId(), current.getRoot(), current.getRoot().getOutputSymbols())); // create a new non-partitioned fragment current = createSingleNodePlan(new ExchangeNode(idAllocator.getNextId(), current.getId(), current.getRoot().getOutputSymbols())) .addChild(current.build()); } current.setRoot(new OutputNode(node.getId(), current.getRoot(), node.getColumnNames(), node.getOutputSymbols())); return current; } ``` ## ステージ別実行計画の生成 (StageExecutionPlan) `SqlQueryExecution.planDistribution()` に戻ってくる。 ```java // presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java:L228 private void planDistribution(SubPlan subplan) { // time distribution planning long distributedPlanningStart = System.nanoTime(); // plan the execution on the active nodes DistributedExecutionPlanner distributedPlanner = new DistributedExecutionPlanner(splitManager, shardManager); StageExecutionPlan outputStageExecutionPlan = distributedPlanner.plan(subplan); ``` `DistributedExecutionPlanner.plan()` へ。 ******** `DistributedExecutionPlanner.plan()` の前半。 `DistributedExecutionPlanner.Visitor extends PlanVisitor` で、 自身の `PlanFragment` 内にテーブルがあれば、それを `SplitSource` として記録している。 ```java // presto-main/src/main/java/com/facebook/presto/sql/planner/DistributedExecutionPlanner.java:L79 public StageExecutionPlan plan(SubPlan root, Predicate<Partition> materializedViewPartitionPredicate) { PlanFragment currentFragment = root.getFragment(); // get splits for this fragment, this is lazy so split assignments aren't actually calculated here Visitor visitor = new Visitor(); NodeSplits nodeSplits = currentFragment.getRoot().accept(visitor, materializedViewPartitionPredicate); ``` 見どころだけ書くと: * `PlanFragment` の末端が `ExchangeNode` の場合、 `SplitSource` なしの扱い * `JoinNode` の入力が二つとも `SplitSource` を持つことはできない * `TableScanNode` と `SampleNode` が同時にあって、かつサンプルにベルヌーイ分布を使っていない場合、スプリットごと間引いている ******** `DistributedExecutionPlanner.plan()` の後半。 `SubPlan` の子要素についても、再帰的に `DistributedExecutionPlanner.plan()` を実行している。 自身の `PlanFragment` と、子要素から `StageExecutionPlan` を構築。 ```java // presto-main/src/main/java/com/facebook/presto/sql/planner/DistributedExecutionPlanner.java:L79 public StageExecutionPlan plan(SubPlan root, Predicate<Partition> materializedViewPartitionPredicate) { ... for (SubPlan childPlan : root.getChildren()) { dependencies.add(plan(childPlan, materializedViewPartitionPredicate)); } return new StageExecutionPlan(currentFragment, nodeSplits.getDataSource(), dependencies.build(), visitor.getOutputReceivers()); } ``` ******** `SqlQueryExecution.planDistribution()` の続きでは、根の `StageExecutionPlan` から `SqlStageExecution` を作っている。 根の `StageExecutionPlan` は子の `StageExecutionPlan` も含んでいるため、 `SqlStageExecution` はクエリ全体のプラン実行を表している。 ```java // presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java:L233 // plan the execution on the active nodes DistributedExecutionPlanner distributedPlanner = new DistributedExecutionPlanner(splitManager, shardManager); StageExecutionPlan outputStageExecutionPlan = distributedPlanner.plan(subplan); ... // build the stage execution objects (this doesn't schedule execution) SqlStageExecution outputStage = new SqlStageExecution(stateMachine.getQueryId(), locationFactory, outputStageExecutionPlan, nodeScheduler, remoteTaskFactory, stateMachine.getSession(), scheduleSplitBatchSize, maxPendingSplitsPerNode, initialHashPartitions, queryExecutor, ROOT_OUTPUT_BUFFERS); this.outputStage.set(outputStage); ``` この結果を `SqlQueryExecution.outputStage` に保持させている。 これは後でキャンセルしたり、情報を取り出したりする際にも使う模様。 # 論理実行計画の発行 `SqlQueryExecution.start()` の最後の部分で、 `SqlStageExecution.start()` を実行している。 ```java // presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java:L147 public void start() { ... // plan distribution of query planDistribution(subplan); ... SqlStageExecution stage = outputStage.get(); if (!stateMachine.isDone()) { stage.start(); } ``` ******** `SqlStageExecution` のコンストラクタ入口。 ルートステージは `parent` が `null` に。 ```java // presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L174 private SqlStageExecution(@Nullable StageExecutionNode parent, QueryId queryId, AtomicInteger nextStageId, LocationFactory locationFactory, StageExecutionPlan plan, NodeScheduler nodeScheduler, RemoteTaskFactory remoteTaskFactory, Session session, int splitBatchSize, int maxPendingSplitsPerNode, int initialHashPartitions, ExecutorService executor) ``` ******** `SqlStageExecution` のコンストラクタ内で、再帰的にサブステージに関するも作っている。 ```java // presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L213 ImmutableMap.Builder<PlanFragmentId, StageExecutionNode> subStages = ImmutableMap.builder(); for (StageExecutionPlan subStagePlan : plan.getSubStages()) { PlanFragmentId subStageFragmentId = subStagePlan.getFragment().getId(); StageExecutionNode subStage = new SqlStageExecution(this, queryId, nextStageId, locationFactory, subStagePlan, nodeScheduler, remoteTaskFactory, session, splitBatchSize, maxPendingSplitsPerNode, initialHashPartitions, executor); subStage.addStateChangeListener(new StateChangeListener<StageInfo>() { @Override public void stateChanged(StageInfo stageInfo) { doUpdateState(); } }); subStages.put(subStageFragmentId, subStage); } this.subStages = subStages.build(); ``` ******** `SqlStageExecution.start()` のエントリ。 ```java // presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L508 public Future<?> start() { try (SetThreadName setThreadName = new SetThreadName("Stage-%s", stageId)) { return scheduleStartTasks(); } } ``` `SqlStageExecution.scheduleStartTasks()` へ。 ******** `SqlStageExecution.scheduleStartTasks()` の本体。 サブステージの ``SqlStageExecution.scheduleStartTasks()`` を再帰的に実行したあと、 別スレッドで自身の `SqlStageExecution.startTasks()` を実行。 ```java // presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L517 public Future<?> scheduleStartTasks() { try (SetThreadName setThreadName = new SetThreadName("Stage-%s", stageId)) { // start sub-stages (starts bottom-up) for (StageExecutionNode subStage : subStages.values()) { subStage.scheduleStartTasks(); } return executor.submit(new Runnable() { @Override public void run() { startTasks(); } }); } } ``` ******** `SqlStageExecution.startTasks()` の本体。 `SubPlan` の作成時に解析した `PlanDistribution` の種類に従って、タスクに関する `schedule*()` を起動している。 ```java // presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L535 private void startTasks() { ... // schedule tasks if (fragment.getDistribution() == PlanDistribution.NONE) { scheduleFixedNodeCount(1); } else if (fragment.getDistribution() == PlanDistribution.FIXED) { scheduleFixedNodeCount(initialHashPartitions); } else if (fragment.getDistribution() == PlanDistribution.SOURCE) { scheduleSourcePartitionedNodes(); } else if (fragment.getDistribution() == PlanDistribution.COORDINATOR_ONLY) { scheduleOnCurrentNode(); } ``` 先に斜め読みしておくと、以下のように `scheduleTask()` に辿り着く模様。 * `scheduleFixedNodeCount()` → `scheduleTask()` * `scheduleSourcePartitionedNodes()` → `assignSplits()` → `scheduleTask()` * `scheduleOnCurrentNode()` → `scheduleTask()` 一番ナイーブそうな `PlanDistribution.COORDINATOR_ONLY` (コーディネーターで処理) → `SqlStageExecution.scheduleOnCurrentNode()` へ。 ******** コーディネータノード上でタスクを実行する `scheduleOnCurrentNode()` の本体。 `nodeSelector` は `NodeScheduler.NodeSelector` クラス。 ```java // presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L610 private void scheduleOnCurrentNode() { // create task on current node Node node = nodeSelector.selectCurrentNode(); scheduleTask(0, node); // tell sub stages about all nodes and that there will not be more nodes for (StageExecutionNode subStage : subStages.values()) { subStage.parentNodesAdded(ImmutableList.of(node), true); } } ``` `NodeScheduler.NodeSelector.selectCurrentNode()` へ。 ******** `NodeScheduler.NodeSelector.selectCurrentNode()` 本体。 `nodeManager` は `NodeManager` インターフェース。 ```java // presto-main/src/main/java/com/facebook/presto/execution/NodeScheduler.java:L157 public Node selectCurrentNode() { // TODO: this is a hack to force scheduling on the coordinator return nodeManager.getCurrentNode(); } ``` ******** `NodeManager` インターフェースのバインディング。 `InternalNodeManager` → `DiscoveryNodeManager` あたりがバインドされている感じ。 ```java // presto-server/src/main/java/com/facebook/presto/server/CoordinatorModule.java:L121 // node scheduler binder.bind(InternalNodeManager.class).to(DiscoveryNodeManager.class).in(Scopes.SINGLETON); binder.bind(NodeManager.class).to(Key.get(InternalNodeManager.class)).in(Scopes.SINGLETON); bindConfig(binder).to(NodeSchedulerConfig.class); binder.bind(NodeScheduler.class).in(Scopes.SINGLETON); newExporter(binder).export(NodeScheduler.class).withGeneratedName(); ``` 少し深そうなので、今回はあまり立ち入らないことにして、それっぽいノード情報が返ってきているということにする。 ちなみに、 `Node` インターフェースはこんな感じで、エントリに関する情報を保持している。 ```java // presto-spi/src/main/java/com/facebook/presto/spi/Node.java:L18 public interface Node { HostAddress getHostAndPort(); URI getHttpUri(); String getNodeIdentifier(); } ``` ******** というわけで `SqlStageExecution` を続けて読む。 自身を表す `Node` オブジェクトを使って、 `scheduleTask` を起動している。 第一引数の `0` はステージ内のタスクIDの模様。 その後、サブステージに対して `parentNodesAdded()` で通知イベントを飛ばしている感じ。 ```java // presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L610 private void scheduleOnCurrentNode() { // create task on current node Node node = nodeSelector.selectCurrentNode(); scheduleTask(0, node); // tell sub stages about all nodes and that there will not be more nodes for (StageExecutionNode subStage : subStages.values()) { subStage.parentNodesAdded(ImmutableList.of(node), true); } } ``` `scheduleTask()` へ。 ******** 2引数の `scheduleTask()` から、4引数の `scheduleTask()` へ。 ```java // presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L722 private RemoteTask scheduleTask(int id, Node node) { return scheduleTask(id, node, null, ImmutableList.<Split>of()); } ``` ## タスクの発行 `scheduleTask()` の先頭部分。 タスクの入出力をやっつけてそうな `addNewExchangesAndBuffers()` を先頭で起動している。 ```java // presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L727 private RemoteTask scheduleTask(int id, Node node, PlanNodeId sourceId, Iterable<? extends Split> sourceSplits) { // before scheduling a new task update all existing tasks with new exchanges and output buffers addNewExchangesAndBuffers(); ``` ******** `addNewExchangesAndBuffers()` の先頭部分。 ```java // presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L791 private boolean addNewExchangesAndBuffers() { // get new exchanges and update exchange state Set<PlanNodeId> completeSources = updateCompleteSources(); boolean allSourceComplete = completeSources.containsAll(fragment.getSourceIds()); Multimap<PlanNodeId, URI> newExchangeLocations = getNewExchangeLocations(); exchangeLocations.set(ImmutableMultimap.<PlanNodeId, URI>builder() .putAll(exchangeLocations.get()) .putAll(newExchangeLocations) .build()); ``` ざっくりと概要。 * `updateCompleteSources()` - 現在のステージの入力 ( `ExchangeNode` ) のうち、それを生成するステージの実行準備が整ったものの一覧を抽出 * `getNewExchangeLocations()` - 現時点で把握していないサブステージのタスク出力一覧を抽出 * `exchangeLocations` - `getNewExchangeLocations()` を追加している ******** `updateCompleteSources()` 本体。 0. 未解決の `ExchangeNode` を探す 0. そのデータを生成するステージ一覧を取りだす 0. それらのステージがいずれも準備が整っていたら、 `ExchangeNode` にマークをつける ```java // presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L853 private Set<PlanNodeId> updateCompleteSources() { for (PlanNode planNode : fragment.getSources()) { if (!completeSources.contains(planNode.getId()) && planNode instanceof ExchangeNode) { ExchangeNode exchangeNode = (ExchangeNode) planNode; boolean exchangeFinished = true; for (PlanFragmentId planFragmentId : exchangeNode.getSourceFragmentIds()) { StageExecutionNode subStage = subStages.get(planFragmentId); switch (subStage.getState()) { case PLANNED: case SCHEDULING: exchangeFinished = false; break; } } if (exchangeFinished) { completeSources.add(planNode.getId()); } } } return completeSources; } ``` ******** `getNewExchangeLocations()` 本体。 0. すべての `ExchangeNode` を探す 0. それぞれのデータを生成するステージ一覧を取りだす 0. それぞれのタスク一覧を取り出す 0. 未見のタスクがあれば、それらを返す ```java // presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L465 private Multimap<PlanNodeId, URI> getNewExchangeLocations() { Multimap<PlanNodeId, URI> exchangeLocations = this.exchangeLocations.get(); ImmutableMultimap.Builder<PlanNodeId, URI> newExchangeLocations = ImmutableMultimap.builder(); for (PlanNode planNode : fragment.getSources()) { if (planNode instanceof ExchangeNode) { ExchangeNode exchangeNode = (ExchangeNode) planNode; for (PlanFragmentId planFragmentId : exchangeNode.getSourceFragmentIds()) { StageExecutionNode subStage = subStages.get(planFragmentId); checkState(subStage != null, "Unknown sub stage %s, known stages %s", planFragmentId, subStages.keySet()); // add new task locations for (URI taskLocation : subStage.getTaskLocations()) { if (!exchangeLocations.containsEntry(exchangeNode.getId(), taskLocation)) { newExchangeLocations.putAll(exchangeNode.getId(), taskLocation); } } } } } return newExchangeLocations.build(); } ``` ******** `addNewExchangesAndBuffers()` の残り部分は、タスクを発行していない今はあまり関係なさそうなので読み飛ばす。 ```java // presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L791 private boolean addNewExchangesAndBuffers() { ... // get new output buffer and update output buffer state OutputBuffers outputBuffers = updateToNextOutputBuffers(); // finished state must be decided before update to avoid race conditions boolean finished = allSourceComplete && outputBuffers.isNoMoreBufferIds(); // update tasks for (RemoteTask task : tasks.values()) { ... } return finished; } ``` ******** `scheduleTask()` の続きの部分。 `addNewExchangesAndBuffers()` の結果も利用して、 `initialSplits` を構築している。 ```java // presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L727 private RemoteTask scheduleTask(int id, Node node, PlanNodeId sourceId, Iterable<? extends Split> sourceSplits) { // before scheduling a new task update all existing tasks with new exchanges and output buffers addNewExchangesAndBuffers(); TaskId taskId = new TaskId(stageId, String.valueOf(id)); ImmutableMultimap.Builder<PlanNodeId, Split> initialSplits = ImmutableMultimap.builder(); for (Split sourceSplit : sourceSplits) { initialSplits.put(sourceId, sourceSplit); } for (Entry<PlanNodeId, URI> entry : exchangeLocations.get().entries()) { initialSplits.put(entry.getKey(), createRemoteSplitFor(node.getNodeIdentifier(), entry.getValue())); } ``` ******** 軽く `createRemoteSplitFor()` を読んでみると、タスクのURIから出力データの位置を気合で計算しているように見える。 ```java // presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L960 private RemoteSplit createRemoteSplitFor(String nodeId, URI taskLocation) { URI splitLocation = uriBuilderFrom(taskLocation).appendPath("results").appendPath(nodeId).build(); return new RemoteSplit(splitLocation, tupleInfos); } ``` ******** さらに `scheduleTask()` の続きの部分。 `RemoteTaskFactory.createRemoteTask()` を利用してリモートタスクを作成し、`RemoteTask.start()` で実行を開始している感じ。 ```java // presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L727 private RemoteTask scheduleTask(int id, Node node, PlanNodeId sourceId, Iterable<? extends Split> sourceSplits) { ... RemoteTask task = remoteTaskFactory.createRemoteTask(session, taskId, node, fragment, initialSplits.build(), outputReceivers, currentOutputBuffers); ... // create and update task task.start(); // record this task tasks.put(node, task); ``` ******** `RemoteTaskFactory` には、 `HttpRemoteTaskFactory` がバインディングされている。 ついでに、その付近で `LocationFactory` に対して `HttpLocationFactory` もバインディングされている。 ```java // presto-server/src/main/java/com/facebook/presto/server/ServerMainModule.java:L149 // execution binder.bind(LocationFactory.class).to(HttpLocationFactory.class).in(Scopes.SINGLETON); binder.bind(RemoteTaskFactory.class).to(HttpRemoteTaskFactory.class).in(Scopes.SINGLETON); httpClientBinder(binder).bindAsyncHttpClient("scheduler", ForScheduler.class).withTracing(); ``` ******** `HttpRemoteTaskFactory.createRemoteTask()` の本体。 `HttpRemoteTask` のコンストラクタを呼び出している。 このとき、 `HttpLocationFactory.createTaskLocation()` を使って、対象タスクのURIを算出している。 ```java // presto-server/src/main/java/com/facebook/presto/server/HttpRemoteTaskFactory.java:L81 public RemoteTask createRemoteTask(Session session, TaskId taskId, com.facebook.presto.spi.Node node, PlanFragment fragment, Multimap<PlanNodeId, Split> initialSplits, Map<PlanNodeId, OutputReceiver> outputReceivers, OutputBuffers outputBuffers) { return new HttpRemoteTask(session, taskId, node.getNodeIdentifier(), locationFactory.createTaskLocation(node, taskId), fragment, initialSplits, outputReceivers, outputBuffers, httpClient, executor, maxConsecutiveErrorCount, minErrorDuration, taskInfoCodec, taskUpdateRequestCodec ); } ``` ******** `HttpLocationFactory.createTaskLocation()` の本体。 `"/v1/task"` というパスが接頭辞としてついている。 ```java // presto-server/src/main/java/com/facebook/presto/server/HttpLocationFactory.java:L76 public URI createTaskLocation(Node node, TaskId taskId) { Preconditions.checkNotNull(node, "node is null"); Preconditions.checkNotNull(taskId, "taskId is null"); return uriBuilderFrom(node.getHttpUri()) .appendPath("/v1/task") .appendPath(taskId.toString()) .build(); } ``` なお、対応するリソースは `TaskResource` の模様。 ```java // presto-server/src/main/java/com/facebook/presto/server/TaskResource.java:L60 @Path("/v1/task") public class TaskResource ``` ******** `HttpRemoteTask` のコンストラクタ。 処理対象のスプリット情報 ( `initialSplits` ) から `pendingSplits` を作っていたり、 タスク情報 ( `taskInfo` ) に `TaskState.PLANNED` という状態のステートマシンを登録していたりする。 ```java // presto-server/src/main/java/com/facebook/presto/server/HttpRemoteTask.java:L144 public HttpRemoteTask(Session session, ... { ... for (Entry<PlanNodeId, Split> entry : checkNotNull(initialSplits, "initialSplits is null").entries()) { ScheduledSplit scheduledSplit = new ScheduledSplit(nextSplitId.getAndIncrement(), entry.getValue()); pendingSplits.put(entry.getKey(), scheduledSplit); } ... TaskStats taskStats = new TaskContext(taskId, executor, session).getTaskStats(); taskInfo = new StateMachine<>("task " + taskId, executor, new TaskInfo( taskId, TaskInfo.MIN_VERSION, TaskState.PLANNED, location, DateTime.now(), new SharedBufferInfo(QueueState.OPEN, 0, 0, bufferStates), ImmutableSet.<PlanNodeId>of(), taskStats, ImmutableList.<FailureInfo>of(), ImmutableMap.<PlanNodeId, Set<?>>of())); } } ``` ******** `HttpRemoteTask.start()` の本体。`HttpRemoteTask.scheduleUpdate()` を呼び出している。 `scheduleUpdate()` は様々なところから呼び出されている。 ```java // presto-server/src/main/java/com/facebook/presto/server/HttpRemoteTask.java:L229 public void start() { try (SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-%s", taskId)) { // to start we just need to trigger an update scheduleUpdate(); } } ``` ******** `HttpRemoteTask.scheduleUpdate()` の本体。 リクエストオブジェクトを作成し、 `/v1/task` に投げている。 ```java // presto-server/src/main/java/com/facebook/presto/server/HttpRemoteTask.java:L350 private synchronized void scheduleUpdate() { ... TaskUpdateRequest updateRequest = new TaskUpdateRequest(session, planFragment, sources, outputBuffers.get()); Request request = preparePost() .setUri(uriBuilderFrom(taskInfo.get().getSelf()).build()) .setHeader(HttpHeaders.CONTENT_TYPE, MediaType.JSON_UTF_8.toString()) .setBodyGenerator(jsonBodyGenerator(taskUpdateRequestCodec, updateRequest)) .build(); ... } ``` おそらくこの辺りでタスクが起動するはず。 ********