Created
March 13, 2017 14:52
-
-
Save redhog/a04c7bd71f2457de1612682421ac303c to your computer and use it in GitHub Desktop.
antihadoop
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import luigi | |
import btarget | |
FIELDSEP="--{FIELDSEP}--" | |
class SubTask(luigi.Task): | |
job_task = luigi.Parameter(description="JobTask") | |
@property | |
def job_task_inst(self): | |
if not hasattr(self, "_job_task_inst"): | |
self._job_task_inst = luigi.task_register.load_task(**json.loads(job_task)) | |
return self._job_task_inst | |
class MapShard(SubTask): | |
infile = luigi.Parameter(description="JobTask") | |
outfile = luigi.Parameter(description="JobTask") | |
def input(self): | |
return btarget.DynamicTarget(self.infile) | |
def output(self): | |
return btarget.DynamicTarget(self.outfile) | |
def run(self): | |
self.job_task_inst.init_mapper() | |
with self.output.open("w") as outf: | |
with self.input.open() as inf: | |
for row in inf: | |
outf.write(FIELDSEP.join(self.job_task_inst.mapper(item))) | |
class Map(btask.ShardedTask, SubTask): | |
def input(self): | |
return self.job_task_inst.input_hadoop() | |
def output(self): | |
return btarget.GCSShardTarget( | |
self.job_task_inst.output().path + "-map") | |
def _input_complete(self): | |
return self.requires().complete() | |
def requires(self): | |
return self.job_task_inst.requires_hadoop() | |
def shard_task(self, infile, outfile): | |
return MapShard(job_task = self.job_task, infile=infile, outfile=outfile) | |
class JobTask(luigi.Task): | |
map_keys = 1 | |
partition_keys = 1 | |
def requires_hadoop(self): | |
return self.requires() | |
def input_hadoop(self): | |
return luigi.task.getpaths(self.requires_hadoop()) | |
def init_mapper(self): | |
pass | |
def init_reducer(self): | |
pass | |
def mapper(self, item): | |
yield None, item | |
def reducer(self, partition, lines): | |
yield partition, None | |
@property | |
def task_spec(self): | |
return json.dumps({ | |
"module": self.task_module, | |
"task_name": self.task_family, | |
"params_str": self.to_str_params()}) | |
def run(self): | |
yield Map(job_task = self.task_spec) | |
pass | |
def |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment