Blog

Lorenzo Alberton's picture
Lorenzo Alberton
Updated on Thursday, 10 January, 2013 - 12:43

Bitly, the #1 link sharing platform, powering 75 percent of the world’s largest media companies and half of the Fortune 500 companies, with over 20,000 white-labeled domains, generates a lot of traffic: 80M new links a day and 200M clicks/day. In other words, on average, over 2,000 people click on a bitly link every second.

When we started exploring a partnership with bitly, we had to do some capacity planning to support their stream in our DataSift platform. We also took the opportunity to revise how we ingest high-volume streams and how we resolve links.

Firehose? More like raging rivers!

Luckily, we already had custom-built libraries to handle huge HTTP streams, which are split from one pipe into many more-manageable sub-streams, sliced and diced into single messages and fed into high-performance, persistent queues, ready to be processed by the rest of the platform. We learnt a lot from our experience with handling the Twitter Firehose, so it was just a matter of provisioning a few more machines and configuring the new “pipes”.

 

Links Resolution Architecture

Given the bitly stream is 100% represented by links data, we also got very excited by how we could use them. Until now, DataSift has been using the API of its own predecessor, TweetMeme, to fully resolve links embedded in Tweets and other data sources, but with the increased volume of links traffic coming from Twitter, we were close to hitting TweetMeme’s maximum capacity of 40M links/day.

What better opportunity to rebuild our links resolution service from the ground up? Now, resolving hundreds of links every second, with predictable latency and accuracy, is no easy task: 5 years of experience with TweetMeme proved to be an invaluable starting point. Taking it to the next level was a matter of redesigning the whole pipeline, keeping all the good parts and recoding where necessary.

The resulting architecture is summarized in the following diagram:

Every new link is cleaned and normalized, looked up in our cache, and immediately returned if found. If not, we try to follow a single hop, and we repeat the procedure until we reach the target page. Every hop is cleaned, normalized and cached, so we can resolve links sharing part of the chain much faster. At every hop, we evaluate the domain and we select specialized URL resolvers, which allow us to rewrite URLs on the fly and/or improve our cache hit ratio. We have stampede-protection systems in place, and watchdogs whose goal is to guarantee the lowest possible latency. Thanks to the new architecture, tens of workers can now resolve links in parallel to spread out the workload across different processes and different servers, so we can keep scaling out as the number of inbound links increases.

With these changes, we managed to reduce the (already low) failure rate, and vastly increase the number of successfully resolved links, instantly quadrupling the throughput on the same number of nodes. At the time of writing, we’re successfully resolving over 200M links/day.

Metadata > Data

The real power of DataSift is to tame huge volumes of traffic and to go well beyond what we receive from our data source, by augmenting each and every message with extra metadata, to add context, insights, and real quality to our inputs. So it was a natural evolution to extend the new and more powerful link resolution engine with extra features. We started with extracting the title of the target page, the language, the content type (we’re now able to resolve links to images and videos!), description, and keywords. These metadata fields are already adding a lot of value to the original message, which might not have any context otherwise. However, we didn’t stop there.

We decided to extract OpenGraph data, Twitter Cards data, and Google News keywords from every page too: almost 100 new properties about the page, the publication, the author, images and videos associated with a post, classification information, description, Twitter handles, information about entities (companies, people, places) mentioned, and so on. Enough information to make any data scientist drool. When we showed a preview to our internal data science team, they jumped around, happy for the early Christmas gifts.

Clicks + page metadata = Information heaven

By combining information about each click (user agent, geo location, domains, for example) and rich metadata about the target page, the number and quality of insights about what people visit and talk about has reached an entirely new level, and can really transform the meaning of social data mining. And with DataSift, accessing all this information has never been easier.

Ed Stenson's picture
Ed Stenson
Updated on Thursday, 15 November, 2012 - 16:28

Today we announce our new data source, the bitly input stream. With 200M clicks a day, it provides an excellent augmentation to the links embedded in Tweets and in messages from other data sources. In the past, we could see which content was being shared; we can now see which links are actually being clicked. In practical terms, DataSift can now reveal activity that, formerly, was hidden. We can show the real reach of content like never before, providing the complete picture, not just one side of it.

The new data source is unquestionably useful by itself, but here at DataSift we’re always trying find ways to add more information to the input data, making it richer and more structured. Every time our filtering engine sees an interaction that contains a link, it resolves that link all the way back to the interaction's original target page, even if the link has been shortened several times. Then, it examines the target page, looking for metadata in Open Graph or Twitter Cards format in the page's HTML header. Any metadata that it finds, it adds to the interaction. We believe the result makes the click stream ten times more valuable to our users, so let’s explore in more depth the data that our platform can deliver.

Data, metadata, and embedded content

A simple but immensely significant change has arrived in the world of social media, as two apparently separate elements, embedded content and metadata, have come together in a fascinating way. At DataSift, the effect is already impacting about 30 percent of the content passing through our servers, and the trend shows healthy growth.

What are Open Graph and Twitter Cards?

Let's define our terms first; embedded content on a web page consists, for example, of videos or static images such as photographs.

Meanwhile, metadata is nothing more than data that describes other data. If that data happens to be a piece of embedded content, a press photograph, for instance, it might be accompanied by metadata such as:

  • a title
  • a description
  • a URL that points to the photograph
  • the width and height of the photograph

... plus as many other nuggets of relevant information that the image's creator chose to supply.

A key technology here is Facebook’s Open Graph protocol (now an open standard that anyone can use), and more recently Twitter Cards. Given the volume of content being shared on Facebook and Twitter, these two platforms decided to propose a set of metadata properties that content creators could use to influence the way their content is previewed (“embedded”) when shared on Facebook and Twitter.

As an example, the New York Times (one of the over 2,000 newspapers already using OpenGraph and Twitter Card metadata) might specify - for each article - the title, the description, the author, the canonical URL, and what image should be used in the preview on Twitter/Facebook.

Why are Open Graph and Twitter Cards significant?

Open Graph and Twitter Cards allow Facebook and Twitter to present rich content, and these ideas are producing an extraordinary, explosive effect because they benefit so many participants in social media:

  • Creators benefit because they now have a way to determine what happens to their content after release. By defining metadata for any creation, whether it's a 3,000-word blog, a photograph, a video, an audio clip, or something brand new on the web, creators can name, annotate, and classify their work.
  • Syndicators benefit because metadata makes their lives easier. In the old days, a newspaper article about Hewlett-Packard stock might have discussed $HPQ common stock but it might have been about inventory shortages of the latest HP server. The only way to be sure was to read the article, or to use natural-language processing to analyze it. But metadata takes the problem away. To describe the article, the syndicator can simply republish the description that they find in the metadata. If it comes from a trusted creator, it will be good. The quality and amount of metadata can be impressive, and span classification, summary, domain, author information, etc.
     
  • Consumers benefit from metadata because they get a better experience on Twitter and Facebook, by having a compelling, visual preview of the target page embedded in their timeline, and not just a link, so they can immediately make up their mind whether it’s worth following the link to the full article or not.

 

Statistics

According to our statistics, more than one-third of all the links we receive point to a page with Open Graph metadata, and about 10 percent also have Twitter Cards (it’s a lower percentage because Twitter Cards is a younger protocol and less generic), so a really significant portion of the links will contain a wealth of information attached.

 

Examples
 

Twitter Card

Facebook Open Graph

 

 

DataSift

We believe that we're the only company able to filter against Open Graph and Twitter Card data, offering you an opportunity to gain unique insight. Here are a few possible use cases:

  • In real time, monitor clicks on bitly links to your site or check out bitly links going to your competitors' sites.
     
  • For stories about TV shows featured on America's top-five newspapers websites, which ones are shared in links the most?
     
  • For Tweets that were heavily retweeted, filter for those that contained heavily clicked links.
     
  • For stories on Superbowl Sunday, exclude the ones that do not have Google News keyword metadata. Stories with Google News keywords will be amongst those most widely read.

 

Shruti Desai's picture
Shruti Desai
Updated on Wednesday, 18 December, 2013 - 16:34

The DataSift UI has a new tab. You can now use the Tasks tab to manage recording and exporting of the output data received from your streams. You can also use it to record and track Historic queries.

Recordings can be scheduled or you can start a recording and manually stop it. When a recording has been running for over an hour, you can export it. You can export a recording in parts or export it whole.

Log in to DataSift to try it out.

Recording a stream

You can record a stream by clicking on the Streams tab and selecting a stream to record.
 




Alternatively, you can use the Tasks tab to start a new recording.
 

  1. Click the Tasks tab at the top of the screen.


     

    This page displays links to access and manage recordings and data exports, as well as Historic queries.
     

  2. Click Start a Recording.


     

  3. Select a stream.


     

  4. Check the Start now box or enter a start time.
     
  5. Enter a finish time. Alternatively, you can leave the Finish field blank if you want to manually record your streams.
     
  6. Give the task a name.

     
     
  7. Click Create.

    A summary page displays details such as the name of the Stream, the Timeframe and the Processing Cost.



     

  8. Click Confirm & Start Recording to continue.


     

    Your recording has been created. You can stop the recording any time by clicking Stop Task.
     

  9. Select your recording to view how many interactions have been recorded by DataSift.

      
     

Exporting a stream

  1. Click the Tasks tab at the top of the screen.

        

    Your recordings and Historic queries are displayed on this page. Find the stream you want to export from the list of Recordings and click Export Data for that stream.
     


     
  2. Enter a name for your new export.
     
  3. Select a format: JSON or CSV.
     
  4. Choose the start date and time. By default, the export begins at the start of the recording; but you can change it to start anywhere between the start and finish time of your recording.
     
  5. Choose a finish date and time.
     
  6. Select a destination for your export: DataSift Storage or Amazon S3.

    When you select Amazon S3 as the destination for your export, you need to provide access credentials from your Amazon S3 account, such as the Access Key ID as well as the Secret key. You will also have to provide the name of the bucket where you want to store your export.



    For more information on using Amazon S3 for storage, please see the Addendum below.
     

  7. Uncheck the All checkbox for Filter Columns to select which targets to include and which to exclude. By default the export selects all the targets that DataSift has recorded for you. 


     

  8. Click Create.

    You can also track the progress of your export. Once the export is ready, a Download link is available. If you selected DataSift as the storage destination for your export, the download link will expire in 7 days.
     

          

Deleting a recording

  1. Click the Tasks tab at the top of the screen.
     


     

  2. Select the Recordings tab on the left. Your recordings are listed on this page.

  3. To delete a recording, click Delete Task for the stream.

 

Output formats

Recordings can be lengthy and hence DataSift compresses the data. The export files are downloaded in GZIP format. You must unpack them before you can use them.

You can receive the data in comma-separated value (CSV) format or JavaScript Object Notation (JSON) format.

  • CSV is easy to parse but the simplest way to look at CSV a file is to import it into a spreadsheet.
     
  • JSON is an easy-to-read, lightweight format for data exchange. Objects are sent in text format, one after another. In raw format is can be difficult to read, but a good free formatter is jsonlint.com. Try formating this sample of raw JSON in jsonlint.com.


    {"company name": "DataSift", "sector": "big data software", "location" : "San Francisco", "products": ["DataSift","TweetMeme"]}
     


 

Addendum

Please note that while using Amazon S3 Storage service to store your exports, certain restrictions apply with respect to the name of the bucket where storing the export, and your Amazon Secret Key. These restrictions are:

  • You cannot use a bucket with an underscore ( _ ) in the bucket name. Any restrictions by Amazon on naming an S3 bucket will also apply. To view naming conventions for an Amazon S3 bucket, please refer to Amazon S3 documentation
  • You cannot use an Amazon Secret Key that includes a forward slash ( / ). If your Amazon Secret Key includes a ( / ), please use DataSift storage to first download the export and then upload it into your Amazon S3 storage.

If ignored, these restrictions are known to cause problems within our API and the export will be unsuccessful. Make sure you use the alternate methods suggested, to work around these restrictions.

Please also note that these restrictions do not apply to Push connectors.

Gareth's picture
Gareth
Updated on Thursday, 13 March, 2014 - 15:26

One of the really attractive aspects of working at a startup like DataSift is managing the challenges that come with the rapidity of organic growth. Our experiences with the networking aspects of Hadoop are an excellent example of this.

When we started working with Hadoop in mid 2011 we very quickly started to find all the little gotchas in networking kit that academically you knew were there but never thought you’d hit.

