Flink – How to use withTimestampAssigner getting time from Event Payload (without using Kafka timestamps)

If the source (e.g., FlinkKafkaConsumer) isn’t providing the timestamps you want to work with, then you need to provide a TimestampAssigner. This is a function that takes an event and the previously assigned timestamp (if any) as input, and returns the timestamp. In your case that can look something like this:

FlinkKafkaConsumer<Event> kafkaData =
        new FlinkKafkaConsumer("CorID_0", new EventDeserializationSchema(), p);

WatermarkStrategy<Event> wmStrategy = 
        WatermarkStrategy
          .<Event>forMonotonousTimestamps()
          .withTimestampAssigner((event, timestamp) -> event.getTime());

DataStream<Event> stream = env.addSource(
        kafkaData.assignTimestampsAndWatermarks(wmStrategy));

(Note: this won’t quite work, since your getTime() method returns a String. You’ll need to parse the string and return a long — typically it will be a long representing milliseconds since the epoch.)

The cases involving a TimestampAssignerSupplier.Context or a WatermarkGeneratorSupplier.Context are for situations where you need access to lower-level APIs to do something more custom. That’s not necessary in this case.

CLICK HERE to find out more related problems solutions.

Leave a Comment

Your email address will not be published.

Scroll to Top