Created
January 20, 2023 05:19
-
-
Save vibhatha/95ef99800974f8f386d21e8f50e5ccb7 to your computer and use it in GitHub Desktop.
Generate Substrait Plans in Google Proto Text Format
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Version 3 | |
--------- | |
Update to naming used for projection | |
Function Performed | |
------------------ | |
Generate Substrait Plans and Corresponding SQL via Ibis | |
Requirements | |
------------ | |
pip install ibis ibis-substrait pyarrow | |
Run Script | |
---------- | |
python generate_substrait_projections_with_names.py <path-to-save-directory> | |
Example: | |
python generate_substrait_projections_with_names.py /home/veloxuser/sandbox/queries | |
Output | |
------ | |
Within the specified folder *.sql files will contain the SQL queries and | |
*.json files will contain the Substrait plans. Corresponding SQL query and | |
JSON plan has the same query id | |
queries/ | |
├── q0.sql | |
├── q0_substrait.json | |
├── q10.sql | |
├── q10_substrait.json | |
├── q1.sql | |
├── q1_substrait.json | |
├── q2.sql | |
├── q2_substrait.json | |
├── q3.sql | |
├── q3_substrait.json | |
├── q4.sql | |
├── q4_substrait.json | |
├── q5.sql | |
├── q5_substrait.json | |
├── q6.sql | |
├── q6_substrait.json | |
├── q7.sql | |
├── q7_substrait.json | |
├── q8.sql | |
├── q8_substrait.json | |
├── q9.sql | |
└── q9_substrait.json | |
""" | |
import os | |
import sys | |
import ibis | |
from ibis_substrait.compiler.core import SubstraitCompiler | |
from google.protobuf import json_format, text_format | |
def separator(char="="): | |
return char * 80 | |
def table(): | |
return ibis.table([("a", "string"), ("b", "float"), ("c", "int32"), ("d", "int64"), ("e", "int64")], "t",) | |
def write_sql(expr, fname_base): | |
f = open(fname_base + ".sql", "w") | |
ibis.show_sql(expr, file=f) | |
f.close() | |
def write_json_plan(expr, fname_base): | |
compiler = SubstraitCompiler() | |
proto = compiler.compile(expr) | |
json_plan = json_format.MessageToJson(proto) | |
with open(fname_base+"_substrait.json", "w") as f: | |
f.write(json_plan) | |
def write_proto_str_plan(expr, fname_base): | |
compiler = SubstraitCompiler() | |
proto = compiler.compile(expr) | |
json_plan = text_format.MessageToString(proto) | |
with open(fname_base+"_substrait.txt", "w") as f: | |
f.write(json_plan) | |
def write_sql_and_json(base_path, exprs): | |
for idx, expr in enumerate(exprs): | |
fname = os.path.join(base_path, "q"+str(idx)) | |
write_sql(expr, fname) | |
write_json_plan(expr, fname) | |
def write_sql_and_proto_str(base_path, exprs): | |
for idx, expr in enumerate(exprs): | |
fname = os.path.join(base_path, "q"+str(idx)) | |
write_sql(expr, fname) | |
write_proto_str_plan(expr, fname) | |
# input table | |
t = table() | |
# Expressions | |
# outputs two columns | |
expr0 = t.projection(["b", "a"]) | |
# add a constant value | |
f1 = lambda x : x + 100 | |
expr1 = t.projection(add_100=f1(t.d)) | |
# take in two columns with a lambda (or inbuilt function) | |
f2 = lambda x, y : x + y | |
expr2 = t.projection(d_plus_e=f2(t.d, t.e)) | |
# multi-expressions with a column name, function with scalar, function with variable inputs | |
expr3 = t.projection("a", d_plus_100=f1(t.d), d_plus_e=f2(t.d, t.e)) | |
# multi-expressions with a column name, function with scalar, function with variable input and scalar input | |
expr4 = t.projection("a", d_plus_100=f1(t.d), d_plus_5=f2(t.d, 5)) | |
# multi-expressions with a column name, function of a function | |
expr5 = t.projection("a", d_plus_100=f1(t.d), d_plus_5_plus_100=f1(f2(t.d, 5))) | |
# comparison operation | |
f3 = lambda x, y : x > y | |
expr6 = t.projection(d_greater_than_e=f3(t.d, t.e)) | |
# with aggregates | |
expr7 = t.group_by(["a"]).aggregate([t.c.sum().name("s1"), t.e.count().name("s2")]).projection(["s1", "s2"]) | |
# nested | |
expr8 = t.group_by(["a"]).aggregate([t.c.sum().name("s1"), t.e.count().name("s2")]).projection(["a", "s1", "s2"]).projection(["a", "s1"]) | |
# series | |
expr9 = t.projection(["d", "c", "e"]).filter(f3(t.d, t.e)).projection(["d", "e"]) | |
# nested series | |
expr10 = t.projection(["d", "c", "e"]).filter(f3(t.d, t.e)).projection(["d", "e"]).projection(d_plus_100=f1(t.d), e_plus_100=f1(t.e), d_greater_than_e=f3(t.d, t.e)) | |
if __name__ == "__main__": | |
args = sys.argv | |
if len(args) != 2: | |
print("help>>>") | |
print("\t python generate_substrait_projections.py <path-to-save-directory>") | |
elif os.path.exists(args[1]): | |
write_sql_and_proto_str(base_path=args[1], exprs=[expr0, expr1, expr2, | |
expr3, expr4, expr5, | |
expr6, expr7, expr8, | |
expr9, expr10]) | |
else: | |
print("Please enter a valid path to save files") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment