Skip to content

SparkETLManager Class Documentation

Bases: ManagerClass

Puts together all Spark features provided by sparksnake library.

This class provides an easy and fast way for users to improve and enhance the development of their Apache Spark applications. This class can be considered a central point of contact for users who whant to use all features (attributes and methods) provided by sparksnake whenever the Spark application is running locally or in any supported AWS services such as AWS Glue.

To configure this class and start using all its features, users just need to set up an "operation mode" represented by the "mode" class attribute. The operation mode can be chosen based on where the Spark application will run. Currently there are two available options:

  • mode="default" enables features do enhance the development of Spark applications anywhere
  • mode="glue" enables features to enhance the development of Spark applications deployed as Glue jobs in AWS. In this case, a class inheritance process is applied in order to enable users to use awsglue modules in a Glue environment.
"Setting up the operation mode within SparkETLManager class"
# Importing the class
from sparksnake.manager import SparkETLManager

# Creating a spark manager object to develop Spark apps anywhere
spark_manager = SparkETLManager(
    mode="default"
)

# Creating a spark manager object to develop Spark apps on AWS Glue
spark_manager = SparkETLManager(
    mode="glue",
    argv_list=[]  # A list of Glue job arguments
    data_dict={}  # A dictionary with all data sources for the job
)

A special note about the sparksnake's operation mode takes place on different behaviors the deployment environment demands in order to work properly. In other words, when choosing "glue" as the operation mode while creating a SparkETLManager object, users need to check what additional attributes must be passed to the class so the Glue custom features can available to be applied in their Spark application.

A basic usage example of class SparkETLManager with mode="glue"
# Importing packages
from sparksnake.manager import SparkETLManager
from datetime import datetime

# Defining job arguments
ARGV_LIST = ["JOB_NAME", "S3_OUTPUT_PATH"]

# Defining dictionary of data sources to be used on job
DATA_DICT = {
    "orders": {
        "database": "ra8",
        "table_name": "orders",
        "transformation_ctx": "dyf_orders"
    },
    "customers": {
        "database": "ra8",
        "table_name": "customers",
        "transformation_ctx": "dyf_customers",
        "push_down_predicate": "anomesdia=20221201",
        "create_temp_view": True,
        "additional_options": {
            "compressionType": "lzo"
        }
    }
}

# Creating a class object on initializing a glue job
spark_manager = SparkETLManager(
    mode="glue",
    argv_list=ARGV_LIST,
    data_dict=DATA_DICT
)

spark_manager.init_job()

# Getting all DataFrames Spark based on data_dict provided
dfs_dict = spark_manager.generate_dataframes_dict()

# Indexing a DataFrame from the dictionary
df_orders = dfs_dict["orders"]

# Dropping a partition on S3 (if exists)
spark_manager.drop_partition(
    s3_partition_uri="s3://some-bucket-name/some-table-name/partition/"
)

# Adding a partition column into the DataFrame
df_orders_partitioned = spark_manager.add_partition_column(
    partition_name="anomesdia",
    partition_value=int(datetime.now().strftime("%Y%m%d"))
)

# Applying a repartition method for storage optimization
df_orders_repartitioned = spark_manager.repartition_dataframe(
    df=df_orders_partitioned,
    num_partitions=10
)

# Writing data on S3 and cataloging it on Data Catalog
spark_manager.write_and_catalog_data(df=df_orders_repartitioned)

# Job commit
spark_manager.job.commit()

Parameters:

Name Type Description Default
mode string

Operation mode for the class. It handles inheritance from other classes based on this library so the SparkETLManager class can expand its features for a Spark application development in specific scenarios. Acceptable values are: "default", "glue".

required
The "mode" attribute may not be the only one.

As stated before, the SparkETLManager class provides a "mode" attribute that can be used to set special class configuration according to where users pretend to develop their Spark applications. Technically, it happens by class inheritance.

In other words, when users set mode="glue" in order to develop their Spark applications as Glue jobs on AWS, all Glue features that is needed to provide such environment is inherited by another class inside the sparksnake library. This class is the GlueJobManager and its source code is available on the glue.py library module.

By saying that the "mode" attribute may not be the only one, it is said that those class inheritance processes may demands the input of some other attributes. For example, to initialize an object from the GlueJobManager class, users need to pass two more attributes named argv_list and data_dict, each one with their special purposes. So, in this situation, anyone who needs to use sparksnake in the Glue ops mode may pass those two mode class attributes in the SparkETLManager class.

To be awared of which additional attributes is needed to start the SparkETLManager class in any available mode, you can always check the source code of the class to be inherited. The table below provides information about all operation modes and the inherited classes:

Operation Mode Inherited Class
default None
glue GlueJobManager
Source code in sparksnake/manager.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
class SparkETLManager(ManagerClass):
    """Puts together all Spark features provided by sparksnake library.

    This class provides an easy and fast way for users to improve and enhance
    the development of their Apache Spark applications. This class can be
    considered a central point of contact for users who whant to use all
    features (attributes and methods) provided by sparksnake whenever the
    Spark application is running locally or in any supported AWS services
    such as AWS Glue.

    To configure this class and start using all its features, users just need
    to set up an "operation mode" represented by the "mode" class attribute.
    The operation mode can be chosen based on where the Spark application will
    run. Currently there are two available options:

    - `mode="default"` enables features do enhance the development of Spark
        applications anywhere
    - `mode="glue"` enables features to enhance the development of Spark
        applications deployed as Glue jobs in AWS. In this case, a class
        inheritance process is applied in order to enable users to use
        `awsglue` modules in a Glue environment.

    Example: "Setting up the operation mode within `SparkETLManager` class"
        ```python
        # Importing the class
        from sparksnake.manager import SparkETLManager

        # Creating a spark manager object to develop Spark apps anywhere
        spark_manager = SparkETLManager(
            mode="default"
        )

        # Creating a spark manager object to develop Spark apps on AWS Glue
        spark_manager = SparkETLManager(
            mode="glue",
            argv_list=[]  # A list of Glue job arguments
            data_dict={}  # A dictionary with all data sources for the job
        )
        ```

    A special note about the sparksnake's operation mode takes place on
    different behaviors the deployment environment demands in order to work
    properly. In other words, when choosing "glue" as the operation mode while
    creating a `SparkETLManager` object, users need to check what additional
    attributes must be passed to the class so the Glue custom features can
    available to be applied in their Spark application.

    Example: A basic usage example of class `SparkETLManager` with mode="glue"
        ```python
        # Importing packages
        from sparksnake.manager import SparkETLManager
        from datetime import datetime

        # Defining job arguments
        ARGV_LIST = ["JOB_NAME", "S3_OUTPUT_PATH"]

        # Defining dictionary of data sources to be used on job
        DATA_DICT = {
            "orders": {
                "database": "ra8",
                "table_name": "orders",
                "transformation_ctx": "dyf_orders"
            },
            "customers": {
                "database": "ra8",
                "table_name": "customers",
                "transformation_ctx": "dyf_customers",
                "push_down_predicate": "anomesdia=20221201",
                "create_temp_view": True,
                "additional_options": {
                    "compressionType": "lzo"
                }
            }
        }

        # Creating a class object on initializing a glue job
        spark_manager = SparkETLManager(
            mode="glue",
            argv_list=ARGV_LIST,
            data_dict=DATA_DICT
        )

        spark_manager.init_job()

        # Getting all DataFrames Spark based on data_dict provided
        dfs_dict = spark_manager.generate_dataframes_dict()

        # Indexing a DataFrame from the dictionary
        df_orders = dfs_dict["orders"]

        # Dropping a partition on S3 (if exists)
        spark_manager.drop_partition(
            s3_partition_uri="s3://some-bucket-name/some-table-name/partition/"
        )

        # Adding a partition column into the DataFrame
        df_orders_partitioned = spark_manager.add_partition_column(
            partition_name="anomesdia",
            partition_value=int(datetime.now().strftime("%Y%m%d"))
        )

        # Applying a repartition method for storage optimization
        df_orders_repartitioned = spark_manager.repartition_dataframe(
            df=df_orders_partitioned,
            num_partitions=10
        )

        # Writing data on S3 and cataloging it on Data Catalog
        spark_manager.write_and_catalog_data(df=df_orders_repartitioned)

        # Job commit
        spark_manager.job.commit()
        ```

    Args:
        mode (string):
            Operation mode for the class. It handles inheritance from other
            classes based on this library so the `SparkETLManager` class can
            expand its features for a Spark application development in
            specific scenarios.
            Acceptable values are: "default", "glue".

    Tip: The "mode" attribute may not be the only one.
        As stated before, the `SparkETLManager` class provides a "mode"
        attribute that can be used to set special class configuration
        according to where users pretend to develop their Spark applications.
        Technically, it happens by class inheritance.

        In other words, when users set `mode="glue"` in order to develop their
        Spark applications as Glue jobs on AWS, all Glue features that is
        needed to provide such environment is inherited by another class inside
        the sparksnake library. This class is the `GlueJobManager` and its
        source code is available on the `glue.py` library module.

        By saying that the "mode" attribute may not be the only one, it is said
        that those class inheritance processes may demands the input of some
        other attributes. For example, to initialize an object from the
        `GlueJobManager` class, users need to pass two more attributes named
        `argv_list` and `data_dict`, each one with their special purposes. So,
        in this situation, anyone who needs to use sparksnake in the Glue ops
        mode may pass those two mode class attributes in the `SparkETLManager`
        class.

        To be awared of which additional attributes is needed to start the
        `SparkETLManager` class in any available mode, you can always check the
        source code of the class to be inherited. The table below provides
        information about all operation modes and the inherited classes:

        | Operation Mode | Inherited Class |
        | :-- | :-- |
        | default | None |
        | glue | GlueJobManager |
    """

    def __init__(self, mode: str, **kwargs) -> None:
        # Cleaning up the mode string attribute to apply validations
        self.mode = mode.strip().lower()

        # Glue operation mode: applying validations and obtaining Glue features
        if self.mode == "glue":
            # Checking if required args for GlueJobManager were passed
            if "argv_list" not in kwargs or "data_dict" not in kwargs:
                raise TypeError("The operation mode was set as 'glue' but "
                                "'argv_list' and/or 'data_dict' required "
                                "attributes weren't set properly. Please "
                                "provide both attributes during class "
                                "start up in order to use sparksnake with "
                                "glue operation mode.")

            # Collecting required args for mode="glue"
            argv_list = kwargs["argv_list"]
            data_dict = kwargs["data_dict"]

            # Applying class inheritance for this mode
            try:
                ManagerClass.__init__(self, argv_list=argv_list,
                                      data_dict=data_dict)
            except TypeError:
                raise TypeError("Error on inherting class GlueJobManager. "
                                "Check if your environment has the awsglue "
                                "libraries and try again. If you don't have "
                                "awsglue libs available, you probably want to "
                                "run sparksnake in a default operation mode. "
                                "If this is the case, change the mode "
                                "attribute to 'default'")

            # Logging initialization message
            logger.info("Sucessfully initialized sparksnake with Glue "
                        "operation mode. You know have some special AWS Glue "
                        "features to improve your Glue job.")

        # Default operation mode:
        elif self.mode == "default":
            # Getting or creating a SparkSession object as a class attribute
            logger.info("Creating a SparkSession object (or getting one if it "
                        "already exists)")
            self.spark = SparkSession.builder.getOrCreate()

            # Logging initialization message
            logger.info("Successfully initialized sparksnake with default "
                        "operation mode. You can know use the sparksnake "
                        "features to improve your Spark application.")

        # None of acceptable operation modes
        else:
            raise ValueError(f"Invalid value for operation mode (mode={mode})."
                             "Acceptable values are 'default' and 'glue'.")

    @staticmethod
    def date_transform(
        df: DataFrame,
        date_col: str,
        date_col_type: str = "date",
        date_format: str = "yyyy-MM-dd",
        cast_string_to_date: bool = True,
        **kwargs
    ) -> DataFrame:
        """Extracting date attributes from a Spark DataFrame date column.

        This method makes it possible to extract multiple date attributes from
        a Spark DataFrame column that represents a date or timestamp value.
        The date attributes are extracted using all available Apache Spark date
        functions such as year(), month(), dayofmonth() and many others that
        can be found on the official pyspark documentation page.

        So, the given date column (date_col argument) should has a DATE or
        a TIMESTAMP data type. If this can be achieved, the date column should
        then be a string that can be parseable to a date type object. This is
        the condition to extract date attributes using pyspark date functions.

        The main idea behind this method is to provide users an easy way to
        enhance their data analysis by extracting multiple date attributes
        from a date column. This can be a huge improvement on analytics
        processes and DataFrames enrichment.

        Examples:
            ```python
            # Extracting date attributes from a date column in a Spark df
            df_date_prep = spark_manager.date_transform(
                df=df_raw,
                date_col="order_date",
                date_col_type="timestamp",
                year=True,
                month=True,
                dayofmonth=True
            )

            # In the above example, the method will return a new DataFrame
            # with additional columns based on the order_date_content, such as:
            # year_order_date, month_order_date and dayofmonth_order_date
            ```

        Args:
            df (pyspark.sql.DataFrame):
                A target Spark DataFrame for applying the transformation.

            date_col (str):
                A date column name (or parseable string as date) to be used in
                the date extraction process.

            date_col_type (str):
                Reference for data type of `date_col` argument. Acceptable
                values are "date" or "timestamp".

            date_format (str):
                Date format applied in a optional string to date casting.
                It's applicable only if `cast_string_to_date=True`

            cast_string_to_date (bool):
                Enables an automatic casting of the `date_col` column reference
                into a given `date_format`.

        Keyword Args:
            year (bool): Extracts the year of target date column
            quarter (bool): Extracts the quarter of target date column
            month (bool): Extracts the month of target date column
            dayofmonth (bool): Extracts the dayofmonth of target date column
            dayofweek (bool): Extracts the dayofweek of target date column
            weekofyear (bool): Extracts the weekofyear of target date column

        Raises:
            ValueError: Exception raised if the `date_col_type` argument is\
            passed in non acceptable value (e.g. something different of "date"\
            or "timestamp").

        Returns:
            Spark DataFrame with new date columns extracted.
        """

        try:
            # Creating casting expressions based on data type of date_col arg
            date_col_type = date_col_type.strip().lower()
            if cast_string_to_date:
                if date_col_type == "date":
                    casting_expr = f"to_date({date_col},\
                        '{date_format}') AS {date_col}_{date_col_type}"
                elif date_col_type == "timestamp":
                    casting_expr = f"to_timestamp({date_col},\
                        '{date_format}') AS {date_col}_{date_col_type}"
                else:
                    raise ValueError("Invalid data type of date_col_type "
                                     "argument. Acceptable values are 'date' "
                                     "or 'timestamp'")

                # Applying a select expression for casting data if applicable
                df = df.selectExpr(
                    "*",
                    casting_expr
                ).drop(date_col)\
                    .withColumnRenamed(f"{date_col}_{date_col_type}", date_col)

        except ValueError as ve:
            logger.error(ve)
            raise ve

        except AnalysisException as ae:
            logger.error("Analysis error on trying to cast the column "
                         f"{date_col} using the expression {casting_expr}. "
                         "Maybe this column doesn't exist on the DataFrame. "
                         f"Check the error traceback for more details: {ae}")
            raise ae

        # Creating a list of all possible date attributes to be extracted
        possible_date_attribs = ["year", "quarter", "month", "dayofmonth",
                                 "dayofweek", "dayofyear", "weekofyear"]

        # Iterating over all possible attributes and extracting date attribs
        for attrib in possible_date_attribs:
            # Add a new column only if attrib is in kwargs
            if attrib in kwargs and bool(kwargs[attrib]):
                df = df.withColumn(
                    f"{attrib}_{date_col}", expr(f"{attrib}({date_col})")
                )

        return df

    @staticmethod
    def agg_data(
        spark_session: SparkSession,
        df: DataFrame,
        agg_col: str,
        group_by: str or list,
        round_result: bool = False,
        n_round: int = 2,
        **kwargs
    ) -> DataFrame:
        """Extracting statistical attributes based on a group by operation.

        This method makes it possible to run complex aggregations using a
        single method call. To use this feature, users can follow the steps
        below:

        1. Provide a aggregation column (agg_col argument)
        2. Provide a single column reference or a list of columns to be
        grouped by (group_by argument)
        3. Provide the aggregation functions on **kwargs

        The aggregation functions mentioned on the third step are represented
        by almost any avaiable pyspark function, such as `sum()`, `mean()`,
        `max()`, `min()` and many others.

        Examples:
            ```python
            # Creating a new special and aggregated DataFrame
            df_stats = spark_manager.agg_data(
                spark_session=spark,
                df=df_orders,
                agg_col="order_value",
                group_by=["order_id", "order_year"],
                sum=True,
                mean=True,
                max=True,
                min=True
            )

            # In the example above, the method will return a new DataFrame with
            # the following columns:
            # order_id e order_year (group by)
            # sum_order_value (sum of order_value column)
            # mean_order_value (average of order_value column)
            # max_order_value (max value of order_value column)
            # min_order_value (min value of order_value column)
            ```

        Args:
            spark_session (pyspark.sql.SparkSession):
                A SparkSession object to be used to run SparkSQL query for
                grouping data

            df (pyspark.sql.DataFrame):
                A target Spark DataFrame for applying the transformation

            agg_col (str):
                A column reference on the target DataFrame to be used as
                target of aggregation process

            group_by (str or list):
                A column name or a list of columns used as group categories
                on the aggregation process

            round_result (bool):
                Enables rounding aggregation results on each new column

            n_round (int):
                Defines the round number on rounding. Applied only if
                `round_result=True`

        Tip: About keyword arguments
            In order to provide a new feature that is capable to put together
            the extraction of multiple statistical attributes with a single
            line of code, a special list of pyspark functions were selected
            as acceptable functions to be called on the aggregation process.

            It means that if users wants to apply an aggregation on the
            target DataFrame and extract the sum, the mean, the minimum and
            the maximum value of a given numeric column, they must pass
            keyword arguments as following: `sum=True`, `mean=True`,
            `min=True` and `max=True`.

            All acceptable keyword arguments (pyspark functions) can be
            found right below:

        Keyword Args:
            sum (bool): Extracts the sum of a given numeric column
            mean (bool): Extracts the mean of a given numeric column
            max (bool): Extracts the max of a given numeric column
            min (bool): Extracts the min of a given numeric column
            countDistinct (bool): Extracts the count distinct value\
                of a given numeric column
            variance (bool): Extracts the variance of a given numeric column
            stddev (bool): Extracts the standard deviation of a given numeric\
                column
            kurtosis (bool): Extracts the kurtosis of a given numeric column
            skewness (bool): Extracts the skewness of a given numeric column

        Returns:
            A new Spark DataFrame with new statistical columns based on the\
            aggregation configured by user on method call.

        Raises:
            Exception: Generic exception raised when failed to execute the\
            SparkSQL query for extracting the stats from the DataFrame.
        """

        # Joining all group by columns in a single string to make agg easier
        if type(group_by) == list and len(group_by) > 1:
            group_by = ",".join(group_by)

        # Creating a Spark temporary table for grouping data using SparkSQL
        df.createOrReplaceTempView("tmp_extract_aggregate_statistics")

        possible_functions = ["sum", "mean", "max", "min", "count",
                              "variance", "stddev", "kurtosis", "skewness"]
        try:
            # Iterating over the attributes to build a single aggregation expr
            agg_query = ""
            for f in possible_functions:
                if f in kwargs and bool(kwargs[f]):
                    if round_result:
                        agg_function = f"round({f}({agg_col}), {n_round})"
                    else:
                        agg_function = f"{f}({agg_col})"

                    agg_query += f"{agg_function} AS {f}_{agg_col},"

            # Dropping the last comma on the expression
            agg_query = agg_query[:-1]

            # Building the final query to be executed
            final_query = f"""
                SELECT
                    {group_by},
                    {agg_query}
                FROM tmp_extract_aggregate_statistics
                GROUP BY
                    {group_by}
            """

            return spark_session.sql(final_query)

        except AnalysisException as ae:
            logger.error("Error on trying to aggregate data from DataFrame "
                         f"using the following query:\n {final_query}. "
                         "Possible reasons are: missing to pass group_by "
                         "parameter or the agg_col argument doesn't exists"
                         f" on the DataFrame. Exception: {ae}")
            raise ae

    @staticmethod
    def add_partition_column(
        df: DataFrame,
        partition_name: str,
        partition_value
    ) -> DataFrame:
        """Adding a "partition" column on a Spark DataFrame.

        This method is responsible for adding a new column on a target Spark
        DataFrame to be considered as a table partition. In essence, this
        method uses the native pyspark `.withColumn()` method for adding a
        new column to the DataFrame using a name (partition_name) and a value
        (partition_value).

        The idea behind this method is to provide users a more clear way to
        add a partition column in their Spark DataFrames and make it very
        explicity to whoever is reading the code.

        Examples
            ```python
            # Defining partition information
            partition_name = "anomesdia"
            partition_value = int(datetime.now().strftime('%Y%m%d'))

            # Adding a partition column to the DataFrame
            df_partitioned = spark_manager.add_partition_column(
                df=df_orders,
                partition_name=partition_name,
                partition_value=partition_value
            )

            # The method returns a new DataFrame with a new column
            # referenced by "anomesdia" and its value referenced by
            # the datetime library
            ```

        Args:
            df (pyspark.sql.DataFrame): A target Spark DataFrame.
            partition_name (str): Column name to be added on the DataFrame.
            partition_value (Any): Value for the new column to be added.

        Returns:
            A Spark DataFrame with the new column added.

        Raises:
            Exception: A generic exception raised on failed to execute the\
            method `.withColumn()` for adding the new column.
        """

        logger.info("Adding a new partition column to the DataFrame "
                    f"({partition_name}={str(partition_value)})")
        try:
            df_partitioned = df.withColumn(partition_name,
                                           lit(partition_value))
            return df_partitioned

        except Exception as e:
            logger.error("Error on adding a partition colum to the DataFrame "
                         f"using the .withColumn() method. Exception: {e}")
            raise e

    @staticmethod
    def repartition_dataframe(df: DataFrame, num_partitions: int) -> DataFrame:
        """Repartitioning a Spark DataFrame in order to optimize storage.

        This method applies the repartition process in a Spark DataFrame in
        order to optimize its storage on S3. The method has some important
        checks based on each pyspark method to use for repartitioning the
        DataFrame. Take a look at the below tip to learn more.

        Tip: Additional details on method behavior
            The method `repartition_dataframe()` works as follows:

            1. The current number of partitions in the target DataFrame
            is checked
            2. The desired number of partitions passed as a parameter
            is checked

            * If the desired number is LESS than the current number, then the
            method `coalesce()` is executed
            * If the desired number is GREATER than the current one, then the
            method `repartition()` is executed

        Examples:
            ```python
            # Repartitioning a Spark DataFrame
            df_repartitioned = spark_manager.repartition_dataframe(
                df=df_orders,
                num_partitions=10
            )
            ```

        Args:
            df (pyspark.sql.DataFrame): A target Spark Dataframe.
            num_partitions (int): Desired number of partitions.

        Returns:
            A new Spark DataFrame gotten after the repartition process.

        Raises:
            Exception: A generic exception is raised on a failed attempt to\
            run the repartition method (`coalesce()` or `repartition()`) in\
            the given Spark DataFrame.
        """

        # Casting arg to integer to avoid exceptions
        num_partitions = int(num_partitions)

        # Getting the current number of partitions of the given DataFrame
        logger.info("Getting the current number of partition of the DataFrame")
        try:
            current_partitions = df.rdd.getNumPartitions()

        except Exception as e:
            logger.error("Failed to collect the current number of partitions"
                         "using the df.rdd.getNumPartitions method. "
                         f"Exception: {e}")
            raise e

        # If the desired partition number if equal to the current number, skip
        if num_partitions == current_partitions:
            logger.warning(f"The current number of partitions "
                           f"({current_partitions}) is equal to the target "
                           f"number ({num_partitions}). There is no need to "
                           "run any Spark repartition method")
            sleep(0.01)
            return df

        # If the desired number if LESS THAN the current, use coalesce()
        elif num_partitions < current_partitions:
            logger.info("Initializing the repartition process using "
                        f"coalesce(), changing the number of DataFrame "
                        f"partitions from {current_partitions} to "
                        f"{num_partitions}")
            try:
                df_repartitioned = df.coalesce(num_partitions)

            except Exception as e:
                logger.warning("Failed to repartition using coalesce(). "
                               "The repartition process won't be executed and "
                               "the target DataFrame will be returned without "
                               f"any changes. Exception: {e}")
                return df

        # If the desired number is GREATER THAN the current, use repartition()
        elif num_partitions > current_partitions:
            logger.warning(f"The target partition number ({num_partitions}) "
                           f"is greater than the current one "
                           f"({current_partitions}) and, because of that, "
                           "the repartitioning operation will be executed "
                           "using the repartition() method which can be "
                           "expensive under the application perspective. As a "
                           "suggestion, check if this is really the expected "
                           "behavior for this operation.")
            try:
                df_repartitioned = df.repartition(num_partitions)

            except Exception as e:
                logger.warning("Failed to repartition using repartition(). "
                               "The repartition process won't be executed and "
                               "the target DataFrame will be returned without "
                               f"any changes. Exception: {e}")
                return df

        return df_repartitioned

    @staticmethod
    def run_spark_sql_pipeline(
        spark_session: SparkSession,
        spark_sql_pipeline: list
    ) -> DataFrame:
        """Providing a way to run multiple SparkSQL queries in sequence.

        This method allows users to define a sequence of SparkSQL queries to
        built transformation DAGs in order to transform DataFrames in a Spark
        application using only SQL. The core idea behind this method is that
        users can define a sequence of SparkSQL statements using a predefined
        list of dictionaries (`spark_sql_pipeline` argument) that will be
        handled by the method as the main piece for sequentially running
        the queries and providing the desired result as a Spark DataFrame.

        As said before, everything around this method takes place
        `spark_sql_pipeline` argument definition. In essence, this argument
        can be defined as a list with multiple dictionaries, where each
        inner dictionary in this list can have elements that describe the
        execution of a SparkSQL query (including the query itself).

        Examples:
        ```python
        # Defining a list with all SparkSQL steps to be executed
        spark_sql_pipeline = [
            {
                "step": 1,
                "query": '''
                    SELECT
                        order_id,
                        order_status,
                        order_purchase_ts

                    FROM tbl_orders
                '''
                "create_temp_view": True,
                "temp_view_name": "auto"
            },
            {
                "step": 2,
                "query": '''
                    SELECT
                        order_id,
                        sum(payment_value) AS sum_payment_value

                    FROM tbl_payments

                    GROUP BY order_id
                '''
                "create_temp_view": True,
                "temp_view_name": "auto"
            },
            {
                "step": 3,
                "query": '''
                    SELECT
                        step_1.order_id,
                        step_1.order_status,
                        step_1.order_purchase_ts,
                        step_2.sum_payment_value

                    FROM step_1

                    LEFT JOIN step_2
                        ON step_1.order_id = step_2.order_id
                '''
                "create_temp_view": True,
                "temp_view_name": "auto"
            }
        ]

        # Running the SparkSQL pipeline
        df_prep = run_spark_sql_pipeline(
            spark_session=spark_manager.spark,
            spark_sql_pipeline=spark_sql_pipeline
        )
        ```

        Tip: About the spark_sql_pipeline definition
            As stated before, the `spark_sql_pipeline` method argument can be
            defined as a Python list where each element is a Python dictionary
            with all information needed to run SparkSQL statements in sequence.

            First of all, it's important to say that the inner dictionaries
            must be defined with some acceptable keys:

            - `"step"` (required): defines an integer number to inform the
                method in which order the query in the given dictionary should
                be executed. The value passed on the "step" inner dictionary
                key is used in a sorting proccess that defines the execution
                order of the SparkSQL statements.

            - `"query"` (required): well, this is the SparkSQL query itself
                that will be executed by the method. This could be defines as a
                Python string directly on the dictionary or even by reading
                some external JSON or SQL file in the project directory.

            - `"create_temp_view"` (optional, default=True): defines a boolean
                flag that handles the creation of a new temporary view for
                each executed step. By the default, it's set as True, meaning
                that after each execution, a new temporary view will be
                available for further SparkSQL statements.

            - `"temp_view_name"` (optional, default="auto"): defines the name
                of the temporary view created after executing the SparkSQL
                query in a given step. It's applicable only if
                "create_temp_view" is True for the step. By default, it's value
                is set as "auto", meaning that the name of the intermediate
                temporary view will be set as "step_N", where N is the integer
                that defines the step. For example, in the first inner
                dictionary of the `spark_sql_pipeline` list (for instance,
                "step": 1), a query will be executed and, if there is no an
                explicit "create_temp_view": False in the dictionary, then a
                new temporary view with query result will be created and named
                as "step_1". So, any further SparkSQL statements that selects
                data from any "step_1" table will be pointing to the
                intermediate results of the first step of the pipeline. By the
                other hand, users can define a specific name for the step
                temporary view by filling this key with any desired string.

            If users don't explicit define the keys "create_temp_view" and
            "temp_view_name", the method will consider its default values. In
            other words, if the a inner dictionary of the `spark_sql_pipeline`
            list doesn't have any of the mentioned keys, it means that a
            temporary view will be created after running the step's query and
            it will be named as "step_N", where N is the integer that identify
            the step.

        Args:
            spark_session (pyspark.sql.SparkSession):
                A SparkSession object to be used to run SparkSQL query for
                grouping data

            spark_sql_pipeline (list):
                A list made by dictionaries that defines details of the steps
                to be executed using SparkSQL queries. Check the tip above for
                more details on how passing this argument

        Returns:
            A Spark DataFrame that is the result of the execution of the last\
            step (query) defined in the spark_sql_pipeline list.

        Raises:
            ValueError: An exception raises in two different situations:\
            first, if the user defines any dictionary in `spark_sql_pipeline`\
            list doesn't have the required keys ("step" and "query"). Second,\
            if any dictionary in the `spark_sql_pipeline` list has any value\
            for the "step" key that isn't an integer. Those validations are\
            important to ensure that users are correctly defining the\
            arguments for the method .
        """

        # Applying some validations on the sql pipeline argument
        required_keys = ["step", "query"]

        # Getting a list with keys of all inner dicts from sql pipeline list
        inner_pipe_keys = [list(pipe.keys()) for pipe in spark_sql_pipeline]

        # Checking if the required arguments are in all inner dict keys
        for inner_key in inner_pipe_keys:
            if not all(key in inner_key for key in required_keys):
                raise ValueError("Invalid value for spark_sql_pipeline "
                                 "argument. Please, check if all inner "
                                 "dictionaries have the 'step' and the "
                                 "'query' required keys defined accordingly. "
                                 "The main reason of this validation is "
                                 "that without the 'step' key, the method "
                                 "won't be able to sort and set the execution "
                                 "order. Without the 'query' key, the method "
                                 "won't be able to run any SparkSQL query")

        # Checking if all step keys are integers
        step_types_validation = list(set(
            [isinstance(pipe["step"], int) for pipe in spark_sql_pipeline]
        ))[0]
        if not step_types_validation:
            raise ValueError("Invalid value for spark_sql_pipeline "
                             "argument. Please check if all inner "
                             "dictionaries have the 'step' key defined as "
                             "integer values. If any 'step' key for a given "
                             "step is defined with a non integer number, the "
                             "method won't be able to sort the steps and set "
                             "the execution order accordingly")

        # Going to the method: sorting the steps in an ascending order
        sorted_spark_sql_pipeline = sorted(
            spark_sql_pipeline,
            key=lambda x: x["step"]
        )

        # Iterating over the pipeline elements to execute the statements
        for pipe in sorted_spark_sql_pipeline:
            # Executing the queries in sequence
            df_step = spark_session.sql(pipe["query"])

            # Creating a temporary view with the step result (if applicable)
            if "create_temp_view" not in pipe\
                    or bool(pipe["create_temp_view"]):
                # Assigning a name for result temporary view
                if "temp_view_name" not in pipe\
                        or pipe["temp_view_name"].strip().lower() == "auto":
                    temp_view_name = "step_" + str(pipe["step"])
                else:
                    temp_view_name = pipe["temp_view_name"]

                # Creating the temporary view
                df_step.createOrReplaceTempView(temp_view_name)

        # Returning the DataFrame from the last step
        return df_step

date_transform(df, date_col, date_col_type='date', date_format='yyyy-MM-dd', cast_string_to_date=True, **kwargs) staticmethod

Extracting date attributes from a Spark DataFrame date column.

This method makes it possible to extract multiple date attributes from a Spark DataFrame column that represents a date or timestamp value. The date attributes are extracted using all available Apache Spark date functions such as year(), month(), dayofmonth() and many others that can be found on the official pyspark documentation page.

So, the given date column (date_col argument) should has a DATE or a TIMESTAMP data type. If this can be achieved, the date column should then be a string that can be parseable to a date type object. This is the condition to extract date attributes using pyspark date functions.

The main idea behind this method is to provide users an easy way to enhance their data analysis by extracting multiple date attributes from a date column. This can be a huge improvement on analytics processes and DataFrames enrichment.

Examples:

# Extracting date attributes from a date column in a Spark df
df_date_prep = spark_manager.date_transform(
    df=df_raw,
    date_col="order_date",
    date_col_type="timestamp",
    year=True,
    month=True,
    dayofmonth=True
)

# In the above example, the method will return a new DataFrame
# with additional columns based on the order_date_content, such as:
# year_order_date, month_order_date and dayofmonth_order_date

Parameters:

Name Type Description Default
df pyspark.sql.DataFrame

A target Spark DataFrame for applying the transformation.

required
date_col str

A date column name (or parseable string as date) to be used in the date extraction process.

required
date_col_type str

Reference for data type of date_col argument. Acceptable values are "date" or "timestamp".

'date'
date_format str

Date format applied in a optional string to date casting. It's applicable only if cast_string_to_date=True

'yyyy-MM-dd'
cast_string_to_date bool

Enables an automatic casting of the date_col column reference into a given date_format.

True

Other Parameters:

Name Type Description
year bool

Extracts the year of target date column

quarter bool

Extracts the quarter of target date column

month bool

Extracts the month of target date column

dayofmonth bool

Extracts the dayofmonth of target date column

dayofweek bool

Extracts the dayofweek of target date column

weekofyear bool

Extracts the weekofyear of target date column

Raises:

Type Description
ValueError

Exception raised if the date_col_type argument is passed in non acceptable value (e.g. something different of "date" or "timestamp").

Returns:

Type Description
DataFrame

Spark DataFrame with new date columns extracted.

Source code in sparksnake/manager.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
@staticmethod
def date_transform(
    df: DataFrame,
    date_col: str,
    date_col_type: str = "date",
    date_format: str = "yyyy-MM-dd",
    cast_string_to_date: bool = True,
    **kwargs
) -> DataFrame:
    """Extracting date attributes from a Spark DataFrame date column.

    This method makes it possible to extract multiple date attributes from
    a Spark DataFrame column that represents a date or timestamp value.
    The date attributes are extracted using all available Apache Spark date
    functions such as year(), month(), dayofmonth() and many others that
    can be found on the official pyspark documentation page.

    So, the given date column (date_col argument) should has a DATE or
    a TIMESTAMP data type. If this can be achieved, the date column should
    then be a string that can be parseable to a date type object. This is
    the condition to extract date attributes using pyspark date functions.

    The main idea behind this method is to provide users an easy way to
    enhance their data analysis by extracting multiple date attributes
    from a date column. This can be a huge improvement on analytics
    processes and DataFrames enrichment.

    Examples:
        ```python
        # Extracting date attributes from a date column in a Spark df
        df_date_prep = spark_manager.date_transform(
            df=df_raw,
            date_col="order_date",
            date_col_type="timestamp",
            year=True,
            month=True,
            dayofmonth=True
        )

        # In the above example, the method will return a new DataFrame
        # with additional columns based on the order_date_content, such as:
        # year_order_date, month_order_date and dayofmonth_order_date
        ```

    Args:
        df (pyspark.sql.DataFrame):
            A target Spark DataFrame for applying the transformation.

        date_col (str):
            A date column name (or parseable string as date) to be used in
            the date extraction process.

        date_col_type (str):
            Reference for data type of `date_col` argument. Acceptable
            values are "date" or "timestamp".

        date_format (str):
            Date format applied in a optional string to date casting.
            It's applicable only if `cast_string_to_date=True`

        cast_string_to_date (bool):
            Enables an automatic casting of the `date_col` column reference
            into a given `date_format`.

    Keyword Args:
        year (bool): Extracts the year of target date column
        quarter (bool): Extracts the quarter of target date column
        month (bool): Extracts the month of target date column
        dayofmonth (bool): Extracts the dayofmonth of target date column
        dayofweek (bool): Extracts the dayofweek of target date column
        weekofyear (bool): Extracts the weekofyear of target date column

    Raises:
        ValueError: Exception raised if the `date_col_type` argument is\
        passed in non acceptable value (e.g. something different of "date"\
        or "timestamp").

    Returns:
        Spark DataFrame with new date columns extracted.
    """

    try:
        # Creating casting expressions based on data type of date_col arg
        date_col_type = date_col_type.strip().lower()
        if cast_string_to_date:
            if date_col_type == "date":
                casting_expr = f"to_date({date_col},\
                    '{date_format}') AS {date_col}_{date_col_type}"
            elif date_col_type == "timestamp":
                casting_expr = f"to_timestamp({date_col},\
                    '{date_format}') AS {date_col}_{date_col_type}"
            else:
                raise ValueError("Invalid data type of date_col_type "
                                 "argument. Acceptable values are 'date' "
                                 "or 'timestamp'")

            # Applying a select expression for casting data if applicable
            df = df.selectExpr(
                "*",
                casting_expr
            ).drop(date_col)\
                .withColumnRenamed(f"{date_col}_{date_col_type}", date_col)

    except ValueError as ve:
        logger.error(ve)
        raise ve

    except AnalysisException as ae:
        logger.error("Analysis error on trying to cast the column "
                     f"{date_col} using the expression {casting_expr}. "
                     "Maybe this column doesn't exist on the DataFrame. "
                     f"Check the error traceback for more details: {ae}")
        raise ae

    # Creating a list of all possible date attributes to be extracted
    possible_date_attribs = ["year", "quarter", "month", "dayofmonth",
                             "dayofweek", "dayofyear", "weekofyear"]

    # Iterating over all possible attributes and extracting date attribs
    for attrib in possible_date_attribs:
        # Add a new column only if attrib is in kwargs
        if attrib in kwargs and bool(kwargs[attrib]):
            df = df.withColumn(
                f"{attrib}_{date_col}", expr(f"{attrib}({date_col})")
            )

    return df

agg_data(spark_session, df, agg_col, group_by, round_result=False, n_round=2, **kwargs) staticmethod

Extracting statistical attributes based on a group by operation.

This method makes it possible to run complex aggregations using a single method call. To use this feature, users can follow the steps below:

  1. Provide a aggregation column (agg_col argument)
  2. Provide a single column reference or a list of columns to be grouped by (group_by argument)
  3. Provide the aggregation functions on **kwargs

The aggregation functions mentioned on the third step are represented by almost any avaiable pyspark function, such as sum(), mean(), max(), min() and many others.

Examples:

# Creating a new special and aggregated DataFrame
df_stats = spark_manager.agg_data(
    spark_session=spark,
    df=df_orders,
    agg_col="order_value",
    group_by=["order_id", "order_year"],
    sum=True,
    mean=True,
    max=True,
    min=True
)

# In the example above, the method will return a new DataFrame with
# the following columns:
# order_id e order_year (group by)
# sum_order_value (sum of order_value column)
# mean_order_value (average of order_value column)
# max_order_value (max value of order_value column)
# min_order_value (min value of order_value column)

Parameters:

Name Type Description Default
spark_session pyspark.sql.SparkSession

A SparkSession object to be used to run SparkSQL query for grouping data

required
df pyspark.sql.DataFrame

A target Spark DataFrame for applying the transformation

required
agg_col str

A column reference on the target DataFrame to be used as target of aggregation process

required
group_by str or list

A column name or a list of columns used as group categories on the aggregation process

required
round_result bool

Enables rounding aggregation results on each new column

False
n_round int

Defines the round number on rounding. Applied only if round_result=True

2
About keyword arguments

In order to provide a new feature that is capable to put together the extraction of multiple statistical attributes with a single line of code, a special list of pyspark functions were selected as acceptable functions to be called on the aggregation process.

It means that if users wants to apply an aggregation on the target DataFrame and extract the sum, the mean, the minimum and the maximum value of a given numeric column, they must pass keyword arguments as following: sum=True, mean=True, min=True and max=True.

All acceptable keyword arguments (pyspark functions) can be found right below:

Other Parameters:

Name Type Description
sum bool

Extracts the sum of a given numeric column

mean bool

Extracts the mean of a given numeric column

max bool

Extracts the max of a given numeric column

min bool

Extracts the min of a given numeric column

countDistinct bool

Extracts the count distinct value of a given numeric column

variance bool

Extracts the variance of a given numeric column

stddev bool

Extracts the standard deviation of a given numeric column

kurtosis bool

Extracts the kurtosis of a given numeric column

skewness bool

Extracts the skewness of a given numeric column

Returns:

Type Description
DataFrame

A new Spark DataFrame with new statistical columns based on the aggregation configured by user on method call.

Raises:

Type Description
Exception

Generic exception raised when failed to execute the SparkSQL query for extracting the stats from the DataFrame.

Source code in sparksnake/manager.py
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
@staticmethod
def agg_data(
    spark_session: SparkSession,
    df: DataFrame,
    agg_col: str,
    group_by: str or list,
    round_result: bool = False,
    n_round: int = 2,
    **kwargs
) -> DataFrame:
    """Extracting statistical attributes based on a group by operation.

    This method makes it possible to run complex aggregations using a
    single method call. To use this feature, users can follow the steps
    below:

    1. Provide a aggregation column (agg_col argument)
    2. Provide a single column reference or a list of columns to be
    grouped by (group_by argument)
    3. Provide the aggregation functions on **kwargs

    The aggregation functions mentioned on the third step are represented
    by almost any avaiable pyspark function, such as `sum()`, `mean()`,
    `max()`, `min()` and many others.

    Examples:
        ```python
        # Creating a new special and aggregated DataFrame
        df_stats = spark_manager.agg_data(
            spark_session=spark,
            df=df_orders,
            agg_col="order_value",
            group_by=["order_id", "order_year"],
            sum=True,
            mean=True,
            max=True,
            min=True
        )

        # In the example above, the method will return a new DataFrame with
        # the following columns:
        # order_id e order_year (group by)
        # sum_order_value (sum of order_value column)
        # mean_order_value (average of order_value column)
        # max_order_value (max value of order_value column)
        # min_order_value (min value of order_value column)
        ```

    Args:
        spark_session (pyspark.sql.SparkSession):
            A SparkSession object to be used to run SparkSQL query for
            grouping data

        df (pyspark.sql.DataFrame):
            A target Spark DataFrame for applying the transformation

        agg_col (str):
            A column reference on the target DataFrame to be used as
            target of aggregation process

        group_by (str or list):
            A column name or a list of columns used as group categories
            on the aggregation process

        round_result (bool):
            Enables rounding aggregation results on each new column

        n_round (int):
            Defines the round number on rounding. Applied only if
            `round_result=True`

    Tip: About keyword arguments
        In order to provide a new feature that is capable to put together
        the extraction of multiple statistical attributes with a single
        line of code, a special list of pyspark functions were selected
        as acceptable functions to be called on the aggregation process.

        It means that if users wants to apply an aggregation on the
        target DataFrame and extract the sum, the mean, the minimum and
        the maximum value of a given numeric column, they must pass
        keyword arguments as following: `sum=True`, `mean=True`,
        `min=True` and `max=True`.

        All acceptable keyword arguments (pyspark functions) can be
        found right below:

    Keyword Args:
        sum (bool): Extracts the sum of a given numeric column
        mean (bool): Extracts the mean of a given numeric column
        max (bool): Extracts the max of a given numeric column
        min (bool): Extracts the min of a given numeric column
        countDistinct (bool): Extracts the count distinct value\
            of a given numeric column
        variance (bool): Extracts the variance of a given numeric column
        stddev (bool): Extracts the standard deviation of a given numeric\
            column
        kurtosis (bool): Extracts the kurtosis of a given numeric column
        skewness (bool): Extracts the skewness of a given numeric column

    Returns:
        A new Spark DataFrame with new statistical columns based on the\
        aggregation configured by user on method call.

    Raises:
        Exception: Generic exception raised when failed to execute the\
        SparkSQL query for extracting the stats from the DataFrame.
    """

    # Joining all group by columns in a single string to make agg easier
    if type(group_by) == list and len(group_by) > 1:
        group_by = ",".join(group_by)

    # Creating a Spark temporary table for grouping data using SparkSQL
    df.createOrReplaceTempView("tmp_extract_aggregate_statistics")

    possible_functions = ["sum", "mean", "max", "min", "count",
                          "variance", "stddev", "kurtosis", "skewness"]
    try:
        # Iterating over the attributes to build a single aggregation expr
        agg_query = ""
        for f in possible_functions:
            if f in kwargs and bool(kwargs[f]):
                if round_result:
                    agg_function = f"round({f}({agg_col}), {n_round})"
                else:
                    agg_function = f"{f}({agg_col})"

                agg_query += f"{agg_function} AS {f}_{agg_col},"

        # Dropping the last comma on the expression
        agg_query = agg_query[:-1]

        # Building the final query to be executed
        final_query = f"""
            SELECT
                {group_by},
                {agg_query}
            FROM tmp_extract_aggregate_statistics
            GROUP BY
                {group_by}
        """

        return spark_session.sql(final_query)

    except AnalysisException as ae:
        logger.error("Error on trying to aggregate data from DataFrame "
                     f"using the following query:\n {final_query}. "
                     "Possible reasons are: missing to pass group_by "
                     "parameter or the agg_col argument doesn't exists"
                     f" on the DataFrame. Exception: {ae}")
        raise ae

add_partition_column(df, partition_name, partition_value) staticmethod

Adding a "partition" column on a Spark DataFrame.

This method is responsible for adding a new column on a target Spark DataFrame to be considered as a table partition. In essence, this method uses the native pyspark .withColumn() method for adding a new column to the DataFrame using a name (partition_name) and a value (partition_value).

The idea behind this method is to provide users a more clear way to add a partition column in their Spark DataFrames and make it very explicity to whoever is reading the code.

Examples

# Defining partition information
partition_name = "anomesdia"
partition_value = int(datetime.now().strftime('%Y%m%d'))

# Adding a partition column to the DataFrame
df_partitioned = spark_manager.add_partition_column(
    df=df_orders,
    partition_name=partition_name,
    partition_value=partition_value
)

# The method returns a new DataFrame with a new column
# referenced by "anomesdia" and its value referenced by
# the datetime library

Parameters:

Name Type Description Default
df pyspark.sql.DataFrame

A target Spark DataFrame.

required
partition_name str

Column name to be added on the DataFrame.

required
partition_value Any

Value for the new column to be added.

required

Returns:

Type Description
DataFrame

A Spark DataFrame with the new column added.

Raises:

Type Description
Exception

A generic exception raised on failed to execute the method .withColumn() for adding the new column.

Source code in sparksnake/manager.py
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
@staticmethod
def add_partition_column(
    df: DataFrame,
    partition_name: str,
    partition_value
) -> DataFrame:
    """Adding a "partition" column on a Spark DataFrame.

    This method is responsible for adding a new column on a target Spark
    DataFrame to be considered as a table partition. In essence, this
    method uses the native pyspark `.withColumn()` method for adding a
    new column to the DataFrame using a name (partition_name) and a value
    (partition_value).

    The idea behind this method is to provide users a more clear way to
    add a partition column in their Spark DataFrames and make it very
    explicity to whoever is reading the code.

    Examples
        ```python
        # Defining partition information
        partition_name = "anomesdia"
        partition_value = int(datetime.now().strftime('%Y%m%d'))

        # Adding a partition column to the DataFrame
        df_partitioned = spark_manager.add_partition_column(
            df=df_orders,
            partition_name=partition_name,
            partition_value=partition_value
        )

        # The method returns a new DataFrame with a new column
        # referenced by "anomesdia" and its value referenced by
        # the datetime library
        ```

    Args:
        df (pyspark.sql.DataFrame): A target Spark DataFrame.
        partition_name (str): Column name to be added on the DataFrame.
        partition_value (Any): Value for the new column to be added.

    Returns:
        A Spark DataFrame with the new column added.

    Raises:
        Exception: A generic exception raised on failed to execute the\
        method `.withColumn()` for adding the new column.
    """

    logger.info("Adding a new partition column to the DataFrame "
                f"({partition_name}={str(partition_value)})")
    try:
        df_partitioned = df.withColumn(partition_name,
                                       lit(partition_value))
        return df_partitioned

    except Exception as e:
        logger.error("Error on adding a partition colum to the DataFrame "
                     f"using the .withColumn() method. Exception: {e}")
        raise e

repartition_dataframe(df, num_partitions) staticmethod

Repartitioning a Spark DataFrame in order to optimize storage.

This method applies the repartition process in a Spark DataFrame in order to optimize its storage on S3. The method has some important checks based on each pyspark method to use for repartitioning the DataFrame. Take a look at the below tip to learn more.

Additional details on method behavior

The method repartition_dataframe() works as follows:

  1. The current number of partitions in the target DataFrame is checked
  2. The desired number of partitions passed as a parameter is checked

  3. If the desired number is LESS than the current number, then the method coalesce() is executed

  4. If the desired number is GREATER than the current one, then the method repartition() is executed

Examples:

# Repartitioning a Spark DataFrame
df_repartitioned = spark_manager.repartition_dataframe(
    df=df_orders,
    num_partitions=10
)

Parameters:

Name Type Description Default
df pyspark.sql.DataFrame

A target Spark Dataframe.

required
num_partitions int

Desired number of partitions.

required

Returns:

Type Description
DataFrame

A new Spark DataFrame gotten after the repartition process.

Raises:

Type Description
Exception

A generic exception is raised on a failed attempt to run the repartition method (coalesce() or repartition()) in the given Spark DataFrame.

Source code in sparksnake/manager.py
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
@staticmethod
def repartition_dataframe(df: DataFrame, num_partitions: int) -> DataFrame:
    """Repartitioning a Spark DataFrame in order to optimize storage.

    This method applies the repartition process in a Spark DataFrame in
    order to optimize its storage on S3. The method has some important
    checks based on each pyspark method to use for repartitioning the
    DataFrame. Take a look at the below tip to learn more.

    Tip: Additional details on method behavior
        The method `repartition_dataframe()` works as follows:

        1. The current number of partitions in the target DataFrame
        is checked
        2. The desired number of partitions passed as a parameter
        is checked

        * If the desired number is LESS than the current number, then the
        method `coalesce()` is executed
        * If the desired number is GREATER than the current one, then the
        method `repartition()` is executed

    Examples:
        ```python
        # Repartitioning a Spark DataFrame
        df_repartitioned = spark_manager.repartition_dataframe(
            df=df_orders,
            num_partitions=10
        )
        ```

    Args:
        df (pyspark.sql.DataFrame): A target Spark Dataframe.
        num_partitions (int): Desired number of partitions.

    Returns:
        A new Spark DataFrame gotten after the repartition process.

    Raises:
        Exception: A generic exception is raised on a failed attempt to\
        run the repartition method (`coalesce()` or `repartition()`) in\
        the given Spark DataFrame.
    """

    # Casting arg to integer to avoid exceptions
    num_partitions = int(num_partitions)

    # Getting the current number of partitions of the given DataFrame
    logger.info("Getting the current number of partition of the DataFrame")
    try:
        current_partitions = df.rdd.getNumPartitions()

    except Exception as e:
        logger.error("Failed to collect the current number of partitions"
                     "using the df.rdd.getNumPartitions method. "
                     f"Exception: {e}")
        raise e

    # If the desired partition number if equal to the current number, skip
    if num_partitions == current_partitions:
        logger.warning(f"The current number of partitions "
                       f"({current_partitions}) is equal to the target "
                       f"number ({num_partitions}). There is no need to "
                       "run any Spark repartition method")
        sleep(0.01)
        return df

    # If the desired number if LESS THAN the current, use coalesce()
    elif num_partitions < current_partitions:
        logger.info("Initializing the repartition process using "
                    f"coalesce(), changing the number of DataFrame "
                    f"partitions from {current_partitions} to "
                    f"{num_partitions}")
        try:
            df_repartitioned = df.coalesce(num_partitions)

        except Exception as e:
            logger.warning("Failed to repartition using coalesce(). "
                           "The repartition process won't be executed and "
                           "the target DataFrame will be returned without "
                           f"any changes. Exception: {e}")
            return df

    # If the desired number is GREATER THAN the current, use repartition()
    elif num_partitions > current_partitions:
        logger.warning(f"The target partition number ({num_partitions}) "
                       f"is greater than the current one "
                       f"({current_partitions}) and, because of that, "
                       "the repartitioning operation will be executed "
                       "using the repartition() method which can be "
                       "expensive under the application perspective. As a "
                       "suggestion, check if this is really the expected "
                       "behavior for this operation.")
        try:
            df_repartitioned = df.repartition(num_partitions)

        except Exception as e:
            logger.warning("Failed to repartition using repartition(). "
                           "The repartition process won't be executed and "
                           "the target DataFrame will be returned without "
                           f"any changes. Exception: {e}")
            return df

    return df_repartitioned

run_spark_sql_pipeline(spark_session, spark_sql_pipeline) staticmethod

Providing a way to run multiple SparkSQL queries in sequence.

This method allows users to define a sequence of SparkSQL queries to built transformation DAGs in order to transform DataFrames in a Spark application using only SQL. The core idea behind this method is that users can define a sequence of SparkSQL statements using a predefined list of dictionaries (spark_sql_pipeline argument) that will be handled by the method as the main piece for sequentially running the queries and providing the desired result as a Spark DataFrame.

As said before, everything around this method takes place spark_sql_pipeline argument definition. In essence, this argument can be defined as a list with multiple dictionaries, where each inner dictionary in this list can have elements that describe the execution of a SparkSQL query (including the query itself).

Examples:

# Defining a list with all SparkSQL steps to be executed
spark_sql_pipeline = [
    {
        "step": 1,
        "query": '''
            SELECT
                order_id,
                order_status,
                order_purchase_ts

            FROM tbl_orders
        '''
        "create_temp_view": True,
        "temp_view_name": "auto"
    },
    {
        "step": 2,
        "query": '''
            SELECT
                order_id,
                sum(payment_value) AS sum_payment_value

            FROM tbl_payments

            GROUP BY order_id
        '''
        "create_temp_view": True,
        "temp_view_name": "auto"
    },
    {
        "step": 3,
        "query": '''
            SELECT
                step_1.order_id,
                step_1.order_status,
                step_1.order_purchase_ts,
                step_2.sum_payment_value

            FROM step_1

            LEFT JOIN step_2
                ON step_1.order_id = step_2.order_id
        '''
        "create_temp_view": True,
        "temp_view_name": "auto"
    }
]

# Running the SparkSQL pipeline
df_prep = run_spark_sql_pipeline(
    spark_session=spark_manager.spark,
    spark_sql_pipeline=spark_sql_pipeline
)
About the spark_sql_pipeline definition

As stated before, the spark_sql_pipeline method argument can be defined as a Python list where each element is a Python dictionary with all information needed to run SparkSQL statements in sequence.

First of all, it's important to say that the inner dictionaries must be defined with some acceptable keys:

  • "step" (required): defines an integer number to inform the method in which order the query in the given dictionary should be executed. The value passed on the "step" inner dictionary key is used in a sorting proccess that defines the execution order of the SparkSQL statements.

  • "query" (required): well, this is the SparkSQL query itself that will be executed by the method. This could be defines as a Python string directly on the dictionary or even by reading some external JSON or SQL file in the project directory.

  • "create_temp_view" (optional, default=True): defines a boolean flag that handles the creation of a new temporary view for each executed step. By the default, it's set as True, meaning that after each execution, a new temporary view will be available for further SparkSQL statements.

  • "temp_view_name" (optional, default="auto"): defines the name of the temporary view created after executing the SparkSQL query in a given step. It's applicable only if "create_temp_view" is True for the step. By default, it's value is set as "auto", meaning that the name of the intermediate temporary view will be set as "step_N", where N is the integer that defines the step. For example, in the first inner dictionary of the spark_sql_pipeline list (for instance, "step": 1), a query will be executed and, if there is no an explicit "create_temp_view": False in the dictionary, then a new temporary view with query result will be created and named as "step_1". So, any further SparkSQL statements that selects data from any "step_1" table will be pointing to the intermediate results of the first step of the pipeline. By the other hand, users can define a specific name for the step temporary view by filling this key with any desired string.

