Skip to content

Instantly share code, notes, and snippets.

@mierzejk
Last active January 17, 2025 15:28
Show Gist options
  • Save mierzejk/8faa3669c435106634af31b4c7c274d0 to your computer and use it in GitHub Desktop.
Save mierzejk/8faa3669c435106634af31b4c7c274d0 to your computer and use it in GitHub Desktop.
Recursively stops version controlling of all or the specified top-level NiFi Process Groups.
import requests
import warnings
from json.decoder import JSONDecodeError
from urllib3.exceptions import InsecureRequestWarning
warnings.filterwarnings(r'ignore', r'.*', category=InsecureRequestWarning, module=r'urllib3')
nifi_url = 'https://server:8443/nifi-api'
token = None
def request(verb, get_response, args, kwargs):
kwargs['verify']=False
if token and not args[0].endswith(r'access/token'):
headers = kwargs.get('headers', {})
headers[r'Authorization'] = 'Bearer ' + token
kwargs['headers'] = headers
with verb(nifi_url + args[0], *args[1:], **kwargs) as response:
return get_response(response)
def post(*args, **kwargs):
return request(requests.post, lambda response: response.text, args, kwargs)
def get(*args, **kwargs):
def get_json(response):
try:
return response.json()
except JSONDecodeError:
return response.status_code, response.text
return request(requests.get, get_json, args, kwargs)
def delete(*args, **kwargs):
return request(requests.delete, lambda response: response.text, args, kwargs)
def dfs(group, stop_pg = None):
groups = get(f'/process-groups/{group["id"]}/process-groups')['processGroups']
for child in filter(lambda g: stop_pg is None or g['component']['name'] in stop_pg, groups):
dfs(child)
component = group['component']
if r'versionControlInformation' in component:
group_revision = get(f'/versions/process-groups/{group["id"]}')['processGroupRevision']
control_inf = component['versionControlInformation']
print(f'{component["name"]} {control_inf["registryName"]}')
print(delete(f'/versions/process-groups/{control_inf["groupId"]}?version={group_revision["version"]}&disconnectedNodeAcknowledged=true&clientId={group_revision["clientId"]}'))
token=post(r'/access/token', data = {'username': 'username', 'password': 'password'}) # or
# token=r'jwt token'
dfs(get(r'/process-groups/root'), ['ProcessGroup1', 'ProcessGroup2'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment