Storm with Twitter Java Example


Storm with Twitter Java Example

Twitter is an online social networking service that provides a platform to send and receive user tweets. Registered users can read and post tweets, but unregistered users can only read tweets. Hashtag is used to categorize tweets by keyword by appending # before the relevant keyword. Now let us take a real-time scenario of finding the most used hashtag per topic.

Add these libraries to your java project build path.




/usr/local/storm/lib/*
twitter4j-async-4.0.4.jar
twitter4j-core-4.0.4.jar
twitter4j-media-support-4.0.4.jar
twitter4j-stream-4.0.4.jar


TwitterSampleSpout.java




import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;

import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.ConfigurationBuilder;
import backtype.storm.Config;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

@SuppressWarnings("serial")
public class TwitterSampleSpout extends BaseRichSpout {
	SpoutOutputCollector _collector;
	LinkedBlockingQueue queue = null;
	TwitterStream _twitterStream;
	String consumerKey;
	String consumerSecret;
	String accessToken;
	String accessTokenSecret;
	String[] keyWords;

	public TwitterSampleSpout(String consumerKey, String consumerSecret,
			String accessToken, String accessTokenSecret, String[] keyWords) {
		this.consumerKey = consumerKey;
		this.consumerSecret = consumerSecret;
		this.accessToken = accessToken;
		this.accessTokenSecret = accessTokenSecret;
		this.keyWords = keyWords;
	}

	public TwitterSampleSpout() {
		// TODO Auto-generated constructor stub
	}

	@Override
	public void open(Map conf, TopologyContext context,
			SpoutOutputCollector collector) {
		queue = new LinkedBlockingQueue(1000);
		_collector = collector;
		StatusListener listener = new StatusListener() {
			@Override
			public void onStatus(Status status) {
				queue.offer(status);
			}

			@Override
			public void onDeletionNotice(StatusDeletionNotice sdn) {
			}

			@Override
			public void onTrackLimitationNotice(int i) {
			}

			@Override
			public void onScrubGeo(long l, long l1) {
			}

			@Override
			public void onException(Exception ex) {
			}

			@Override
			public void onStallWarning(StallWarning arg0) {
				// TODO Auto-generated method stub
			}
		};
		ConfigurationBuilder cb = new ConfigurationBuilder();
		cb.setDebugEnabled(true).setOAuthConsumerKey(consumerKey)
				.setOAuthConsumerSecret(consumerSecret)
				.setOAuthAccessToken(accessToken)
				.setOAuthAccessTokenSecret(accessTokenSecret);
		_twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
		_twitterStream.addListener(listener);
		if (keyWords.length == 0) {
			_twitterStream.sample();
		} else {
			FilterQuery query = new FilterQuery().track(keyWords);
			_twitterStream.filter(query);
		}
	}

	@Override
	public void nextTuple() {
		Status ret = queue.poll();
		if (ret == null) {
			Utils.sleep(50);
		} else {
			_collector.emit(new Values(ret));
		}
	}

	@Override
	public void close() {
		_twitterStream.shutdown();
	}

	@Override
	public Map getComponentConfiguration() {
		Config ret = new Config();
		ret.setMaxTaskParallelism(1);
		return ret;
	}

	@Override
	public void ack(Object id) {
	}

	@Override
	public void fail(Object id) {
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("tweet"));
	}
}


HashtagReaderBolt.java




import java.util.Map;

import twitter4j.HashtagEntity;
import twitter4j.Status;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class HashtagReaderBolt implements IRichBolt {
	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	private OutputCollector collector;

	@Override
	public void prepare(Map conf, TopologyContext context,
			OutputCollector collector) {
		this.collector = collector;
	}

	@Override
	public void execute(Tuple tuple) {
		Status tweet = (Status) tuple.getValueByField("tweet");
		for (HashtagEntity hashtage : tweet.getHashtagEntities()) {
			System.out.println("Hashtag: " + hashtage.getText());
			this.collector.emit(new Values(hashtage.getText()));
		}
	}

	@Override
	public void cleanup() {
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("hashtag"));
	}

	@Override
	public Map getComponentConfiguration() {
		return null;
	}
}


HashtagCounterBolt.java




import java.util.HashMap;
import java.util.Map;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class HashtagCounterBolt implements IRichBolt {
	Map counterMap;
	private OutputCollector collector;

	@Override
	public void prepare(Map conf, TopologyContext context,
			OutputCollector collector) {
		this.counterMap = new HashMap();
		this.collector = collector;
	}

	@Override
	public void execute(Tuple tuple) {
		String key = tuple.getString(0);
		if (!counterMap.containsKey(key)) {
			counterMap.put(key, 1);
		} else {
			Integer c = counterMap.get(key) + 1;
			counterMap.put(key, c);
		}
		collector.ack(tuple);
	}

	@Override
	public void cleanup() {
		for (Map.Entry entry : counterMap.entrySet()) {
			System.out.println("Result: " + entry.getKey() + " : "
					+ entry.getValue());
		}
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("hashtag"));
	}

	@Override
	public Map getComponentConfiguration() {
		return null;
	}
}


TwitterHashtagStorm.java




import java.util.Arrays;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

public class TwitterHashtagStorm {
	
	public static void main(String[] args) throws Exception {
		String consumerKey = "bVd3fwceBGCvjghPqjVF6A2jW";
		String consumerSecret = "86EPCj7ByjPpPTx4vNN1nTYqOsdjN0v7ZsainjEgjGY6KzwjFV";
		String accessToken = "******************-0NpAbHQt1WW2NM5njFieh6xVA0BwedG";
		String accessTokenSecret = "lUcbFDxu08lRE6uIISHE9fgAsEdZXKCh6MTpJqbplYUXy";
		String[] keyWords = {"tweet","hello","hadoop"};
		Config config = new Config();
		config.setDebug(true);
		TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout("twitter-spout", new TwitterSampleSpout(consumerKey,
				consumerSecret, accessToken, accessTokenSecret, keyWords));
		builder.setBolt("twitter-hashtag-reader-bolt", new HashtagReaderBolt())
				.shuffleGrouping("twitter-spout");
		builder.setBolt("twitter-hashtag-counter-bolt",
				new HashtagCounterBolt()).fieldsGrouping(
				"twitter-hashtag-reader-bolt", new Fields("hashtag"));
		LocalCluster cluster = new LocalCluster();
		cluster.submitTopology("TwitterHashtagStorm", config,
				builder.createTopology());
		Thread.sleep(10000);
		cluster.shutdown();
	}
}


Step 1 - Change consumer key, consumer secret, access token and access token secret with your twitter credentials in TwitterHashtagStorm.java program. If you use below credentials it doesn't work.

Step 2 - Change keyowrds.




String consumerKey = "bVd3fwceBGCvjghPqjVF6A2jW";
String consumerSecret = "86EPCj7ByjPpPTx4vNN1nTYqOsdjN0v7ZsainjEgjGY6KzwjFV";
String accessToken = "******************-0NpAbHQt1WW2NM5njFieh6xVA0BwedG";
String accessTokenSecret = "lUcbFDxu08lRE6uIISHE9fgAsEdZXKCh6MTpJqbplYUXy";
String[] keyWords = {"tweet","hello","hadoop"};


Step 3 - Start ZooKeeper. Open a new terminal (CTRL + ALT + T) and start zookeeper.




$ /usr/local/zookeeper/bin/zkServer.sh start

Step 4 - Open a new terminal (CTRL + ALT + T). Change the directory to /usr/local/storm




$ cd /usr/local/storm

Step 5 - Start nimbus




$ ./bin/storm nimbus

Step 6 - Open a new terminal (CTRL + ALT + T). Change the directory to /usr/local/storm




$ cd /usr/local/storm

Step 7 - Start supervisor




$ ./bin/storm supervisor
TwitterSampleSpout.java
HashtagReaderBolt.java
HashtagCounterBolt.java
TwitterHashtagStorm.java


Step 8 - Compile all above programs and execute TwitterHashtagStorm.java




$ javac -cp "/usr/local/storm/lib/*":"/twitter4j-4.0.4/lib/*" *.java

$ java -cp "/usr/local/storm/lib/*":"/twitter4j-4.0.4/lib/*":. TwitterHashtagStorm 


Output




Result: ROBLOX : 2
Result: restaurantsneednotapply : 1
Result: ARIASONEDIRECTION : 1
Result: TuitUtil : 1
Result: EVOLution : 1
Result: YouAreMySundayAtLFF : 1


NOTE

For you the output may be different. It depends on the hashtag keywords that you have given in the program.

Make sure you have internet connection before executing.



Have any Question or Comment?

Leave a Reply

Your email address will not be published. Required fields are marked *