Skip to content

Instantly share code, notes, and snippets.

Revisions

  1. swarbhanu created this gist Feb 1, 2012.
    167 changes: 167 additions & 0 deletions test_integration_worker_couch_integration.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,167 @@
    #!/usr/bin/env python

    '''
    @file ion/services/dm/ingestion/test/test_ingestion.py
    @author Swarbhanu Chatterjee
    @test ion.services.dm.ingestion.ingestion_management_service test suite to cover all ingestion mgmt service code
    '''

    import gevent
    from mock import Mock, sentinel, patch
    from pyon.util.unit_test import PyonTestCase
    from pyon.util.int_test import IonIntegrationTestCase
    from ion.services.dm.ingestion.ingestion_management_service import IngestionManagementService, IngestionManagementServiceException
    from nose.plugins.attrib import attr
    from pyon.core.exception import NotFound, BadRequest
    import unittest
    from pyon.public import CFG, IonObject, log, RT, PRED, LCS, StreamPublisher, StreamSubscriber
    from pyon.public import Container
    from pyon.public import Container
    from pyon.util.containers import DotDict
    from interface.objects import ProcessDefinition, StreamQuery, ExchangeQuery
    from interface.services.icontainer_agent import ContainerAgentClient
    from interface.services.dm.iingestion_management_service import IngestionManagementServiceClient
    from interface.services.dm.ipubsub_management_service import PubsubManagementServiceClient
    from interface.services.dm.itransform_management_service import TransformManagementServiceClient
    from interface.services.coi.iresource_registry_service import ResourceRegistryServiceClient

    from interface.objects import BlogPost, BlogComment
    from pyon.datastore.couchdb.couchdb_dm_datastore import CouchDB_DM_DataStore


    @attr('INT', group='dm')
    class IngestionManagementServiceIntTest(IonIntegrationTestCase):

    def setUp(self):
    # set up the container for testing

    #------------------------------------------------------------------------
    # Container
    #----------------------------------------------------------------------
    self._start_container()

    self.cc = ContainerAgentClient(node=self.container.node,name=self.container.name)

    self.cc.start_rel_from_url('res/deploy/r2dm.yml')

    #------------------------------------------------------------------------
    # Service clients
    #----------------------------------------------------------------------
    self.pubsub_cli = PubsubManagementServiceClient(node=self.cc.node)
    self.tms_cli = TransformManagementServiceClient(node=self.cc.node)
    self.ingestion_cli = IngestionManagementServiceClient(node=self.cc.node)
    self.rr_cli = ResourceRegistryServiceClient(node=self.cc.node)

    #------------------------------------------------------------------------
    # Configuration parameters
    #----------------------------------------------------------------------
    self.exchange_point_id = 'science_data'
    self.datastore_name = 'dm_datastore'
    self.number_of_workers = 2
    self.hdf_storage = {'root_path': '', 'filesystem' : ''}
    self.couch_storage = {'server': '', 'couchstorage': '', 'database': self.datastore_name }
    self.default_policy = {}
    self.XP = 'science_data'
    self.exchange_name = 'ingestion_queue'

    #------------------------------------------------------------------------
    # Refresh datastore before testing
    #----------------------------------------------------------------------

    self.db = CouchDB_DM_DataStore()
    self.db.delete_datastore(self.datastore_name)

    #------------------------------------------------------------------------
    # Stream publisher
    #----------------------------------------------------------------------

    self.input_stream_id = self.pubsub_cli.create_stream(name='input_stream',original=True)
    stream_route = self.pubsub_cli.register_producer(exchange_name=self.exchange_name, stream_id=self.input_stream_id)
    self.ctd_stream1_publisher = StreamPublisher(node=self.cc.node, name=('science_data',stream_route.routing_key), \
    process=self.cc)

    def tearDown(self):
    """
    Cleanup. Delete Subscription, Stream, Process Definition
    """
    self.pubsub_cli.delete_stream(self.input_stream_id)
    self._stop_container()


    def test_ingestion_workers_writes_to_couch(self):
    """
    Test that the ingestion workers are writing messages to couch
    """

    #------------------------------------------------------------------------
    # Create ingestion configuration and activate it
    #----------------------------------------------------------------------
    ingestion_configuration_id = self.ingestion_cli.create_ingestion_configuration(self.exchange_point_id, \
    self.couch_storage, self.hdf_storage, self.number_of_workers, self.default_policy)
    self.ingestion_cli.activate_ingestion_configuration(ingestion_configuration_id)

    #------------------------------------------------------------------------
    # Publish messages
    #----------------------------------------------------------------------

    post = BlogPost( post_id = '1234', title = 'The beautiful life',author = {}, updated = 'too early', content ='summer' )

    self.ctd_stream1_publisher.publish(post)

    comment = BlogComment(ref_id = '1234',author = {}, updated = 'too late',content = 'when summer comes')

    self.ctd_stream1_publisher.publish(comment)


    #------------------------------------------------------------------------
    # List the posts and the comments that should have been written to couch
    #----------------------------------------------------------------------

    objs = self.db.list_objects(self.couch_storage['database'])

    # the list of ion_objects... in our case BlogPost and BlogComment
    ion_objs = []

    for obj in objs:

    # read the document returned by list
    result = self.db.read_doc(objs[0], '', 'dm_datastore')

    # convert the persistence dict to an ion_object
    ion_obj = self.db._persistence_dict_to_ion_object(result)

    if isinstance(ion_obj, BlogPost):
    log.debug("ION OBJECT: %s\n" % ion_obj)
    log.debug("POST: %s\n" % post)

    # since the retrieved document has an extra attribute, rev_id, which the orginal post did not have
    # it is easier to compare the attributes than the whole objects
    self.assertTrue(ion_obj.author == post.author), "The post is not to be found in couch storage"
    self.assertTrue(ion_obj.title == post.title), "The post is not to be found in couch storage"
    self.assertTrue(ion_obj.post_id == post.post_id), "The post is not to be found in couch storage"


    elif isinstance(ion_obj, BlogComment):
    log.debug("ION OBJECT: %s\n" % ion_obj)
    log.debug("COMMENT: %s\n" % comment)

    # since the retrieved document has an extra attribute, rev_id, which the orginal post did not have
    # it is easier to compare the attributes than the whole objects
    self.assertTrue(ion_obj.author == comment.author), "The comment is not to be found in couch storage"
    self.assertTrue(ion_obj.content == comment.content), "The comment is not to be found in couch storage"
    self.assertTrue(ion_obj.ref_id == comment.ref_id), "The comment is not to be found in couch storage"

    #------------------------------------------------------------------------
    # Cleanup
    #----------------------------------------------------------------------

    self.ingestion_cli.deactivate_ingestion_configuration(ingestion_configuration_id)
    self.ingestion_cli.delete_ingestion_configuration(ingestion_configuration_id)


    def test_ingestion_worker_receives_message(self):
    """
    Test the activation of the ingestion configuration
    """
    pass