If users don't explicit define the keys "create_temp_view" and "temp_view_name", the method will consider its default values. In other words, if the a inner dictionary of the spark_sql_pipeline list doesn't have any of the mentioned keys, it means that a temporary view will be created after running the step's query and it will be named as "step_N", where N is the integer that identify the step.

Parameters:

Name Type Description Default
spark_session pyspark.sql.SparkSession

A SparkSession object to be used to run SparkSQL query for grouping data

required
spark_sql_pipeline list

A list made by dictionaries that defines details of the steps to be executed using SparkSQL queries. Check the tip above for more details on how passing this argument

required

Returns:

Type Description
DataFrame

A Spark DataFrame that is the result of the execution of the last step (query) defined in the spark_sql_pipeline list.

Raises:

Type Description
ValueError

An exception raises in two different situations: first, if the user defines any dictionary in spark_sql_pipeline list doesn't have the required keys ("step" and "query"). Second, if any dictionary in the spark_sql_pipeline list has any value for the "step" key that isn't an integer. Those validations are important to ensure that users are correctly defining the arguments for the method .

Source code in sparksnake/manager.py
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
@staticmethod
def run_spark_sql_pipeline(
    spark_session: SparkSession,
    spark_sql_pipeline: list
) -> DataFrame:
    """Providing a way to run multiple SparkSQL queries in sequence.

    This method allows users to define a sequence of SparkSQL queries to
    built transformation DAGs in order to transform DataFrames in a Spark
    application using only SQL. The core idea behind this method is that
    users can define a sequence of SparkSQL statements using a predefined
    list of dictionaries (`spark_sql_pipeline` argument) that will be
    handled by the method as the main piece for sequentially running
    the queries and providing the desired result as a Spark DataFrame.

    As said before, everything around this method takes place
    `spark_sql_pipeline` argument definition. In essence, this argument
    can be defined as a list with multiple dictionaries, where each
    inner dictionary in this list can have elements that describe the
    execution of a SparkSQL query (including the query itself).

    Examples:
    ```python
    # Defining a list with all SparkSQL steps to be executed
    spark_sql_pipeline = [
        {
            "step": 1,
            "query": '''
                SELECT
                    order_id,
                    order_status,
                    order_purchase_ts

                FROM tbl_orders
            '''
            "create_temp_view": True,
            "temp_view_name": "auto"
        },
        {
            "step": 2,
            "query": '''
                SELECT
                    order_id,
                    sum(payment_value) AS sum_payment_value

                FROM tbl_payments

                GROUP BY order_id
            '''
            "create_temp_view": True,
            "temp_view_name": "auto"
        },
        {
            "step": 3,
            "query": '''
                SELECT
                    step_1.order_id,
                    step_1.order_status,
                    step_1.order_purchase_ts,
                    step_2.sum_payment_value

                FROM step_1

                LEFT JOIN step_2
                    ON step_1.order_id = step_2.order_id
            '''
            "create_temp_view": True,
            "temp_view_name": "auto"
        }
    ]

    # Running the SparkSQL pipeline
    df_prep = run_spark_sql_pipeline(
        spark_session=spark_manager.spark,
        spark_sql_pipeline=spark_sql_pipeline
    )
    ```

    Tip: About the spark_sql_pipeline definition
        As stated before, the `spark_sql_pipeline` method argument can be
        defined as a Python list where each element is a Python dictionary
        with all information needed to run SparkSQL statements in sequence.

        First of all, it's important to say that the inner dictionaries
        must be defined with some acceptable keys:

        - `"step"` (required): defines an integer number to inform the
            method in which order the query in the given dictionary should
            be executed. The value passed on the "step" inner dictionary
            key is used in a sorting proccess that defines the execution
            order of the SparkSQL statements.

        - `"query"` (required): well, this is the SparkSQL query itself
            that will be executed by the method. This could be defines as a
            Python string directly on the dictionary or even by reading
            some external JSON or SQL file in the project directory.

        - `"create_temp_view"` (optional, default=True): defines a boolean
            flag that handles the creation of a new temporary view for
            each executed step. By the default, it's set as True, meaning
            that after each execution, a new temporary view will be
            available for further SparkSQL statements.

        - `"temp_view_name"` (optional, default="auto"): defines the name
            of the temporary view created after executing the SparkSQL
            query in a given step. It's applicable only if
            "create_temp_view" is True for the step. By default, it's value
            is set as "auto", meaning that the name of the intermediate
            temporary view will be set as "step_N", where N is the integer
            that defines the step. For example, in the first inner
            dictionary of the `spark_sql_pipeline` list (for instance,
            "step": 1), a query will be executed and, if there is no an
            explicit "create_temp_view": False in the dictionary, then a
            new temporary view with query result will be created and named
            as "step_1". So, any further SparkSQL statements that selects
            data from any "step_1" table will be pointing to the
            intermediate results of the first step of the pipeline. By the
            other hand, users can define a specific name for the step
            temporary view by filling this key with any desired string.

        If users don't explicit define the keys "create_temp_view" and
        "temp_view_name", the method will consider its default values. In
        other words, if the a inner dictionary of the `spark_sql_pipeline`
        list doesn't have any of the mentioned keys, it means that a
        temporary view will be created after running the step's query and
        it will be named as "step_N", where N is the integer that identify
        the step.

    Args:
        spark_session (pyspark.sql.SparkSession):
            A SparkSession object to be used to run SparkSQL query for
            grouping data

        spark_sql_pipeline (list):
            A list made by dictionaries that defines details of the steps
            to be executed using SparkSQL queries. Check the tip above for
            more details on how passing this argument

    Returns:
        A Spark DataFrame that is the result of the execution of the last\
        step (query) defined in the spark_sql_pipeline list.

    Raises:
        ValueError: An exception raises in two different situations:\
        first, if the user defines any dictionary in `spark_sql_pipeline`\
        list doesn't have the required keys ("step" and "query"). Second,\
        if any dictionary in the `spark_sql_pipeline` list has any value\
        for the "step" key that isn't an integer. Those validations are\
        important to ensure that users are correctly defining the\
        arguments for the method .
    """

    # Applying some validations on the sql pipeline argument
    required_keys = ["step", "query"]

    # Getting a list with keys of all inner dicts from sql pipeline list
    inner_pipe_keys = [list(pipe.keys()) for pipe in spark_sql_pipeline]

    # Checking if the required arguments are in all inner dict keys
    for inner_key in inner_pipe_keys:
        if not all(key in inner_key for key in required_keys):
            raise ValueError("Invalid value for spark_sql_pipeline "
                             "argument. Please, check if all inner "
                             "dictionaries have the 'step' and the "
                             "'query' required keys defined accordingly. "
                             "The main reason of this validation is "
                             "that without the 'step' key, the method "
                             "won't be able to sort and set the execution "
                             "order. Without the 'query' key, the method "
                             "won't be able to run any SparkSQL query")

    # Checking if all step keys are integers
    step_types_validation = list(set(
        [isinstance(pipe["step"], int) for pipe in spark_sql_pipeline]
    ))[0]
    if not step_types_validation:
        raise ValueError("Invalid value for spark_sql_pipeline "
                         "argument. Please check if all inner "
                         "dictionaries have the 'step' key defined as "
                         "integer values. If any 'step' key for a given "
                         "step is defined with a non integer number, the "
                         "method won't be able to sort the steps and set "
                         "the execution order accordingly")

    # Going to the method: sorting the steps in an ascending order
    sorted_spark_sql_pipeline = sorted(
        spark_sql_pipeline,
        key=lambda x: x["step"]
    )

    # Iterating over the pipeline elements to execute the statements
    for pipe in sorted_spark_sql_pipeline:
        # Executing the queries in sequence
        df_step = spark_session.sql(pipe["query"])

        # Creating a temporary view with the step result (if applicable)
        if "create_temp_view" not in pipe\
                or bool(pipe["create_temp_view"]):
            # Assigning a name for result temporary view
            if "temp_view_name" not in pipe\
                    or pipe["temp_view_name"].strip().lower() == "auto":
                temp_view_name = "step_" + str(pipe["step"])
            else:
                temp_view_name = pipe["temp_view_name"]

            # Creating the temporary view
            df_step.createOrReplaceTempView(temp_view_name)

    # Returning the DataFrame from the last step
    return df_step