API calls with the 51Degrees Pipeline (Part 1)

Stuart Langridge

6/9/2021 5:00 PM

WebDev API Cloud Development

How to use a pipeline library to make your life easier

Recently I've been working with the team at 51Degrees on some geolocation things, and I realised that one of the tools we were using has a rather more general application than I at first imagined. So now I come, O reader, to tell you about it.

A fair amount of the things I build involve pulling data from multiple APIs. (I suspect this is true of a fair amount of the things you build too, O reader.) And what that involves is a bunch of juggling to handle querying those APIs, collecting the results together, working out what to do if one API call fails and the rest don't, adding caching layers, and so on. This is one of those insidious programming tasks where it looks easy, because you can write a prototype in five minutes, and then gradually over time you add robustness every time a bug's discovered. Think of it like making an HTTP request. This is not hard, in concept; you can write the code to do it in a couple of lines at most, but everyone who has ever considered doing this very quickly realises that you're a lot better off to use one of the convenient higher-level libraries that already exist -- got for Noderequests for Python, etc. This is because no plan survives contact with the enemy, and no freshly-written code survives contact with the real world with all its edge cases and grubby inconsistencies. So if you choose to not use got or requests or similar, you will over time end up doing a half-arsed poorly-tested reimplementation of them inside your own code. I advise against this. Instead, use the libraries that a dedicated open source community has provided for you (and then send them some funding so they keep doing that.)

And this applies to making and collating multiple API requests, too. Looks easy at first: many small edge cases. So that's what the pipeline library is for.

Pipelinish reasons

In essence, you build a reusable function that does an API query for each API you want to call, and then the pipeline strings them all together and takes care of aggregating the results. It's not doing the API querying work itself; it's all the fiddly bookkeeping, plus giving you a few hooks to add conveniences such as caching later if you need it. The pipeline documentation explains in more detail, but the tl;dr is what you've just read. They call the reusable functions "flow elements" and the query parameters "evidence", and the whole thing is implemented in a bunch of different languages: JavaScript and Python, PHP, C#, Java.

An example is useful here. Let's say that, given a geographical location, we want to combine weather data from 7timer and city data from Teleport. This involves making two separate queries, one to each of the APIs. To do that with the pipeline, we have two steps: make a flow element for each of the 7timer API and the teleport API, and then add each to a pipeline and run it. Flow elements first.

These examples will be in JavaScript, but as noted the pipeline library is available in many different languages. Here we're roughly working through the Simple Custom Flow Element example from the docs.

A basic JavaScript example

Add the pipeline libraries to your JS project with npm install --save fiftyone.pipeline.core and npm install --save fiftyone.pipeline.engines. We also for this example want the got HTTP library so we can make HTTP requests, so npm install --save got as well. A flow element is defined by the core, and so we subclass it; we need to define a dataKey, which is the key under which this element's results will show up in the final result, and we need to define a processInternal method which actually does the HTTP request and deals with the result. So a minimal flowElement might look like this:

const FiftyOnePipelineCore = require("fiftyone.pipeline.core"); const got = require('got'); class Weather extends FiftyOnePipelineCore.FlowElement { constructor () { super(...arguments); this.dataKey = 'weather'; } async processInternal (flowData) { // note that error checking is elided for this example let lat = flowData.evidence.get('query.lat'); let lon = flowData.evidence.get('query.lon'); const response = await got('http://www.7timer.info/bin/api.pl', { responseType: "json", searchParams: { lat: lat, lon: lon, ac: 0, unit: "metric", output: "json", tzshift: 0, product: "civil" } }); const data = new FiftyOnePipelineCore.ElementDataDictionary({ flowElement: this, contents: {short: response.body.dataseries[0].weather} }); flowData.setElementData(data); } }

This element will fetch out query parameters lat and lon that are provided to it, and then construct and fetch an API URL for results. The results are then assembled into an ElementDataDictionary and set on the flowData object, which is the thing that assembles all the results together for use.

To demonstrate, let's construct a pipeline with only this element in it, and run it:

const weather_element = new Weather(); const pipeline = new FiftyOnePipelineCore.PipelineBuilder() .add(weather_element) .build(); const flowData = pipeline.createFlowData(); flowData.evidence.add('query.lat', 52.5); flowData.evidence.add('query.lon', -1.8); flowData.process().then(function () { console.log(flowData.weather.short); // prints something like "lightsnowday" });

This is done in three steps: add elements to a new pipeline and build it; add evidence to the pipeline via flowData; then call flowData.process(), which resolves when the flowData is populated with results. (Here, we simply print that result out; in real use you'd obviously do something more interesting with it.)

The flowData object is more than a simple container. Some flowElements in the pipeline may not be populated, and a flowData is designed to make this possible. In practice, this is not used much in JavaScript, where it's possible to test at runtime whether an object has a property; but it's very useful in more strict and unforgiving languages such as C# or Java, in which a property may not be present and accessing it will cause errors. For more about using hasValue to check for properties, it's worth looking at the documentation's reverse geocoding example, or Which Three Birdies which explains how this might be done in PHP in detail; we'll skip over it in this example, because JavaScript doesn't need it as much.

It must flow

So far, so simple. We haven't really gained all that much by using the pipeline rather than making a request ourselves, of course, but now that the structure is in place, it's easy to add new things. One example here, which is important, is that data flows through the pipeline, to one element after another. This means that later elements can rely on previous elements having run.

The Teleport APIs allow fetching various data about cities -- quality of life indices, population, and the like. This can be done in multiple steps; first look up the nearest city to a geographical point, then look up basic city info for that place. These could be implemented as one Teleport flowElement, but for purposes of illustration on how one might string together multiple elements, let's do it with two; one to look up the nearest city and stash that result into the flowData, and then the next to retrieve that result from the flowData and use it to look up city info. Again, this is a simple example, but having the structure of the pipeline is useful here because implementing caching, for example, is much easier to do with the structure in place.

/* NearestCity needs to run first to geolocate the lat/long to a city */ class NearestCity extends FiftyOnePipelineCore.FlowElement { constructor () { super(...arguments); this.dataKey = 'city'; this.evidenceKeyFilter = new FiftyOnePipelineCore .BasicListEvidenceKeyFilter(['query.lat', 'query.lon']); this.properties = {city: {type: 'string', description: "nearest city"}}; } async processInternal (flowData) { let lat = flowData.evidence.get('query.lat'); let lon = flowData.evidence.get('query.lon'); const response = await got( `https://api.teleport.org/api/locations/${lat}%2C${lon}/`, {responseType: "json"}); let cities = response.body._embedded["location:nearest-cities"]; cities.sort((a, b) => a.distance_km - b.distance_km); const data = new FiftyOnePipelineCore.ElementDataDictionary({ flowElement: this, contents: {city: cities[0]._links["location:nearest-city"]} /* this contains an href element to fetch more data */ }); flowData.setElementData(data); } } /* CityData uses the information from NearestCity */ class CityData extends FiftyOnePipelineCore.FlowElement { constructor () { super(...arguments); this.dataKey = 'city'; this.evidenceKeyFilter = new FiftyOnePipelineCore .BasicListEvidenceKeyFilter(['query.lat', 'query.lon']); this.properties = { population: {type: 'number', description: "number of people"} }; } async processInternal (flowData) { try { let city_nearest_url = flowData.city.city.href; const response = await got(city_nearest_url, {responseType: "json"}); const data = new FiftyOnePipelineCore.ElementDataDictionary({ /* replace the previous flowData.city */ flowElement: this, contents: {population: response.body.population} }); // Set this data on the flowElement flowData.setElementData(data); } catch (error) { console.log(error); throw(error); } } } const nc_element = new NearestCity(); const weather_element = new Weather(); const citydata_element = new CityData(); const pipeline = new FiftyOnePipelineCore.PipelineBuilder() .addParallel([nc_element, weather_element]) // NearestCity runs first .add(citydata_element) // add citydata afterwards so it can use data from NC .build(); const flowData = pipeline.createFlowData(); flowData.evidence.add('query.lat', 52.5); flowData.evidence.add('query.lon', -1.8); flowData.process().then(function () { console.log(flowData.city.population); // => 984333 console.log(flowData.weather.weather); // => lightsnowday });

Each of the flowElements is a reusable components, so we can add Weather to this pipeline easily as well as our two new flowElements. Note that the order that things are added to the pipeline is important. NearestCity and Weather have nothing to do with one another, so we add them in parallel by passing a list of parallel elements to flowData.addParallel. CityData requires the information that NearestCity retrieves, so we add it as a second stage in the pipeline, and then everything works. Data flows from one stage in the pipeline to the next, and can be augmented at each stage.

And for my next trick

That explains how to use the 51Degrees Pipeline library to aggregate API calls to cloud services. What I want to look at in the second half of this series is some more advanced usages: how to use a local ("on-premises") data source, how to add caching in a general way for all flowElements, and error handling and retries.