Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
313 views
in Technique[技术] by (71.8m points)

python 3.x - Airflow s3 connection using UI

I've been trying to use Airflow to schedule a DAG. One of the DAG includes a task which loads data from s3 bucket.

For the purpose above I need to setup s3 connection. But UI provided by airflow isn't that intutive (http://pythonhosted.org/airflow/configuration.html?highlight=connection#connections). Any one succeeded setting up the s3 connection if so are there any best practices you folks follow?

Thanks.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

EDIT: This answer stores your secret key in plain text which can be a security risk and is not recommended. The best way is to put access key and secret key in the login/password fields, as mentioned in other answers below. END EDIT

It's hard to find references, but after digging a bit I was able to make it work.

TLDR

Create a new connection with the following attributes:

Conn Id: my_conn_S3

Conn Type: S3

Extra:

{"aws_access_key_id":"_your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"}

Long version, setting up UI connection:

  • On Airflow UI, go to Admin > Connections
  • Create a new connection with the following attributes:
  • Conn Id: my_conn_S3
  • Conn Type: S3
  • Extra: {"aws_access_key_id":"_your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"}
  • Leave all the other fields (Host, Schema, Login) blank.

To use this connection, below you can find a simple S3 Sensor Test. The idea of this test is to set up a sensor that watches files in S3 (T1 task) and once below condition is satisfied it triggers a bash command (T2 task).

Testing

  • Before running the DAG, ensure you've an S3 bucket named 'S3-Bucket-To-Watch'.
  • Add below s3_dag_test.py to airflow dags folder (~/airflow/dags)
  • Start airflow webserver.
  • Go to Airflow UI (http://localhost:8383/)
  • Start airflow scheduler.
  • Turn on 's3_dag_test' DAG on the main DAGs view.
  • Select 's3_dag_test' to show the dag details.
  • On the Graph View you should be able to see it's current state.
  • 'check_s3_for_file_in_s3' task should be active and running.
  • Now, add a file named 'file-to-watch-1' to your 'S3-Bucket-To-Watch'.
  • First tasks should have been completed, second should be started and finish.

The schedule_interval in the dag definition is set to '@once', to facilitate debugging.

To run it again, leave everything as it's, remove files in the bucket and try again by selecting the first task (in the graph view) and selecting 'Clear' all 'Past','Future','Upstream','Downstream' .... activity. This should kick off the DAG again.

Let me know how it went.

s3_dag_test.py ;

"""
S3 Sensor Connection Test
"""

from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor,   BashOperator, EmailOperator, S3KeySensor
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 11, 1),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 5,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('s3_dag_test', default_args=default_args, schedule_interval= '@once')

t1 = BashOperator(
    task_id='bash_test',
    bash_command='echo "hello, it should work" > s3_conn_test.txt',
    dag=dag)

sensor = S3KeySensor(
    task_id='check_s3_for_file_in_s3',
    bucket_key='file-to-watch-*',
    wildcard_match=True,
    bucket_name='S3-Bucket-To-Watch',
    s3_conn_id='my_conn_S3',
    timeout=18*60*60,
    poke_interval=120,
    dag=dag)

t1.set_upstream(sensor)

Main References:

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...