error when creating the google dataflow template

The error is raised by the cancel function after the waiting time and it appears to be harmless.

To prove it, I managed to reproduce your exact issue from my virtual machine with python 3.5. The template is created in the given path by --template_location and can be used to run jobs. Note that I needed to apply some changes to your code to get it to actually work in Dataflow.

In case it is of any use to you, I ended up using this pipeline code

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import datetime

# Fill this values in order to have them by default
# Note that the table in BQ needs to have the column names message_body and publish_time

Table = 'projectid:datasetid.tableid'
schema = 'ex1:STRING, ex2:TIMESTAMP'
TOPIC = "projects/<projectid>/topics/<topicname>"

class AddTimestamps(beam.DoFn):
    def process(self, element, publish_time=beam.DoFn.TimestampParam):
        """Processes each incoming element by extracting the Pub/Sub
        message and its publish timestamp into a dictionary. `publish_time`
        defaults to the publish timestamp returned by the Pub/Sub server. It
        is bound to each element by Beam at runtime.
        """

        yield {
            "message_body": element.decode("utf-8"),
            "publish_time": datetime.datetime.utcfromtimestamp(
                float(publish_time)
            ).strftime("%Y-%m-%d %H:%M:%S.%f"),
        }


def main(argv=None):

    parser = argparse.ArgumentParser()
    parser.add_argument("--input_topic", default=TOPIC)
    parser.add_argument("--output_table", default=Table)
    args, beam_args = parser.parse_known_args(argv)
    # save_main_session needs to be set to true due to modules being used among the code (mostly datetime)
    # Uncomment the service account email to specify a custom service account
    p = beam.Pipeline(argv=beam_args,options=PipelineOptions(save_main_session=True,
region='us-central1'))#, service_account_email='email'))

    (p
        | 'ReadData' >> beam.io.ReadFromPubSub(topic=args.input_topic).with_output_types(bytes)
        | "Add timestamps to messages" >> beam.ParDo(AddTimestamps())
        | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(args.output_table, schema=schema, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
    )
    result = p.run()
    #Warning: Cancel does not work properly in a template
    result.wait_until_finish(duration=3000)
    result.cancel()   # Cancel the streaming pipeline after a while to avoid consuming more resources

if __name__ == '__main__':
    logger = logging.getLogger().setLevel(logging.INFO)
    main()

Afterwards I ran commands:

# Fill accordingly
PROJECT="MYPROJECT-ID"
BUCKET="MYBUCKET"
TEMPLATE_NAME="TRIAL"

# create the template
python3 -m templates.template-pubsub-bigquery \
  --runner DataflowRunner \
  --project $PROJECT \
  --staging_location gs://$BUCKET/staging \
  --temp_location gs://$BUCKET/temp \
  --template_location gs://$BUCKET/templates/$TEMPLATE_NAME \
  --streaming

to create the pipeline (which yields the error you mentioned but still creates the template). And

# Fill job-name and gcs location accordingly
# Uncomment and fill the parameters should you want to use your own

gcloud dataflow jobs run <job-name> \
        --gcs-location "gs://<MYBUCKET>/dataflow/templates/mytemplate" 
   #     --parameters input_topic="", output_table=""

To run the pipeline.

As I said, the template was properly created and the pipeline worked properly.


Edit

Indeed the cancel function does not work properly in the template. It seems to be an issue with it needing the job id on template creation which of course it does not exist and as a result it omits the function.

I found this other post that handles extracting the Job id on the pipeline. I tried some tweaks to make it work within the template code itself but I think is not necessary. Given you want to schedule their execution I would go for the easier option and execute the streaming pipeline template at a certain time (e.g. 9:01 GMT) and cancel the pipeline with script

import logging, re,os
from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials

def retrieve_job_id():
  #Fill as needed
  project = '<project-id>'
  job_prefix = "<job-name>"
  location = '<location>'

  logging.info("Looking for jobs with prefix {} in region {}...".format(job_prefix, location))

  try:
    credentials = GoogleCredentials.get_application_default()
    dataflow = build('dataflow', 'v1b3', credentials=credentials)

    result = dataflow.projects().locations().jobs().list(
      projectId=project,
      location=location,
    ).execute()

    job_id = "none"

    for job in result['jobs']:
      if re.findall(r'' + re.escape(job_prefix) + '', job['name']):
        job_id = job['id']
        break

    logging.info("Job ID: {}".format(job_id))
    return job_id

  except Exception as e:
    logging.info("Error retrieving Job ID")
    raise KeyError(e)


os.system('gcloud dataflow jobs cancel {}'.format(retrieve_job_id()))

at another time (e.g. 9:05 GMT). This script assumes you are running the script with the same job name each time and takes the latest appearance of the name and cancels it. I tried it several times and it works fine.

CLICK HERE to find out more related problems solutions.

Leave a Comment

Your email address will not be published.

Scroll to Top