KEMBAR78
6.3. Data - Structure - Pyspark - Ipynb - Exercise | PDF | Function (Mathematics) | Software Engineering
0% found this document useful (0 votes)
64 views6 pages

6.3. Data - Structure - Pyspark - Ipynb - Exercise

Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
64 views6 pages

6.3. Data - Structure - Pyspark - Ipynb - Exercise

Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
You are on page 1/ 6

11/3/24, 8:59 PM Bản sao của 6.3. data_structure_pyspark.

ipynb - Colab

keyboard_arrow_down RDD to DataFrame


To convert an RDD to a DataFrame, we can use the SparkSession.createDataFrame() function. Every element in the RDD has be to an Row
object.

Create an RDD

rdd_raw = sc.textFile('./mtcars.csv')
rdd_raw.take(5)

['model,mpg,cyl,disp,hp,drat,wt,qsec,vs,am,gear,carb',
'Mazda RX4,21,6,160,110,3.9,2.62,16.46,0,1,4,4',
'Mazda RX4 Wag,21,6,160,110,3.9,2.875,17.02,0,1,4,4',
'Datsun 710,22.8,4,108,93,3.85,2.32,18.61,1,1,4,1',
'Hornet 4 Drive,21.4,6,258,110,3.08,3.215,19.44,1,0,3,1']

header = rdd_raw.map(lambda x: x.split(',')).filter(lambda x: x[1] == 'mpg').collect()[0]


header[0] = 'model'
header

['model',
'mpg',
'cyl',
'disp',
'hp',
'drat',
'wt',
'qsec',
'vs',
'am',
'gear',
'carb']

keyboard_arrow_down Save the rest to a new RDD

rdd = rdd_raw.map(lambda x: x.split(',')).filter(lambda x: x[1] != 'mpg')


rdd.take(2)

[['Mazda RX4',
'21',
'6',
'160',
'110',
'3.9',
'2.62',
'16.46',
'0',
'1',
'4',
'4'],
['Mazda RX4 Wag',
'21',
'6',
'160',
'110',
'3.9',
'2.875',
'17.02',
'0',
'1',
'4',
'4']]

First we define a function which takes a list of column names and a list of values and create a Row of key-value pairs. Since keys in an Row
object are variable names, we can’t simply pass a dictionary to the Row() function. We can think of a dictionary as an argument list and use the
** to unpack the argument list.

See an example.

from pyspark.sql import Row


my_dict = dict(zip(['a', 'b', 'c'], range(1, 4)))
Row(**my_dict)

https://colab.research.google.com/drive/1L6KEcp0M345BVe0nqrXAHivuOlh1Qdzk#printMode=true 11/16
11/3/24, 8:59 PM Bản sao của 6.3. data_structure_pyspark.ipynb - Colab

Row(a=1, b=2, c=3)

keyboard_arrow_down Let’s define the function.

def dict_to_row(keys, values):


row_dict = dict(zip(keys, values))
return Row(**row_dict)

rdd_rows = rdd.map(lambda x: dict_to_row(header, x))


rdd_rows.take(3)

[Row(model='Mazda RX4', mpg='21', cyl='6', disp='160', hp='110', drat='3.9', wt='2.62', qsec='16.46', vs='0', am='1', gear='4',
carb='4'),
Row(model='Mazda RX4 Wag', mpg='21', cyl='6', disp='160', hp='110', drat='3.9', wt='2.875', qsec='17.02', vs='0', am='1', gear='4',
carb='4'),
Row(model='Datsun 710', mpg='22.8', cyl='4', disp='108', hp='93', drat='3.85', wt='2.32', qsec='18.61', vs='1', am='1', gear='4',
carb='1')]

# check
type(rdd_rows)

pyspark.rdd.PipelinedRDD

df = spark.createDataFrame(rdd_rows)
df.show(5)

+-----------------+----+---+----+---+----+-----+-----+---+---+----+----+
| model| mpg|cyl|disp| hp|drat| wt| qsec| vs| am|gear|carb|
+-----------------+----+---+----+---+----+-----+-----+---+---+----+----+
| Mazda RX4| 21| 6| 160|110| 3.9| 2.62|16.46| 0| 1| 4| 4|
| Mazda RX4 Wag| 21| 6| 160|110| 3.9|2.875|17.02| 0| 1| 4| 4|
| Datsun 710|22.8| 4| 108| 93|3.85| 2.32|18.61| 1| 1| 4| 1|
| Hornet 4 Drive|21.4| 6| 258|110|3.08|3.215|19.44| 1| 0| 3| 1|
|Hornet Sportabout|18.7| 8| 360|175|3.15| 3.44|17.02| 0| 0| 3| 2|
+-----------------+----+---+----+---+----+-----+-----+---+---+----+----+
only showing top 5 rows

Bắt đầu lập trình hoặc tạo mã bằng trí tuệ nhân tạo (AI).

keyboard_arrow_down Merge and split columns


Sometimes we need to merge multiple columns in a Dataframe into one column, or split a column into multiple columns. We can easily achieve
this by converting a DataFrame to RDD, applying map functions to manipulate elements, and then converting the RDD back to a DataFrame.

# adjust first column name


colnames = mtcars.columns
colnames[0] = 'model'
mtcars = mtcars.rdd.toDF(colnames)
mtcars.show(5)

+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
| model| mpg|cyl| disp| hp|drat| wt| qsec| vs| am|gear|carb|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
| Mazda RX4|21.0| 6|160.0|110| 3.9| 2.62|16.46| 0| 1| 4| 4|
| Mazda RX4 Wag|21.0| 6|160.0|110| 3.9|2.875|17.02| 0| 1| 4| 4|
| Datsun 710|22.8| 4|108.0| 93|3.85| 2.32|18.61| 1| 1| 4| 1|
| Hornet 4 Drive|21.4| 6|258.0|110|3.08|3.215|19.44| 1| 0| 3| 1|
|Hornet Sportabout|18.7| 8|360.0|175|3.15| 3.44|17.02| 0| 0| 3| 2|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 5 rows

keyboard_arrow_down Merge multiple columns


We convert DataFrame to RDD and then apply the map function to merge values and convert elements to Row objects.

type(mtcars)

https://colab.research.google.com/drive/1L6KEcp0M345BVe0nqrXAHivuOlh1Qdzk#printMode=true 12/16
11/3/24, 8:59 PM Bản sao của 6.3. data_structure_pyspark.ipynb - Colab

pyspark.sql.dataframe.DataFrame
def __init__(jdf, sql_ctx)

A distributed collection of data grouped into named columns.

A :class:`DataFrame` is equivalent to a relational table in Spark SQL,


and can be created using various functions in :class:`SparkSession`::

people = spark.read.parquet("...")

from pyspark.sql import Row


mtcars_rdd = mtcars.rdd.map(lambda x: Row(model=x[0], values=x[1:]))
mtcars_rdd.take(5)

[Row(model='Mazda RX4', values=(21.0, 6, 160.0, 110, 3.9, 2.62, 16.46, 0, 1, 4, 4)),


Row(model='Mazda RX4 Wag', values=(21.0, 6, 160.0, 110, 3.9, 2.875, 17.02, 0, 1, 4, 4)),
Row(model='Datsun 710', values=(22.8, 4, 108.0, 93, 3.85, 2.32, 18.61, 1, 1, 4, 1)),
Row(model='Hornet 4 Drive', values=(21.4, 6, 258.0, 110, 3.08, 3.215, 19.44, 1, 0, 3, 1)),
Row(model='Hornet Sportabout', values=(18.7, 8, 360.0, 175, 3.15, 3.44, 17.02, 0, 0, 3, 2))]

type(mtcars_rdd)

pyspark.rdd.PipelinedRDD

Then we create a new DataFrame from the obtained RDD.

mtcars_df = spark.createDataFrame(mtcars_rdd)
mtcars_df.show(5, truncate=False)

+-----------------+-----------------------------------------------------+
|model |values |
+-----------------+-----------------------------------------------------+
|Mazda RX4 |{21.0, 6, 160.0, 110, 3.9, 2.62, 16.46, 0, 1, 4, 4} |
|Mazda RX4 Wag |{21.0, 6, 160.0, 110, 3.9, 2.875, 17.02, 0, 1, 4, 4} |
|Datsun 710 |{22.8, 4, 108.0, 93, 3.85, 2.32, 18.61, 1, 1, 4, 1} |
|Hornet 4 Drive |{21.4, 6, 258.0, 110, 3.08, 3.215, 19.44, 1, 0, 3, 1}|
|Hornet Sportabout|{18.7, 8, 360.0, 175, 3.15, 3.44, 17.02, 0, 0, 3, 2} |
+-----------------+-----------------------------------------------------+
only showing top 5 rows

keyboard_arrow_down Split one column


We use the above DataFrame as our example data. Again, we need to convert the DataFrame to an RDD to achieve our goal.

Let's split the values column into two columns: x1 and x2. The first 4 values will be in column x1 and the remaining values will be in column x2.

mtcars_rdd_2 = mtcars_df.rdd.map(lambda x: Row(model=x[0], x1=x[1][:5], x2=x[1][5:]))


# convert RDD back to DataFrame
mtcars_df_2 = spark.createDataFrame(mtcars_rdd_2)
mtcars_df_2.show(5, truncate=False)

+-----------------+---------------------------+--------------------------+
|model |x1 |x2 |
+-----------------+---------------------------+--------------------------+
|Mazda RX4 |{21.0, 6, 160.0, 110, 3.9} |{2.62, 16.46, 0, 1, 4, 4} |
|Mazda RX4 Wag |{21.0, 6, 160.0, 110, 3.9} |{2.875, 17.02, 0, 1, 4, 4}|
|Datsun 710 |{22.8, 4, 108.0, 93, 3.85} |{2.32, 18.61, 1, 1, 4, 1} |
|Hornet 4 Drive |{21.4, 6, 258.0, 110, 3.08}|{3.215, 19.44, 1, 0, 3, 1}|
|Hornet Sportabout|{18.7, 8, 360.0, 175, 3.15}|{3.44, 17.02, 0, 0, 3, 2} |
+-----------------+---------------------------+--------------------------+
only showing top 5 rows

mtcars_rdd_3 = mtcars_df.rdd.map(lambda x: Row(model=x[0], x1=x[1][:7], x2=x[1][7:]))


# convert RDD back to DataFrame
mtcars_df_3 = spark.createDataFrame(mtcars_rdd_3)
mtcars_df_3.show(5, truncate=False)

+-----------------+-----------------------------------------+------------+
|model |x1 |x2 |
+-----------------+-----------------------------------------+------------+
|Mazda RX4 |{21.0, 6, 160.0, 110, 3.9, 2.62, 16.46} |{0, 1, 4, 4}|
|Mazda RX4 Wag |{21.0, 6, 160.0, 110, 3.9, 2.875, 17.02} |{0, 1, 4, 4}|
https://colab.research.google.com/drive/1L6KEcp0M345BVe0nqrXAHivuOlh1Qdzk#printMode=true 13/16
11/3/24, 8:59 PM Bản sao của 6.3. data_structure_pyspark.ipynb - Colab
|Datsun 710 |{22.8, 4, 108.0, 93, 3.85, 2.32, 18.61} |{1, 1, 4, 1}|
|Hornet 4 Drive |{21.4, 6, 258.0, 110, 3.08, 3.215, 19.44}|{1, 0, 3, 1}|
|Hornet Sportabout|{18.7, 8, 360.0, 175, 3.15, 3.44, 17.02} |{0, 0, 3, 2}|
+-----------------+-----------------------------------------+------------+
only showing top 5 rows

mtcars_rdd_4 = mtcars_df.rdd.map(lambda x: Row(model=x[0], x1=x[1][:5], x2=x[1][5:8], x3=x[1][8:]))


# convert RDD back to DataFrame
mtcars_df_4 = spark.createDataFrame(mtcars_rdd_4)
mtcars_df_4.show(5, truncate=False)

+-----------------+---------------------------+-----------------+---------+
|model |x1 |x2 |x3 |
+-----------------+---------------------------+-----------------+---------+
|Mazda RX4 |{21.0, 6, 160.0, 110, 3.9} |{2.62, 16.46, 0} |{1, 4, 4}|
|Mazda RX4 Wag |{21.0, 6, 160.0, 110, 3.9} |{2.875, 17.02, 0}|{1, 4, 4}|
|Datsun 710 |{22.8, 4, 108.0, 93, 3.85} |{2.32, 18.61, 1} |{1, 4, 1}|
|Hornet 4 Drive |{21.4, 6, 258.0, 110, 3.08}|{3.215, 19.44, 1}|{0, 3, 1}|
|Hornet Sportabout|{18.7, 8, 360.0, 175, 3.15}|{3.44, 17.02, 0} |{0, 3, 2}|
+-----------------+---------------------------+-----------------+---------+
only showing top 5 rows

keyboard_arrow_down Exercise
1. Split mtcars_df into 5 columns given that the first column is model , call the new dataframe as mtcars_df_5 .
2. Merge all columns (except the first colums model ) of mtcars_df_5 to one column X . Call this new dataframe as mtcars_df_6
3. Split mtcars_df into some columns given that the first column is model . From the second column and more, each column has three
values, while the last column contains the remining values. Then we call the new dataframe as mtcars_df_7 .

# Exercise 1 Part 1: Split `mtcars_df` into 5 columns, named `mtcars_df_5`


mtcars_rdd_5 = mtcars_df.rdd.map(lambda x: Row(model=x[0], x1=x[1][:3], x2=x[1][3:5], x3=x[1][5:7], x4=x[1][7:9], x5=x[1][9:]))
mtcars_df_5 = spark.createDataFrame(mtcars_rdd_5)
mtcars_df_5.show(truncate=False)

+-------------------+----------------+-----------+--------------+------+------+
|model |x1 |x2 |x3 |x4 |x5 |
+-------------------+----------------+-----------+--------------+------+------+
|Mazda RX4 |{21.0, 6, 160.0}|{110, 3.9} |{2.62, 16.46} |{0, 1}|{4, 4}|
|Mazda RX4 Wag |{21.0, 6, 160.0}|{110, 3.9} |{2.875, 17.02}|{0, 1}|{4, 4}|
|Datsun 710 |{22.8, 4, 108.0}|{93, 3.85} |{2.32, 18.61} |{1, 1}|{4, 1}|
|Hornet 4 Drive |{21.4, 6, 258.0}|{110, 3.08}|{3.215, 19.44}|{1, 0}|{3, 1}|
|Hornet Sportabout |{18.7, 8, 360.0}|{175, 3.15}|{3.44, 17.02} |{0, 0}|{3, 2}|
|Valiant |{18.1, 6, 225.0}|{105, 2.76}|{3.46, 20.22} |{1, 0}|{3, 1}|
|Duster 360 |{14.3, 8, 360.0}|{245, 3.21}|{3.57, 15.84} |{0, 0}|{3, 4}|
|Merc 240D |{24.4, 4, 146.7}|{62, 3.69} |{3.19, 20.0} |{1, 0}|{4, 2}|
|Merc 230 |{22.8, 4, 140.8}|{95, 3.92} |{3.15, 22.9} |{1, 0}|{4, 2}|
|Merc 280 |{19.2, 6, 167.6}|{123, 3.92}|{3.44, 18.3} |{1, 0}|{4, 4}|
|Merc 280C |{17.8, 6, 167.6}|{123, 3.92}|{3.44, 18.9} |{1, 0}|{4, 4}|
|Merc 450SE |{16.4, 8, 275.8}|{180, 3.07}|{4.07, 17.4} |{0, 0}|{3, 3}|
|Merc 450SL |{17.3, 8, 275.8}|{180, 3.07}|{3.73, 17.6} |{0, 0}|{3, 3}|
|Merc 450SLC |{15.2, 8, 275.8}|{180, 3.07}|{3.78, 18.0} |{0, 0}|{3, 3}|
|Cadillac Fleetwood |{10.4, 8, 472.0}|{205, 2.93}|{5.25, 17.98} |{0, 0}|{3, 4}|
|Lincoln Continental|{10.4, 8, 460.0}|{215, 3.0} |{5.424, 17.82}|{0, 0}|{3, 4}|
|Chrysler Imperial |{14.7, 8, 440.0}|{230, 3.23}|{5.345, 17.42}|{0, 0}|{3, 4}|
|Fiat 128 |{32.4, 4, 78.7} |{66, 4.08} |{2.2, 19.47} |{1, 1}|{4, 1}|
|Honda Civic |{30.4, 4, 75.7} |{52, 4.93} |{1.615, 18.52}|{1, 1}|{4, 2}|
|Toyota Corolla |{33.9, 4, 71.1} |{65, 4.22} |{1.835, 19.9} |{1, 1}|{4, 1}|
+-------------------+----------------+-----------+--------------+------+------+
only showing top 20 rows

# Exercise 1 Part 2: Merge all columns (except `model`) of `mtcars_df_5` to one column `X`, creating `mtcars_df_6`
mtcars_df_6 = mtcars_df_5.rdd.map(lambda x: Row(model=x[0], X=x.x1 + x.x2 + x.x3 + x.x4 + x.x5)).toDF()
mtcars_df_6.show(truncate=False)

+-------------------+-----------------------------------------------------+
|model |X |
+-------------------+-----------------------------------------------------+
|Mazda RX4 |{21.0, 6, 160.0, 110, 3.9, 2.62, 16.46, 0, 1, 4, 4} |
|Mazda RX4 Wag |{21.0, 6, 160.0, 110, 3.9, 2.875, 17.02, 0, 1, 4, 4} |
|Datsun 710 |{22.8, 4, 108.0, 93, 3.85, 2.32, 18.61, 1, 1, 4, 1} |
|Hornet 4 Drive |{21.4, 6, 258.0, 110, 3.08, 3.215, 19.44, 1, 0, 3, 1}|
|Hornet Sportabout |{18.7, 8, 360.0, 175, 3.15, 3.44, 17.02, 0, 0, 3, 2} |
|Valiant |{18.1, 6, 225.0, 105, 2.76, 3.46, 20.22, 1, 0, 3, 1} |

https://colab.research.google.com/drive/1L6KEcp0M345BVe0nqrXAHivuOlh1Qdzk#printMode=true 14/16
11/3/24, 8:59 PM Bản sao của 6.3. data_structure_pyspark.ipynb - Colab
|Duster 360 |{14.3, 8, 360.0, 245, 3.21, 3.57, 15.84, 0, 0, 3, 4} |
|Merc 240D |{24.4, 4, 146.7, 62, 3.69, 3.19, 20.0, 1, 0, 4, 2} |
|Merc 230 |{22.8, 4, 140.8, 95, 3.92, 3.15, 22.9, 1, 0, 4, 2} |
|Merc 280 |{19.2, 6, 167.6, 123, 3.92, 3.44, 18.3, 1, 0, 4, 4} |
|Merc 280C |{17.8, 6, 167.6, 123, 3.92, 3.44, 18.9, 1, 0, 4, 4} |
|Merc 450SE |{16.4, 8, 275.8, 180, 3.07, 4.07, 17.4, 0, 0, 3, 3} |
|Merc 450SL |{17.3, 8, 275.8, 180, 3.07, 3.73, 17.6, 0, 0, 3, 3} |
|Merc 450SLC |{15.2, 8, 275.8, 180, 3.07, 3.78, 18.0, 0, 0, 3, 3} |
|Cadillac Fleetwood |{10.4, 8, 472.0, 205, 2.93, 5.25, 17.98, 0, 0, 3, 4} |
|Lincoln Continental|{10.4, 8, 460.0, 215, 3.0, 5.424, 17.82, 0, 0, 3, 4} |
|Chrysler Imperial |{14.7, 8, 440.0, 230, 3.23, 5.345, 17.42, 0, 0, 3, 4}|
|Fiat 128 |{32.4, 4, 78.7, 66, 4.08, 2.2, 19.47, 1, 1, 4, 1} |
|Honda Civic |{30.4, 4, 75.7, 52, 4.93, 1.615, 18.52, 1, 1, 4, 2} |
|Toyota Corolla |{33.9, 4, 71.1, 65, 4.22, 1.835, 19.9, 1, 1, 4, 1} |
+-------------------+-----------------------------------------------------+
only showing top 20 rows

# Exercise 1 Part 3: Split `mtcars_df` so each additional column has three values, last column holds remaining values, named `mtcars_df_7`
mtcars_rdd_7 = mtcars_df.rdd.map(lambda x: Row(model=x[0], x1=x[1][:3], x2=x[1][3:6], x3=x[1][6:9], x4=x[1][9:]))
mtcars_df_7 = spark.createDataFrame(mtcars_rdd_7)
mtcars_df_7.show(truncate=False)

+-------------------+----------------+------------------+-------------+------+
|model |x1 |x2 |x3 |x4 |
+-------------------+----------------+------------------+-------------+------+
|Mazda RX4 |{21.0, 6, 160.0}|{110, 3.9, 2.62} |{16.46, 0, 1}|{4, 4}|
|Mazda RX4 Wag |{21.0, 6, 160.0}|{110, 3.9, 2.875} |{17.02, 0, 1}|{4, 4}|
|Datsun 710 |{22.8, 4, 108.0}|{93, 3.85, 2.32} |{18.61, 1, 1}|{4, 1}|
|Hornet 4 Drive |{21.4, 6, 258.0}|{110, 3.08, 3.215}|{19.44, 1, 0}|{3, 1}|
|Hornet Sportabout |{18.7, 8, 360.0}|{175, 3.15, 3.44} |{17.02, 0, 0}|{3, 2}|
|Valiant |{18.1, 6, 225.0}|{105, 2.76, 3.46} |{20.22, 1, 0}|{3, 1}|
|Duster 360 |{14.3, 8, 360.0}|{245, 3.21, 3.57} |{15.84, 0, 0}|{3, 4}|
|Merc 240D |{24.4, 4, 146.7}|{62, 3.69, 3.19} |{20.0, 1, 0} |{4, 2}|
|Merc 230 |{22.8, 4, 140.8}|{95, 3.92, 3.15} |{22.9, 1, 0} |{4, 2}|
|Merc 280 |{19.2, 6, 167.6}|{123, 3.92, 3.44} |{18.3, 1, 0} |{4, 4}|
|Merc 280C |{17.8, 6, 167.6}|{123, 3.92, 3.44} |{18.9, 1, 0} |{4, 4}|
|Merc 450SE |{16.4, 8, 275.8}|{180, 3.07, 4.07} |{17.4, 0, 0} |{3, 3}|
|Merc 450SL |{17.3, 8, 275.8}|{180, 3.07, 3.73} |{17.6, 0, 0} |{3, 3}|
|Merc 450SLC |{15.2, 8, 275.8}|{180, 3.07, 3.78} |{18.0, 0, 0} |{3, 3}|
|Cadillac Fleetwood |{10.4, 8, 472.0}|{205, 2.93, 5.25} |{17.98, 0, 0}|{3, 4}|
|Lincoln Continental|{10.4, 8, 460.0}|{215, 3.0, 5.424} |{17.82, 0, 0}|{3, 4}|
|Chrysler Imperial |{14.7, 8, 440.0}|{230, 3.23, 5.345}|{17.42, 0, 0}|{3, 4}|
|Fiat 128 |{32.4, 4, 78.7} |{66, 4.08, 2.2} |{19.47, 1, 1}|{4, 1}|
|Honda Civic |{30.4, 4, 75.7} |{52, 4.93, 1.615} |{18.52, 1, 1}|{4, 2}|
|Toyota Corolla |{33.9, 4, 71.1} |{65, 4.22, 1.835} |{19.9, 1, 1} |{4, 1}|
+-------------------+----------------+------------------+-------------+------+
only showing top 20 rows

keyboard_arrow_down Exercise
Do the same thing for titanic dataset.

from google.colab import files


files.upload()

from pyspark.sql import SparkSession


from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType,ArrayType

# Load the Titanic CSV file with similar parameters


# Define explicit schema for Titanic data
titanic_df = spark.read.csv(
path='/content/titanic.csv',
sep=',',
encoding='UTF-8',
header=True,
inferSchema=True
)

# Display the first 5 rows of the Titanic DataFrame


titanic_df.show(n=5, truncate=False)

https://colab.research.google.com/drive/1L6KEcp0M345BVe0nqrXAHivuOlh1Qdzk#printMode=true 15/16
11/3/24, 8:59 PM Bản sao của 6.3. data_structure_pyspark.ipynb - Colab

+-----------+--------+------+---------------------------------------------------+------+----+-----+-----+----------------+-------+-----+
|PassengerId|Survived|Pclass|Name |Sex |Age |SibSp|Parch|Ticket |Fare |Cabin|
+-----------+--------+------+---------------------------------------------------+------+----+-----+-----+----------------+-------+-----+
|1 |0 |3 |Braund, Mr. Owen Harris |male |22.0|1 |0 |A/5 21171 |7.25 |null |
|2 |1 |1 |Cumings, Mrs. John Bradley (Florence Briggs Thayer)|female|38.0|1 |0 |PC 17599 |71.2833|C85 |
|3 |1 |3 |Heikkinen, Miss. Laina |female|26.0|0 |0 |STON/O2. 3101282|7.925 |null |
|4 |1 |1 |Futrelle, Mrs. Jacques Heath (Lily May Peel) |female|35.0|1 |0 |113803 |53.1 |C123 |
|5 |0 |3 |Allen, Mr. William Henry |male |35.0|0 |0 |373450 |8.05 |null |
+-----------+--------+------+---------------------------------------------------+------+----+-----+-----+----------------+-------+-----+
only showing top 5 rows

# adjust first column name


colnames = titanic_df.columns
colnames[0] = 'id'
titanic_df = titanic_df.rdd.toDF(colnames)
titanic_df.show(5)

+---+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
| id|Survived|Pclass| Name| Sex| Age|SibSp|Parch| Ticket| Fare|Cabin|Embarked|
+---+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
| 1| 0| 3|Braund, Mr. Owen ...| male|22.0| 1| 0| A/5 21171| 7.25| null| S|
| 2| 1| 1|Cumings, Mrs. Joh...|female|38.0| 1| 0| PC 17599|71.2833| C85| C|
| 3| 1| 3|Heikkinen, Miss. ...|female|26.0| 0| 0|STON/O2. 3101282| 7.925| null| S|
| 4| 1| 1|Futrelle, Mrs. Ja...|female|35.0| 1| 0| 113803| 53.1| C123| S|
| 5| 0| 3|Allen, Mr. Willia...| male|35.0| 0| 0| 373450| 8.05| null| S|
+---+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
only showing top 5 rows

from pyspark.sql import Row


titanic_df_rdd = titanic_df.rdd.map(lambda x: Row(id=x[0], values=x[1:]))
titanic_df_rdd.take(5)

[Row(id=1, values=(0, 3, 'Braund, Mr. Owen Harris', 'male', 22.0, 1, 0, 'A/5 21171', 7.25, None, 'S')),
R (id 2 l (1 1 'C i M J h B dl (Fl B i Th )' 'f l ' 38 0 1 0 'PC 17599' 71 2833 'C85'

https://colab.research.google.com/drive/1L6KEcp0M345BVe0nqrXAHivuOlh1Qdzk#printMode=true 16/16

You might also like