Airflow
Frequently used code for airflow related code snippets
Guidelines
By using DockerOperators/KubernetesOperators only we can avoid the technical debt that would existing by using different operators, and the dependency on Airflow development of operators(more on this).
Recipes
Activate the execution of a DAG
airflow unpause dag_id
Setting it up in Amazon linux AMI
#!/bin/bash
SLUGIFY_USES_TEXT_UNIDECODE=yes pip install apache-airflow[s3,postgres]
# sudo su
yum install postgresql postgresql-contrib postgresql-server
/etc/init.d/postgresql92 initdb
/etc/init.d/postgresql92 start
# or required version
AIRFLOW_USER=user
AIRFLOW_PASS=pass
AIRFLOW_DB=airflow
AIRFLOW_HOME=~/airflow
AIRFLOW_SQL_ALCHEMY_CONN=postgresql+psycopg2://"$AIRFLOW_USER":"$AIRFLOW_PASS"@localhost:5432/"$AIRFLOW_DB"
echo "CREATE USER ${AIRFLOW_USER} PASSWORD '${AIRFLOW_PASS}'; CREATE DATABASE ${AIRFLOW_DB}; GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO ${AIRFLOW_USER}; ALTER ROLE ${AIRFLOW_USER} SUPERUSER; ALTER ROLE ${AIRFLOW_USER} CREATEDB; ALTER ROLE ${AIRFLOW_USER} WITH LOGIN;" | sudo -u postgres psql
## on /var/lib/pgsql92/data/pg_hba.conf
# put:
#host all all 0.0.0.0/0 trust on IPv4 local connection
sudo -u postgres sed -i "s|127.0.0.1/32|0.0.0.0/0|" /var/lib/pgsql92/data/pg_hba.conf # /etc/postgresql/10/main/pg_hba.conf ubuntu
# on /var/lib/pgsql92/data/postgresql.conf
# put:
# listen_addresses = '*'
sudo -u postgres sed -i "s|#listen_addresses = 'localhost'|listen_addresses = '*'|" /var/lib/pgsql92/data/postgresql.conf
#then
sudo /etc/init.d/postgresql92 restart
sudo echo "AIRFLOW_HOME=${AIRFLOW_HOME}" >> /etc/environment
SLUGIFY_USES_TEXT_UNIDECODE=yes pip install apache-airflow[postgres] # or s3
# pip install -U pip setuptools wheel psycopg2 Cython pytz pyOpenSSL ndg-httpsclient pyasn1 psutil apache-airflow[postgres]
# for venv
airflow initdb
# on AIRFLOW_HOME/airflow.cfg
# put:
# executor = LocalExecutor
# sql_alchemy_conn = postgresql+psycopg2://af_user:af_pass@localhost:5432/airflow
sed -i "s|executor = .*|executor = LocalExecutor|g" "$AIRFLOW_HOME"/airflow.cfg
sed -i "s|sql_alchemy_conn = .*|sql_alchemy_conn = $AIRFLOW_SQL_ALCHEMY_CONN|g" "$AIRFLOW_HOME"/airflow.cfg
sed -i "s|load_examples = .*|load_examples = False|g" "$AIRFLOW_HOME"/airflow.cfg
airflow initdb
#Always run webserver and then scheduler
airflow webserver
airflow scheduler
Triggering dags from python client
Setup a cluster
Pre-Requisites
The following nodes are available with the given host names:
master1: Will have the role(s): Web Server, Scheduler
master2: Will have the role(s): Web Server
worker1: Will have the role(s): Worker
worker2: Will have the role(s): Worker
A Queuing Service is Running. (RabbitMQ, AWS SQS, etc)
You can install RabbitMQ by following these instructions:
Installing RabbitMQ
If you’re using RabbitMQ, it is recommended that it is also setup to be a cluster for High Availability. Setup a Load Balancer to proxy requests to the RabbitMQ instances.
Comments
Scale workers vertically by providing higher values to
celeryd_concurrency
References
Last updated
Was this helpful?