-
-
Save oscarandreu/5564d37ac0a83e7b9cbef6e4123932c6 to your computer and use it in GitHub Desktop.
An Apache NiFi InvokeScriptedProcessor template (in Jython) for running ExecuteScript Jython scripts faster
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from org.apache.nifi.processor import Processor,Relationship | |
from org.apache.nifi.components import PropertyDescriptor | |
from org.apache.nifi.processor.util import StandardValidators | |
from java.lang import Throwable | |
class ScriptBody(): | |
def __init__(self): | |
pass | |
def executeScript(self, session, context, log, REL_SUCCESS, REL_FAILURE): | |
flowFile = session.get() | |
if (not flowFile): | |
return | |
# transfer | |
session.transfer(flowFile, REL_SUCCESS) | |
#end class | |
class JythonProcessor(Processor): | |
REL_SUCCESS = (Relationship.Builder() | |
.name("success") | |
.description('FlowFiles that were successfully processed are routed here') | |
.build()) | |
REL_FAILURE = (Relationship.Builder() | |
.name("failure") | |
.description('FlowFiles that were not successfully processed are routed here') | |
.build()) | |
importFilesPath = (PropertyDescriptor.Builder() | |
.name('importFilesPath') | |
.description("Defines the importFilesPath.") | |
.required(True) | |
.defaultValue('/mnt/foo/bar') | |
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) | |
.build()) | |
importFile = (PropertyDescriptor.Builder() | |
.name('importFile') | |
.description("filename to be imported") | |
.expressionLanguageSupported(True) | |
.required(True) | |
.defaultValue('import.csv') | |
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) | |
.build()) | |
log = None | |
eb = ScriptBody() | |
def initialize(self, context): | |
self.log = context.logger | |
def getRelationships(self): | |
return set([self.REL_SUCCESS, self.REL_FAILURE]) | |
def validate(self,context): | |
pass | |
def onPropertyModified(self,descriptor, oldValue, newValue): | |
pass | |
def getPropertyDescriptors(self): | |
return [ | |
self.importFile, | |
self.importFilesPath | |
] | |
def getIdentifier(self): | |
return None | |
def onTrigger(self,context, sessionFactory): | |
session = sessionFactory.createSession() | |
try: | |
self.eb.executeScript(session, context, self.log, self.REL_SUCCESS, self.REL_FAILURE) | |
session.commit() | |
except Throwable, t: | |
self.log.error('{} failed to process due to {}; rolling back session', [self, t]) | |
session.rollback(true) | |
raise t | |
#end class | |
processor = JythonProcessor() |
Author
oscarandreu
commented
Jan 22, 2020
via email
Currently on holidays for 2 weeks, after that I'll take a look.
Maybe is a problem related to the NiFi version.
El mié., 22 ene. 2020 a las 17:30, Juanma-GP (<[email protected]>)
escribió:
… Hi, thanks for share it.
I'm finding a few errors on NiFi's processor, once I've used your script.
First of all, I find "true" variable, line 68, is missing. Could it be
"True"?
Secondly, I'm finding a continuous error in line 69:
`
nifi_1 | 2020-01-22 16:25:56,457 ERROR [Timer-Driven Process Thread-10]
o.a.n.p.script.InvokeScriptedProcessor InvokeScriptedProcessor[id=c8..dd]
***@***.*** failed to
process due to java.lang.NullPointerException; rolling back session
...
nifi_1 | 2020-01-22 16:25:56,464 ERROR [Timer-Driven Process Thread-10]
o.a.n.p.script.InvokeScriptedProcessor
InvokeScriptedProcessor[id=c80d3300-016f-1000-b353-4b50ed36c6dd]
InvokeScriptedProcessor[id=c80d3300-016f-1000-b353-4b50ed36c6dd] failed to
process session due to java.lang.reflect.UndeclaredThrowableException;
Processor Administratively Yielded for 1 sec:
java.lang.reflect.UndeclaredThrowableException
nifi_1 | java.lang.reflect.UndeclaredThrowableException: null
nifi_1 | at com.sun.proxy.$Proxy110.onTrigger(Unknown Source)
nifi_1 | at
org.apache.nifi.processors.script.InvokeScriptedProcessor.onTrigger(InvokeScriptedProcessor.java:547)
nifi_1 | at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
nifi_1 | at
org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
nifi_1 | at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
nifi_1 | at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
nifi_1 | at
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
nifi_1 | at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
nifi_1 | at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
nifi_1 | at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
nifi_1 | at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
nifi_1 | at java.lang.Thread.run(Thread.java:748)
nifi_1 | Caused by: javax.script.ScriptException:
java.lang.NullPointerException: java.lang.NullPointerException in <script>
at line number 224 (69 in your script, "raise"'s line)
nifi_1 | at
org.python.jsr223.PyScriptEngine.scriptException(PyScriptEngine.java:222)
nifi_1 | at
org.python.jsr223.PyScriptEngine.access$200(PyScriptEngine.java:20)
nifi_1 | at
org.python.jsr223.PyScriptEngine$1.invoke(PyScriptEngine.java:187)
nifi_1 | ... 12 common frames omitted
nifi_1 | Caused by: org.python.core.PyException: null
nifi_1 | at org.python.core.PyException.doRaise(PyException.java:233)
nifi_1 | at org.python.core.Py.makeException(Py.java:1565)
nifi_1 | at org.python.core.Py.makeException(Py.java:1569)
nifi_1 | at org.python.core.Py.makeException(Py.java:1573)
nifi_1 | at org.python.pycode.
*pyx116.onTrigger$16(<script>:224) nifi_1 | at
org.python.pycode.pyx116.call_function(<script>) nifi_1 | at
org.python.core.PyTableCode.call(PyTableCode.java:171) nifi_1 | at
org.python.core.PyBaseCode.call(PyBaseCode.java:308) nifi_1 | at
org.python.core.PyBaseCode.call(PyBaseCode.java:199) nifi_1 | at
org.python.core.PyFunction.call(PyFunction.java:482) nifi_1 | at
org.python.core.PyMethod.instancemethod___call*(PyMethod.java:237)
nifi_1 | at org.python.core.PyMethod.*call*(PyMethod.java:228)
nifi_1 | at org.python.core.PyMethod.*call*(PyMethod.java:218)
nifi_1 | at org.python.core.PyMethod.*call*(PyMethod.java:213)
nifi_1 | at
org.python.jsr223.PyScriptEngine$1.invoke(PyScriptEngine.java:181)
nifi_1 | ... 12 common frames omitted
`
Have you ever find a solution for "UndeclaredThrouableException"?
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<https://gist.github.com/5564d37ac0a83e7b9cbef6e4123932c6?email_source=notifications&email_token=AAL37KOLZWS76LDG5GOA6X3Q7BYCTA5CNFSM4KKIXGEKYY3PNVWWK3TUL52HS4DFVNDWS43UINXW23LFNZ2KUY3PNVWWK3TUL5UWJTQAGAG6O#gistcomment-3149287>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAL37KIZFF5S4HRLWT7BEXDQ7BYCTANCNFSM4KKIXGEA>
.
--
Un saludo.
Óscar Andreu.
Hey I have the same issue, were you able to resolve it?
Hey I have the same issue, were you able to resolve it?
Hey @karanmankar95!
I don't know if you still need have this problem, but I'm replying anyways for posterity and the sake of everyone else that may have the error.
The error on line 68 is probably the lowercase "true", it should be True. The former is not valid Python syntax.
The error on line 69 is probably due to line 66 (again, not valid Python syntax). It should be except Throwable as t:
In the end, lines 61 to 69 (i.e., the onTrigger function) should be something like this:
def onTrigger(self,context, sessionFactory):
session = sessionFactory.createSession()
try:
self.eb.executeScript(session, context, self.log, self.REL_SUCCESS, self.REL_FAILURE)
session.commit()
except Throwable as t: # CHANGE 1: Use Python syntax for capturing exception objects
self.log.error('{} failed to process due to {}; rolling back session', [self, t])
session.rollback(True) # CHANGE 2: Should be uppercased
raise t
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment