Blog

Gareth's picture

Platform Performance Gains with Arista Switches

In late 2012 I wrote about the migration of DataSift's Hadoop cluster to Arista switches but what I didn't mention was that we also moved our real-time systems over to Arista too.

Within the LAN

During our fact-finding trek through the Cisco portfolio we acquired a bunch of 4948 and 3750 switches which were re-purposed into the live platform. Unfortunately, the live platform (as opposed to Hadoop-sourced historical data) would occasionally experience performance issues due to the fan-out design of our distributed architecture amplifying the impact of micro-bursts during high traffic events. Every interaction we receive is augmented with additional meta data such as language detection, sentiment analysis, trend analysis and more. To acquire these values an interaction is tokenized into the relevant parts (for example, a Twitter user name for Klout score, sentences for sentiment analysis, trigrams for language analysis and so on). Each of those tokens is then dispatched to the service endpoints for processing. A stream of 15,000 interactions a second can instantly become 100,000+ additional pieces of data traversing the network which puts load on NICs, switch backplanes and core uplinks.

If a particular request were to fail then precious time is wasted on waiting for the reply, processing the failure and then re-processing the request. To combat this you might duplicate calls to service endpoints (for example, speculative execution in Hadoop parlance) which doubles your chances of success and those ~100,000 streams would become ~200,000 putting further stress on all your infrastructure.

