Skip to content

Instantly share code, notes, and snippets.

@bukzor
Last active June 11, 2025 18:55
Show Gist options
  • Save bukzor/9341291cc48963d785b81633abcc377d to your computer and use it in GitHub Desktop.
Save bukzor/9341291cc48963d785b81633abcc377d to your computer and use it in GitHub Desktop.
passing an array of struct to bigquery
import json
from dataclasses import dataclass
from datetime import datetime
from typing import Any
from typing import Iterator
from typing import TypeAlias
from google.cloud import bigquery # type:ignore[missingTypeStubs]
Rows: TypeAlias = Iterator[bigquery.Row]
QUERY = """
SELECT
issue
, TYPEOF(issue) typeof_issue
, issue.nodeid as nodeid
, TYPEOF(issue.nodeid) typeof_nodeid
, issue.closed_at as closed_at
, TYPEOF(issue.closed_at) typeof_closed_at
FROM UNNEST(@flaky_test_issues) as issue
"""
@dataclass(frozen=True)
class FlakyTest:
nodeid: str
closed_at: datetime | None
@classmethod
def bigquery_type(cls):
return bigquery.StructQueryParameterType(
bigquery.ScalarQueryParameterType("STRING", name="nodeid"),
bigquery.ScalarQueryParameterType("TIMESTAMP", name="closed_at"),
name=cls.__name__,
)
def to_api_repr(self) -> dict[Any, Any]:
cls = type(self)
return bigquery.StructQueryParameter(
cls.__name__,
bigquery.ScalarQueryParameter("nodeid", "STRING", self.nodeid),
bigquery.ScalarQueryParameter(
"closed_at", "TIMESTAMP", self.closed_at
),
).to_api_repr()
client = bigquery.Client()
def json_default(obj: object) -> Any:
"""JSON serializer for objects not serializable by default json code."""
if isinstance(obj, bigquery.Row):
return dict(obj) # type:ignore[reportUnknownTypeVariable]
elif isinstance(obj, datetime):
return obj.isoformat()
else:
raise TypeError(
f"Object of type {obj.__class__.__name__} is not JSON serializable"
)
def test(flaky_test_issues: list[FlakyTest]) -> Rows:
return iter(
client.query_and_wait( # type:ignore[unknownMemberType]
QUERY,
job_config=bigquery.QueryJobConfig(
query_parameters=[
bigquery.ArrayQueryParameter(
"flaky_test_issues",
FlakyTest.bigquery_type(),
flaky_test_issues,
)
],
default_dataset="di-prod-mtr.devinfra_metrics",
),
)
)
def show_query_result(result: Rows) -> None:
for row in result:
print(json.dumps(row, default=json_default))
print("\n# with empty array:")
show_query_result(test([]))
print("\n# with nonempty array:")
show_query_result(
test([
FlakyTest("this my test", datetime.now()),
FlakyTest("this isn't closed yet", None),
])
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment