Introduction Streaming API

Stewart Townsend | 15th November 2011

At its simplest, the DataSift Streaming API is a tool to send requests to DataSift in order to receive data from a stream, and close the connection when you are done. For more comprehensive use, the Streaming API is perfect if you want to capture or record a continuous, high-throughput stream of data with a predefined end, or no end at all. The Streaming API will ensure you capture every message in the stream – you won't miss a Tweet.

Streaming API vs. REST API

They both have their pros and cons, and were designed for different use cases; The Streaming API was designed predominantly to run uninterrupted, capturing everything from a continuous stream of data in real time. As with other REST APIs, our REST API was designed to take a number of requests to perform a number of tasks, ranging from compiling CSDL rules, costing streams, and taking smaller samples of streams.

One of the key advantages of receiving a stream with the Streaming API rather than with the REST API, is that the Streaming API receives all it's data in real time. As soon as you Tweet something it appears in your stream. The REST API returns a buffered stream of up to 200 interactions from the DataSift buffer on each request. If you are consuming a high-throughput stream it is possible that some of the interactions will be dropped from the buffer if your connection can't keep up with the speed the interactions are being created.

For other information, and differences between the Streaming and REST APIs, take a look at Things Every Developer Should Know

What can we do with the data?

Interactions from DataSift are returned as JSON or JSONP, and what you do with this data is up to you.

One powerful feature used to help with post processing is Tagging. You can automatically tag each interaction according to a set of rules to help categorize each post. More information and examples can be found in our Tags Blog Post.

Another way you could use the Streaming API is to dynamically build a CSDL rule. It would simply be a case of writing a script to extract data, such as the Twitter username, from each interaction, build this data into a list, and either insert it into your CSDL periodically or build a new CSDL rule.

Streaming API Example

We have written a simple example looking for Tweets mentioning coffee, which have location data attached to them. We have chosen to output the user's Twitter username, their country, and the text from the Tweet.

This example uses DataSift's PHP API Client Library.

<?php
    require '../lib/datasift.php';
    require '../config.php';

    date_default_timezone_set('Europe/London');

    $user = new DataSift_User(USERNAME, API_KEY);
    $def = $user->createDefinition(twitter.text contains "coffee" AND 
                                   twitter.place.country exists');
    try {
        $consumer = $def->getConsumer(DataSift_StreamConsumer::TYPE_HTTP, 'display', 'stopped');
        $consumer->consume();
    } catch (Exception $e) {
        echo 'An error has occured : '.$e->getMessage();
        break;
    }

    function display($consumer, $data) {
        echo "\n@".$data['twitter']['user']['screen_name']."\n";
        echo $data['twitter']['place']['country']."\n";
        echo $data['twitter']['text']."\n--";
    }

    function stopped($consumer, $reason) {
        echo "\nStream stopped : ".$reason."\n";
    }
?>

Let's take a walk through the code:

Line 7 – Create and authorize a new user object from the credentials stored in config.php

Line 8 – Create your CSDL rule – ensure the full CSDL string is wrapped in single quotes (''), and any strings in the statement are wrapped in double quotes (“”)

Line 11 – Create the consumer, by compiling the CSDL code, and setting the onInteraction and onStopping callback functions.

The getConsumer() method looks for the following three parameters:

Type – The type of Stream Consumer to use. The default is DataSift_StreamConsumer::TYPE_HTTP

onInteraction – The function to call each time an interaction is received.

OnStopping – The function to call each time the stream stops

Line 13 – Consume – This is the function we run to start consuming a stream.

Line 19 – onInteraction function – This is run each time a new interaction is received by the API client. In this case we are simply printing content from each interaction to the screen. You could choose to store them in a local array, print to screen, save to a database, or perform any number of other functions.

We access the returned interaction objects by accessing properties of the JSON $data object. Full details about the returned JSON object can be found under HTTP Streaming.

Line 25 – onStopping function – This is run if the stream connection is ended for any reason, whether it be a network error, or a user request to stop the stream. I this case, we call the stopped() function to print a message stating the reason why we were disconnected. You could also use it to trigger post-processing of the data you collected, or to reconnect. Please note, if building your own client library, you will need to adhere to our reconnection timing restrictions

Summary

The Streaming API is one of the fastest and easiest ways for developers to pull data from the DataSift API. It is an incredibly powerful way to receive a curated stream of real-time data, and the only way to ensure you receive everything – you won't miss a Tweet.


Previous post: Introducing : Links Augmentations

Next post: High Scalability