At DataSift we discuss internal platform latency in terms of microseconds and throughput in tens of gigabits so adding an unnecessary callout here or a millisecond extra there isn't acceptable. We want to be as efficient, fast and reliable as possible. When we started looking at ways of improving the performance of the real-time platform it was obvious that many of the arguments that made Arista an obvious choice for Hadoop also meant it was ideal for our real-time system too. The Arista 7050 switches we'd already deployed have some impressive statistics in regards to latency so we needed little more convincing that we were on the right path (although the 1.28 Tbps and 960,000,000 packets per second statistics don't hurt either). For truly low-latency switching at the edge, one would normally look at the 7150 series but from our testing the 7048 switches were well within the performance threshold we wanted and enabled us to standardize our edge. We made use of our failure-intolerant platform design (detailed further below) to move entire cabs at a time over to the Arista 7048 switches with no interruption of service to customers. Once all cabinets were migrated and with no other optimizations at that point we saw an immediate difference in key metrics:
 

 

Simply by deploying Arista switches for our 'real-time' network we decreased augmentation latency from ~15,000µs down to 2200µs. Further optimizations to our stack and how we leverage the Linux kernels myriad options have improved things even more.

Epic Switches are only half the story

One of the great features of the Arista 7048 switches is their deep buffer architecture but in certain circumstances another buffer in the path is that last thing you want. Each buffer potentially adds latency to the system before the upstream can detect the congestion and react to it. The stack needs to be free of bottlenecks to prevent the buffers from filling up and the 7048 switches can provide up to 40Gb/s of throughput to the core which fits nicely with 40 1u servers in a 44u cabinet. With that said we're not ones to waste time and bandwidth by leaving the TOR switch if we don't have to. By pooling together resources into 'cells' we can reduce uplink utilization and decrease the RTT of fan out operations by splitting the workload into per-cabinet pools. 

With intelligent health checks and resource routing coupled with the Aristas non-blocking full wire speed forwarding in the event of a resource pool suffering failures the processing servers can call out cross-rack with very little penalty. 

That's great but I'm on the other side of the Internet

We are confident in our ability to provide low-latency, real-time content that is filtered and augmented. This enables us to publish live latency statistics of a stream consumed being consumed by an EC2 node from the other side of the planet on our status site. We can be this confident because we control and manage every aspect of our platform from influencing how data traverses the Internet to reach us, our routers, our switches all the way down to the SSD chipset or SAS drive spindle speed in the servers. (You can't say that if you're on someone's public cloud!)

User Latency
(They could be next door to a social platform DC or over in Antarctica)
10ms - 150ms
+
Source Platform Processing time
(For example, the time taken for Facebook or Twitter to process & send it on)
???ms
+
Trans-Atlantic fiberoptics
(For example, San Jose to our furthest European processing node)
~150ms
+
Trans-Pacific fiberoptics
(For example, from a European processing node to a customer in Japan)
~150ms
=
  ~500ms

When dealing with social data on a global scale there can be a lot of performance uncertainty with under-sea fiber cuts, carrier issues and entire IX outages but we can rest assured that once that data hits our edge we know we can process it with low latencies and high throughput. In conclusion I've once again been impressed by Arista and would whole heartedly recommend their switches to anyone else working with high volume, latency sensitive data. 

Reading List:

Arista switches were already a joy to work with (access to bash on a switch, what's not to love?) but Gary's insights and advice makes it all the better. Arista Warrior - Gary A. Donahue

Even with all the epicness of this hardware, if you're lazy with how you treat the steps your data goes through before it becomes a frame on the switch you're gonna have a bad time so for heavy duty reading The Linux TCP/IP stack book may help. The Linux TCP/IP Stack: Networking for Embedded Systems - Thomas F Herbert

Ed Stenson's picture

New Release of the Query Builder

Have you tried our Query Builder yet? It's a visual tool that makes it easy for newcomers to get started with DataSift quickly, before they even begin to learn our query language, CSDL. Despite its simplicity, the Query Builder offers very nearly all the features on offer in the full language. It includes every CSDL operator and logical operator, together with very nearly all the targets and augmentations. Recently we added the ability to use parentheses and, with the latest release, we've added the NOT logical operator.

Let me give you an example. In CSDL, you can write a filter that includes one keyword and excludes another like this:

 

To date, it has not been possible to perform logical inversion using the Query Builder but now you can do it with a single click:

Adding a NOT in the Query Builder

To create rules that use NOT:

  1. Click Create New Filter.
     
  2. Choose IMDb -> Title.
  3. Choose Contains words as your operator.
     
  4. Type "Star" as the filter keyword.
     
  5. Click Save.
     
  6. Click Create New Filter again and build a second rule that looks for "Trek" in the IMDb title.
     
  7. Click Save and your Query Builder screen will show the two rules like this:


    The will filter for titles that include "Trek" AND "Star". We need to adjust it to filter for "Trek" AND NOT "Star".
     
  8. Click Advanced.

    Notice how the two rules now have numbers?


     
  9. Click NOT.



    The Query Builder adds the NOT in front of rule 1. This is exactly what we want, because rule 1 filters for "Star". If you wanted to apply the NOT to the rule for "Trek" you could drag and drop it in front of the "2".

  10. Click Save and Preview. The Query Builder saves your work and automatically generates code.

 

// JCSDL_MASTER 4bce1d2f67166ea38e0875cc79750c85 !1&2

// JCSDL_VERSION 2.1

NOT

// JCSDL_START 8a1624733bb708222fab239bcb5d8aaf imdb.title,contains_any,25-4 1

imdb.title contains_any "Star"

// JCSDL_END

AND

// JCSDL_START ea352777ca8a4b1e80c3f4cb60e22dfc imdb.title,contains_any,25-4 2

imdb.title contains_any "Trek"

// JCSDL_END

// JCSDL_MASTER_END

 

Several of the lines are commented out because they contain internal information for the Query Builder itself.  If we remove those lines, we're left with the original CSDL that I included at the beginning of this blog.

 

NOT imdb.title contains_any "Star"

AND

imdb.title contains_any "Trek"

 

Using NOT with parentheses

Let's look at an example that uses parentheses as well as NOT. Suppose you wanted to filter for blogs about mention the NASDAQ, but exclude mentions of Microsoft and Intel, two of the tech-laden index's most well-known components:

  1. Click Create New Filter.
     
  2. Choose Blog -> Content.
     
  3. Choose Contains words as the operator.
     
  4. Add "NASDAQ" as the filter keyword.
     
  5. Click Save.
     
  6. Click Create New Filter again and build a second rule that looks for "Microsoft" in the blog content.
     
  7. Finally, build one more rule that looks for "Intel" in the blog content.



    Now we need to add logical operators, parentheses, and a NOT operator.
     
  8. Click Advanced.
     
  9. Select the parentheses.



    By default, the Query Builder places the parentheses around your entire query definition.


     
  10. Drag the left parenthesis so that the parentheses surround rules 2 and 3.
     
  11. Click the second AND operator and it will change to OR.


     
  12. We need to make one final modification to make the query exclude Microsoft and Intel. Click NOT and drag it so that it operates on the clause defined within the parentheses.


     
  13. Click Save and Preview. The Query Builder saves your work and automatically generates code.

 

// JCSDL_MASTER c24e74b9aa6c3aed7c09d36aae51b661 1&!(2|3)

// JCSDL_VERSION 2.1

// JCSDL_START cfc90f784c8274788a2bb3984ba42ee1 blog.content,contains_any,27-6 1

blog.content contains_any "NASDAQ"

// JCSDL_END

AND  NOT (

// JCSDL_START 13a0d4a23c57744568df3c58dde08ec4 blog.content,contains_any,27-9 2

blog.content contains_any "Microsoft"

// JCSDL_END

OR

// JCSDL_START 18b98e1b4d1b7c312d6ac8827be9350c blog.content,contains_any,27-5 3

blog.content contains_any "Intel"

// JCSDL_END

)

// JCSDL_MASTER_END

 

If we strip out the comments, the code becomes easy to read:

 

blog.content contains_any "NASDAQ"

AND  NOT (

blog.content contains_any "Microsoft"

OR

blog.content contains_any "Intel"

)

 

Summary

To try this new functionality out, you'll need to grab a copy of the Query Builder. It's Open Source and you can find it on Github. We provide full documentation to show you how to embed the Query Builder on your own pages.

Jairam Chandar's picture

Historics - We Keep Making It Better

A year ago, Datasift released Historics, a product that enabled users to access content from the past. Its demand has grown massively over the past year. We have had to make many optimizations in order to keep up with not just the demand, but the scale of our ever-growing archive.

Our Historics archive is very close to one petabyte in size now and we are adding about two terabytes to it each day. We run over 2,000 Hadoop jobs every month that scan over a total of nine trillion records cumulatively. Hence, we must ensure that every single component involved in extracting information from this archive is as efficient as possible. 

Now, here’s the thing about Datasift: we are never satisfied with our improvements. We always strive to make our platform better and faster. A while back, our Chief Technical Architect, Lorenzo Alberton, wrote a blog on how we optimized our Hadoop jobs. Those optimizations vastly improved the speed at which we scanned and filtered our archive to give users what they want quickly. We mainly concentrated on improving our I/O times. And we achieved that by improving our job scheduling to run multiple user queries in one Hadoop job, thereby reading the data once and filtering it for multiple users.

But still not satisfied with our improvements, we have made two major changes: 

  • Moved our archive from HBase to raw Hadoop Distributed File System (HDFS).
  • Changed our scheduling algorithm in order to give each user a fairer share of the cluster. 

Moving our archive from HBase to raw HDFS

HBase is a brilliant solution for high write-throughput use cases. However, the extra overhead it incurred while querying data through Hadoop was giving us a lot of grief. We hit the ceiling with the I/O throughput we could achieve, mainly because Hadoop over HBase has to go through HBase RegionServers in order to ensure any data still in memory (yet to be flushed to disk) is read as well. The idea behind using HBase was to provide random access to data and we realized that almost all our applications working on the archive were doing streaming reads. The ideal solution for us then was to move our archive to raw HDFS. 

When we ran a Historics query on some of the migrated data in our archive, we saw a 300 percent improvement in our job completion times. Earlier, it used to take queries up to 15 hours to read and filter a month’s worth of archive. Now, we are able to achieve the same in close to five hours. This was, of course, for a simple query, but we have seen similar improvements for complex queries too. 

The migration to raw HDFS was more difficult than you would imagine. We couldn’t afford to simply shut down Historics while we migrated the data. So we came up with a solution that could connect to both the old archive on HBase and the migrated archive on HDFS. This solution is still being used as we are in the process of migrating all the archive across to raw HDFS. This also meant that we had to provision new hardware to accommodate for what would be a second archive of the same size as the first, while we still continued to write every new interaction that was coming our way.

New and improved job scheduling 

One of the main concerns for us while designing the Historics system was to ensure all users receive a fair share of our computing cluster. In his blog, Lorenzo explained the original queuing algorithm we used for all Historics queries. We have now updated this algorithm to improve the user experience. 

We have introduced user-based queues where each user gets their own queue of queries. We break these queries into chunks that represent a day’s worth of data from the archive. For example, a Historics query for a week’s worth of data will consist of seven chunks. These chunks are then added to a user queue based on how old the chunk is (oldest first). It is important to note the difference between a job and a chunk. A job refers to the Hadoop job on the cluster, whereas a chunk is simply a day’s worth of work. Multiple chunks can run in the same job. 

With the queues prepared, we pick the ‘n’ number of users with the oldest queries in their queues, where ‘n’ is the pre-determined size of the user pool we process at a given time. We then use the round-robin format among these user queues to pick one chunk to run at a time.

For example, two users with two chunks each would have their chunks executed in the following order:

 

               
 

The reason we pick only ‘n’ users to process is because we don’t want to penalize older queries if we suddenly get a lot of queries from users who are not in the user pool yet.

                                                                                              Fig 1: Queue evaluation

New user queries can enter the user pool in any one of the following ways. If the maximum user pool size ‘n’ has not been reached, they are simply added to the user pool. If the maximum user pool size 'n' has been reached, they have to wait. When the wait time is long, we say that the query is 'starved'. In order to address the problem of starvation, we introduced a starvation interval. If the wait time for a user has exceeded the starvation interval, then we increase the user pool size to accommodate this user. However we always revert back to the configured pool size once the starvation load drops. 

Then, the user queues are evaluated and tickets are allocated for all the chunks in the user queues. 

                                                                                                          Fig 2: Ticket Evaluation

Once all the tickets are allocated, it's a simple case of picking the chunks with the lowest ticket numbers to run first. At this point, we check to see if there is a chunk without a ticket (a new one) that would query the same time period. If so, it is piggybacked with the chunk we just picked. This ensures we minimize I/O and reduce user wait times.

Time estimation

We are now able to estimate how long it will take for a Hadoop job to complete. A user’s Historics query is broken into chunks that are then run as part of Hadoop jobs. The time it will take to complete an entire Historics query is the time it will take to complete the last chunk in its queue. The previous section detailed how tickets are assigned to each chunk. Estimating time then involves iterating over all the chunks with tickets and calculating and accumulating time for each of the chunks.

The estimated time for a job to complete depends on four main factors:

  1. Rule complexity: the more complex the rule(s), the longer the filtering engine will take to process the interactions.
  2. Job cardinality (number of chunks running together): if there are multiple chunks running in a job, it means there are multiple rules that the filtering engine has to load and apply.
  3. Sources being queried: higher volume sources take longer time to complete.
  4. Sample rate: a smaller sample rate means less I/O and fewer interactions to filter and, therefore, shorter time to get executed.

Currently, we lack enough information to take all these factors into account. But we have already introduced additional monitoring so that we can factor in all of the above when estimating the time it will take to complete a job. You can expect the job estimation times to become more accurate over going forward.

Summary

While we are happy with the improvements we have made so far, we are also certain there is room for some more! We continue to analyze our Historics jobs, which will help us respond to any abnormalities more quickly and will help us improve our job estimation algorithms. We are also in the process of improving our message queues so that we can move the data through the rest of the pipeline faster. We are tweaking our filtering engine to further reduce the time it takes to get you the data you want. We look forward to making our processes faster and more robust.

Jacek Artymiak's picture

Writing CSDL in Vim

Like any developer-friendly company, DataSift too has fans of the good old Vim editor working for us and with us. And since we spend so much time inside Vim, it is no wonder that we use it to write CSDL too. Which is why today I'm especially happy to announce that CSDL syntax highlighiting has been added to the Vim source code repository and should be shipping with all major operating systems worth using soon. (An OS worth using is the one that ships with Vim, of course.)

In the meantime, if you absolutely cannot wait to try it but don't want to build Vim from sources, grab a copy of the datasift-vim repository, unpack it, and follow the instructions. (You must have Vim installed first, of course.)

Vim will higlight CSDL automatically when you edit files with the .csdl filename extension. If you want to force CSDL syntax higlighting while you are editing, do the following:

  • Press Esc
  • Type :syn on
  • Press Enter
  • Press Esc
  • Type :set syntax=csdl
  • Press Enter

To achieve the same effect In gVim or MacVim, select Syntax -> Show filetypes in menu and then select Syntax -> C -> CSDL.

So, there you have it. If you like Vim and you like CSDL, the two are best pals now. Enojy our syntax file, and if you spot any problems with it, let us know. The datasift-vim project is Open Source and we do welcome patches, comments, and suggestions. 

And if you like Vim, don't forget the good cause that Vim has been promoting for years. 

PS. I'd like to thank Bram Moolenaar for adding my patches to Vim. It means a lot.

Jacek Artymiak's picture

Split-Second Social Media Analysis with DataSift and Redis

Social media gives us a way to sample trends and sentiment in real time. Consequently, it is very important that the analysis of the data we are looking at also happens in real time. And we want to help you, because here at DataSift we want our platform to be the Swiss Army knife of the social media analysis tools. We try to be flexible and do as much of the hard work as possible so that you can focus on analyzing the data instead of having to think how to feed it into your processing pipeline.

We strive to achieve that goal with our advanced Push data delivery system and its ever-growing set of connectors that can deliver the data you filter for to a variety of destinations. These could be third-party cloud storage services, such as Amazon AWS DynamoDB or an instance of CouchDB running on your own server. If there is a way to connect to a machine via the internet, we want to be able to deliver data to it.

When time is of essence and you absolutely must be able to start analysing data as soon as you receive it, then keeping data in RAM will help you shorten the time needed to access and process it. One popular tool for managing data in memory is Redis, an Open Source key-value store. And today we are very happy to announce the immediate availability of our new Redis connector, which will deliver the data you filter for straight to your Redis instance.

Getting started with Redis

It is your responsibility to set up your own instance of Redis and make sure it can be reached via the internet. If you have never used Redis before, we have help to get you started. Then it is just a matter of setting up a subscription via our Push API. The data will then be delivered straight into your Redis server ready for processing.

At your end, you will need a way to connect to your Redis server and you can do that with one of Redis clients. Many are available and you should be able to find the one that fits your needs quite easily.

The client alone is just a part of equation. You will also need software that can unpack interactions you get from DataSift from JSON into another format and look for the answers to your questions. Just like Redis, JSON is very well supported and many programing languages include appropriate libraries by default. As for data analysis tools, you will be the best judge of their usefulness, and it always is a good idea to ask your community for suggestions when you are not sure.

Please remember that you will be more likely to get reliable results if you start your analysis with a well-defined data set. That is where a well-written set of CSDL filters can help you pick out the most relevant interactions for further processing.

Those pesky limits (and how to cheat around them)

Keeping data in RAM lets you avoid delays caused by slow disk read and write operations, but that convenience comes at a cost: RAM is volatile and usually not available in large quantities even on high-end servers. It is also expensive to buy. Fortunately, you can architect a solution that reads data from a Redis store and saves it to disk, you can also rent servers with 110GB of RAM or more on a hourly basis, which can be a very cost-effective alternative to buying them or leasing on a long-term contract. Amazon AWS EC2 High-Memory instances are one such solution.

The issue of volatility is important when you do not want to lose data. You can avoid problems by storing multiple copies of data on two or more servers either by replicating it yourself or by creating two or more Push subscriptions based on the same stream hash. You can also make backups of the data held in memory to disk.

And if you really lose data you can retrieve it again using Historics. There will be a delay in receiving data, which may render it no longer relevant, but please keep in mind that there is a way to "replay" your analysis albeit at additional cost of running a Historics query.

RAM size constraints are also fairly easy to overcome. If the data you want to analyze does not fit inside the physical memory installed in your machine, you will need to add RAM, get a machine with more RAM, or use a piece of software that can manage a farm of Redis servers, such as the Redis LightCloud manager.

Go mad!

If your social media analysis business needs to work in real-time our Redis connector is the tool that will help you get further ahead of your competition. Go mad, build something amazing, and let us know how else we could be helping you achieve your goals!

This post was written by Jacek Artymiak with valuable input from Ollie Parsley, the developer of the Redis Push Connector.

Pages

Subscribe to Datasift Documentation Blog