Storm Trident Java Example


Storm Trident Java Example

Trident is an extension of Storm. Like Storm, Trident was also developed by Twitter. The main reason behind developing Trident is to provide a high-level abstraction on top of Storm along with stateful stream processing and low latency distributed querying.

Trident uses spout and bolt, but these low-level components are auto-generated by Trident before execution. Trident has functions, filters, joins, grouping, and aggregation.

Add these libraries to your java project build path.

/usr/local/storm/lib/*

FormatCall.java

import backtype.storm.tuple.Values;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class FormatCall extends BaseFunction {
	private static final long serialVersionUID = 1L;

	@Override
	public void execute(TridentTuple tuple, TridentCollector collector) {
		String fromMobileNumber = tuple.getString(0);
		String toMobileNumber = tuple.getString(1);
		collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
	}
}

CSVSplit.java

import backtype.storm.tuple.Values;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class CSVSplit extends BaseFunction {
	private static final long serialVersionUID = 1L;

	@Override
	public void execute(TridentTuple tuple, TridentCollector collector) {
		for (String word : tuple.getString(0).split(",")) {
			if (word.length() > 0) {
				collector.emit(new Values(word));
			}
		}
	}
}

LogAnalyserTrident.java

import java.util.*;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.MemoryMapState;
import org.apache.storm.shade.com.google.common.collect.*;

public class LogAnalyserTrident {
	public static void main(String[] args) throws Exception {
		System.out.println("Log Analyser Trident");
		TridentTopology topology = new TridentTopology();
		FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of(
				"fromMobileNumber", "toMobileNumber", "duration"));
		TridentState callCounts = topology
				.newStream("fixed-batch-spout", testSpout)
				.each(new Fields("fromMobileNumber", "toMobileNumber"),
						new FormatCall(), new Fields("call"))
				.groupBy(new Fields("call"))
				.persistentAggregate(new MemoryMapState.Factory(), new Count(),
						new Fields("count"));
		LocalDRPC drpc = new LocalDRPC();
		topology.newDRPCStream("call_count", drpc).stateQuery(callCounts,
				new Fields("args"), new MapGet(), new Fields("count"));
		topology.newDRPCStream("multiple_call_count", drpc)
				.each(new Fields("args"), new CSVSplit(), new Fields("call"))
				.groupBy(new Fields("call"))
				.stateQuery(callCounts, new Fields("call"), new MapGet(),
						new Fields("count"))
				.each(new Fields("call", "count"), new Debug())
				.each(new Fields("count"), new FilterNull())
				.aggregate(new Fields("count"), new Sum(), new Fields("sum"));
		Config conf = new Config();
		LocalCluster cluster = new LocalCluster();
		cluster.submitTopology("trident", conf, topology.build());
		Random randomGenerator = new Random();
		int idx = 0;
		while (idx < 10) {
			testSpout.feed(ImmutableList.of(new Values("1234123401",
					"1234123402", randomGenerator.nextInt(60))));
			testSpout.feed(ImmutableList.of(new Values("1234123401",
					"1234123403", randomGenerator.nextInt(60))));
			testSpout.feed(ImmutableList.of(new Values("1234123401",
					"1234123404", randomGenerator.nextInt(60))));
			testSpout.feed(ImmutableList.of(new Values("1234123402",
					"1234123403", randomGenerator.nextInt(60))));
			idx = idx + 1;
		}
		System.out.println("DRPC : Query starts");
		System.out.println(drpc
				.execute("call_count", "1234123401 - 1234123402"));
		System.out.println(drpc.execute("multiple_call_count",
				"1234123401 - 1234123402,1234123401 - 1234123403"));
		System.out.println("DRPC : Query ends");
		cluster.shutdown();
		drpc.shutdown();
		// DRPCClient client = new DRPCClient("drpc.server.location", 3772);
	}
}

Step 1 – Start ZooKeeper. Open a new terminal (CTRL + ALT + T) and start zookeeper.

$ /usr/local/zookeeper/bin/zkServer.sh start

Step 2 – Open a new terminal (CTRL + ALT + T). Change the directory to /usr/local/storm

$ cd /usr/local/storm

Step 3 – Start nimbus

$ ./bin/storm nimbus

Step 4 – Open a new terminal (CTRL + ALT + T). Change the directory to /usr/local/storm

$ cd /usr/local/storm

Step 5 – Start supervisor

$ ./bin/storm supervisor
FormatCall.java
CSVSplit.java
LogAnalyerTrident.java

Step 6 – Compile all above programs and execute LogAnalyerTrident.java

$ javac -cp "/usr/local/storm/lib/*" *.java
$ java -cp "/usr/local/storm/lib/*":. LogAnalyerTrident

Output

DRPC : Query starts
[["1234123401 - 1234123402",10]]
DEBUG: [1234123401 - 1234123402, 10]
DEBUG: [1234123401 - 1234123403, 10]
[[20]]
DRPC : Query ends

Have any Question or Comment?

Leave a Reply

Your email address will not be published. Required fields are marked *