Last active
April 6, 2020 15:20
-
-
Save iboates/dc0eb07be5382191117fb43e0981246c to your computer and use it in GitHub Desktop.
Remove pseudonodes from geodataframe full of linestrings
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from shapely.ops import linemerge | |
import geopandas as gpd | |
import networkx as nx | |
def remove_pseudonodes(gdf, geom_col="geometry"): | |
def _gdf_to_graph(gdf, geom_col) -> nx.Graph: | |
# Find all unique start & end points and assign them an id | |
gdf["start_node_coords"] = gdf[geom_col].apply(lambda x: x.coords[0]) | |
gdf["end_node_coords"] = gdf[geom_col].apply(lambda x: x.coords[-1]) | |
node_ids = {} | |
i = 0 | |
for index, row in gdf.iterrows(): | |
node_1 = row["start_node_coords"] | |
node_2 = row["end_node_coords"] | |
if node_1 not in node_ids: | |
node_ids[node_1] = i | |
i += 1 | |
if node_2 not in node_ids: | |
node_ids[node_2] = i | |
i += 1 | |
# Assign the unique id to each | |
gdf["source"] = gdf["start_node_coords"].apply(lambda x: node_ids[x]) | |
gdf["target"] = gdf["end_node_coords"].apply(lambda x: node_ids[x]) | |
# Make the graph | |
graph = nx.from_pandas_edgelist(gdf, edge_attr=[geom_col]) | |
return graph | |
def _path_to_edges(path, cycle=False): | |
# Turn the traversed node path into edge tuples | |
edges = [] | |
for i in range(len(path) - 1): | |
edges.append((path[i], path[i + 1])) | |
if cycle: | |
edges.append((path[-1], path[0])) | |
return edges | |
# Make graph and find the tips and the forks | |
graph = _gdf_to_graph(gdf, geom_col) | |
tips_and_forks = [n for n in graph if nx.degree(graph, n) == 1 or nx.degree(graph, n) > 2] | |
# Iterate through all pairs of tips & forks and find the shortest path through the network | |
for start in tips_and_forks: | |
for end in tips_and_forks[tips_and_forks.index(start) + 1:]: | |
node_path = nx.shortest_path(graph, start, end) | |
# If there is even a single fork in between, this path is not a chain | |
degree_path = [False if nx.degree(graph, n) > 2 else True for n in node_path][1:-1] | |
chain = (set(degree_path) == {True}) | |
# If it is a chain, merge the linestrings into a single linestring and replace he originals with it | |
if chain: | |
edge_path = _path_to_edges(node_path) | |
linestrings = [d[geom_col] for u, v, d in graph.edges(data=True) if (u, v) in edge_path] | |
new_linestring = linemerge(linestrings) | |
graph.remove_edges_from(edge_path) | |
graph.add_edge(start, end, geometry=new_linestring) | |
# Convert back to geodataframe and return | |
clean_gdf = nx.to_pandas_edgelist(graph) | |
clean_gdf = gpd.GeoDataFrame(clean_gdf) | |
clean_gdf.set_geometry(geom_col) | |
return clean_gdf | |
if __name__ == "__main__": | |
gdf = gpd.GeoDataFrame.from_file("<Path fo file containing linestrings with pseudonodes>") | |
clean_gdf = remove_pseudonodes(gdf) | |
clean_gdf.crs = {"init": "epsg:<Desired EPSG code>"} | |
clean_gdf.to_file("<Path to file to contain linestrings with removed pseudonodes>", driver="GPKG") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment