|
use pyo3::prelude::*; |
|
use pyo3::types::{PyDict, PyList, PyString}; |
|
use pyo3::exceptions::PyValueError; |
|
use hcl_edit::ast::*; |
|
use hcl_edit::parser; |
|
use std::collections::BTreeMap; |
|
|
|
// ─── HclTraversal (for full HCL docs) ───────────────────────────────────── |
|
#[pyclass] |
|
pub struct HclTraversal { path: String } |
|
#[pymethods] impl HclTraversal { |
|
#[new] fn new(path: &str) -> Self { Self { path: path.to_string() } } |
|
} |
|
|
|
// ─── HclDoc: Full HCL2 parser/generator ────────────────────────────────── |
|
#[pyclass] |
|
pub struct HclDoc { inner: Body } |
|
|
|
#[pymethods] |
|
impl HclDoc { |
|
#[staticmethod] |
|
fn parse(hcl_str: &str) -> PyResult<Self> { |
|
let body = parser::parse(hcl_str) |
|
.map_err(|e| PyValueError::new_err(format!("Parse error: {}", e)))?; |
|
Ok(Self { inner: body }) |
|
} |
|
|
|
#[staticmethod] |
|
fn new() -> Self { Self { inner: Body::default() } } |
|
|
|
fn dumps(&self) -> String { self.inner.to_string() } |
|
|
|
fn add_comment_to_attribute(&mut self, key: &str, text: &str, position: &str) -> PyResult<()> { |
|
let comment = Comment { text: text.to_string() }; |
|
if let Some(attr) = self.inner.attributes.iter_mut().find(|a| a.key.as_str() == key) { |
|
match position { |
|
"leading" | "before" => attr.leading_comments.push(comment), |
|
"trailing" | "after" => attr.trailing_comments.push(comment), |
|
_ => return Err(PyValueError::new_err("position must be 'leading' or 'trailing'")), |
|
} |
|
Ok(()) |
|
} else { |
|
Err(PyValueError::new_err(format!("Attribute '{}' not found", key))) |
|
} |
|
} |
|
|
|
fn add_comment_to_block(&mut self, block_type: &str, labels: Vec<&str>, text: &str, position: &str) -> PyResult<()> { |
|
let comment = Comment { text: text.to_string() }; |
|
if let Some(block) = self.inner.blocks.iter_mut().find(|b| { |
|
b.identifier.as_str() == block_type && b.labels.iter().map(|l| l.as_str()).collect::<Vec<_>>() == labels |
|
}) { |
|
match position { |
|
"leading" | "before" => block.leading_comments.push(comment), |
|
"trailing" | "after" => block.trailing_comments.push(comment), |
|
_ => return Err(PyValueError::new_err("position must be 'leading' or 'trailing'")), |
|
} |
|
Ok(()) |
|
} else { |
|
Err(PyValueError::new_err("Block not found")) |
|
} |
|
} |
|
|
|
fn get_attribute(&self, key: &str, py: Python<'_>) -> PyResult<Option<PyObject>> { |
|
if let Some(attr) = self.inner.attributes.iter().find(|a| a.key.as_str() == key) { |
|
Ok(Some(expr_to_py(&attr.value, py)?)) |
|
} else { Ok(None) } |
|
} |
|
|
|
fn set_attribute(&mut self, key: &str, value: &PyAny) -> PyResult<()> { |
|
let expr = py_to_expr(value)?; |
|
let attr = Attribute { |
|
key: key.parse().map_err(|_| PyValueError::new_err("Invalid key"))?, |
|
value: expr, leading_comments: vec![], trailing_comments: vec![], |
|
}; |
|
if let Some(existing) = self.inner.attributes.iter_mut().find(|a| a.key.as_str() == key) { |
|
*existing = attr; |
|
} else { |
|
self.inner.attributes.push(attr); |
|
} |
|
Ok(()) |
|
} |
|
|
|
fn add_block(&mut self, block_type: &str, labels: Vec<String>, body_dict: &PyDict) -> PyResult<()> { |
|
let mut block_body = Body::default(); |
|
for (k, v) in body_dict { |
|
let key = k.extract::<String>()?; |
|
let val = py_to_expr(v)?; |
|
block_body.attributes.push(Attribute { |
|
key: key.parse().map_err(|_| PyValueError::new_err("Invalid key"))?, |
|
value: val, leading_comments: vec![], trailing_comments: vec![], |
|
}); |
|
} |
|
let block = Block { |
|
identifier: block_type.parse().map_err(|_| PyValueError::new_err("Invalid block type"))?, |
|
labels: labels.into_iter().map(|l| l.parse().unwrap()).collect(), |
|
body: block_body, leading_comments: vec![], trailing_comments: vec![], |
|
}; |
|
self.inner.blocks.push(block); |
|
Ok(()) |
|
} |
|
} |
|
|
|
// ─── HclVars: Strict tfvars parser/generator ──────────────────────────── |
|
#[pyclass] |
|
pub struct HclVars { inner: Body } |
|
|
|
#[pymethods] |
|
impl HclVars { |
|
#[staticmethod] |
|
fn parse(tfvars_str: &str) -> PyResult<Self> { |
|
let body = parser::parse(tfvars_str) |
|
.map_err(|e| PyValueError::new_err(format!("tfvars parse error: {}", e)))?; |
|
validate_tfvars_body(&body)?; |
|
Ok(Self { inner: body }) |
|
} |
|
|
|
#[staticmethod] |
|
fn new() -> Self { Self { inner: Body::default() } } |
|
|
|
fn dumps(&self) -> PyResult<String> { |
|
validate_tfvars_body(&self.inner)?; |
|
Ok(self.inner.to_string()) |
|
} |
|
|
|
fn set(&mut self, key: &str, value: &PyAny) -> PyResult<()> { |
|
let expr = py_to_tfvars_expr(value)?; |
|
let attr = Attribute { |
|
key: key.parse().map_err(|_| PyValueError::new_err("Invalid tfvars key"))?, |
|
value: expr, leading_comments: vec![], trailing_comments: vec![], |
|
}; |
|
if let Some(existing) = self.inner.attributes.iter_mut().find(|a| a.key.as_str() == key) { |
|
*existing = attr; |
|
} else { |
|
self.inner.attributes.push(attr); |
|
} |
|
Ok(()) |
|
} |
|
|
|
fn get(&self, key: &str, py: Python<'_>) -> PyResult<Option<PyObject>> { |
|
if let Some(attr) = self.inner.attributes.iter().find(|a| a.key.as_str() == key) { |
|
Ok(Some(expr_to_py(&attr.value, py)?)) |
|
} else { Ok(None) } |
|
} |
|
|
|
fn add_comment(&mut self, key: &str, text: &str, position: &str) -> PyResult<()> { |
|
let comment = Comment { text: text.to_string() }; |
|
if let Some(attr) = self.inner.attributes.iter_mut().find(|a| a.key.as_str() == key) { |
|
match position { |
|
"leading" | "before" => attr.leading_comments.push(comment), |
|
"trailing" | "after" => attr.trailing_comments.push(comment), |
|
_ => return Err(PyValueError::new_err("position must be 'leading' or 'trailing'")), |
|
} |
|
Ok(()) |
|
} else { |
|
Err(PyValueError::new_err(format!("Key '{}' not found", key))) |
|
} |
|
} |
|
} |
|
|
|
// ─── Expression Conversion ────────────────────────────────────────────── |
|
fn py_to_expr(obj: &PyAny) -> PyResult<Expression> { |
|
if obj.is_instance_of::<HclTraversal>() { |
|
let trav = obj.extract::<HclTraversal>()?; |
|
let parts: Vec<&str> = trav.path.split('.').collect(); |
|
if parts.is_empty() { return Err(PyValueError::new_err("Traversal path cannot be empty")); } |
|
let root: Identifier = parts[0].parse() |
|
.map_err(|e| PyValueError::new_err(format!("Invalid root '{}': {}", parts[0], e)))?; |
|
let operators: Result<Vec<_>, _> = parts[1..].iter().map(|&p| { |
|
let ident: Identifier = p.parse() |
|
.map_err(|e| PyValueError::new_err(format!("Invalid identifier '{}': {}", p, e)))?; |
|
Ok(TraversalOperator::AttrAccess(ident)) |
|
}).collect(); |
|
return Ok(Expression::Traversal(Traversal { root, operators: operators? })); |
|
} |
|
|
|
if obj.is_instance_of::<PyString>() { |
|
Ok(Expression::StringLiteral(StringLiteral::new(obj.extract()?))) |
|
} else if obj.is_instance_of::<pyo3::types::PyBool>() { |
|
Ok(Expression::BoolLiteral(BoolLiteral::new(obj.extract()?))) |
|
} else if obj.is_instance_of::<pyo3::types::PyInt>() { |
|
Ok(Expression::NumberLiteral(NumberLiteral::new(obj.extract::<i64>()? as f64))) |
|
} else if obj.is_instance_of::<PyList>() { |
|
let list = obj.downcast::<PyList>()?; |
|
let items = list.iter().map(py_to_expr).collect::<PyResult<_>>()?; |
|
Ok(Expression::ArrayLiteral(ArrayLiteral::new(items))) |
|
} else if obj.is_instance_of::<PyDict>() { |
|
let dict = obj.downcast::<PyDict>()?; |
|
let mut map = BTreeMap::new(); |
|
for (k, v) in dict { map.insert(k.extract::<String>()?, py_to_expr(v)?); } |
|
Ok(Expression::ObjectLiteral(ObjectLiteral::new(map))) |
|
} else if obj.is_none() { |
|
Ok(Expression::NullLiteral(NullLiteral::default())) |
|
} else { |
|
Err(PyValueError::new_err("Unsupported type. Use str, int, bool, list, dict, None, or hcl_py.HclTraversal")) |
|
} |
|
} |
|
|
|
fn py_to_tfvars_expr(obj: &PyAny) -> PyResult<Expression> { |
|
if obj.is_instance_of::<HclTraversal>() { |
|
return Err(PyValueError::new_err("tfvars does not support traversals or references. Use literal values only.")); |
|
} |
|
if obj.is_instance_of::<PyString>() { |
|
Ok(Expression::StringLiteral(StringLiteral::new(obj.extract()?))) |
|
} else if obj.is_instance_of::<pyo3::types::PyBool>() { |
|
Ok(Expression::BoolLiteral(BoolLiteral::new(obj.extract()?))) |
|
} else if obj.is_instance_of::<pyo3::types::PyInt>() { |
|
Ok(Expression::NumberLiteral(NumberLiteral::new(obj.extract::<i64>()? as f64))) |
|
} else if obj.is_instance_of::<pyo3::types::PyFloat>() { |
|
Ok(Expression::NumberLiteral(NumberLiteral::new(obj.extract::<f64>()?))) |
|
} else if obj.is_instance_of::<PyList>() { |
|
let list = obj.downcast::<PyList>()?; |
|
let items = list.iter().map(py_to_tfvars_expr).collect::<PyResult<_>>()?; |
|
Ok(Expression::ArrayLiteral(ArrayLiteral::new(items))) |
|
} else if obj.is_instance_of::<PyDict>() { |
|
let dict = obj.downcast::<PyDict>()?; |
|
let mut map = BTreeMap::new(); |
|
for (k, v) in dict { map.insert(k.extract::<String>()?, py_to_tfvars_expr(v)?); } |
|
Ok(Expression::ObjectLiteral(ObjectLiteral::new(map))) |
|
} else if obj.is_none() { |
|
Ok(Expression::NullLiteral(NullLiteral::default())) |
|
} else { |
|
Err(PyValueError::new_err("tfvars only supports literals: str, int, float, bool, list, dict, or None.")) |
|
} |
|
} |
|
|
|
fn expr_to_py(expr: &Expression, py: Python<'_>) -> PyResult<PyObject> { |
|
match expr { |
|
Expression::StringLiteral(s) => Ok(s.value.to_object(py)), |
|
Expression::NumberLiteral(n) => Ok(if n.value.fract() == 0.0 { (n.value as i64).to_object(py) } else { n.value.to_object(py) }), |
|
Expression::BoolLiteral(b) => Ok(b.value.to_object(py)), |
|
Expression::ArrayLiteral(arr) => { |
|
let list = PyList::empty(py); |
|
for item in &arr.elements { list.append(expr_to_py(item, py)?)?; } |
|
Ok(list.to_object(py)) |
|
} |
|
Expression::ObjectLiteral(map) => { |
|
let dict = PyDict::new(py); |
|
for (k, v) in &map.values { dict.set_item(k, expr_to_py(v, py)?)?; } |
|
Ok(dict.to_object(py)) |
|
} |
|
Expression::NullLiteral(_) => Ok(py.None()), |
|
Expression::Traversal(t) => { |
|
let mut path = t.root.as_str().to_string(); |
|
for op in &t.operators { |
|
if let TraversalOperator::AttrAccess(id) = op { path.push('.'); path.push_str(id.as_str()); } |
|
} |
|
Ok(path.to_object(py)) |
|
} |
|
_ => Err(PyValueError::new_err("Complex expression type not yet mapped")), |
|
} |
|
} |
|
|
|
// ─── Validators ────────────────────────────────────────────────────────── |
|
fn validate_tfvars_body(body: &Body) -> PyResult<()> { |
|
if !body.blocks.is_empty() { return Err(PyValueError::new_err("tfvars does not support blocks")); } |
|
for attr in &body.attributes { validate_tfvars_expr(&attr.value)?; } |
|
Ok(()) |
|
} |
|
|
|
fn validate_tfvars_expr(expr: &Expression) -> PyResult<()> { |
|
match expr { |
|
Expression::NullLiteral(_) | Expression::BoolLiteral(_) | |
|
Expression::NumberLiteral(_) | Expression::StringLiteral(_) => Ok(()), |
|
Expression::ArrayLiteral(arr) => { |
|
for item in &arr.elements { validate_tfvars_expr(item)?; } |
|
Ok(()) |
|
} |
|
Expression::ObjectLiteral(map) => { |
|
for val in map.values.values() { validate_tfvars_expr(val)?; } |
|
Ok(()) |
|
} |
|
_ => Err(PyValueError::new_err("tfvars only supports literal values (no traversals, templates, functions, or conditionals)")), |
|
} |
|
} |
|
|
|
// ─── Module Registration ───────────────────────────────────────────────── |
|
#[pymodule] |
|
fn hcl_py(_py: Python, m: &PyModule) -> PyResult<()> { |
|
m.add_class::<HclTraversal>()?; |
|
m.add_class::<HclDoc>()?; |
|
m.add_class::<HclVars>()?; |
|
Ok(()) |
|
} |