Skip to content

Instantly share code, notes, and snippets.

@swarbhanu
Created October 8, 2012 19:12

Revisions

  1. swarbhanu created this gist Oct 8, 2012.
    76 changes: 76 additions & 0 deletions ctd_L1_conductivity.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,76 @@

    '''
    @author MManning
    @file ion/processes/data/transforms/ctd/ctd_L1_conductivity.py
    @description Transforms CTD parsed data into L1 product for conductivity
    '''

    from pyon.ion.transforma import TransformDataProcess
    from pyon.core.exception import BadRequest
    from interface.services.dm.ipubsub_management_service import PubsubManagementServiceProcessClient
    from ion.services.dm.utility.granule.record_dictionary import RecordDictionaryTool
    from ion.core.function.transform_function import SimpleGranuleTransformFunction

    # For usage: please refer to the integration tests in
    # ion/processes/data/transforms/ctd/test/test_ctd_transforms.py

    class CTDL1ConductivityTransform(TransformDataProcess):
    ''' A basic transform that receives input through a subscription,
    parses the input from a CTD, extracts the conductivity value and scales it according to
    the defined algorithm. If the transform
    has an output_stream it will publish the output on the output stream.
    '''

    def on_start(self):
    super(CTDL1ConductivityTransform, self).on_start()

    if not self.CFG.process.publish_streams.has_key('conductivity'):
    raise BadRequest("For CTD transforms, please send the stream_id using "
    "a special keyword (ex: conductivity)")
    self.cond_stream = self.CFG.process.publish_streams.conductivity

    # Read the parameter dict from the stream def of the stream
    pubsub = PubsubManagementServiceProcessClient(process=self)
    self.stream_definition = pubsub.read_stream_definition(stream_id=self.cond_stream)

    def recv_packet(self, packet, stream_route, stream_id):
    """Processes incoming data!!!!
    """
    if packet == {}:
    return

    granule = CTDL1ConductivityTransformAlgorithm.execute(packet, params=self.stream_definition._id)
    self.conductivity.publish(msg=granule)

    class CTDL1ConductivityTransformAlgorithm(SimpleGranuleTransformFunction):

    @staticmethod
    @SimpleGranuleTransformFunction.validate_inputs
    def execute(input=None, context=None, config=None, params=None, state=None):

    rdt = RecordDictionaryTool.load_from_granule(input)
    conductivity = rdt['conductivity']

    cond_value = (conductivity / 100000.0) - 0.5

    values = {}
    for field_name,value in rdt.iteritems():
    values[field_name]=value

    # Update the conductivity values
    values['conductivity'] = cond_value

    # build the granule for conductivity
    result = CTDL1ConductivityTransformAlgorithm._build_granule(stream_definition_id = params,
    values=values)
    return result

    @staticmethod
    def _build_granule(stream_definition_id=None, values=None):

    root_rdt = RecordDictionaryTool(stream_definition_id=stream_definition_id)

    for field_name, value in values.iteritems():
    root_rdt[field_name] = value

    return root_rdt.to_granule()