Working with Custom Flume Interceptors & Multiplexing

This article basically describes how to use Apache Flume to modify incoming data (say from an HTTP/NetCat source) and redirect the same into multiple HBase tables based on some criterion. You would have to make use of custom Flume interceptors and multiplexing to achieve the same. I haven’t come across an article which details & connects these concepts together as a use-case (and with working sample code), so I thought I’d kick-start my blog with it. I hope the flume/hbase beginners if not everyone will find this helpful. 🙂

Before moving further, please make sure you have

  • Installed and configured all Hadoop services on your physical/virtual machine. If you haven’t I suggest you download the Cloudera QuickStart VM which is pre-configured with everything.
  • A basic understanding of the architecture of Apache Flume and HBase (including how to create tables in HBase using Hue)

Let’s start with the basic flume configuration (flume.conf). We will keep on modifying/appending to this configuration as we go on. Here we are using the AsyncHBaseSink for the sink type (table = table1, column family = data).

agent1.sources = source1
agent1.channels = channel1
agent1.sinks = sink1

agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 100

#You can also use netcat as the source type
agent1.sources.source1.type = org.apache.flume.source.http.HTTPSource
agent1.sources.source1.port = 5140
agent1.sources.source1.channels = channel1

agent1.sinks.sink1.type = org.apache.flume.sink.hbase.AsyncHBaseSink
agent1.sinks.sink1.channel = channel1
agent1.sinks.sink1.table = table1
agent1.sinks.sink1.columnFamily = data
agent1.sinks.sink1.batchSize = 5000

Let’s move to the custom flume interceptor. Say, the body of the incoming event (into Flume) is simply a JSON string and we want to flatten it first (i.e. convert it into unique key-value pairs) before moving it forward. Below is the code which does that.

//CustomInterceptor.java
package com.flumetest;

import java.util.Iterator;
import java.util.List;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

public class CustomInterceptor implements Interceptor {

  @Override
  public void close()
  {

  }

  @Override
  public void initialize()
  {

  }

  @Override
  public Event intercept(Event event)
  {
     byte[] eventBody = event.getBody();

     String flattenedJson = FlattenJSON(new String(eventBody));

     event.setBody(flattenedJson.getBytes());
     
     return event;
  }

  @Override
  public List intercept(List events)
  {
     for (Iterator iterator = events.iterator(); iterator.hasNext();)
     {
       Event next = intercept(iterator.next());
       if (next == null)
       {
          iterator.remove();
       }
     }

     return events;
  }

  public static class Builder implements Interceptor.Builder
  {
     @Override
     public void configure(Context context) {
        // TODO Auto-generated method stub
     }

     @Override
     public Interceptor build() {
        return new CustomInterceptor();
     }
   }
}

As you can clearly see from the above code, one needs to implement org.apache.flume.interceptor.Interceptor to work on the incoming event headers/body. The JSON body of a single event is modified in the intercept() method (the json flattening code has been omitted). The second intercept() method deals with a list of events. The following lines should be added to the flume configuration.

agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = com.flumetest.CustomInterceptor$Builder

Now we want to redirect events to different HBase tables based on some criterion. Say, the JSON event body contains a key “location” and we would want to store the events based on the value of this key. In short, we want to store data in different tables based on geographic location of the event.
Let’s say we are considering three geographic locations for this viz. US, UK and INDIA.

Firstly we create 3 different tables in HBase (say table1, table2 & table3 with a column family named ‘data’). We then modify the interceptor to extract the location value from the event body and include it in the event headers. Here’s the new code (to be merged with the above code),

import java.util.Map;

import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.JsonProcessingException;
import org.codehaus.jackson.map.ObjectMapper;;

public class CustomInterceptor implements Interceptor {
 
  private final String _header = "location";
  private final String _defaultLocation = "US";

  @Override
  public Event intercept(Event event) 
  {
     String location = _defaultLocation;
     
     byte[] eventBody = event.getBody();

     try {
        ObjectMapper mapper = new ObjectMapper();
	JsonNode rootNode = mapper.readTree(new String(eventBody));		
	location = rootNode.get("location").asText();	
     } catch (JsonProcessingException e) {
		
     } catch (Exception e) {
	
     }
		
     Map headers = event.getHeaders();
     headers.put(_header, location);
		
     event.setHeaders(headers);
		
     return event;
  }
}

In the above code, we extract the location value from the JSON event body using the Jackson JSON Processor and add the same to the event header (name of the header = ‘location’). All we need to do is map each event to an HBase sink based on the event header name (which here is location) and all it’s possible values (viz.. US,UK & INDIA). Here’s the complete flume configuration set up for multiplexing,

agent1.sources = source1
agent1.channels = channel1 channel2 channel3
agent1.sinks = sink1 sink2 sink3

# For each source, channel, and sink, set
# standard properties.
agent1.channels.channel1.type   = memory
agent1.channels.channel1.capacity = 100
agent1.channels.channel2.type   = memory
agent1.channels.channel2.capacity = 100
agent1.channels.channel3.type   = memory
agent1.channels.channel3.capacity = 100

agent1.sources.source1.type = org.apache.flume.source.http.HTTPSource
agent1.sources.source1.port = 5140
agent1.sources.source1.channels = channel1 channel2 channel3

agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = com.flumetest.CustomInterceptor$Builder

agent1.sources.source1.selector.type = multiplexing
agent1.sources.source1.selector.header = location
agent1.sources.source1.selector.mapping.US = channel1
agent1.sources.source1.selector.mapping.UK = channel2
agent1.sources.source1.selector.mapping.INDIA = channel3
agent1.sources.source1.selector.default = channel1

agent1.sinks.sink1.type = org.apache.flume.sink.hbase.AsyncHBaseSink
agent1.sinks.sink1.channel = channel1
agent1.sinks.sink1.table = table1
agent1.sinks.sink1.columnFamily = data

agent1.sinks.sink2.type = org.apache.flume.sink.hbase.AsyncHBaseSink
agent1.sinks.sink2.channel = channel2
agent1.sinks.sink2.table = table2
agent1.sinks.sink2.columnFamily = data

agent1.sinks.sink3.type = org.apache.flume.sink.hbase.AsyncHBaseSink
agent1.sinks.sink3.channel = channel3
agent1.sinks.sink3.table = table3
agent1.sinks.sink3.columnFamily = data

We can also further use a custom serializer (that implements org.apache.flume.sink.hbase.AsyncHbaseEventSerializer) to customize the data to be put into HBase.

Hope this helped 🙂

Advertisements

18 thoughts on “Working with Custom Flume Interceptors & Multiplexing

    • Hi Akshay,
      Apologies for the late reply. I will write an basic article on how Regex Filtering interceptor works (with an example, of course). It would take a day or two. Do you need me to answer any other query?

      Also, I see that your query in stackflow has been answered. Are you satisfied with the answer? Have you been able to write a custom Regex interceptor as suggested by Erik Schmiegelow?

  1. can u please let me know how to process zip files through flume.My input files are zip files that contains lots of xml files.requirement is to load that xmls to hdfs.can i use interceptor to convert zip files to xmls.

  2. Very nice explanation. Couple of questions:
    How does the sample source data (JSON) look like?
    Also do we nee do make a jar (with custom code)? If so where should it reside.

    Is it possible to explain the interceptor with a simple netcat of flat file source without any custom code?

    Thanks

  3. How the selector will identify header in a text file when we are using flume multiplexing.
    In my scenario.. I have placed a text file containing employee details in an edge node. I have configured conf file for agent. My goal is to send the events containing Gender as ‘Male’ to one HDFS directory and Female to other HDFS directory.

  4. I followed the steps, though i dont have a json input, its just netcat, but i inserted the headers. I Placed the jar in $FLUME_HOME/lib, but i dont see the headers being set, i only see the body. Also can you please show step by step how to execute and debug the program and whether flume is taking the interceptor or not.

  5. Pingback: Apache Flume Custom Interceptor Multiplexing | Me and My Mining

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s