Storm Spout and Bolt Example


Storm Spout and Bolt Example

Spout is a component which is used for data generation. Basically, a spout will implement an IRichSpout interface.

Bolt is a component that takes tuples as input, processes the tuple, and produces new tuples as output. Bolts will implement IRichBolt interface.

Add these libraries to your java project build path.




/usr/local/storm/lib/*

FakeCallLogReaderSpout.java




import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

// Create a class FakeLogReaderSpout which implement IRichSpout interface to access functionalities
public class FakeCallLogReaderSpout implements IRichSpout {
	private static final long serialVersionUID = 1L;
	// Create instance for SpoutOutputCollector which passes tuples to bolt.
	private SpoutOutputCollector collector;
	private boolean completed = false;

	// Create instance for TopologyContext which contains topology data.
	private TopologyContext context;
	// Create instance for Random class.
	private Random randomGenerator = new Random();
	private Integer idx = 0;

	@Override
	public void open(Map conf, TopologyContext context,
			SpoutOutputCollector collector) {
		this.context = context;
		this.collector = collector;
	}

	@Override
	public void nextTuple() {
		if (this.idx <= 1000) {
			List mobileNumbers = new ArrayList();

			mobileNumbers.add("1234123401");
			mobileNumbers.add("1234123402");
			mobileNumbers.add("1234123403");
			mobileNumbers.add("1234123404");
			Integer localIdx = 0;
			while (localIdx++ < 100 && this.idx++ < 1000) {
				String fromMobileNumber = mobileNumbers.get(randomGenerator
						.nextInt(4));
				String toMobileNumber = mobileNumbers.get(randomGenerator
						.nextInt(4));
				while (fromMobileNumber == toMobileNumber) {
					toMobileNumber = mobileNumbers.get(randomGenerator
							.nextInt(4));
				}
				Integer duration = randomGenerator.nextInt(60);
				this.collector.emit(new Values(fromMobileNumber,
						toMobileNumber, duration));
			}
		}
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("from", "to", "duration"));
	}

	// Override all the interface methods
	@Override
	public void close() {
	}

	public boolean isDistributed() {
		return false;
	}

	@Override
	public void activate() {
	}

	@Override
	public void deactivate() {
	}

	@Override
	public void ack(Object msgId) {
	}

	@Override
	public void fail(Object msgId) {
	}

	@Override
	public Map getComponentConfiguration() {
		return null;
	}
}


CallLogCreatorBolt.java




import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
	private static final long serialVersionUID = 1L;
	// Create instance for OutputCollector which collects and emits tuples to
	// produce output
	private OutputCollector collector;

	@Override
	public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context,
			OutputCollector collector) {
		this.collector = collector;
	}

	@Override
	public void execute(Tuple tuple) {
		String from = tuple.getString(0);
		String to = tuple.getString(1);
		Integer duration = tuple.getInteger(2);
		collector.emit(new Values(from + " - " + to, duration));
	}

	@Override
	public void cleanup() {
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("call", "duration"));
	}

	@Override
	public Map getComponentConfiguration() {
		return null;
	}
}


CallLogCounterBolt.java




import java.util.HashMap;
import java.util.Map;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class CallLogCounterBolt implements IRichBolt {

	private static final long serialVersionUID = 1L;
	Map counterMap;
	private OutputCollector collector;

	@Override
	public void prepare(Map conf, TopologyContext context,
			OutputCollector collector) {
		this.counterMap = new HashMap();
		this.collector = collector;
	}

	@Override
	public void execute(Tuple tuple) {
		String call = tuple.getString(0);
		Integer duration = tuple.getInteger(1);
		if (!counterMap.containsKey(call)) {
			counterMap.put(call, 1);
		} else {
			Integer c = counterMap.get(call) + 1;
			counterMap.put(call, c);
		}
		collector.ack(tuple);
	}

	@Override
	public void cleanup() {
		for (Map.Entry entry : counterMap.entrySet()) {
			System.out.println(entry.getKey() + " : " + entry.getValue());
		}
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("call"));
	}

	@Override
	public Map getComponentConfiguration() {
		return null;
	}
}


LogAnalyserStorm.java




import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
	public static void main(String[] args) throws Exception {
		// Create Config instance for cluster configuration
		Config config = new Config();
		config.setDebug(true);
		//
		TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());
		builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
				.shuffleGrouping("call-log-reader-spout");
		builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
				.fieldsGrouping("call-log-creator-bolt", new Fields("call"));
		LocalCluster cluster = new LocalCluster();
		cluster.submitTopology("LogAnalyserStorm", config,
				builder.createTopology());
		Thread.sleep(10000);
		// Stop the topology
		cluster.shutdown();
	}
}


Step 1 - Start ZooKeeper. Open a new terminal (CTRL + ALT + T) and start zookeeper.




$ /usr/local/zookeeper/bin/zkServer.sh start

Step 2 - Open a new terminal (CTRL + ALT + T). Change the directory to /usr/local/storm




$ cd /usr/local/storm

Step 3 - Start nimbus




$ ./bin/storm nimbus

Step 4 - Open a new terminal (CTRL + ALT + T). Change the directory to /usr/local/storm




$ cd /usr/local/storm

Step 5 - Start supervisor




$ ./bin/storm supervisor
FakeCallLogReaderSpout.java
CallLogCreaterBolt.java
CallLogCounterBolt.java
LogAnalyerStorm.java


Step 6 - Compile all above programs and execute LogAnalyerStorm.java




$ javac -cp "/usr/local/storm/lib/*" *.java
$ java -cp "/usr/local/storm/lib/*":. LogAnalyserStorm


Output




1234123402 - 1234123401 : 93
1234123404 - 1234123403 : 72
1234123404 - 1234123402 : 83
1234123404 - 1234123401 : 99
1234123402 - 1234123404 : 82
1234123402 - 1234123403 : 86
1234123403 - 1234123404 : 77
1234123401 - 1234123404 : 82
1234123401 - 1234123403 : 78
1234123401 - 1234123402 : 80
1234123403 - 1234123401 : 96
1234123403 - 1234123402 : 72

Have any Question or Comment?

Leave a Reply

Your email address will not be published. Required fields are marked *