Source code for raven.processes.wps_raven

import logging
from collections import defaultdict
from pathlib import Path
import json

from pywps import Process, Format, LiteralOutput

from raven.models import Raven
from . import wpsio as wio

LOGGER = logging.getLogger("PYWPS")


[docs]class RavenProcess(Process): identifier = 'raven' abstract = 'Raven hydrological framework' title = "Run the Raven hydrological framework using model configuration files and forcing time series. In " \ "the `rvt` file, only provide the name of the forcing file, not an absolute or relative path." version = '0.1' tuple_inputs = {} inputs = [wio.ts, wio.nc_spec, wio.conf] outputs = [wio.hydrograph, wio.storage, wio.solution, wio.diagnostics, wio.rv_config] model_cls = Raven def __init__(self): super(RavenProcess, self).__init__( self._handler, identifier=self.identifier, title=self.title, version=self.version, abstract=self.abstract, inputs=self.inputs, outputs=self.outputs, status_supported=True, store_supported=True ) def model(self, request): return self.model_cls(workdir=self.workdir) def _handler(self, request, response): response.update_status('PyWPS process {} started.'.format(self.identifier), 0) model = self.model(request) # Model configuration if 'conf' in request.inputs: conf = request.inputs.pop('conf') config = self.get_config(conf) model.configure(config.values()) # Input data files ts = [f.file for f in request.inputs.pop('ts')] # Input specs dictionary. Could a all given in the same dict or a list of dicts. nc_spec = {} for spec in request.inputs.pop('nc_spec', []): nc_spec.update(json.loads(spec.data)) for key, val in nc_spec.items(): model.assign(key, val) # Parse all other input parameters kwds = defaultdict(list) for name, objs in request.inputs.items(): for obj in objs: # Namedtuples if name in self.tuple_inputs: csv = obj.data.replace('(', '').replace(')', '') arr = map(float, csv.split(',')) data = self.tuple_inputs[name](*arr) # Other parameters else: data = obj.data if name in model._parallel_parameters: kwds[name].append(data) else: model.assign(name, data) # Launch model with input files model(ts=ts, **kwds) # Store output files name. If an output counts multiple files, they'll be zipped. for key in response.outputs.keys(): val = model.outputs.get(key) if val is not None: if isinstance(response.outputs[key], LiteralOutput): response.outputs[key].data = str(val) else: response.outputs[key].file = str(val) if val.suffix == '.zip': response.outputs[key].data_format = \ Format('application/zip', extension='.zip', encoding='base64') return response @staticmethod def get_config(conf): """Return a dictionary storing the configuration files content.""" config = defaultdict(dict) for obj in conf: fn = Path(obj.file) config[fn.stem][fn.suffix[1:]] = fn if len(config.keys()) > 1: raise NotImplementedError("Multi-model simulations are not yet supported.") return config[fn.stem]