|
| 1 | +# Licensed to the Apache Software Foundation (ASF) under one |
| 2 | +# or more contributor license agreements. See the NOTICE file |
| 3 | +# distributed with this work for additional information |
| 4 | +# regarding copyright ownership. The ASF licenses this file |
| 5 | +# to you under the Apache License, Version 2.0 (the |
| 6 | +# "License"); you may not use this file except in compliance |
| 7 | +# with the License. You may obtain a copy of the License at |
| 8 | +# |
| 9 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | +# |
| 11 | +# Unless required by applicable law or agreed to in writing, |
| 12 | +# software distributed under the License is distributed on an |
| 13 | +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | +# KIND, either express or implied. See the License for the |
| 15 | +# specific language governing permissions and limitations |
| 16 | +# under the License. |
| 17 | + |
| 18 | +""" |
| 19 | +The Suppliers Who Kept Orders Waiting query identifies suppliers, for a given nation, whose product |
| 20 | +was part of a multi-supplier order (with current status of 'F') where they were the only supplier |
| 21 | +who failed to meet the committed delivery date. |
| 22 | +""" |
| 23 | + |
| 24 | +from datafusion import SessionContext, col, lit, functions as F |
| 25 | + |
| 26 | +NATION_OF_INTEREST = "SAUDI ARABIA" |
| 27 | + |
| 28 | +# Load the dataframes we need |
| 29 | + |
| 30 | +ctx = SessionContext() |
| 31 | + |
| 32 | +df_orders = ctx.read_parquet("data/orders.parquet").select_columns( |
| 33 | + "o_orderkey", "o_orderstatus" |
| 34 | +) |
| 35 | +df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns( |
| 36 | + "l_orderkey", "l_receiptdate", "l_commitdate", "l_suppkey" |
| 37 | +) |
| 38 | +df_supplier = ctx.read_parquet("data/supplier.parquet").select_columns( |
| 39 | + "s_suppkey", "s_name", "s_nationkey" |
| 40 | +) |
| 41 | +df_nation = ctx.read_parquet("data/nation.parquet").select_columns( |
| 42 | + "n_nationkey", "n_name" |
| 43 | +) |
| 44 | + |
| 45 | +# Limit to suppliers in the nation of interest |
| 46 | +df_suppliers_of_interest = df_nation.filter(col("n_name") == lit(NATION_OF_INTEREST)) |
| 47 | + |
| 48 | +df_suppliers_of_interest = df_suppliers_of_interest.join( |
| 49 | + df_supplier, (["n_nationkey"], ["s_nationkey"]), "inner" |
| 50 | +) |
| 51 | + |
| 52 | +# Find the failed orders and all their line items |
| 53 | +df = df_orders.filter(col("o_orderstatus") == lit("F")) |
| 54 | + |
| 55 | +df = df_lineitem.join(df, (["l_orderkey"], ["o_orderkey"]), "inner") |
| 56 | + |
| 57 | +# Identify the line items for which the order is failed due to. |
| 58 | +df = df.with_column( |
| 59 | + "failed_supp", |
| 60 | + F.case(col("l_receiptdate") > col("l_commitdate")) |
| 61 | + .when(lit(True), col("l_suppkey")) |
| 62 | + .end(), |
| 63 | +) |
| 64 | + |
| 65 | +# There are other ways we could do this but the purpose of this example is to work with rows where |
| 66 | +# an element is an array of values. In this case, we will create two columns of arrays. One will be |
| 67 | +# an array of all of the suppliers who made up this order. That way we can filter the dataframe for |
| 68 | +# only orders where this array is larger than one for multiple supplier orders. The second column |
| 69 | +# is all of the suppliers who failed to make their commitment. We can filter the second column for |
| 70 | +# arrays with size one. That combination will give us orders that had multiple suppliers where only |
| 71 | +# one failed. Use distinct=True in the blow aggregation so we don't get multipe line items from the |
| 72 | +# same supplier reported in either array. |
| 73 | +df = df.aggregate( |
| 74 | + [col("o_orderkey")], |
| 75 | + [ |
| 76 | + F.array_agg(col("l_suppkey"), distinct=True).alias("all_suppliers"), |
| 77 | + F.array_agg(col("failed_supp"), distinct=True).alias("failed_suppliers"), |
| 78 | + ], |
| 79 | +) |
| 80 | + |
| 81 | +# Remove the null entries that will get returned by array_agg so we can test to see where we only |
| 82 | +# have a single failed supplier in a multiple supplier order |
| 83 | +df = df.with_column( |
| 84 | + "failed_suppliers", F.array_remove(col("failed_suppliers"), lit(None)) |
| 85 | +) |
| 86 | + |
| 87 | +# This is the check described above which will identify single failed supplier in a multiple |
| 88 | +# supplier order. |
| 89 | +df = df.filter(F.array_length(col("failed_suppliers")) == lit(1)).filter( |
| 90 | + F.array_length(col("all_suppliers")) > lit(1) |
| 91 | +) |
| 92 | + |
| 93 | +# Since we have an array we know is exactly one element long, we can extract that single value. |
| 94 | +df = df.select( |
| 95 | + col("o_orderkey"), F.array_element(col("failed_suppliers"), lit(1)).alias("suppkey") |
| 96 | +) |
| 97 | + |
| 98 | +# Join to the supplier of interest list for the nation of interest |
| 99 | +df = df.join(df_suppliers_of_interest, (["suppkey"], ["s_suppkey"]), "inner") |
| 100 | + |
| 101 | +# Count how many orders that supplier is the only failed supplier for |
| 102 | +df = df.aggregate([col("s_name")], [F.count(col("o_orderkey")).alias("numwait")]) |
| 103 | + |
| 104 | +# Return in descending order |
| 105 | +df = df.sort(col("numwait").sort(ascending=False)) |
| 106 | + |
| 107 | +df = df.limit(100) |
| 108 | + |
| 109 | +df.show() |
0 commit comments