Skip to content

Instantly share code, notes, and snippets.

@richpsharp
Created February 13, 2021 21:11
Show Gist options
  • Save richpsharp/b847b0f5881bcac20267f4ac48e2bb3a to your computer and use it in GitHub Desktop.
Save richpsharp/b847b0f5881bcac20267f4ac48e2bb3a to your computer and use it in GitHub Desktop.
Demo of how to extract streams and subwatersheds
"""Tracer for floodplain extraction function with custom parameters."""
import logging
import os
import sys
import pygeoprocessing
import pygeoprocessing.routing
import numpy
logging.basicConfig(
level=logging.DEBUG,
format=(
'%(asctime)s (%(relativeCreated)d) %(processName)s %(levelname)s '
'%(name)s [%(funcName)s:%(lineno)d] %(message)s'),
stream=sys.stdout)
LOGGER = logging.getLogger(__name__)
logging.getLogger('taskgraph').setLevel(logging.INFO)
WORKSPACE_DIR = 'workspace'
def _scrub_invalid_values(base_array, nodata, new_nodata):
result = numpy.copy(base_array)
invalid_mask = (
~numpy.isfinite(base_array) |
numpy.isclose(result, nodata))
result[invalid_mask] = new_nodata
return result
def main():
"""Entry point."""
os.makedirs(WORKSPACE_DIR, exist_ok=True)
min_flow_accum_threshold = 1000
dem_path = "sample_data/MERIT DEM Pro Agua Purus Acre clip2.tif"
target_stream_vector_path = os.path.join(
WORKSPACE_DIR, f'stream_segments_fa{min_flow_accum_threshold}.gpkg')
target_watershed_boundary_vector_path = os.path.join(
WORKSPACE_DIR, f'subwatersheds_fa{min_flow_accum_threshold}.gpkg')
dem_info = pygeoprocessing.get_raster_info(dem_path)
dem_type = dem_info['numpy_type']
nodata = dem_info['nodata'][0]
new_nodata = float(numpy.finfo(dem_type).min)
scrubbed_dem_path = os.path.join(
WORKSPACE_DIR, 'scrubbed_dem.tif')
LOGGER.debug('scrub the dem')
pygeoprocessing.raster_calculator(
[(dem_path, 1), (nodata, 'raw'), (new_nodata, 'raw')],
_scrub_invalid_values, scrubbed_dem_path,
dem_info['datatype'], new_nodata)
LOGGER.info('fill pits')
filled_pits_path = os.path.join(
WORKSPACE_DIR, 'filled_pits_dem.tif')
pygeoprocessing.routing.fill_pits(
(scrubbed_dem_path, 1), filled_pits_path,
max_pixel_fill_count=1000000)
LOGGER.info('flow dir d8')
flow_dir_d8_path = os.path.join(WORKSPACE_DIR, 'flow_dir_d8.tif')
pygeoprocessing.routing.flow_dir_d8(
(filled_pits_path, 1), flow_dir_d8_path)
LOGGER.info('flow accum d8')
flow_accum_d8_path = os.path.join(WORKSPACE_DIR, 'flow_accum_d8.tif')
pygeoprocessing.routing.flow_accumulation_d8(
(flow_dir_d8_path, 1), flow_accum_d8_path)
pygeoprocessing.routing.extract_strahler_streams_d8(
(flow_dir_d8_path, 1), (flow_accum_d8_path, 1),
(filled_pits_path, 1), target_stream_vector_path,
min_flow_accum_threshold=min_flow_accum_threshold,
river_order=7)
pygeoprocessing.routing.calculate_subwatershed_boundary(
(flow_dir_d8_path, 1), target_stream_vector_path,
target_watershed_boundary_vector_path,
outlet_at_confluence=False)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment