I have 2 streams created using kafka topics and I'm joining them using the DataStream API. I want the results of the join (apply) to be published to another kafka topic. I don't see the results of the join in the out topic.
I confirm I'm publishing proper data to both the source topics. Not sure where it is going wrong. Here is code snippet,
The streams created as shown below.
DataStream<String> ms1=env.addSource(new FlinkKafkaConsumer("top1",new SimpleStringSchema(),prop))
.assignTimestampsAndWatermarks(new WatermarkStrategy() {
@Override
public WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new AscendingTimestampsWatermarks<>();
}
@Override
public TimestampAssigner createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return (event, timestamp) -> System.currentTimeMillis();
}
});
DataStream<String> ms2=env.addSource(new FlinkKafkaConsumer("top2",new SimpleStringSchema(),prop))
.assignTimestampsAndWatermarks(new WatermarkStrategy() {
@Override
public WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new AscendingTimestampsWatermarks<>();
}
@Override
public TimestampAssigner createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return (event, timestamp) -> System.currentTimeMillis();
}
});
Stream joins performed using the join-where-equals, as below.
DataStream joinedStreams = ms1.join(ms2)
.where(o -> {String[] tokens = ((String) o).split("::"); return tokens[0];})
.equalTo(o -> {String[] tokens = ((String) o).split("::"); return tokens[0];})
.window(EventTimeSessionWindows.withGap(Time.seconds(60)))
.apply(new JoinFunction<String, String, CountryData>() {
@Override
public CountryData join(String o, String o2) throws Exception {
String[] tokens1 = o.split("::");
String[] tokens2 = o2.split("::");
CountryData countryData = new CountryData(tokens1[0], tokens1[1], tokens1[2], Long.parseLong(tokens1[3])+Long.parseLong(tokens2[3]));
return countryData;
}});
Added sink as below,
joinedStreams.addSink(new FlinkKafkaProducer<CountryData>("localhost:9095","flink-output", new CustomSchema()));
dataStreamSink.setParallelism(1);
dataStreamSink.name("KAFKA-TOPIC");
Any clue, where it is going wrong? I can see messages available in the topology
Thanks
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…