If the source (e.g., FlinkKafkaConsumer
) isn’t providing the timestamps you want to work with, then you need to provide a TimestampAssigner
. This is a function that takes an event and the previously assigned timestamp (if any) as input, and returns the timestamp. In your case that can look something like this:
FlinkKafkaConsumer<Event> kafkaData =
new FlinkKafkaConsumer("CorID_0", new EventDeserializationSchema(), p);
WatermarkStrategy<Event> wmStrategy =
WatermarkStrategy
.<Event>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.getTime());
DataStream<Event> stream = env.addSource(
kafkaData.assignTimestampsAndWatermarks(wmStrategy));
(Note: this won’t quite work, since your getTime()
method returns a String. You’ll need to parse the string and return a long — typically it will be a long representing milliseconds since the epoch.)
The cases involving a TimestampAssignerSupplier.Context
or a WatermarkGeneratorSupplier.Context
are for situations where you need access to lower-level APIs to do something more custom. That’s not necessary in this case.
CLICK HERE to find out more related problems solutions.