Created
May 24, 2022 03:30
-
-
Save rabernat/15f77fb447e2cdbc73c4031c59768886 to your computer and use it in GitHub Desktop.
WIP Xarray to Zarr pipeline with Beam
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "cells": [ | |
| { | |
| "cell_type": "markdown", | |
| "id": "07aa0beb", | |
| "metadata": {}, | |
| "source": [ | |
| "What is hard about Pangeo Forge.\n", | |
| "\n", | |
| "- Unknown size of inputs\n", | |
| "- Possibly uneven size of inputs\n", | |
| "- Need to write regular Zarr chunks\n", | |
| "- Need to initialize the Zarr dataset" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 1, | |
| "id": "f6478a1a-89ec-4120-a738-2502d671994d", | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "# copied / modified from tests\n", | |
| "\n", | |
| "import xarray as xr\n", | |
| "import numpy as np\n", | |
| "import pandas as pd\n", | |
| "import os\n", | |
| "\n", | |
| "from tempfile import TemporaryDirectory\n", | |
| "\n", | |
| "def daily_xarray_dataset():\n", | |
| " \"\"\"Return a synthetic random xarray dataset.\"\"\"\n", | |
| " np.random.seed(1)\n", | |
| " # TODO: change nt to 11 in order to catch the edge case where\n", | |
| " # items_per_input does not evenly divide the length of the sequence dimension\n", | |
| " nt, ny, nx = 10, 18, 36\n", | |
| " time = pd.date_range(start=\"2010-01-01\", periods=nt, freq=\"D\")\n", | |
| " lon = (np.arange(nx) + 0.5) * 360 / nx\n", | |
| " lon_attrs = {\"units\": \"degrees_east\", \"long_name\": \"longitude\"}\n", | |
| " lat = (np.arange(ny) + 0.5) * 180 / ny\n", | |
| " lat_attrs = {\"units\": \"degrees_north\", \"long_name\": \"latitude\"}\n", | |
| " foo = np.random.rand(nt, ny, nx)\n", | |
| " foo_attrs = {\"long_name\": \"Fantastic Foo\"}\n", | |
| " # make sure things work with heterogenous data types\n", | |
| " bar = np.random.randint(0, 10, size=(nt, ny, nx))\n", | |
| " bar_attrs = {\"long_name\": \"Beautiful Bar\"}\n", | |
| " dims = (\"time\", \"lat\", \"lon\")\n", | |
| " ds = xr.Dataset(\n", | |
| " {\"bar\": (dims, bar, bar_attrs), \"foo\": (dims, foo, foo_attrs)},\n", | |
| " coords={\n", | |
| " \"time\": (\"time\", time),\n", | |
| " \"lat\": (\"lat\", lat, lat_attrs),\n", | |
| " \"lon\": (\"lon\", lon, lon_attrs),\n", | |
| " },\n", | |
| " attrs={\"conventions\": \"CF 1.6\"},\n", | |
| " )\n", | |
| " return ds\n", | |
| "\n", | |
| "\n", | |
| "def split_up_files_by_day(ds, day_param):\n", | |
| " gb = ds.resample(time=f\"{day_param}D\")\n", | |
| " _, datasets = zip(*gb)\n", | |
| " fnames = [f\"{n:03d}.nc\" for n in range(len(datasets))]\n", | |
| " return datasets, fnames\n", | |
| "\n", | |
| "\n", | |
| "def make_netcdf_local_paths(items_per_file=2):\n", | |
| " td = TemporaryDirectory()\n", | |
| " tmp_path = td.name\n", | |
| " \n", | |
| " ds = daily_xarray_dataset()\n", | |
| " datasets, fnames = split_up_files_by_day(ds, items_per_file)\n", | |
| "\n", | |
| " full_paths = [os.path.join(tmp_path, fname) for fname in fnames]\n", | |
| " xr.save_mfdataset(datasets, [str(path) for path in full_paths])\n", | |
| " return td, full_paths\n" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 2, | |
| "id": "e4690e7a-5f19-4b38-a46c-8532ab1030f3", | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": [ | |
| "['/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpywxdehvd/000.nc',\n", | |
| " '/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpywxdehvd/001.nc',\n", | |
| " '/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpywxdehvd/002.nc',\n", | |
| " '/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpywxdehvd/003.nc',\n", | |
| " '/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpywxdehvd/004.nc']" | |
| ] | |
| }, | |
| "execution_count": 2, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "td, paths = make_netcdf_local_paths()\n", | |
| "paths" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 3, | |
| "id": "1c6b92d7-8d0d-4c29-a866-212a736f6322", | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "ds0 = xr.open_dataset(paths[0])\n", | |
| "assert not ds0.foo._in_memory\n", | |
| "ds0_dict = ds0.to_dict(data=False)\n", | |
| "assert not ds0.foo._in_memory" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 4, | |
| "id": "98162cda-7993-4ef0-b1ff-671fa80103e0", | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": [ | |
| "{'unlimited_dims': set(),\n", | |
| " 'source': '/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpywxdehvd/000.nc'}" | |
| ] | |
| }, | |
| "execution_count": 4, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "ds0.encoding" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 5, | |
| "id": "ed81fdf4-da1e-434e-9888-89ac30335ae5", | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": [ | |
| "{'coords': {'time': {'dims': ('time',),\n", | |
| " 'attrs': {},\n", | |
| " 'dtype': 'datetime64[ns]',\n", | |
| " 'shape': (2,)},\n", | |
| " 'lat': {'dims': ('lat',),\n", | |
| " 'attrs': {'units': 'degrees_north', 'long_name': 'latitude'},\n", | |
| " 'dtype': 'float64',\n", | |
| " 'shape': (18,)},\n", | |
| " 'lon': {'dims': ('lon',),\n", | |
| " 'attrs': {'units': 'degrees_east', 'long_name': 'longitude'},\n", | |
| " 'dtype': 'float64',\n", | |
| " 'shape': (36,)}},\n", | |
| " 'attrs': {'conventions': 'CF 1.6'},\n", | |
| " 'dims': {'time': 2, 'lat': 18, 'lon': 36},\n", | |
| " 'data_vars': {'bar': {'dims': ('time', 'lat', 'lon'),\n", | |
| " 'attrs': {'long_name': 'Beautiful Bar'},\n", | |
| " 'dtype': 'int64',\n", | |
| " 'shape': (2, 18, 36)},\n", | |
| " 'foo': {'dims': ('time', 'lat', 'lon'),\n", | |
| " 'attrs': {'long_name': 'Fantastic Foo'},\n", | |
| " 'dtype': 'float64',\n", | |
| " 'shape': (2, 18, 36)}}}" | |
| ] | |
| }, | |
| "execution_count": 5, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "# does not include encoding; otherwise might be enough for template\n", | |
| "# maybe that's an xarray issue to open?\n", | |
| "ds0_dict" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 6, | |
| "id": "73e40c1a-30e8-4925-8726-7cdf77e628df", | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": [ | |
| "<FilePattern {'time': 5}>" | |
| ] | |
| }, | |
| "execution_count": 6, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "from pangeo_forge_recipes.patterns import FilePattern, ConcatDim, Index, CombineOp\n", | |
| "\n", | |
| "base_path = td.name\n", | |
| "def format_function(time):\n", | |
| " return f\"{base_path}/{time:03d}.nc\"\n", | |
| "\n", | |
| "fp = FilePattern(\n", | |
| " format_function,\n", | |
| " ConcatDim(\"time\", list(range(5)))\n", | |
| ")\n", | |
| "fp" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 7, | |
| "id": "25ba5713-d80c-4593-849a-a1f32da52af4", | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": [ | |
| "[(frozenset({DimIndex(name='time', index=0, sequence_len=5, operation=<CombineOp.CONCAT: 2>)}),\n", | |
| " '/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpywxdehvd/000.nc'),\n", | |
| " (frozenset({DimIndex(name='time', index=1, sequence_len=5, operation=<CombineOp.CONCAT: 2>)}),\n", | |
| " '/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpywxdehvd/001.nc'),\n", | |
| " (frozenset({DimIndex(name='time', index=2, sequence_len=5, operation=<CombineOp.CONCAT: 2>)}),\n", | |
| " '/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpywxdehvd/002.nc'),\n", | |
| " (frozenset({DimIndex(name='time', index=3, sequence_len=5, operation=<CombineOp.CONCAT: 2>)}),\n", | |
| " '/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpywxdehvd/003.nc'),\n", | |
| " (frozenset({DimIndex(name='time', index=4, sequence_len=5, operation=<CombineOp.CONCAT: 2>)}),\n", | |
| " '/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpywxdehvd/004.nc')]" | |
| ] | |
| }, | |
| "execution_count": 7, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "list(fp.items())" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 8, | |
| "id": "697d66fb-0b04-4b7a-a503-baa9a96382bf", | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": [ | |
| "frozenset({DimIndex(name='time', index=0, sequence_len=5, operation=<CombineOp.CONCAT: 2>)})" | |
| ] | |
| }, | |
| "execution_count": 8, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "index, item = next(fp.items())\n", | |
| "index" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 9, | |
| "id": "9d53a649-fb38-47f0-920d-52e50dc6d497", | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": [ | |
| "frozenset({DimIndex(name='time', index=0, sequence_len=5, operation=<CombineOp.CONCAT: 2>)})" | |
| ] | |
| }, | |
| "execution_count": 9, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "index" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 10, | |
| "id": "7dd68912", | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "application/javascript": [ | |
| "\n", | |
| " if (typeof window.interactive_beam_jquery == 'undefined') {\n", | |
| " var jqueryScript = document.createElement('script');\n", | |
| " jqueryScript.src = 'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n", | |
| " jqueryScript.type = 'text/javascript';\n", | |
| " jqueryScript.onload = function() {\n", | |
| " var datatableScript = document.createElement('script');\n", | |
| " datatableScript.src = 'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n", | |
| " datatableScript.type = 'text/javascript';\n", | |
| " datatableScript.onload = function() {\n", | |
| " window.interactive_beam_jquery = jQuery.noConflict(true);\n", | |
| " window.interactive_beam_jquery(document).ready(function($){\n", | |
| " \n", | |
| " });\n", | |
| " }\n", | |
| " document.head.appendChild(datatableScript);\n", | |
| " };\n", | |
| " document.head.appendChild(jqueryScript);\n", | |
| " } else {\n", | |
| " window.interactive_beam_jquery(document).ready(function($){\n", | |
| " \n", | |
| " });\n", | |
| " }" | |
| ] | |
| }, | |
| "metadata": {}, | |
| "output_type": "display_data" | |
| }, | |
| { | |
| "name": "stderr", | |
| "output_type": "stream", | |
| "text": [ | |
| "WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['-f', '/Users/rpa/Library/Jupyter/runtime/kernel-c469c754-043a-4000-8dee-856fdcc97ae6.json']\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "import apache_beam as beam\n", | |
| "from apache_beam.options import pipeline_options\n", | |
| "from apache_beam.runners.interactive import interactive_runner\n", | |
| "import apache_beam.runners.interactive.interactive_beam as ib\n", | |
| "\n", | |
| "from typing import Tuple, List, Sequence, Dict, Any, TypeVar, Optional\n", | |
| "\n", | |
| "\n", | |
| "T = TypeVar('T') \n", | |
| "\n", | |
| "class LoadXarrayDataset(beam.DoFn):\n", | |
| " def process(self, element: Tuple[Index, str]) -> List[Tuple[Index, xr.Dataset]]:\n", | |
| " key, path = element\n", | |
| " ds = xr.open_dataset(path)\n", | |
| " return [(key, ds)]\n", | |
| " \n", | |
| "class XarrayDatasetToSchema(beam.DoFn):\n", | |
| " def process(self, element: Tuple[Index, xr.Dataset]) -> List[Tuple[Index, Dict]]:\n", | |
| " key, ds = element\n", | |
| " ds_dict = ds.to_dict(data=False)\n", | |
| " return [(key, ds_dict)]\n", | |
| " \n", | |
| "class SchemaToDimLen(beam.DoFn):\n", | |
| " def process(self, element: Tuple[Index, Dict], dim: str) -> List[Tuple[Index, int]]:\n", | |
| " index, schema = element\n", | |
| " return [(index, schema['dims'][dim])]\n", | |
| " \n", | |
| "class DropIndex(beam.DoFn):\n", | |
| " def process(self, element: Tuple[Index, T]) -> List[T]:\n", | |
| " return [element[1]]\n", | |
| " \n", | |
| " \n", | |
| "# TODO:\n", | |
| "# - don't hard code time as the concat-dim\n", | |
| "# - allow multiple concat dims\n", | |
| "# - generalize to include metadata harmonization\n", | |
| "def calculate_total_size(schemas: Sequence[Tuple[Index, Dict]], dim: str) -> int:\n", | |
| " # hopefully the data variables have not been loaded\n", | |
| " return sum(time_lens)\n", | |
| " \n", | |
| "\n", | |
| "def is_first_element(element: Tuple[Index, T], concat_dim: str) -> bool:\n", | |
| " index, item = element\n", | |
| " for dindex in index:\n", | |
| " if (dindex.name == concat_dim) and (dindex.operation == CombineOp.CONCAT) and (dindex.index == 0):\n", | |
| " return True\n", | |
| " return False\n", | |
| "\n", | |
| "def get_first_element(elements: Sequence[Tuple[Index, T]], concat_dim: str):\n", | |
| " for e in elements:\n", | |
| " if is_first_element(e, concat_dim):\n", | |
| " return e\n", | |
| "\n", | |
| "\n", | |
| "class DummyPrepareTarget(beam.DoFn):\n", | |
| " def process(self, element: Tuple[Index, xr.Dataset], dim: str, total_len: int) -> None:\n", | |
| " # should actually return a zarr group\n", | |
| " index, ds = element\n", | |
| " print(index, ds.dims, dim, total_len)\n", | |
| " \n", | |
| "\n", | |
| " \n", | |
| "options = pipeline_options.PipelineOptions()\n", | |
| "runner = interactive_runner.InteractiveRunner()\n", | |
| "\n", | |
| "p = beam.Pipeline(runner, options=options)\n", | |
| "\n", | |
| "inputs = p | beam.Create(fp.items())\n", | |
| "all_datasets = inputs | beam.ParDo(LoadXarrayDataset())\n", | |
| "schemas = all_datasets | beam.ParDo(XarrayDatasetToSchema())\n", | |
| "time_lens = schemas | beam.ParDo(SchemaToDimLen(), \"time\")\n", | |
| "time_len = time_lens | beam.ParDo(DropIndex()) | beam.CombineGlobally(sum)\n", | |
| "first_ds = all_datasets | beam.CombineGlobally(get_first_element, \"time\")\n", | |
| "target = first_ds | beam.ParDo(DummyPrepareTarget(), \"time\", beam.pvalue.AsSingleton(time_len))\n" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 11, | |
| "id": "c59bd22f", | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/html": [ | |
| "\n", | |
| " <link rel=\"stylesheet\" href=\"https://stackpath.bootstrapcdn.com/bootstrap/4.4.1/css/bootstrap.min.css\" integrity=\"sha384-Vkoo8x4CGsO3+Hhxv8T/Q5PaXtkKtu6ug5TOeNV6gBiFeWPGFN9MuhOf23Q9Ifjh\" crossorigin=\"anonymous\">\n", | |
| " <div id=\"progress_indicator_b33df83a230e72a76d738872b71da9ed\" class=\"spinner-border text-info\" role=\"status\">\n", | |
| " </div>" | |
| ], | |
| "text/plain": [ | |
| "<IPython.core.display.HTML object>" | |
| ] | |
| }, | |
| "metadata": {}, | |
| "output_type": "display_data" | |
| }, | |
| { | |
| "data": { | |
| "text/html": [ | |
| "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\"?>\n", | |
| "<!DOCTYPE svg PUBLIC \"-//W3C//DTD SVG 1.1//EN\"\n", | |
| " \"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd\">\n", | |
| "<!-- Generated by graphviz version 2.49.1 (0)\n", | |
| " -->\n", | |
| "<!-- Title: G Pages: 1 -->\n", | |
| "<svg width=\"514pt\" height=\"1300pt\"\n", | |
| " viewBox=\"0.00 0.00 513.50 1299.95\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n", | |
| "<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 1295.95)\">\n", | |
| "<title>G</title>\n", | |
| "<polygon fill=\"white\" stroke=\"transparent\" points=\"-4,4 -4,-1295.95 509.5,-1295.95 509.5,4 -4,4\"/>\n", | |
| "<!-- [10]: Create -->\n", | |
| "<g id=\"node1\" class=\"node\">\n", | |
| "<title>[10]: Create</title>\n", | |
| "<polygon fill=\"none\" stroke=\"blue\" points=\"289,-1291.95 205,-1291.95 205,-1255.95 289,-1255.95 289,-1291.95\"/>\n", | |
| "<text text-anchor=\"middle\" x=\"247\" y=\"-1270.25\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">[10]: Create</text>\n", | |
| "</g>\n", | |
| "<!-- inputs -->\n", | |
| "<g id=\"node2\" class=\"node\">\n", | |
| "<title>inputs</title>\n", | |
| "<ellipse fill=\"none\" stroke=\"blue\" cx=\"247\" cy=\"-1186.8\" rx=\"33.29\" ry=\"33.29\"/>\n", | |
| "<text text-anchor=\"middle\" x=\"247\" y=\"-1183.1\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">inputs</text>\n", | |
| "</g>\n", | |
| "<!-- [10]: Create->inputs -->\n", | |
| "<g id=\"edge1\" class=\"edge\">\n", | |
| "<title>[10]: Create->inputs</title>\n", | |
| "<path fill=\"none\" stroke=\"black\" d=\"M247,-1255.72C247,-1248.3 247,-1239.28 247,-1230.26\"/>\n", | |
| "<polygon fill=\"black\" stroke=\"black\" points=\"250.5,-1230.16 247,-1220.16 243.5,-1230.16 250.5,-1230.16\"/>\n", | |
| "</g>\n", | |
| "<!-- [10]: ParDo(LoadXarrayDataset) -->\n", | |
| "<g id=\"node3\" class=\"node\">\n", | |
| "<title>[10]: ParDo(LoadXarrayDataset)</title>\n", | |
| "<polygon fill=\"none\" stroke=\"blue\" points=\"348,-1117.66 146,-1117.66 146,-1081.66 348,-1081.66 348,-1117.66\"/>\n", | |
| "<text text-anchor=\"middle\" x=\"247\" y=\"-1095.96\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">[10]: ParDo(LoadXarrayDataset)</text>\n", | |
| "</g>\n", | |
| "<!-- inputs->[10]: ParDo(LoadXarrayDataset) -->\n", | |
| "<g id=\"edge2\" class=\"edge\">\n", | |
| "<title>inputs->[10]: ParDo(LoadXarrayDataset)</title>\n", | |
| "<path fill=\"none\" stroke=\"black\" d=\"M247,-1153.61C247,-1145.23 247,-1136.27 247,-1128.16\"/>\n", | |
| "<polygon fill=\"black\" stroke=\"black\" points=\"250.5,-1128.09 247,-1118.09 243.5,-1128.09 250.5,-1128.09\"/>\n", | |
| "</g>\n", | |
| "<!-- all_datasets -->\n", | |
| "<g id=\"node4\" class=\"node\">\n", | |
| "<title>all_datasets</title>\n", | |
| "<ellipse fill=\"none\" stroke=\"blue\" cx=\"247\" cy=\"-992.36\" rx=\"53.09\" ry=\"53.09\"/>\n", | |
| "<text text-anchor=\"middle\" x=\"247\" y=\"-988.66\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">all_datasets</text>\n", | |
| "</g>\n", | |
| "<!-- [10]: ParDo(LoadXarrayDataset)->all_datasets -->\n", | |
| "<g id=\"edge3\" class=\"edge\">\n", | |
| "<title>[10]: ParDo(LoadXarrayDataset)->all_datasets</title>\n", | |
| "<path fill=\"none\" stroke=\"black\" d=\"M247,-1081.27C247,-1074.06 247,-1065.22 247,-1055.96\"/>\n", | |
| "<polygon fill=\"black\" stroke=\"black\" points=\"250.5,-1055.8 247,-1045.8 243.5,-1055.8 250.5,-1055.8\"/>\n", | |
| "</g>\n", | |
| "<!-- [10]: ParDo(XarrayDatasetToSchema) -->\n", | |
| "<g id=\"node5\" class=\"node\">\n", | |
| "<title>[10]: ParDo(XarrayDatasetToSchema)</title>\n", | |
| "<polygon fill=\"none\" stroke=\"blue\" points=\"232,-903.06 0,-903.06 0,-867.06 232,-867.06 232,-903.06\"/>\n", | |
| "<text text-anchor=\"middle\" x=\"116\" y=\"-881.36\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">[10]: ParDo(XarrayDatasetToSchema)</text>\n", | |
| "</g>\n", | |
| "<!-- all_datasets->[10]: ParDo(XarrayDatasetToSchema) -->\n", | |
| "<g id=\"edge4\" class=\"edge\">\n", | |
| "<title>all_datasets->[10]: ParDo(XarrayDatasetToSchema)</title>\n", | |
| "<path fill=\"none\" stroke=\"black\" d=\"M205.91,-958.33C186.13,-942.43 162.88,-923.75 144.98,-909.36\"/>\n", | |
| "<polygon fill=\"black\" stroke=\"black\" points=\"147.15,-906.61 137.17,-903.08 142.77,-912.07 147.15,-906.61\"/>\n", | |
| "</g>\n", | |
| "<!-- [10]: CombineGlobally(get_first_element) -->\n", | |
| "<g id=\"node13\" class=\"node\">\n", | |
| "<title>[10]: CombineGlobally(get_first_element)</title>\n", | |
| "<polygon fill=\"none\" stroke=\"blue\" points=\"505.5,-903.06 250.5,-903.06 250.5,-867.06 505.5,-867.06 505.5,-903.06\"/>\n", | |
| "<text text-anchor=\"middle\" x=\"378\" y=\"-881.36\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">[10]: CombineGlobally(get_first_element)</text>\n", | |
| "</g>\n", | |
| "<!-- all_datasets->[10]: CombineGlobally(get_first_element) -->\n", | |
| "<g id=\"edge5\" class=\"edge\">\n", | |
| "<title>all_datasets->[10]: CombineGlobally(get_first_element)</title>\n", | |
| "<path fill=\"none\" stroke=\"black\" d=\"M288.09,-958.33C307.87,-942.43 331.12,-923.75 349.02,-909.36\"/>\n", | |
| "<polygon fill=\"black\" stroke=\"black\" points=\"351.23,-912.07 356.83,-903.08 346.85,-906.61 351.23,-912.07\"/>\n", | |
| "</g>\n", | |
| "<!-- schemas -->\n", | |
| "<g id=\"node6\" class=\"node\">\n", | |
| "<title>schemas</title>\n", | |
| "<ellipse fill=\"none\" stroke=\"blue\" cx=\"143\" cy=\"-789.47\" rx=\"41.69\" ry=\"41.69\"/>\n", | |
| "<text text-anchor=\"middle\" x=\"143\" y=\"-785.77\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">schemas</text>\n", | |
| "</g>\n", | |
| "<!-- [10]: ParDo(XarrayDatasetToSchema)->schemas -->\n", | |
| "<g id=\"edge6\" class=\"edge\">\n", | |
| "<title>[10]: ParDo(XarrayDatasetToSchema)->schemas</title>\n", | |
| "<path fill=\"none\" stroke=\"black\" d=\"M120.95,-866.9C123.21,-859.06 126.02,-849.34 128.86,-839.49\"/>\n", | |
| "<polygon fill=\"black\" stroke=\"black\" points=\"132.29,-840.24 131.7,-829.66 125.56,-838.3 132.29,-840.24\"/>\n", | |
| "</g>\n", | |
| "<!-- [10]: ParDo(SchemaToDimLen) -->\n", | |
| "<g id=\"node7\" class=\"node\">\n", | |
| "<title>[10]: ParDo(SchemaToDimLen)</title>\n", | |
| "<polygon fill=\"none\" stroke=\"blue\" points=\"295,-691.53 97,-691.53 97,-655.53 295,-655.53 295,-691.53\"/>\n", | |
| "<text text-anchor=\"middle\" x=\"196\" y=\"-669.83\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">[10]: ParDo(SchemaToDimLen)</text>\n", | |
| "</g>\n", | |
| "<!-- schemas->[10]: ParDo(SchemaToDimLen) -->\n", | |
| "<g id=\"edge7\" class=\"edge\">\n", | |
| "<title>schemas->[10]: ParDo(SchemaToDimLen)</title>\n", | |
| "<path fill=\"none\" stroke=\"black\" d=\"M160.21,-751.47C167.89,-734.95 176.77,-715.86 183.77,-700.82\"/>\n", | |
| "<polygon fill=\"black\" stroke=\"black\" points=\"187.01,-702.14 188.06,-691.6 180.67,-699.19 187.01,-702.14\"/>\n", | |
| "</g>\n", | |
| "<!-- time_lens -->\n", | |
| "<g id=\"node8\" class=\"node\">\n", | |
| "<title>time_lens</title>\n", | |
| "<ellipse fill=\"none\" stroke=\"blue\" cx=\"198\" cy=\"-553.03\" rx=\"46.29\" ry=\"46.29\"/>\n", | |
| "<text text-anchor=\"middle\" x=\"198\" y=\"-549.33\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">time_lens</text>\n", | |
| "</g>\n", | |
| "<!-- [10]: ParDo(SchemaToDimLen)->time_lens -->\n", | |
| "<g id=\"edge8\" class=\"edge\">\n", | |
| "<title>[10]: ParDo(SchemaToDimLen)->time_lens</title>\n", | |
| "<path fill=\"none\" stroke=\"black\" d=\"M196.29,-655.3C196.5,-643.18 196.78,-626.22 197.06,-609.65\"/>\n", | |
| "<polygon fill=\"black\" stroke=\"black\" points=\"200.56,-609.49 197.23,-599.43 193.57,-609.37 200.56,-609.49\"/>\n", | |
| "</g>\n", | |
| "<!-- [10]: ParDo(DropIndex) -->\n", | |
| "<g id=\"node9\" class=\"node\">\n", | |
| "<title>[10]: ParDo(DropIndex)</title>\n", | |
| "<polygon fill=\"none\" stroke=\"blue\" points=\"279.5,-470.89 126.5,-470.89 126.5,-434.89 279.5,-434.89 279.5,-470.89\"/>\n", | |
| "<text text-anchor=\"middle\" x=\"203\" y=\"-449.19\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">[10]: ParDo(DropIndex)</text>\n", | |
| "</g>\n", | |
| "<!-- time_lens->[10]: ParDo(DropIndex) -->\n", | |
| "<g id=\"edge9\" class=\"edge\">\n", | |
| "<title>time_lens->[10]: ParDo(DropIndex)</title>\n", | |
| "<path fill=\"none\" stroke=\"black\" d=\"M200.31,-506.69C200.75,-497.99 201.2,-489.13 201.61,-481.22\"/>\n", | |
| "<polygon fill=\"black\" stroke=\"black\" points=\"205.11,-481.28 202.12,-471.12 198.12,-480.93 205.11,-481.28\"/>\n", | |
| "</g>\n", | |
| "<!-- pcoll682 -->\n", | |
| "<g id=\"node10\" class=\"node\">\n", | |
| "<title>pcoll682</title>\n", | |
| "<ellipse fill=\"none\" stroke=\"blue\" cx=\"212\" cy=\"-380.89\" rx=\"18\" ry=\"18\"/>\n", | |
| "</g>\n", | |
| "<!-- [10]: ParDo(DropIndex)->pcoll682 -->\n", | |
| "<g id=\"edge10\" class=\"edge\">\n", | |
| "<title>[10]: ParDo(DropIndex)->pcoll682</title>\n", | |
| "<path fill=\"none\" stroke=\"black\" d=\"M205.22,-434.58C206.22,-426.87 207.41,-417.6 208.51,-409\"/>\n", | |
| "<polygon fill=\"black\" stroke=\"black\" points=\"212,-409.36 209.8,-398.99 205.05,-408.46 212,-409.36\"/>\n", | |
| "</g>\n", | |
| "<!-- [10]: CombineGlobally(sum) -->\n", | |
| "<g id=\"node11\" class=\"node\">\n", | |
| "<title>[10]: CombineGlobally(sum)</title>\n", | |
| "<polygon fill=\"none\" stroke=\"blue\" points=\"320,-326.89 140,-326.89 140,-290.89 320,-290.89 320,-326.89\"/>\n", | |
| "<text text-anchor=\"middle\" x=\"230\" y=\"-305.19\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">[10]: CombineGlobally(sum)</text>\n", | |
| "</g>\n", | |
| "<!-- pcoll682->[10]: CombineGlobally(sum) -->\n", | |
| "<g id=\"edge11\" class=\"edge\">\n", | |
| "<title>pcoll682->[10]: CombineGlobally(sum)</title>\n", | |
| "<path fill=\"none\" stroke=\"black\" d=\"M216.27,-363.3C218.27,-355.51 220.71,-346.02 222.97,-337.22\"/>\n", | |
| "<polygon fill=\"black\" stroke=\"black\" points=\"226.41,-337.89 225.51,-327.33 219.63,-336.15 226.41,-337.89\"/>\n", | |
| "</g>\n", | |
| "<!-- time_len -->\n", | |
| "<g id=\"node12\" class=\"node\">\n", | |
| "<title>time_len</title>\n", | |
| "<ellipse fill=\"none\" stroke=\"blue\" cx=\"258\" cy=\"-212.64\" rx=\"42.49\" ry=\"42.49\"/>\n", | |
| "<text text-anchor=\"middle\" x=\"258\" y=\"-208.94\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">time_len</text>\n", | |
| "</g>\n", | |
| "<!-- [10]: CombineGlobally(sum)->time_len -->\n", | |
| "<g id=\"edge12\" class=\"edge\">\n", | |
| "<title>[10]: CombineGlobally(sum)->time_len</title>\n", | |
| "<path fill=\"none\" stroke=\"black\" d=\"M235.14,-290.6C237.46,-282.79 240.33,-273.12 243.25,-263.3\"/>\n", | |
| "<polygon fill=\"black\" stroke=\"black\" points=\"246.67,-264.08 246.16,-253.5 239.96,-262.09 246.67,-264.08\"/>\n", | |
| "</g>\n", | |
| "<!-- [10]: ParDo(DummyPrepareTarget) -->\n", | |
| "<g id=\"node15\" class=\"node\">\n", | |
| "<title>[10]: ParDo(DummyPrepareTarget)</title>\n", | |
| "<polygon fill=\"none\" stroke=\"blue\" points=\"401,-134.39 185,-134.39 185,-98.39 401,-98.39 401,-134.39\"/>\n", | |
| "<text text-anchor=\"middle\" x=\"293\" y=\"-112.69\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">[10]: ParDo(DummyPrepareTarget)</text>\n", | |
| "</g>\n", | |
| "<!-- time_len->[10]: ParDo(DummyPrepareTarget) -->\n", | |
| "<g id=\"edge13\" class=\"edge\">\n", | |
| "<title>time_len->[10]: ParDo(DummyPrepareTarget)</title>\n", | |
| "<path fill=\"none\" stroke=\"black\" d=\"M272.44,-172.76C276,-163.18 279.73,-153.14 283.02,-144.27\"/>\n", | |
| "<polygon fill=\"black\" stroke=\"black\" points=\"286.32,-145.44 286.52,-134.85 279.76,-143 286.32,-145.44\"/>\n", | |
| "</g>\n", | |
| "<!-- first_ds -->\n", | |
| "<g id=\"node14\" class=\"node\">\n", | |
| "<title>first_ds</title>\n", | |
| "<ellipse fill=\"none\" stroke=\"blue\" cx=\"364\" cy=\"-673.53\" rx=\"38.19\" ry=\"38.19\"/>\n", | |
| "<text text-anchor=\"middle\" x=\"364\" y=\"-669.83\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">first_ds</text>\n", | |
| "</g>\n", | |
| "<!-- [10]: CombineGlobally(get_first_element)->first_ds -->\n", | |
| "<g id=\"edge14\" class=\"edge\">\n", | |
| "<title>[10]: CombineGlobally(get_first_element)->first_ds</title>\n", | |
| "<path fill=\"none\" stroke=\"black\" d=\"M376.85,-866.92C374.76,-835.57 370.24,-767.99 367.16,-721.88\"/>\n", | |
| "<polygon fill=\"black\" stroke=\"black\" points=\"370.65,-721.56 366.49,-711.82 363.67,-722.03 370.65,-721.56\"/>\n", | |
| "</g>\n", | |
| "<!-- first_ds->[10]: ParDo(DummyPrepareTarget) -->\n", | |
| "<g id=\"edge15\" class=\"edge\">\n", | |
| "<title>first_ds->[10]: ParDo(DummyPrepareTarget)</title>\n", | |
| "<path fill=\"none\" stroke=\"black\" d=\"M361.79,-635.01C359.42,-591.48 356,-517.51 356,-453.89 356,-453.89 356,-453.89 356,-307.89 356,-246.49 326.14,-179.34 307.53,-143.46\"/>\n", | |
| "<polygon fill=\"black\" stroke=\"black\" points=\"310.62,-141.81 302.84,-134.62 304.44,-145.09 310.62,-141.81\"/>\n", | |
| "</g>\n", | |
| "<!-- target -->\n", | |
| "<g id=\"node16\" class=\"node\">\n", | |
| "<title>target</title>\n", | |
| "<ellipse fill=\"none\" stroke=\"blue\" cx=\"293\" cy=\"-31.2\" rx=\"31.4\" ry=\"31.4\"/>\n", | |
| "<text text-anchor=\"middle\" x=\"293\" y=\"-27.5\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">target</text>\n", | |
| "</g>\n", | |
| "<!-- [10]: ParDo(DummyPrepareTarget)->target -->\n", | |
| "<g id=\"edge16\" class=\"edge\">\n", | |
| "<title>[10]: ParDo(DummyPrepareTarget)->target</title>\n", | |
| "<path fill=\"none\" stroke=\"black\" d=\"M293,-98.16C293,-90.69 293,-81.6 293,-72.59\"/>\n", | |
| "<polygon fill=\"black\" stroke=\"black\" points=\"296.5,-72.53 293,-62.53 289.5,-72.53 296.5,-72.53\"/>\n", | |
| "</g>\n", | |
| "</g>\n", | |
| "</svg>\n" | |
| ], | |
| "text/plain": [ | |
| "<IPython.core.display.HTML object>" | |
| ] | |
| }, | |
| "metadata": {}, | |
| "output_type": "display_data" | |
| }, | |
| { | |
| "data": { | |
| "application/javascript": [ | |
| "\n", | |
| " if (typeof window.interactive_beam_jquery == 'undefined') {\n", | |
| " var jqueryScript = document.createElement('script');\n", | |
| " jqueryScript.src = 'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n", | |
| " jqueryScript.type = 'text/javascript';\n", | |
| " jqueryScript.onload = function() {\n", | |
| " var datatableScript = document.createElement('script');\n", | |
| " datatableScript.src = 'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n", | |
| " datatableScript.type = 'text/javascript';\n", | |
| " datatableScript.onload = function() {\n", | |
| " window.interactive_beam_jquery = jQuery.noConflict(true);\n", | |
| " window.interactive_beam_jquery(document).ready(function($){\n", | |
| " \n", | |
| " $(\"#progress_indicator_b33df83a230e72a76d738872b71da9ed\").remove();\n", | |
| " });\n", | |
| " }\n", | |
| " document.head.appendChild(datatableScript);\n", | |
| " };\n", | |
| " document.head.appendChild(jqueryScript);\n", | |
| " } else {\n", | |
| " window.interactive_beam_jquery(document).ready(function($){\n", | |
| " \n", | |
| " $(\"#progress_indicator_b33df83a230e72a76d738872b71da9ed\").remove();\n", | |
| " });\n", | |
| " }" | |
| ] | |
| }, | |
| "metadata": {}, | |
| "output_type": "display_data" | |
| } | |
| ], | |
| "source": [ | |
| "ib.show_graph(p)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 12, | |
| "id": "a70e158a", | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "name": "stderr", | |
| "output_type": "stream", | |
| "text": [ | |
| "WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['-f', '/Users/rpa/Library/Jupyter/runtime/kernel-c469c754-043a-4000-8dee-856fdcc97ae6.json']\n", | |
| "WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter.\n" | |
| ] | |
| }, | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "Index({DimIndex(name='time', index=0, sequence_len=5, operation=<CombineOp.CONCAT: 2>)}) Frozen({'time': 2, 'lat': 18, 'lon': 36}) time 10\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "result = p.run()" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "id": "4ef5fb31", | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [] | |
| } | |
| ], | |
| "metadata": { | |
| "kernelspec": { | |
| "display_name": "Python 3", | |
| "language": "python", | |
| "name": "python3" | |
| }, | |
| "language_info": { | |
| "codemirror_mode": { | |
| "name": "ipython", | |
| "version": 3 | |
| }, | |
| "file_extension": ".py", | |
| "mimetype": "text/x-python", | |
| "name": "python", | |
| "nbconvert_exporter": "python", | |
| "pygments_lexer": "ipython3", | |
| "version": "3.9.9" | |
| } | |
| }, | |
| "nbformat": 4, | |
| "nbformat_minor": 5 | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment