Skip to content

Instantly share code, notes, and snippets.

@rssnyder
Created April 7, 2025 14:05
Show Gist options
  • Save rssnyder/6d8661b875549a3432bd09737497579a to your computer and use it in GitHub Desktop.
Save rssnyder/6d8661b875549a3432bd09737497579a to your computer and use it in GitHub Desktop.

auth

HARNESS_ACCOUNT_ID: harness account id

HARNESS_PLATFORM_API_KEY: harness api key, requires delegate:read and connectors:read/write

settings

NEW_FEATURE: new feature to add to connectors

LOG_LEVEL

from os import getenv
from logging import getLogger, debug, info, error
from requests import post, put
getLogger().setLevel(level=getenv("LOG_LEVEL", "INFO").upper())
PARAMS = {
"accountIdentifier": getenv("HARNESS_ACCOUNT_ID"),
}
HEADERS = {
"x-api-key": getenv("HARNESS_PLATFORM_API_KEY"),
}
HARNESS_ENDPOINT = getenv("HARNESS_ENDPOINT", "https://app.harness.io")
def get_ce_connectors() -> dict:
resp = post(
f"{HARNESS_ENDPOINT}/gateway/ng/api/connectors/listV2",
params=PARAMS,
headers=HEADERS,
json={"filterType":"Connector","types":["CEAws","CEAzure","GcpCloudCost"]}
)
if (
resp.status_code != 200
and resp.json().get("code") != "RESOURCE_NOT_FOUND_EXCEPTION"
):
error(f"error getting connector: {resp.text}")
return {}
if resp.json().get("code") == "RESOURCE_NOT_FOUND_EXCEPTION":
return {}
return resp.json().get("data", {}).get("content")
def update_connector(connector: dict) -> dict:
resp = put(
f"{HARNESS_ENDPOINT}/gateway/ng/api/connectors",
params=PARAMS,
headers=HEADERS,
json=connector,
)
if resp.status_code != 200:
error(f"error updating connector: {resp.text}")
return {}
return resp.json()
if __name__ == "__main__":
new_feature = getenv("NEW_FEATURE")
for connector in get_ce_connectors():
# add new feature
if new_feature not in connector["connector"]["spec"]["featuresEnabled"]:
connector["connector"]["spec"]["featuresEnabled"].append(new_feature)
if update_connector(connector):
info(f"added {new_feature} to {connector['connector']['name']}")
else:
debug(f"{connector['connector']['name']} already in {new_feature}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment