![]() ![]() However, if it has been executed correctly, you can see the number of dynamic tasks that have been executed. On the other hand, in the network view it also looks like a single task. In the grid view (the tree view has been removed in Airflow 2.3.0) dynamic tasks are viewed as if they were a single task, although their name appears with two square brackets: Repair_logs_dummy = my_dag() Graphical interface Sum_processes(get_num_of_failed_processes.expand(log_file=get_log_files(logs_folder))) Time.sleep(0.05) # Simulate computation time Return glob.glob(str(log_folder) + get_num_of_failed_processes(log_file: str) -> int: Return line.split(" 5, get_log_files(log_folder: Path | str) -> list: Logs_folder = Path("/data/dummy_logs/random_logs") Each of these files must be processed in a different task in order to obtain the maximum possible parallelism.įor this, the first task returns a list with the elements of a log folder, then a task is created dynamically for each of these files that are responsible for counting the number of error logs for their respective input and finally a task uses the sum operation to reduce the results of the previous dynamic tasks. In the following example we will show a DAG to count the number of error messages in several log files. To do this, we simply need the post-mapping task to have as arguments a list or dictionary that matches the output of the mapped tasks. Repair_process.expand(process_id=get_failed_processes(logs_file))Ĭreating mappings to distribute tasks and allow their parallel execution is very powerful, but if we also chain the dynamically generated nodes to another one we can perform map reduce operations. Time.sleep(0.5) # Simulate computation time Return line.split(" 5, get_failed_processes(log_file: Path) -> list:įailed_processes.append(get_process_id(line)) Logs_file = Path("/data/dummy_logs/random_logs.log") Below is a DAG that reads a log file (randomly generated for this example), filters the logs with error status and performs a mapping on this set to process each one of them in a different task: import time Simple exampleĪ field that can potentially benefit from the use of this new functionality is log processing due to its high variability. It is possible that future updates will add functionality to improve error handling in dynamic tasks. On the other hand, if a dynamic task fails the whole set of tasks are marked as failed. In case a different data type is passed, an UnmappableXComTypePushed exception will be thrown at runtime. add(x, n):Īdd.partial(n=2).expand(x=)# Result: task1 -> 3, task2 ->4,task3-> 5 LimitationsĬurrently it is only possible to expand a list or dictionary that can be passed as an input parameter in our DAG or as a result of a previous task that has been saved in XCOM. It is also possible to set constant arguments using the method partial(). To perform mapping, simply use the expand() method of a task and pass it a list or a dictionary. The result is similar to having a for loop, where for each element a task is invoked. Thanks to this we can change the number of such tasks in our DAG based on the data handled during an execution. This new feature adds the possibility of creating tasks dynamically at runtime. One of the most outstanding new features of Airflow 2.3.0 is Dynamic Task Mapping. ![]()
0 Comments
Leave a Reply. |
AuthorWrite something about yourself. No need to be fancy, just an overview. ArchivesCategories |