If the source (e.g.,
FlinkKafkaConsumer) isn’t providing the timestamps you want to work with, then you need to provide a
TimestampAssigner. This is a function that takes an event and the previously assigned timestamp (if any) as input, and returns the timestamp. In your case that can look something like this:
FlinkKafkaConsumer<Event> kafkaData = new FlinkKafkaConsumer("CorID_0", new EventDeserializationSchema(), p); WatermarkStrategy<Event> wmStrategy = WatermarkStrategy .<Event>forMonotonousTimestamps() .withTimestampAssigner((event, timestamp) -> event.getTime()); DataStream<Event> stream = env.addSource( kafkaData.assignTimestampsAndWatermarks(wmStrategy));
(Note: this won’t quite work, since your
getTime() method returns a String. You’ll need to parse the string and return a long — typically it will be a long representing milliseconds since the epoch.)
The cases involving a
TimestampAssignerSupplier.Context or a
WatermarkGeneratorSupplier.Context are for situations where you need access to lower-level APIs to do something more custom. That’s not necessary in this case.
CLICK HERE to find out more related problems solutions.