tencent cloud

文档反馈

Python 访问

最后更新时间:2024-07-31 17:34:04
    数据湖计算 DLC 提供符合 DBAPI 2.0 标准的工具。您可以通过 Python 连接到 DLC 的 Presto/Spark 引擎,可以方便地使用 SQL 的方式操作 DLC 库表。

    环境准备

    1. Python 3.9及以上版本。
    2. 安装tencentcloud-dlc-connector。
    pip install -i https://mirrors.tencent.com/pypi/simple/ tencentcloud-dlc-connector

    使用示例

    步骤一:连接到引擎

    代码:
    import tdlc_connector
    import datetime
    from tdlc_connector import constants
    
    conn = tdlc_connector.connect(region="<REGION>",
    secret_id="<SECRET_ID>",
    secret_key="<SECRET_KEY>",
    token=None,
    endpoint=None,
    catalog=constants.Catalog.DATALAKECATALOG,
    engine="<ENGINE>",
    engine_type=constants.EngineType.AUTO,
    result_style=constants.ResultStyles.LIST,
    download=False,
    mode=constants.Mode.LASY,
    database='',
    config={},
    callback=None,
    callback_events=None,
    )
    参数说明:
    参数
    说明
    region
    引擎所在地域,如 ap-nanjing,ap-beijing,ap-guangzhou,ap-shanghai, ap-chengdu,ap-chongqing,na-siliconvalley,ap-singapore, ap-hongkong
    secret_id
    腾讯云 SecretID
    secret_key
    腾讯云 SecretKey
    token
    (可选)临时密钥 Token
    endpoint
    (可选)连接服务节点
    engine
    使用的引擎名称,例如 “test_python”
    engine_type
    (可选)引擎类型:对应引擎名称的引擎类型, 默认值 constants.EngineType.AUTO
    例如:AUTO、PRESTO、SPARK、SPARK_BATCH
    result_style
    (可选)返回结果的格式,可选 LIST/DICT
    download
    (可选)是否直接下载数据 True/False,详见下载模式说明
    mode
    (可选)模式。支持 ALL/LASY/STREAM
    database
    (可选)默认数据库
    config
    (可选)提交到集群配置
    callback
    (可选)回调函数,函数签名 def cb(statement_id, status)
    callback_events
    (可选)回调触发事件,同callback配合使用,详见回调机制说明
    driver_size
    (可选)Driver 节点大小,默认值 constants.PodSize.SMALL (仅SPARK_BATCH 集群有效)
    可选值:SMALL、MEDIUM、LARGE、XLARGE、M_SMALL、M_MEDIUM、M_LARGE、M_XLARGE
    executor_size
    (可选)Executor 节点大小,默认值 constants.PodSize.SMALL (仅SPARK_BATCH 集群有效) 可选值:SMALL、MEDIUM、LARGE、XLARGE、M_SMALL、M_MEDIUM、M_LARGE、M_XLARGE
    executor_num
    (可选)Executor 节点数量, 默认值1 (仅SPARK_BATCH 集群有效)
    executor_max_num
    (可选)Executor 最大节点数量,若不等于 executor_num, 则开启资源动态分配 (仅 SPARK_BATCH 集群有效)
    
    下载模式说明
    序号
    download
    mode
    说明
    1
    False
    ALL
    从接口获取所有数据,获取完成后才能 fetch 到数据
    2
    False
    LASY
    从接口获取数据,根据 fetch 的数据量延迟到服务端获取数据
    3
    False
    STREAM
    同 LASY 模式
    4
    True
    ALL
    从 COS 下载所有结果(需具备 COS 读权限)占用本地临时存储,数据量较大时推荐使用
    5
    True
    LASY
    从 COS 下载结果(需具备 COS 读权限),根据 fetch 数据量延迟下载文件
    6
    True
    STREAM
    实时的从 COS 读取结果流(需具备 COS 读权限),性能较慢,本地内存磁盘占用率极低

    步骤二:执行 SQL

    代码:
    # 基本操作
    
    cursor = conn.cursor()
    count = cursor.execute("SELECT 1")
    print(cursor.fetchone()) # 读取一行数据
    for row in cursor.fetchall(): # 读取剩余多行数据
    print(row)
    
    # 使用参数 pyformat 格式
    
    cursor.execute("SELECT * FROM dummy WHERE date < %s", datetime.datetime.now())
    cursor.execute("SELECT * FROM dummy WHERE status in %s", (('SUCCESS', 'INIT', 'FAIL'),))
    cursor.execute("SELECT * FROM dummy WHERE date < %(date)s AND status = %(status)s", {'date': datetime.datetime.now(), 'status': 'SUCCESS'})
    
    # 使用BULK方式
    
    cursor.executemany("INSERT INTO dummy VALUES(%s, %s)", [('张三', 18), ('李四', 20)])

    基本操作流程

    上述代码流程如下:
    1. 通过conn.cursor() 创建了一个游标对象。
    2. 通过cursor.execute("SELECT 1") 执行了一条 SQL 查询语句,并将结果赋值给变量count。
    3. 通过调用cursor.fetchone() 方法读取了一行数据,并将其打印出来。
    4. 在一个循环中调用了cursor.fetchall() 方法来读取剩余的多行数据,并逐行打印出来。

    参数传递方式

    支持两种不同的参数传递方式:
    使用 pyformat 格式:在执行 SQL 语句时,可以使用%s作为占位符,并将实际参数以元组或字典形式传入。
    使用 BULK 方式:通过调用executemany() 方法可以批量插入多个记录到数据库表中。

    特性功能

    回调机制使用说明

    import tdlc_connector
    import datetime
    from tdlc_connector import constants
    
    
    def tdlc_connector_callback(statement_id, state):
    '''
    parmas: statement_id 任务id
    params: state 任务状态,枚举值 参考constants.TaskStatus
    '''
    print(statement_id, state)
    
    
    conn = tdlc_connector.connect(region="<REGION>",
    secret_id="<SECRET_ID>",
    secret_key="<SECRET_KEY>",
    engine="<ENGINE>",
    engine_type=constants.EngineType.SPARK,
    result_style=constants.ResultStyles.LIST,
    callback=tdlc_connector_callback,
    callback_events=[constants.CallbackEvent.ON_INIT, constants.CallbackEvent.ON_SUCCESS]
    )
    
    cursor = conn.cursor()
    cursor.execute("SELECT 1")
    cursor.fetchone()
    
    # callback 函数会在任务初始化 和 任务成功时调用
    

    提交任务到作业集群

    当前已支持提交任务到 Spark 作业集群,具体请参考如下示例。
    from tdlc_connector import constants
    
    conn = tdlc_connector.connect(region="<REGION>",
    secret_id="<SECRET_ID>",
    secret_key="<SECRET_KEY>",
    engine="<ENGINE>", # 请选择 spark 作业引擎
    result_style=constants.ResultStyles.LIST,
    driver_size=constants.PodSize.SMALL, # 选择 Driver 规格
    executor_size=constants.PodSize.SMALL, # 选择 Executor 规格
    executor_num=1, # 设置 Executor 数量
    executor_max_num=1, # 设置 Executor 最大数量, 如果 不等于{executor_num},则开启动态资源分配
    )
    说明:
    如需使用该特性,请将 connector 升级至 >= 1.1.0。

    自动推断引擎类型

    当前指定使用引擎后无需再指定引擎类型,connector 会自动推断,具体请参考如下示例。
    from tdlc_connector import constants
    
    conn = tdlc_connector.connect(region="<REGION>",
    secret_id="<SECRET_ID>",
    secret_key="<SECRET_KEY>",
    engine="<ENGINE>",
    engine_type=constants.EngineType.AUTO # 可设置成AUTO 或者 不传入该参数,驱动自动推断
    )
    说明:
    如需使用该特性,请将 connector 升级至 >= 1.1.0。

    空值转换

    当前结果集基于CSV格式存储,引擎默认将空值转换成空字符串,如果需要区分空值,请指定空值表示符, 例如 “\\1” ,引擎查询结果会将空值转换成 “\\1”,同时驱动会将 “\\1”字段转换成 None,具体请参考如下示例。
    from tdlc_connector import constants, formats
    
    formats.FORMAT_STRING_NULL = '\\1'
    
    conn = tdlc_connector.connect(region="<REGION>",
    secret_id="<SECRET_ID>",
    secret_key="<SECRET_KEY>",
    engine="<ENGINE>",
    result_style=constants.ResultStyles.LIST
    )
    说明:
    空值转换目前仅支持 SparkSQL集群,请将 connector 升级至 >= 1.1.3。
    
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持