Our initial Hadoop network was very simple. At the core were two Cisco 3750s stacked together for redundancy with multiple Cisco 2960s LACP’d to create 4Gb uplinks.

Almost immediately Cacti showed that the busiest servers were experiencing unacceptable volumes of discards. The 2960s are only rated for 2,700,000 packets per second with switching / forwarding bandwidth of 16 / 32Gbs respectively and this didn’t seem to be sufficient.

After following the traffic patterns and confirming our suspicions we experimented by upgrading some of the TOR switches to the Cisco 3560 platform which is lab rated at 38,700,000 packets per second with a similar switching / forwarding capacity of 32Gbs.

Packet loss dropped but was still in the thousands per second for certain hosts. Attempts were made to tweak the buffers to make better use of what buffer space we had but it just couldn’t keep up.

At this point we’d reached the limit of the Cisco Enterprise Access layer / Campus LAN portfolio and were heading into what they term ‘Core and Distribution’. Doubling the 3560s packet per second throughput was the Cisco 4948 with a healthy 72,000,000 packets per second throughput and a 96Gbs switching capacity. The 4948 is effectively a 4500 without the modularity.

The 4948s had cracked it; intra-switch transit was flawless but unfortunately we had Core grade switches on our access layer that then got bottlenecked by performance of the uplinks to the ‘core’ which were still suffering heavily from discards.

The Hadoop cluster was growing and the network needed to be ready so we took the opportunity to redesign from scratch.

Designing the Network

Creating a network that will scale as fast as your platform where you need upwards of 20Gb/s at the aggregation layer, things can get a little complicated.

CHALLENGE #1 – BUFFERS

One of the primary reasons we were suffering such high discards was due to the inability for the switches to allocate enough buffer memory to consume the volumes of data that we could pile into the platform (Cisco has some documentation on the issue). Couple that with the issues regarding head of line blocking when all the servers in the cab need to utilize the uplinks back to the core then there are plenty of things to optimize and check.

CHALLENGE #2 – UPLINK OVER-SUBSCRIPTION

Ideally there should be enough bandwidth on the uplinks to allow the servers to utilize the full extent of their individual 1Gb NICs without contention.

CHALLENGE #3 – EXTENSIBILITY

The volume of data we process climbs every day and there’s no point designing something without anticipating how it’ll scale and how long for.

Several ideas were planned; the Mesh, the Chassis, and the traditional Core + Access.

The Mesh

Since we already had some 4948s which have Layer 3 capabilities and because Hadoop is cab aware therefore likes to talk between cabs one idea was to build a meshed ring using 2Gb/4Gb LACP links. Creating links to each and every other switch using OSPF allows the switches to efficiently route traffic only to the switch that contained the target subnet rather than sending all data up to a core just to immediately egress again on what is likely to be a heavily over subscribed link.

Unfortunately this design does not scale well.

As soon as the network hits 13 cabinets it starts using more ports for routing than you actually get ports for hosts and the cost per host port just keeps climbing and there is still an issue that if all hosts need to talk to one particular rack then the link is over subscribed.

The mesh was dead in the water.

The Cisco Chassis

If you’re dealing with distribution grade connectivity the mainstay is usually something along the lines of a Cisco 6513 filled with eleven 48 port 10/100/1000 line cards.

 

 

With the 2T supervisors you’d be looking at around 720,000,000 packets per second of performance with 80Gbs of inter blade transit. Unfortunately a chassis switch brings several complications aside from performance concerns.

With 577 potential ports that’s a lot of Cat6a to be consolidating into a single cab which even with decent cable management it was going to be a lot of cable to run to one place.

 

 

Another disadvantage of the chassis is that instead of having x amount of TOR switches with y PSU’s each the entire infrastructure relies on just 2 PSU’s from 2 divergent power supplies and it doesn’t take a fancy formula to see that this reduces redundancy.

Additional issues arise from consolidating switching as you’d want to make full use of each line card which could (in the event of the loss of a line card) affect twice as many servers if there was just 20 servers per TOR switch and there is also the issue of how to cross connect additional chassis’ without over-subscription once there were more than 500 servers.

During our investigations it would turn out that even the 6513 suffers from head of line blocking and has insufficient per port buffers to fully handle what we’re planning on doing with the platform.

