How To Update Filters On-The-Fly And Build Dynamic Social Solutions

Richard Caudle | 10th June 2014

It would be easy if the world around us was static, but in practice things are always changing. Nowhere is this truer than in the world of social networks; users are constantly following new friends and expressing new thoughts. The filter you wrote yesterday is probably already out-of-date!

On the DataSift platform you can update your filters on the fly via the API and avoid downtime for your application. This not only allows you to adapt to real-world changing scenarios, but in fact allows you to build much more powerful, dynamic social solutions. In this post I'll show you how this can be done.

Embracing Change

If you've ever built a software solution you'll know that things aren't quite as simple as you'd hope. The real world is always changing.

For example imagine you're tracking conversation around a news story. You build a simple filter which looks for the terms and personalities involved in the story. This works great, but a few hours later the story has evolved. As stories evolve it is inevitable that the terms people are using to discuss it change. You'll need to react to this without missing any conversations.

Or, maybe you've built an awesome social app that allows users to input their interests and you're creating a filter from input. The next day the user updates their interests. You'll need to update your filter to the new interests without interrupting your service to the user.

A well-designed solution takes change in it's stride.

Broad Brush Overview

Ok, so we want to build our wonderfully dynamic, super-duper-flexible social solution. What does this mean in practice? On the DataSift side of things we want to be able to update our streams (filtering & tagging) definitions on-the-fly, delivering data to the same destination, without missing any data.

Before we get to the deeper details, the broad principles are:

  • Create V1 of our stream: Build V1 of our stream definition, for instance from user input
  • Start consuming V1: Compile and stream V1 of our stream as usual via the API
  • Create V2 of our stream: Something has changed! Build V2 of our stream to adapt.
  • Start consuming V2: In parallel with streaming V1, we'll start streaming V2 of our stream.
  • Stop consuming V1: When we're happy V2 is streaming nicely, we'll stop streaming V1.

Essentially to avoid downtime (or missing data) we have a brief period where we're streaming both versions in parallel. Note we will need to handle de-duplication during this brief period.

Let's Do It

Ok, so that's the principles explained. Let's see this in practice.

I wrote a stream last week to track conversations around popular games. Let's use this as an example.

(For the complete example code take a look at this GIST.)

Create Stream V1

Version 1 of our stream will look for mentions of five popular games; 2048, Farmville 2, Swamp Attack, Trials Frontier and Don't Step The White Tile.

Note this is a simple illustrative example. In practice you might want to look for mentions by inspecting links being shared for instance.

// Mentions relating to games
    interaction.content contains_any "2048,FarmVille 2,Swamp Attack,Smash,Trials Frontier"
    OR interaction.content contains_all "step,white,tile"

// Exclude noise
AND NOT interaction.content contains "banned,blocked"
AND NOT interaction.content wildcard "cheat*"
AND NOT interaction.content wildcard "trick*" 
AND NOT interaction.content wildcard "hack*"

Start Consuming V1

Now that we have our stream defined, we can compile the definition and start consuming data. In this example we'll use the Pull destination to get resulting data.

For this example I'll use the Python helper library.

# Compiles a stream from CSDL and returns the stream hash
def compile_stream(csdl):
    fltr = client.compile(csdl)
    return fltr['hash']

# Creates a pull subscription given a stream hash
def create_pull_subscription(hash):
    subscription = client.push.create_from_hash(hash,'Example pull subscription','pull',{})
    return subscription['id']

# Pulls data from stream buffer
def pull_data_from_buffer(subscription_id):
    data = client.pull(subscription_id)

    # TODO: Do something useful with data here, for now just print it!
    print data

# Create a client
# TODO: Insert your API credentials
client = datasift.Client('YOUR_USERNAME', 'YOUR_API_KEY')

print 'Compiling and streaming version 1'
hash_v1 = compile_stream(stream_v1)
subscription_id_v1 = create_pull_subscription(hash_v1)

print 'Sleeping...'

print 'Pulling data from buffer'

Create Stream V2

We're now happily consuming data. But wait! There's a new game that's entered the charts that we must track. The game is Clash of the Clans, and it must be added to our filter.

It's easy to imagine you could generate such a filter from an API which gives you the latest game charts.

The updated filter looks as follows (notice the use of the contains_near operator to tolerate missing words from the title):

// Mentions relating to games
    interaction.content contains_any "2048,FarmVille 2,Swamp Attack,Smash,Trials Frontier"
    OR interaction.content contains_all "step,white,tile"
    OR interaction.content contains_near "Clash,Clans:5"

// Exclude noise
AND NOT interaction.content contains "banned,blocked"
AND NOT interaction.content wildcard "cheat*"
AND NOT interaction.content wildcard "trick*" 
AND NOT interaction.content wildcard "hack*"

Start Consuming V2

The next step is to start streaming V2 of the stream in parallel with V1.

print 'Compiling and streaming version 2'
hash_v2 = compile_stream(stream_v2)
subscription_id_v2 = create_pull_subscription(hash_v2)

print 'Sleeping...'

print 'Pulling data from buffer'

De-duplicating Data

We now have two streams running in parallel. Until we stop stream 1 there's a good chance that the same interaction might be received on both streams, so it's important we de-duplicate the data received.

How you go about this completely depends on the solution being built. Whatever way you choose you can use the property of the interaction as a unique identifier. One way would be to have a unique key in a database (if this is where your data is being stored), another simple way would to have a rolling in-memory list of IDs say for the last 5 minutes. Of course this decision depends on the volume of data you expect and the scale of your solution.

Stop Consuming V1

Now that we have started streaming V2 of the stream we can stop consuming data from V1.

When you start the second stream it will start immediately. However, if you want to be doubly sure that you do not miss any data we recommend that you wait for the first interaction from stream V2 to be received before stopping stream V1. Note that the platform will charge you for DPUs consumed and data received for each stream individually.

# Stops a subscription
def stop_subscription(subscription_id):

print 'Stopping stream v1'

In Conclusion

And so ends my quick tour. I hope this post illustrates how you can switch to new stream definitions on the fly. This capability is likely to be key to real-world solutions you create, and hopefully inspires you to create some truly responsive applications.

For the complete example code take a look at this GIST.

To stay in touch with all the latest developer news please subscribe to our RSS feed at

And, or follow us on Twitter at @DataSiftDev.

Previous post: Facebook Pages Managed Source Enhancements

Next post: New Community Site Launched!