Note:
You need to start the Hive and Spark component services in the EMR cluster. 1. The current user has permissions in the EMR cluster.
2. The corresponding database and table have been created in Hive, such as in the example: wedata_demo_db.
3. The PySpark system automatically submits tasks using cluster mode.
Sample Code
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
spark = SparkSession.builder.appName("WeDataApp").getOrCreate()
schema = StructType([
StructField("user_id", IntegerType(), True),
StructField("user_name", StringType(), True),
StructField("age", IntegerType(), True)
])
data = [(1, "Alice", 25), (2, "Bob", 30)]
df = spark.createDataFrame(data, schema=schema)
df.show()
from pyspark.sql import SparkSession
#
spark = SparkSession.builder.appName("WeDataApp").enableHiveSupport().getOrCreate()
#
df = spark.sql("SELECT * FROM WeData_demo_db.user_demo")
#
count = df.count()
#
print("The number of rows in the dataframe is:", count)
Parameter description
|
Python Version | Python2 and Python3 are supported. |
Use the Python environment of the scheduling resource group in the PySpark task
Install Python libraries in the scheduling resource group
1. Enter the Project Management > Execution Resource Group > Standard Scheduling Resource Group interface, click Resource Details, and go to the resource maintenance interface.
2. In the Resource Operations interface, click Python Package Installation to install built-in Python libraries. It is recommended to install the Python3 version.
3. Currently, the platform only supports the installation of built-in libraries. Here, install the sklearn and pandas libraries. After the installation is complete, you can use the View Python Packages feature to view the installed Python libraries.
Edit PySpark task
1. Create a task and select the scheduling resource group that has installed the Python package.
2. Write PySpark code using the Python libraries; here, pandas and sklearn are used.
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import pandas as pd
import sklearn
spark = SparkSession.builder.appName("WeDataApp-1").getOrCreate()
schema = StructType([
StructField("user_id", IntegerType(), True),
StructField("user_name", StringType(), True),
StructField("age", IntegerType(), True)
])
data = [(1, "Alice", 25), (2, "Bob", 30)]
df = spark.createDataFrame(data, schema=schema)
pandas_df = df.toPandas()
df.show()
print(pandas_df.head(10))
print(sklearn.__version__)
Debug the PySpark task
1. Click Debug Runs to view the debug logs and results.
Example: In the logs, you can see that the Python environment in the Schedule Resource Group was used as the task execution environment.
spark.yarn.dist.archives,file:///usr/local/python3/python3.zip
2. View the log results to see that the installed pandas library was used and the correct version of the installed sklearn library was printed.
Periodic Scheduling PySpark task
Periodic Scheduling runs, view the debug run logs and results. In the logs, you can see that the Python environment in the Schedule Resource Group was used as the task execution environment.
Was this page helpful?