So when you take into consideration the reduced redundancy, the larger impact of a failed linecard, the allocation of almost an entire cab footprint and the impact of a failed fan tray / backplane the chassis was out of the running too.

 

Core and Distribution a.k.a Leaf and Spine

So far we’d dismissed two designs due to the lack of scalability, resilience, redundancy, uplink subscription, head-of-line blocking, port buffering, color scheme of the chassis and a bunch of other factors it was time to pull something out of the hat.

The 4948 comes in several flavors some with 10Gb XENPAK slots and those without. We had the ones without which meant either trading in and finding a compatible core too or looking at a completely new vendor.

Switching vendors is a daunting task, it comes with risks about reliability, training, costs, inter-operability with existing infrastructure, scalability and if you’re really unlucky uses some crazy cable and serial config so it pays well to stand on the shoulders of giants. In this case our closest documented peers were StumbleUpon and Benoit Sigoure’s presentation at a Hadoop user group in 2011. We got in touch with Benoit who provided more information than was in the slides and after a few emails back and forth we were sold on the idea of using Arista as our new vendor.

After contacting Arista we acquired an initial batch of four 7048s and two 7050s with enough SFPs to make a cool little pile on my desk.

 

 

I then took over one of our meeting rooms to create a mini network lab. Luckily our CTO likes shiny kit as much as we in the Operations team do.

 

 

 

Within hours I’d resolved most of the initial reservations about switching vendors; with dual PSUs, hot swap fans, an OS (named EOS) that is similar enough to IOS to not present any training problems, up to four 10Gb links per 7048 and even a normal serial cable in ‘Cisco blue’.

I was confident that the Arista gear was the way forward and a couple of days later it was racked and ready for the transition;

 

 

 

This article won’t cover the migration of a multi petabyte Hadoop cluster onto an entirely new infrastructure but I will tell you that the first time we tested a HDFS rebalance it resulted in throughput that exceeded 24Gb/s without a single dropped packet and we’ve been running for some time since without a single issue:

Ethernet49 is up, line protocol is up (connected)
  Hardware is Ethernet, address is 001c.7316.4bd0
  Description: ECMP 10Gbit Fibre Uplink
  Internet address is xx.xx.xx.xx/yy
  Broadcast address is 255.255.255.255
  Address determined by manual configuration
  MTU 1500 bytes, BW 10000000 Kbit
  Full-duplex, 10Gb/s, auto negotiation: off
  Last clearing of "show interface" counters never
  5 minutes input rate 456 Mbps (4.6% with framing), 48972 packets/sec
  5 minutes output rate 1.16 Gbps (11.7% with framing), 100369 packets/sec
     108948784171 packets input, 141933737051584 bytes
     Received 1183 broadcasts, 2155053 multicast
     0 runts, 0 giants
     0 input errors, 0 CRC, 0 alignment, 0 symbol
     0 PAUSE input
     142359795981 packets output, 190129640181753 bytes
     Sent 2 broadcasts, 136398 multicast
     0 output errors, 0 collisions
     0 late collision, 0 deferred
     0 PAUSE output

Ethernet50 is up, line protocol is up (connected)
  Hardware is Ethernet, address is 001c.7316.4bd0
  Description: ECMP 10Gbit Fibre Uplink
  Internet address is xx.xx.xx.xx/yy
  Broadcast address is 255.255.255.255
  Address determined by manual configuration
  MTU 1500 bytes, BW 10000000 Kbit
  Full-duplex, 10Gb/s, auto negotiation: off
  Last clearing of "show interface" counters never
  5 minutes input rate 499 Mbps (5.1% with framing), 51724 packets/sec
  5 minutes output rate 1.13 Gbps (11.4% with framing), 102552 packets/sec
     108946562727 packets input, 139989778742013 bytes
     Received 14 broadcasts, 2155115 multicast
     0 runts, 0 giants
     0 input errors, 0 CRC, 0 alignment, 0 symbol
     0 PAUSE input
     141614511135 packets output, 189850408828125 bytes
     Sent 2 broadcasts, 136409 multicast
     0 output errors, 0 collisions
     0 late collision, 0 deferred
     0 PAUSE output

That was August, it’s now October and we’re already at 12 switches powering hundreds of Hadoop servers plus with more data sources and augmentations being added the cluster will only grow further.

