Blog

Ed Stenson's picture

Using Managed Sources

I've noticed some questions from clients who are using Managed Sources for the first time. In this blog I'm going to go through the steps to run a DataSift filter on a Managed Source:

  1. Create a token
  2. Create a Managed Source
  3. Create a CSDL filter for that Managed Source
  4. Start recording the output of the filter
  5. Start the Managed Source

I'll use Facebook in my examples, but the process is similar for all the Managed Sources the platform offers.

Suppose that you have hundreds of Facebook pages about your brands, plus a body of content created by users or customers. DataSift can aggregate it all: your brand pages, campaign pages, competitor's pages, and pages from industry influencers.

In this blog I'm going to focus on our UI but you can set up and manage everything via API calls instead and, for production use, that's the way to go. To learn more about that process, read our step-by-step guide.

Just to set the scene, DataSift offers two types of data source:

  • Public
  • Managed 

A public source (Youtube, for example) is one that anyone can access. A Managed Source is one that requires you to supply valid authentication credentials before you can use it.

 

Create a token

The first task is to create an OAuth token that DataSift will use for authentication. The good news is that you don't even need to know what an OAuth token is, because it's generated automatically:

1.  Log in and go to Data Sources -> Managed Sources.

 

2.  Click on the Facebook tile.

 

3.  Click Add Token.

 

A popup box appears, inviting your to sign into your Facebook account. If you look at the URL in the popup's address bar, you'll see that it's served by Facebook, not by us. That means you're giving your Facebook credentials to Facebook privately, just as you do any other time you sign in. You are not giving them to us and we cannot see them.

 

 

4. Log in to Facebook in the popup box.

The popup closes and you will now see that you have a token.

 

From now on, any time you run a filter in DataSift against this Managed Source, DataSift will use the token to gain access. It's secure; if you want to stop using the token, you can delete it from DataSift by clicking the red X. Or, in your Apps settings in Facebook, you can revoke it. If you do that, the token becomes useless.

 

Create a Managed Source

Now you can specify what you want to filter for.

5. In the Name field, specify a name for your Managed Source. Here, I've called it "Example".

 

6. Type a search term in the Search box and click Search. Here I'm going to monitor Ferrari cars and merchandise.

DataSift lists all the accounts that match your search term. Select which ones you want to include in your filtering. In this example, I've chosen the candidate with the greatest number of likes.

 

 

8. Click Save

 

Create a CSDL filter for that Managed Source

9. Click the My Managed Sources tab. You will see the source you just defined. Notice that the Start button is orange whereas the other two sources, which I defined before I took this screenshot, have a Stop button. It's important that you don't click Start yet. The first time you click it, DataSift delivers a backlog of posts from the past seven days. You need to create a stream and start a recording to capture those posts otherwise they'll be lost. The next few steps explain how to do that.

 

10. Click on your Managed Source, "Example" in this case. DataSift displays the definition page for the source.

 

 

11. Click How to Use. Now you can grab the CSDL code for this Managed Source. It's a simple one-line filter that uses the source.id target and the unique id for the source you just defined.

 

 

12. Copy the CSDL code to the clipboard:

      source.id == "c07504cc3a324848ba1fb5905287799b"

 

13. Create a filter with that CSDL. You're probably very familiar with this step already. Just click the Create Stream button, paste the CSDL code in from my clipboard, and save it.

 

 

Start recording the output of the filter

Now you need to start recording the output of that filter. Recordings are under the Tasks tab in DataSift.

14. Click Start a Recording.

 

15. Choose the filter that you created in Step 13.

16. Click Start Now and choose and end time for your recording. For this first test, I'd recommend that you don't choose a long duration.

17. Click Continue and then click Start Task.

 

Start the Managed Source

18. Now go back to My Managed Sources and click Start.

Your filter will start to collect data from the source and DataSift will record it automatically.

 

Summary

That's all you need to know to use Managed Sources from the UI. Notice that you didn't even need to write a filter to get started; the platform provided the code for you. And by starting the recording before you ran the filter, you made sure that no data was lost.

For production use, there's a powerful Managed Sources API, plus that step-by-step guide that I mentioned at the beginning of this blog.

Jacek Artymiak's picture

Pulling Data with the Pull (Push) Connector

The Pull Connector is the latest addition to our growing family of Push connectors. This new Push connector takes its name after the mechanism used to deliver the interactions you filter for: you pull data from our platform instead of us pushing it to you.

Even though the name of this connector might seem to be out of place for a Push connector, it makes sense to classify it as another Push connector, because it uses the same robust Push subsystem that powers other DataSift Push Connectors.

 

 

We designed it specifically for the clients who are firewalled from the public internet and prefer to keep and process data in house. The Pull Connector provides the following benefits:

  • Firewalls and network security policies are no longer an issue.

    With Pull, there is no need to set up public endpoints. It simplifies firewall and network management on your side.

    For example, you no longer need to ask your operations team to loosen up the firewall rules to enable connections from DataSift to a host that will receive data. They will not have to give up a precious public IP address or think of ways of redirecting traffic to a shared IP address.

    Also, a change of the IP address of the host receving data does not require a call to /push/update. 
     
  • Data collection and processing at your own pace.

    The Pull Connector uses the Push data queuing subsystem. Your data is stored for an hour in a Push queue, giving you freedom to collect it as often as you want (up to twice per second per Push subscription ID) and to request as much of it as you want, in batches of up to 20MB.
     
  • You can retrieve data again, if necessary.  

    If you need to request data again, you can go back in time for up to an hour using the queue cursor mechanism. It lets you retrieve data from the queue again in case it gets lost. You have up to one hour to retrieve it, which should give you plenty of time to handle technical problems.

When you combine the robust foundations of the Push subsystem, the freedom to collect data at your own pace, and the ease of setting up a data collection and processing system without having to make changes to your organization's network and security setup, the Pull Connector becomes a very attractive solution.

And we saved the best for last, even though the Pull Connector introduces a new endpoint, /pull, for data collection, we implemented it using the same REST API you are already familiar with. You set it up just like any other Push connector and then call /pull to get your data.

Gareth's picture

Platform Performance Gains with Arista Switches

In late 2012 I wrote about the migration of DataSift's Hadoop cluster to Arista switches but what I didn't mention was that we also moved our real-time systems over to Arista too.

Within the LAN

During our fact-finding trek through the Cisco portfolio we acquired a bunch of 4948 and 3750 switches which were re-purposed into the live platform. Unfortunately, the live platform (as opposed to Hadoop-sourced historical data) would occasionally experience performance issues due to the fan-out design of our distributed architecture amplifying the impact of micro-bursts during high traffic events. Every interaction we receive is augmented with additional meta data such as language detection, sentiment analysis, trend analysis and more. To acquire these values an interaction is tokenized into the relevant parts (for example, a Twitter user name for Klout score, sentences for sentiment analysis, trigrams for language analysis and so on). Each of those tokens is then dispatched to the service endpoints for processing. A stream of 15,000 interactions a second can instantly become 100,000+ additional pieces of data traversing the network which puts load on NICs, switch backplanes and core uplinks.

If a particular request were to fail then precious time is wasted on waiting for the reply, processing the failure and then re-processing the request. To combat this you might duplicate calls to service endpoints (for example, speculative execution in Hadoop parlance) which doubles your chances of success and those ~100,000 streams would become ~200,000 putting further stress on all your infrastructure.

At DataSift we discuss internal platform latency in terms of microseconds and throughput in tens of gigabits so adding an unnecessary callout here or a millisecond extra there isn't acceptable. We want to be as efficient, fast and reliable as possible. When we started looking at ways of improving the performance of the real-time platform it was obvious that many of the arguments that made Arista an obvious choice for Hadoop also meant it was ideal for our real-time system too. The Arista 7050 switches we'd already deployed have some impressive statistics in regards to latency so we needed little more convincing that we were on the right path (although the 1.28 Tbps and 960,000,000 packets per second statistics don't hurt either). For truly low-latency switching at the edge, one would normally look at the 7150 series but from our testing the 7048 switches were well within the performance threshold we wanted and enabled us to standardize our edge. We made use of our failure-intolerant platform design (detailed further below) to move entire cabs at a time over to the Arista 7048 switches with no interruption of service to customers. Once all cabinets were migrated and with no other optimizations at that point we saw an immediate difference in key metrics:
 

 

Simply by deploying Arista switches for our 'real-time' network we decreased augmentation latency from ~15,000µs down to 2200µs. Further optimizations to our stack and how we leverage the Linux kernels myriad options have improved things even more.

Epic Switches are only half the story

One of the great features of the Arista 7048 switches is their deep buffer architecture but in certain circumstances another buffer in the path is that last thing you want. Each buffer potentially adds latency to the system before the upstream can detect the congestion and react to it. The stack needs to be free of bottlenecks to prevent the buffers from filling up and the 7048 switches can provide up to 40Gb/s of throughput to the core which fits nicely with 40 1u servers in a 44u cabinet. With that said we're not ones to waste time and bandwidth by leaving the TOR switch if we don't have to. By pooling together resources into 'cells' we can reduce uplink utilization and decrease the RTT of fan out operations by splitting the workload into per-cabinet pools. 

With intelligent health checks and resource routing coupled with the Aristas non-blocking full wire speed forwarding in the event of a resource pool suffering failures the processing servers can call out cross-rack with very little penalty. 

That's great but I'm on the other side of the Internet

We are confident in our ability to provide low-latency, real-time content that is filtered and augmented. This enables us to publish live latency statistics of a stream consumed being consumed by an EC2 node from the other side of the planet on our status site. We can be this confident because we control and manage every aspect of our platform from influencing how data traverses the Internet to reach us, our routers, our switches all the way down to the SSD chipset or SAS drive spindle speed in the servers. (You can't say that if you're on someone's public cloud!)

User Latency
(They could be next door to a social platform DC or over in Antarctica)
10ms - 150ms
+
Source Platform Processing time
(For example, the time taken for Facebook or Twitter to process & send it on)
???ms
+
Trans-Atlantic fiberoptics
(For example, San Jose to our furthest European processing node)
~150ms
+
Trans-Pacific fiberoptics
(For example, from a European processing node to a customer in Japan)
~150ms
=
  ~500ms

When dealing with social data on a global scale there can be a lot of performance uncertainty with under-sea fiber cuts, carrier issues and entire IX outages but we can rest assured that once that data hits our edge we know we can process it with low latencies and high throughput. In conclusion I've once again been impressed by Arista and would whole heartedly recommend their switches to anyone else working with high volume, latency sensitive data. 

Reading List:

Arista switches were already a joy to work with (access to bash on a switch, what's not to love?) but Gary's insights and advice makes it all the better. Arista Warrior - Gary A. Donahue

Even with all the epicness of this hardware, if you're lazy with how you treat the steps your data goes through before it becomes a frame on the switch you're gonna have a bad time so for heavy duty reading The Linux TCP/IP stack book may help. The Linux TCP/IP Stack: Networking for Embedded Systems - Thomas F Herbert

Ed Stenson's picture

New Release of the Query Builder

Have you tried our Query Builder yet? It's a visual tool that makes it easy for newcomers to get started with DataSift quickly, before they even begin to learn our query language, CSDL. Despite its simplicity, the Query Builder offers very nearly all the features on offer in the full language. It includes every CSDL operator and logical operator, together with very nearly all the targets and augmentations. Recently we added the ability to use parentheses and, with the latest release, we've added the NOT logical operator.

Let me give you an example. In CSDL, you can write a filter that includes one keyword and excludes another like this:

 

To date, it has not been possible to perform logical inversion using the Query Builder but now you can do it with a single click:

Adding a NOT in the Query Builder

To create rules that use NOT:

  1. Click Create New Filter.
     
  2. Choose IMDb -> Title.
  3. Choose Contains words as your operator.
     
  4. Type "Star" as the filter keyword.
     
  5. Click Save.
     
  6. Click Create New Filter again and build a second rule that looks for "Trek" in the IMDb title.
     
  7. Click Save and your Query Builder screen will show the two rules like this:


    The will filter for titles that include "Trek" AND "Star". We need to adjust it to filter for "Trek" AND NOT "Star".
     
  8. Click Advanced.

    Notice how the two rules now have numbers?


     
  9. Click NOT.



    The Query Builder adds the NOT in front of rule 1. This is exactly what we want, because rule 1 filters for "Star". If you wanted to apply the NOT to the rule for "Trek" you could drag and drop it in front of the "2".

  10. Click Save and Preview. The Query Builder saves your work and automatically generates code.

 

// JCSDL_MASTER 4bce1d2f67166ea38e0875cc79750c85 !1&2

// JCSDL_VERSION 2.1

NOT

// JCSDL_START 8a1624733bb708222fab239bcb5d8aaf imdb.title,contains_any,25-4 1

imdb.title contains_any "Star"

// JCSDL_END

AND

// JCSDL_START ea352777ca8a4b1e80c3f4cb60e22dfc imdb.title,contains_any,25-4 2

imdb.title contains_any "Trek"

// JCSDL_END

// JCSDL_MASTER_END

 

Several of the lines are commented out because they contain internal information for the Query Builder itself.  If we remove those lines, we're left with the original CSDL that I included at the beginning of this blog.

 

NOT imdb.title contains_any "Star"

AND

imdb.title contains_any "Trek"

 

Using NOT with parentheses

Let's look at an example that uses parentheses as well as NOT. Suppose you wanted to filter for blogs about mention the NASDAQ, but exclude mentions of Microsoft and Intel, two of the tech-laden index's most well-known components:

  1. Click Create New Filter.
     
  2. Choose Blog -> Content.
     
  3. Choose Contains words as the operator.
     
  4. Add "NASDAQ" as the filter keyword.
     
  5. Click Save.
     
  6. Click Create New Filter again and build a second rule that looks for "Microsoft" in the blog content.
     
  7. Finally, build one more rule that looks for "Intel" in the blog content.



    Now we need to add logical operators, parentheses, and a NOT operator.
     
  8. Click Advanced.
     
  9. Select the parentheses.



    By default, the Query Builder places the parentheses around your entire query definition.


     
  10. Drag the left parenthesis so that the parentheses surround rules 2 and 3.
     
  11. Click the second AND operator and it will change to OR.


     
  12. We need to make one final modification to make the query exclude Microsoft and Intel. Click NOT and drag it so that it operates on the clause defined within the parentheses.


     
  13. Click Save and Preview. The Query Builder saves your work and automatically generates code.

 

// JCSDL_MASTER c24e74b9aa6c3aed7c09d36aae51b661 1&!(2|3)

// JCSDL_VERSION 2.1

// JCSDL_START cfc90f784c8274788a2bb3984ba42ee1 blog.content,contains_any,27-6 1

blog.content contains_any "NASDAQ"

// JCSDL_END

AND  NOT (

// JCSDL_START 13a0d4a23c57744568df3c58dde08ec4 blog.content,contains_any,27-9 2

blog.content contains_any "Microsoft"

// JCSDL_END

OR

// JCSDL_START 18b98e1b4d1b7c312d6ac8827be9350c blog.content,contains_any,27-5 3

blog.content contains_any "Intel"

// JCSDL_END

)

// JCSDL_MASTER_END

 

If we strip out the comments, the code becomes easy to read:

 

blog.content contains_any "NASDAQ"

AND  NOT (

blog.content contains_any "Microsoft"

OR

blog.content contains_any "Intel"

)

 

Summary

To try this new functionality out, you'll need to grab a copy of the Query Builder. It's Open Source and you can find it on Github. We provide full documentation to show you how to embed the Query Builder on your own pages.

Jairam Chandar's picture

Historics - We Keep Making It Better

A year ago, Datasift released Historics, a product that enabled users to access content from the past. Its demand has grown massively over the past year. We have had to make many optimizations in order to keep up with not just the demand, but the scale of our ever-growing archive.

Our Historics archive is very close to one petabyte in size now and we are adding about two terabytes to it each day. We run over 2,000 Hadoop jobs every month that scan over a total of nine trillion records cumulatively. Hence, we must ensure that every single component involved in extracting information from this archive is as efficient as possible. 

Now, here’s the thing about Datasift: we are never satisfied with our improvements. We always strive to make our platform better and faster. A while back, our Chief Technical Architect, Lorenzo Alberton, wrote a blog on how we optimized our Hadoop jobs. Those optimizations vastly improved the speed at which we scanned and filtered our archive to give users what they want quickly. We mainly concentrated on improving our I/O times. And we achieved that by improving our job scheduling to run multiple user queries in one Hadoop job, thereby reading the data once and filtering it for multiple users.

But still not satisfied with our improvements, we have made two major changes: 

  • Moved our archive from HBase to raw Hadoop Distributed File System (HDFS).
  • Changed our scheduling algorithm in order to give each user a fairer share of the cluster. 

Moving our archive from HBase to raw HDFS

HBase is a brilliant solution for high write-throughput use cases. However, the extra overhead it incurred while querying data through Hadoop was giving us a lot of grief. We hit the ceiling with the I/O throughput we could achieve, mainly because Hadoop over HBase has to go through HBase RegionServers in order to ensure any data still in memory (yet to be flushed to disk) is read as well. The idea behind using HBase was to provide random access to data and we realized that almost all our applications working on the archive were doing streaming reads. The ideal solution for us then was to move our archive to raw HDFS. 

When we ran a Historics query on some of the migrated data in our archive, we saw a 300 percent improvement in our job completion times. Earlier, it used to take queries up to 15 hours to read and filter a month’s worth of archive. Now, we are able to achieve the same in close to five hours. This was, of course, for a simple query, but we have seen similar improvements for complex queries too. 

The migration to raw HDFS was more difficult than you would imagine. We couldn’t afford to simply shut down Historics while we migrated the data. So we came up with a solution that could connect to both the old archive on HBase and the migrated archive on HDFS. This solution is still being used as we are in the process of migrating all the archive across to raw HDFS. This also meant that we had to provision new hardware to accommodate for what would be a second archive of the same size as the first, while we still continued to write every new interaction that was coming our way.

New and improved job scheduling 

One of the main concerns for us while designing the Historics system was to ensure all users receive a fair share of our computing cluster. In his blog, Lorenzo explained the original queuing algorithm we used for all Historics queries. We have now updated this algorithm to improve the user experience. 

We have introduced user-based queues where each user gets their own queue of queries. We break these queries into chunks that represent a day’s worth of data from the archive. For example, a Historics query for a week’s worth of data will consist of seven chunks. These chunks are then added to a user queue based on how old the chunk is (oldest first). It is important to note the difference between a job and a chunk. A job refers to the Hadoop job on the cluster, whereas a chunk is simply a day’s worth of work. Multiple chunks can run in the same job. 

With the queues prepared, we pick the ‘n’ number of users with the oldest queries in their queues, where ‘n’ is the pre-determined size of the user pool we process at a given time. We then use the round-robin format among these user queues to pick one chunk to run at a time.

For example, two users with two chunks each would have their chunks executed in the following order:

 

               
 

The reason we pick only ‘n’ users to process is because we don’t want to penalize older queries if we suddenly get a lot of queries from users who are not in the user pool yet.

                                                                                              Fig 1: Queue evaluation

New user queries can enter the user pool in any one of the following ways. If the maximum user pool size ‘n’ has not been reached, they are simply added to the user pool. If the maximum user pool size 'n' has been reached, they have to wait. When the wait time is long, we say that the query is 'starved'. In order to address the problem of starvation, we introduced a starvation interval. If the wait time for a user has exceeded the starvation interval, then we increase the user pool size to accommodate this user. However we always revert back to the configured pool size once the starvation load drops. 

Then, the user queues are evaluated and tickets are allocated for all the chunks in the user queues. 

                                                                                                          Fig 2: Ticket Evaluation

Once all the tickets are allocated, it's a simple case of picking the chunks with the lowest ticket numbers to run first. At this point, we check to see if there is a chunk without a ticket (a new one) that would query the same time period. If so, it is piggybacked with the chunk we just picked. This ensures we minimize I/O and reduce user wait times.

Time estimation

We are now able to estimate how long it will take for a Hadoop job to complete. A user’s Historics query is broken into chunks that are then run as part of Hadoop jobs. The time it will take to complete an entire Historics query is the time it will take to complete the last chunk in its queue. The previous section detailed how tickets are assigned to each chunk. Estimating time then involves iterating over all the chunks with tickets and calculating and accumulating time for each of the chunks.

The estimated time for a job to complete depends on four main factors:

  1. Rule complexity: the more complex the rule(s), the longer the filtering engine will take to process the interactions.
  2. Job cardinality (number of chunks running together): if there are multiple chunks running in a job, it means there are multiple rules that the filtering engine has to load and apply.
  3. Sources being queried: higher volume sources take longer time to complete.
  4. Sample rate: a smaller sample rate means less I/O and fewer interactions to filter and, therefore, shorter time to get executed.

Currently, we lack enough information to take all these factors into account. But we have already introduced additional monitoring so that we can factor in all of the above when estimating the time it will take to complete a job. You can expect the job estimation times to become more accurate over going forward.

Summary

While we are happy with the improvements we have made so far, we are also certain there is room for some more! We continue to analyze our Historics jobs, which will help us respond to any abnormalities more quickly and will help us improve our job estimation algorithms. We are also in the process of improving our message queues so that we can move the data through the rest of the pipeline faster. We are tweaking our filtering engine to further reduce the time it takes to get you the data you want. We look forward to making our processes faster and more robust.

Pages

Subscribe to Datasift Documentation Blog