tencent cloud

Feedback

DLC PySpark

Last updated: 2024-11-01 16:26:14
    Note:
    You need to bind the DLC engine. Currently, DLC PySpark supports the Spark job engine. For engine kernel details, see DLC Engine Kernel Version.

    Feature Overview

    Create a DLC PySpark task in WeData, submit it to the WeData scheduling platform and the DLC engine for execution.

    Task parameters description

    In the task properties of DLC PySpark, you can add DLC PySpark task data access policy, entry parameters, dependent resources, Spark task conf parameters, and task image.
    Parameter name
    Parameter description
    Data access policy
    Required, security policy to access COS data during task execution. For details, refer to DLC Configuration Data Access Policy.
    Entry parameters
    Optional, entry parameters of the program. Multiple parameters are supported and should be separated by "space".
    Dependent resources
    Optional, supports selecting --py-files, --files, --archives. Multiple COS paths for each resource can be input, separated by commas (,).
    Conf parameters
    Optional, parameters starting with spark., formatted as k=v. Multiple parameters should be separated by new lines. Example: spark.network.timeout=120s.
    Task image
    The image for task execution. If the task requires a specific image, you can choose between DLC built-in image and custom image.
    Resource configuration
    Using cluster resource configuration: Use the default resource configuration parameters of the cluster.
    Custom: Resource usage parameters for custom tasks, including executor size, driver size, and number of executors.

    Sample code

    from os.path import abspath
    
    from pyspark.sql import SparkSession
    
    if __name__ == "__main__":
    spark = SparkSession \\
    .builder \\
    .appName("Operate DB Example") \\
    .getOrCreate()
    # 1. Create database
    spark.sql("CREATE DATABASE IF NOT EXISTS `DataLakeCatalog`.`dlc_db_test_py` COMMENT 'demo test' ")
    # 2. Create inner table
    spark.sql("CREATE TABLE IF NOT EXISTS `DataLakeCatalog`.`dlc_db_test_py`.`test`(`id` int,`name` string,`age` int) ")
    # 3. Write inner data
    spark.sql("INSERT INTO `DataLakeCatalog`.`dlc_db_test_py`.`test` VALUES (1,'Andy',12),(2,'Justin',3) ")
    # 4. Query inner data
    spark.sql("SELECT * FROM `DataLakeCatalog`.`dlc_db_test_py`.`test` ").show()
    # 5. Create outer table
    spark.sql("CREATE EXTERNAL TABLE IF NOT EXISTS `DataLakeCatalog`.`dlc_db_test_py`.`ext_test`(`id` int, `name` string, `age` int) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS TEXTFILE LOCATION 'cosn://cos-bucket-name/ext_test' ")
    # 6. Write outer data
    spark.sql("INSERT INTO `DataLakeCatalog`.`dlc_db_test_py`.`ext_test` VALUES (1,'Andy',12),(2,'Justin',3) ")
    # 7. Query outer data
    spark.sql("SELECT * FROM `DataLakeCatalog`.`dlc_db_test_py`.`ext_test` ").show()
    spark.stop()
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support