Flink Word Count Java Example

Flink Word Count Java Example

The following code shows the WordCount implementation from the Quickstart which processes some text lines with two operators (FlatMap and Reduce), prints the resulting words and counts to std-out.

Step 1 - Add JARs (Libraries) Add the following jars to your java project build path. You can find these jar files in the lib directory of flink.





import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.tuple.Tuple2;

public class WordCount {
  public static void main(String[] args) throws Exception {
    // set up the execution environment
    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    DataSet<String> text = env.readTextFile("/home/hduser/Desktop/input.txt");
    DataSet<Tuple2<String, Integer>> counts = 
        // split up the lines in pairs (2-tuples) containing: (word,1)
        text.flatMap(new LineSplitter())
        // group by the tuple field "0" and sum up tuple field "1"
        .aggregate(Aggregations.SUM, 1)

    // emit result
    // execute program
    env.execute("WordCount Example");



import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

	private static final long serialVersionUID = 1L;

  public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
    // normalize and split the line into words
    String[] tokens = value.toLowerCase().split("\\W+");
    // emit the pairs
    for (String token : tokens) {
      if (token.length() > 0) {
        out.collect(new Tuple2<String, Integer>(token, 1));

input.txt Which is saved in the directory /home/hduser/Desktop/input.txt


hi hello hi hello nagaraju deshmane

Step 2 - Run WordCount.java

output.txt Which is saved in the directory /home/hduser/Desktop/


deshmane 1
hello 2
hi 2
nagaraju 1

You can also give the input files those are present in the HDFS (Hadoop distributed file system) and store result.


DataSet<String> text = env.readTextFile("hdfs://localhost:9000/user/hduser/input.txt");


Have any Question or Comment?

Leave a Reply

Your email address will not be published. Required fields are marked *