So, if you need to run Hadoop at scale I can’t recommend Arista highly enough.

Reading List

Whilst none of these books have any specific information about dealing with this topic I would strongly recommend giving them a read through:

Hadoop – The Definitive Guide
Designing Large Scale LANs
Storage Networks

Did I mention we’re hiring?

 

Originally posted on NetworksAreMadeOfString.

Lorenzo Alberton's picture
Lorenzo Alberton
Updated on Wednesday, 15 August, 2012 - 12:32

At DataSift we are in the enviable position of receiving the full Twitter Firehose in real time (currently 400 million messages/day), plus many other data sources and augmentations. So far, we've been offering the ability to filter the messages and deliver the custom output stream in real time. Some time ago we also started recording everything on our internal Hadoop cluster, since we wanted to run analytics and offer filtering on past data too. At the current rate, we store about 2 TeraBytes of compressed data every day. As a data store, we chose HBase on HDFS because we needed random access to individual records, combined with the ability to scan sorted sets and run complex Map/Reduce jobs.

Whilst our cluster size isn't really comparable with the largest Hadoop deployments in the medical or physics fields, at just under 1 PetaByte and doubling in size every 3 months, it's probably a lot larger than the average amount of data most companies have to deal with. Since the announcement of our closed alpha, the demand for historical Twitter access has been huge, so we knew early on that we couldn't release a service that would collapse with just a couple of queries or that took forever to process data: if you start a query now, you don't want results in 15 days.

Getting closer to the data

Scanning the data in HDFS is quite fast, however I/O is still the biggest bottleneck, especially with HBase. Since we always strive to offer top performance, we started thinking about ways of pushing our batch Map/Reduce jobs faster and faster. The first big improvement was to turn our filtering engine from a service into a C++ library, so we could execute custom queries locally on each node of the cluster, from within the Map task, instead of constantly moving data from the cluster to the remote service. This change alone, combined with a better serialization format, translated into a 20x speed boost, and of course should scale more or less linearly with the number of nodes in the cluster.

 

Improving parallelization

Filtering closer to the data was an improvement, but we didn't stop there. Scanning billions of records on each node means I/O still trumps CPU load by orders of magnitude, meaning we weren't making very good use of our filtering engine, which was designed to handle thousands of concurrent filters. So we had another hard look at the system to see whether we could squeeze more cycles out of those CPUs.

The idea we came up with solved several problems at once:

  • parallel execution of sub-tasks
  • fair queuing of scheduled jobs
  • dynamic prioritization and allocation of jobs for maximum throughput
  • ability to give higher priority to certain jobs

Here's what we did.

Each historical query can be submitted at any time, and each one of them could be very different in length (some could span weeks/months of archived data, others could be much shorter and limited to a few hours/days), making it difficult to have an optimal schedule. Once a query is started, it has to complete, and newly submitted queries cannot be executed in parallel in the same process with already running ones, even if they overlap. A long-running query could also easily occupy most of the available resources for a long time, thus starving the pending queries.

In order to maximize the execution parallelism, we needed a strategy that maximized the overlap of pending queries, and avoided allocating Map/Reduce slots to any single query for too long. The solution to both problems was to split each query into smaller chunks, whose size was big enough to minimize the M/R job start-up overhead, but small enough to be executed within a predictable time. The query chunks are added to a queue of pending M/R tasks, and as soon as the cluster has some capacity to run more M/R jobs, the system selects the most promising chunks off the queue.

