Last active
January 27, 2023 15:04
-
-
Save gmaze/44439a2150068abe395cffa21ca3b765 to your computer and use it in GitHub Desktop.
A Python class to use the OceanOPS API for metadata access to retrieve Argo floats deployment information
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import aiohttp | |
import asyncio | |
from aiohttp.helpers import URL | |
from aiohttp.web import HTTPException | |
import nest_asyncio | |
nest_asyncio.apply() # To be used in jupyter notebooks | |
class OceanOPS_Deployments: | |
""" Use the OceanOPS API for metadata access to retrieve Argo floats deployment information | |
API Swagger: https://www.ocean-ops.org/api/swagger/?url=https://www.ocean-ops.org/api/1/oceanops-api.yaml#/ | |
Examples | |
-------- | |
box = [-20, 0, 42, 51] | |
box = [-20, 0, 42, 51, '2023-01', '2024-01'] | |
box = [-180, 180, -90, 90, '2023-01', '2024-01'] | |
OceanOPS_Deployments(box).uri | |
OceanOPS_Deployments(box).uri_decoded | |
df = OceanOPS_Deployments(box).to_dataframe() | |
""" | |
api = "https://www.ocean-ops.org" | |
"""URL to the API""" | |
model = "api/1/data/platform" | |
"""This model represents a Platform entity and is used to retrieve a platform information (schema model named 'Ptf').""" | |
def __init__(self, box): | |
""" | |
Parameters | |
---------- | |
box: list() | |
Define the domain to load Argo index for. The box list is made of: | |
- lon_min: float, lon_max: float, | |
- lat_min: float, lat_max: float, | |
- date_min: str (optional), date_max: str (optional) | |
Longitude and latitude bounds are required, while the two bounding dates are optional. | |
If bounding dates are not specified, the entire time series is fetched. | |
Eg: [-60, -55, 40., 45., '2007-08-01', '2007-09-01'] | |
""" | |
self.box = box | |
self.loop = asyncio.get_event_loop() | |
def __encode_inc(self, inc): | |
"""Return encoded uri expression for 'include' parameter | |
Parameters | |
---------- | |
inc: str | |
Returns | |
------- | |
str | |
""" | |
return inc.replace("\"","%22").replace("[","%5B").replace("]","%5D") | |
def __encode_exp(self, exp): | |
"""Return encoded uri expression for 'exp' parameter | |
Parameters | |
---------- | |
exp: str | |
Returns | |
------- | |
str | |
""" | |
return exp.replace("\"","%22").replace("'","%27").replace(" ","%20").replace(">","%3E").replace("<","%3C") | |
def include(self, encoded=False): | |
"""Return an Ocean-Ops API 'include' expression | |
This is used to determine which variables the API call should return | |
Parameters | |
---------- | |
encoded: bool, default=False | |
Returns | |
------- | |
str | |
""" | |
# inc = ["ref", "ptfDepl.lat", "ptfDepl.lon", "ptfDepl.deplDate", "ptfStatus", "wmos"] | |
# inc = ["ref", "ptfDepl.lat", "ptfDepl.lon", "ptfDepl.deplDate", "ptfStatus.id", "ptfStatus.name", "wmos"] | |
# inc = ["ref", "ptfDepl.lat", "ptfDepl.lon", "ptfDepl.deplDate", "ptfStatus.id", "ptfStatus.name"] | |
inc = ["ref", "ptfDepl.lat", "ptfDepl.lon", "ptfDepl.deplDate", "ptfStatus.id", "ptfStatus.name", | |
"program.nameShort", "program.country.nameShort", "ptfModel.nameShort", "ptfDepl.noSite"] | |
inc = "[%s]" % ",".join(["\"%s\"" % v for v in inc]) | |
return inc if not encoded else self.__encode_inc(inc) | |
def exp(self, encoded=False): | |
"""Return an Ocean-Ops API deployment search expression for an argopy region box definition | |
Parameters | |
---------- | |
encoded: bool, default=False | |
Returns | |
------- | |
str | |
""" | |
exp, arg = "networkPtfs.network.name='Argo'", [] | |
if self.box[0] is not None: | |
exp +=" and ptfDepl.lon>=$var%i" % (len(arg)+1) | |
arg.append(str(self.box[0])) | |
if self.box[1] is not None: | |
exp +=" and ptfDepl.lon<=$var%i" % (len(arg)+1) | |
arg.append(str(self.box[1])) | |
if self.box[2] is not None: | |
exp +=" and ptfDepl.lat>=$var%i" % (len(arg)+1) | |
arg.append(str(self.box[2])) | |
if self.box[3] is not None: | |
exp +=" and ptfDepl.lat<=$var%i" % (len(arg)+1) | |
arg.append(str(self.box[3])) | |
if len(self.box) > 4: | |
if self.box[4] is not None: | |
exp +=" and ptfDepl.deplDate>=$var%i" % (len(arg)+1) | |
arg.append("\"%s\"" % pd.to_datetime(self.box[4]).strftime("%Y-%m-%d %H:%M:%S")) | |
if self.box[5] is not None: | |
exp +=" and ptfDepl.deplDate<=$var%i" % (len(arg)+1) | |
arg.append("\"%s\"" % pd.to_datetime(self.box[5]).strftime("%Y-%m-%d %H:%M:%S")) | |
exp = "[\"%s\", %s]" % (exp, ", ".join(arg)) | |
return exp if not encoded else self.__encode_exp(exp) | |
def __get_uri(self, encoded=False): | |
uri = "exp=%s&include=%s" % (self.exp(encoded=encoded), self.include(encoded=encoded)) | |
url = "%s/%s?%s" % (self.api, self.model, uri) | |
return url | |
@property | |
def uri(self): | |
"""Return encoded URL to post to Ocean-Ops API request | |
Returns | |
------- | |
str | |
""" | |
return self.__get_uri(encoded=True) | |
@property | |
def uri_decoded(self): | |
"""Return decoded URL to post to Ocean-Ops API request | |
Returns | |
------- | |
str | |
""" | |
return self.__get_uri(encoded=False) | |
def to_json(self): | |
"""Return OceanOPS API request response as a json object""" | |
return self.loop.run_until_complete(self._get()) | |
async def _get(self): | |
"""Internal method to send and get the request response""" | |
async with aiohttp.ClientSession() as session: | |
async with session.get(URL(self.uri, encoded=True)) as resp: | |
# print(resp.status) | |
data = await resp.json() | |
return data | |
def to_dataframe(self): | |
"""Return the deployment plan as :class:`pandas.DataFrame`""" | |
data = self.to_json() | |
# res = {'date': [], 'lat': [], 'lon': [], 'wmo': [], 'status_name': [], 'status_code': []} | |
# res = {'date': [], 'lat': [], 'lon': [], 'wmo': [], 'status_name': [], 'status_code': [], 'ship_name': []} | |
res = {'date': [], 'lat': [], 'lon': [], 'wmo': [], 'status_name': [], 'status_code': [], 'program': [], 'country': [], 'model': []} | |
for irow, ptf in enumerate(data['data']): | |
# if irow == 0: | |
# print(ptf) | |
res['lat'].append(ptf['ptfDepl']['lat']) | |
res['lon'].append(ptf['ptfDepl']['lon']) | |
res['date'].append(ptf['ptfDepl']['deplDate']) | |
res['wmo'].append(ptf['ref']) | |
# res['wmo'].append(ptf['wmos'][-1]['wmo']) | |
# res['wmo'].append(float_wmo(ptf['ref'])) # will not work for some CONFIRMED, PROBABLE or REGISTERED floats | |
# res['wmo'].append(float_wmo(ptf['wmos'][-1]['wmo'])) | |
res['status_code'].append(ptf['ptfStatus']['id']) | |
res['status_name'].append(ptf['ptfStatus']['name']) | |
# res['ship_name'].append(ptf['ptfDepl']['shipName']) | |
program = ptf['program']['nameShort'].replace("_"," ") if ptf['program']['nameShort'] else ptf['program']['nameShort'] | |
res['program'].append(program) | |
res['country'].append(ptf['program']['country']['nameShort']) | |
res['model'].append(ptf['ptfModel']['nameShort']) | |
df = pd.DataFrame(res) | |
df = df.sort_values(by='date').reset_index(drop=True) | |
# df = df[ (df['status_name'] == 'CLOSED') | (df['status_name'] == 'OPERATIONAL')] # Select only floats that have been deployed and returned data | |
return df | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment