Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
447 views
in Technique[技术] by (71.8m points)

Apache Flink Stream join using DataStream API not outputting anything

I have 2 streams created using kafka topics and I'm joining them using the DataStream API. I want the results of the join (apply) to be published to another kafka topic. I don't see the results of the join in the out topic.

I confirm I'm publishing proper data to both the source topics. Not sure where it is going wrong. Here is code snippet,

The streams created as shown below.

DataStream<String> ms1=env.addSource(new FlinkKafkaConsumer("top1",new SimpleStringSchema(),prop))
            .assignTimestampsAndWatermarks(new WatermarkStrategy() {
                @Override
                public WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                    return new AscendingTimestampsWatermarks<>();
                }
                @Override
                public TimestampAssigner createTimestampAssigner(TimestampAssignerSupplier.Context context) {
                    return (event, timestamp) -> System.currentTimeMillis();
                }
            });
DataStream<String> ms2=env.addSource(new FlinkKafkaConsumer("top2",new SimpleStringSchema(),prop))
            .assignTimestampsAndWatermarks(new WatermarkStrategy() {
                @Override
                public WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                    return new AscendingTimestampsWatermarks<>();
                }
                @Override
                public TimestampAssigner createTimestampAssigner(TimestampAssignerSupplier.Context context) {
                    return (event, timestamp) -> System.currentTimeMillis();
                }
            });

Stream joins performed using the join-where-equals, as below.

DataStream joinedStreams = ms1.join(ms2)
            .where(o -> {String[] tokens = ((String) o).split("::"); return tokens[0];})
            .equalTo(o -> {String[] tokens = ((String) o).split("::"); return tokens[0];})
            .window(EventTimeSessionWindows.withGap(Time.seconds(60)))
            .apply(new JoinFunction<String, String, CountryData>() {
                @Override
                public CountryData join(String o, String o2) throws Exception {
                    String[] tokens1 = o.split("::");
                    String[] tokens2 = o2.split("::");
                    CountryData countryData = new CountryData(tokens1[0], tokens1[1], tokens1[2], Long.parseLong(tokens1[3])+Long.parseLong(tokens2[3]));
                    return countryData;
                }});

Added sink as below,

joinedStreams.addSink(new FlinkKafkaProducer<CountryData>("localhost:9095","flink-output", new CustomSchema()));
dataStreamSink.setParallelism(1);
dataStreamSink.name("KAFKA-TOPIC");

Any clue, where it is going wrong? I can see messages available in the topology Thanks


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

I think the two FlinkKafkaConsumer instances are missing a time extractor and a watermark configuration.

Since the code is using event-time window join, it needs some kind of time information associated with the data found in Kafka in order to know which time window each events corresponds to.

Without that, events from both streams are probably never close enough in event time to match the 60s window defined by EventTimeSessionWindows.withGap(Time.seconds(60)).

You also need to set the watermark parameter to tell Flink when to stop waiting for new data and materialize the output s.t. you can see the join result.

Have a look at the Kafka connector time and watermark configuration for the various time extraction and watermarking possibilities you have.

Finally, make sure you send test data spread over a long enough time period to your application. With event time processing, only "old enough" data makes it to the output, young data is always "stuck in transit". For example, with 60s time window and, say, 30s watermark, you would need at least 90s of data before you see anything in the output.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...