count(Materialized.>as("WordCountsStore") Next, we'll create a state store (key-value) for all the computed word counts: groupedByWord groupBy((key, word) -> word, Grouped.with(stringSerde, stringSerde)) flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W "))) Firstly, we'll define the processing topology, in our case, the word count algorithm: KStream textLines =īuilder.stream(TEXT_LINES_TOPIC, Consumed.with(Serdes.String(), Serdes.String())) įinal KGroupedStream groupedByWord = textLines Let's see an example using interactive queries. With the following entries: "one", "two", "three", "four", "five" Materialized.with(Serdes.String(), Serdes.Long())) aggregate(() -> 0L, (aggKey, newValue, aggValue) -> aggValue newValue.length(), Grouped.with(Serdes.String(), Serdes.String())) groupBy((key, value) -> (value != null
0 Comments
Leave a Reply. |
AuthorWrite something about yourself. No need to be fancy, just an overview. ArchivesCategories |