|
import unittest |
|
|
|
# probably you will need to change this import |
|
from testcicd.jobs.sample.entrypoint import SampleJob |
|
from uuid import uuid4 |
|
from pyspark.dbutils import DBUtils # noqa |
|
|
|
from typing import Dict |
|
import coverage |
|
from argparse import ArgumentParser |
|
import sys |
|
|
|
|
|
class SampleJobIntegrationTest(unittest.TestCase): |
|
def setUp(self): |
|
|
|
self.test_dir = "dbfs:/tmp/tests/sample/%s" % str(uuid4()) |
|
self.test_config = {"output_format": "delta", |
|
"output_path": self.test_dir} |
|
|
|
self.job = SampleJob(init_conf=self.test_config) |
|
self.dbutils = DBUtils(self.job.spark) |
|
self.spark = self.job.spark |
|
|
|
def test_sample(self): |
|
|
|
self.job.launch() |
|
|
|
output_count = ( |
|
self.spark.read.format(self.test_config["output_format"]) |
|
.load(self.test_config["output_path"]) |
|
.count() |
|
) |
|
|
|
self.assertGreater(output_count, 0) |
|
|
|
def tearDown(self): |
|
self.dbutils.fs.rm(self.test_dir, True) |
|
|
|
|
|
class CoverageIntegrationTest(): |
|
def __init__(self): |
|
self.conf = self._parse_args() |
|
if self.conf['cov']: |
|
self.cov = coverage.Coverage(config_file=self.conf['cov_config']) |
|
|
|
@staticmethod |
|
def _dbfs_path(path): |
|
if path: |
|
return path.replace('dbfs:', '/dbfs') |
|
|
|
@staticmethod |
|
def _parse_args() -> Dict[str, str]: |
|
p = ArgumentParser() |
|
p.add_argument("--cov", required=False, type=str) |
|
p.add_argument("--cov-config", required='--cov' in sys.argv, type=str) |
|
p.add_argument("--xml", required='--cov' in sys.argv, type=str) |
|
|
|
namespace = p.parse_known_args(sys.argv[1:])[0] |
|
return { |
|
"cov": namespace.cov.replace('pkg-', '') if namespace.cov else None, |
|
"cov_config": CoverageIntegrationTest._dbfs_path(namespace.cov_config), |
|
"xml": namespace.xml |
|
} |
|
|
|
def start(self): |
|
if not self.cov: |
|
return |
|
|
|
self.cov.start() |
|
|
|
def stop(self): |
|
if not self.cov: |
|
return |
|
|
|
self.cov.stop() |
|
|
|
if self.conf['xml']: |
|
self.cov.xml_report(outfile=self.conf['xml']) |
|
|
|
|
|
if __name__ == "__main__": |
|
c = CoverageIntegrationTest() |
|
c.start() |
|
|
|
# please don't change the logic of test result checks here |
|
# it's intentionally done in this way to comply with jobs run result checks |
|
# for other tests, please simply replace the SampleJobIntegrationTest with your custom class name |
|
loader = unittest.TestLoader() |
|
tests = loader.loadTestsFromTestCase(SampleJobIntegrationTest) |
|
runner = unittest.TextTestRunner(verbosity=2) |
|
result = runner.run(tests) |
|
|
|
c.stop() |
|
|
|
if not result.wasSuccessful(): |
|
raise RuntimeError( |
|
"One or multiple tests failed. Please check job logs for additional information." |
|
) |