garagelobi.blogg.se

Airflow 2.0 dag example
Airflow 2.0 dag example









Then I might have n DagRuns that I want to re-run. If the previous DagRun failed, the current one will keep sensing into the abyss. I implement this logic via a sensor that senses the success of the last task of the previous DagRun. This DAG depends on the previous DagRuns being successful. So I have a DAG with max_active_runs=1 and catchup=True. My core usecase is re-running DAGs that are not currently running. I will create a second PR with this alternative approach.

airflow 2.0 dag example

Thus, I think a second alternative is an approach where we keep the abovementioned logic but adjust it so that only tasks in the first max_active_runs DagRuns are run, ordered by execution_date. Arguably, when using backfill or just generally catchup=True, I would expect that I can rely on the tasks being executed ordered by execution_date, because if I just have my airflow installation running, that is also the order in which the tasks are being run.

  • When a user clears tasks, the user would want these tasks to be scheduled, therefore I think the violation of max_active_runs - as is the case in my usecase - is on purpose and a feature, not a bugīut, from #1442 I can see that a user might also want to just have specific tasks run, but have them run across a large number of DagRuns, while only executing tasks in max_active_runs or less DagRuns.
  • The scheduler does not schedule tasks in DagRuns which are, in fact, running.
  • Currently, it is expected to ignore the DagRuns which were re-set to running by the clearing of tasks in order to avoid a violation of max_active_runs, until such a violation is avoided. I see now that using TI was entirely on purpose. I did some more reading (mainly #1442 and ).
  • run this snippet to see the result of the query with TI and DR, respectively.
  • Clear a couple of tasks in dag runs that were successful.
  • Just have a dummy task or something, let it run a couple of times so you have a couple of successful DAG states.
  • Create a DAG mydag with catchup=True and max_active_runs=1.
  • While I didn't do this, the following should reproduce the behavior: You don't need to have the sensor logic I described above to reproduce this behavior. If you replace TI with DR in that query, this should work perfectly fine, without breaking anything that currently works and fixing this issue. This is because looks at all TaskInstances of that dagrun and their execution date instead of looking at the DagRuns, and since the tasks were successfull or failed and then cleared, they are filtered out in the query. This is in spite of the fact that they are running. The tasks that should be queued are not queued because the dag runs are not in the abovementioned set of active dag runs. _do_scheduling() runs _schedule_dag_run() once for each dag_id, and gives the set of active dag runs as arg, here. Tl dr Ultimately, this happens because airflow uses TI instead of DR here:

    airflow 2.0 dag example airflow 2.0 dag example

    One alternative would be to remove the max_active_runs constraint, but that is not feasible, as this would create hundreds of DAG runs at once and that is a complete and total performance killer.Īs with previous airflow versions, I would expect that the cleared tasks get scheduled again, which they don't. Thus, t=0 never finishes and t=1 never sensed the finished run, and any t=n with n>1 also have no chance of ever succeeding. Scheduler_1 | INFO - DAG mydag already has 1 active runs, not queuing any tasks for run 05:00:00+00:00











    Airflow 2.0 dag example