Please consider that Cloud Datafusion is an ETL (Extract-Transform-Load) tool; in which case, the pipeline will handle the data in the files and not the files as such; therefore, it’s difficult to define a simple pipeline for uploading files to Storage.
In the same way, since the files have different formats, it seems that a transformation would have to be defined to separate/treat the files depending on the type of file.
On the other hand, I understand that your usage scenario is:
- Extract files from one or more SFTP server
- The files have different formats (csv, json, parquet, and avro)
- Files need to be uploaded to Cloud Storage
- Each Storage file is associated with a BigQuery table as an external source
Based on this, I consider a better option to use an orchestration tool like Cloud Composer.
Airflow uses a DAG (Directed Acyclic Graph) as a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies; in which case, your DAG would have these tasks:
- Use the sftp_operator to obtain the files from the sftp servers within the workers
- Once the files are in the workers, you can use the PythonOperator to use the Cloud Storage library to upload the files to a bucket in your project.
- Once the files are in Storage, you can use a PythonOperator to use the BigQuery library or a BashOperator to use bq load to create the tables with each file as external source
Another benefit is you no longer need to worry about the file type since all the formats you mentioned are currently supported to create the table directly from them.
CLICK HERE to find out more related problems solutions.