SIGN IN SIGN UP
apache / airflow UNCLAIMED

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows

44796 0 0 Python

Rework the TriggererJobRunner to run triggers in a process without DB access (#46677)

This uses a similar approach to the DAG Parser -- the subprocess runs the async
Triggers (i.e. user code) in a process and sends messages back and forth to the
supervisor/parent to perform CRUD operations on the DB.

I have also massively re-worked how per-trigger logging works to greatly
simplify it. I hope @dstandish will approve. The main way it has been
simplified is with the switch to TaskSDK then _all_ (100%! Really) of logs are
set as JSON over a socket to the parent process; everything in the subprocess
logs to this output, there is no differentiation needed in stdlib, no custom
handlers etc. and by making use of structlog's automatic context vars we can
include a trigger_id field -- if we find that we route the output to the right
trigger specific log file.

This is all now so much simpler with structlog in the mix.

Logging from the async process works as follows:
- stdlib logging is configured to send messages via struct log as json
- As part of the stdlib->structlog processing change we include structlog bound
  contextvars
- When a triggerer coro starts it binds `trigger_id` as a paramter
- When the Supervisor receives a log message (which happens as LD JSON over a
  dedicated  socket channel) it parses the JSON, and if it finds `trigger_id`
  key in there it redirects it to the trigger file log, else prints it.

Of note: I haven't allowed triggers to directly access or set Xcom, Variables
etc. We can add that in future if there is demand.
A
Ash Berlin-Taylor committed
18396ed8083f38eebe7db062110d8c569cb4fa3d
Parent: 8fbdf75
Committed by GitHub <noreply@github.com> on 2/18/2025, 9:39:31 AM