The way each job is partitioned into chunks could be done in many ways. The first attempt (chunking strategy #1) could be to chunk queries only if - at the time the pending queue is evaluated - there's an overlap, but this would likely lead to lots of chunks of widely different sizes, and to unpredictable execution times as a consequence. A better approach (chunking strategy #2) is to divide the timeline in slots of equal, predetermined size, and then to partition the queries according to the slots boundaries they fall into. Of course the overlap of chunks on a slot thus determined is not always perfect, but it's easy to skip the records outside the chunk range when the query is executed, whilst still enjoying the benefits of an easier partitioning strategy and an upper bound on the amount of work for each chunk.

Let me explain the two strategies with a diagram.

Supposing for simplicity that we can only run one M/R job at the same time, and that it takes 1 hour to scan a single segment of the archive (e.g. the segment AB or BC). If at time t we receive the query Q1 and the job tracker is free, Q1 is immediately executed (as a single job), and it will take 3 hours to complete (AB + BC + CD). At time (t0 + 30m) we receive Q2 and at time (t0 + 45) we receive Q3, but both remain in the pending queue until the first job is done. At time (t0 + 3h) we can start processing the segment BC for Q2 and Q3 together; at time (t0 + 4h) we can run the segment AB for query Q2; at time (t0 + 5h) we can run the segment CE for query Q3. The total process requires 7 hours.

Chunking strategy #2

If now we decide to chunk every query in slots of equal size, even if there's no overlap at this time, here's what happens with the same queries as above. At time t only query Q1 is available: its segment AB is started. At time (t + 1h) we can run the segment BC for all three queries. At time (t + 2h) we can run segment CD for queries Q1 and Q3. At time (t + 3h) we run segment AB for query Q2 and at time (t + 4h) we run the last segment DE for query Q3. The total process now only requires 5 hours, thanks to the more dynamic scheduling and to unconditional chunking into slots of predictable execution time.

Dynamic scheduling

The last interesting characteristic of this system is the order chunks are chosen for execution, and it can be summarized as a priority queue with three parameters:

 

  • the cardinality of each slot (i.e. how many chunks happen to be in the same slot: the higher the slot cardinality, the higher the priority given to the chunks in that slot)
  • the age of each query (because we don't want to starve old queries with a low overlap cardinality)
  • custom priorities (so we can tweak the priority of a single query if we need to execute it immediately)

The beauty of this strategy is that the execution priority of the pending chunks is re-evaluated every time Hadoop has spare capacity (as determined by the job monitor) to execute new jobs, for all pending chunks (and not just for pending queries as a whole) and can be maintained to a near-optimal level of parallelism at any time.

There's still some extra complexity to join chunks back into a query to determine when the original query is fully executed, but handling this is quite trivial. The result is we can now run hundreds of concurrent queries, optimizing data scans and taking full advantage of our massively-parallel filtering engine, at the data is scanned and delivered up to 100x faster than real-time.

Data sampling

With such a large volume of data being produced every day, many people are not interested in receiving every single message matching a filter, a random sample of the results is more than enough, resulting in a significant saving in licensing cost. For live streams, we already offer a special predicate to select a percentage of the results, we likewise offer a 1%, 10% or 100% sample of results from the archive.

The way we implemented it is actually quite interesting too. As I mentioned at the beginning, we need to store the data in sequential order, so we can scan sorted time ranges. This write pattern usually creates hot spots in any cluster, be it HBase or Cassandra or another distributed database. All the data for a time range usually has to be stored in the same region to keep it sorted, and to make it possible to do range scans.

To avoid hot spots, we played a simple trick: we added a random byte in front of the row key, effectively sharding the data in 256 buckets (~20 groups). The rest of the row key contains a special UUID which is strictly time-ordered, keeping the data sorted in each bucket. When reading, we have to start 256 Map tasks for each date range we need to scan/filter, but each task is much more efficient, as the data is better distributed across all nodes in the cluster and not just in one single region.

Since each write is randomly sent to one of the 256 buckets, each bucket contains a 1/256 truly random sample over the given time range, so when you choose a sample over the archived data, we can harness this property and only read data from one or more buckets, depending on the sampling rate, with obvious gains in terms of speed and efficiency, as we don't need to scan all the data, and we don't need to start 256 map tasks. Thanks to this fact, we're happy to pass onto you both the speed gains in producing the results and a significant saving in processing cost :-)

Improving delivery

A big engineering effort also went into optimizing the delivery itself: as a teaser, we parted quite drastically from the "traditional" Map/Reduce execution, improved the network layer and implemented a much better API and many new connectors, but I'll leave the details to another blog post, there's a lot to talk about :-)

Conclusion

The last three months at work have been really intense, and as you now know they involved a lot of re-engineering and re-architecting of the DataSift platform, however we're super-excited about what we achieved (I'm constantly humbled by working with such a talented team of engineers) and about finally making this service generally available to all customers. If you've been waiting a long time for historical Twitter access, we're confident it was totally worth the wait. Happy data sifting!

 

Pages

Subscribe to Datasift Documentation Blog