Azure Databricks客户库库提供了一个方便的接口,用于通过Azure Databricks REST API自动化Azure Databricks工作区。
该库的实现基于REST API 2.0及更高版本。
主分支是版本2。版本1.1(稳定)在版本/1.1分支中。
您必须具有个人访问令牌(PAT)或Azure Active Directory标记(AAD令牌)才能访问Databricks REST API。
| REST API | 版本 | 描述 |
|---|---|---|
| 集群 | 2.0 | 簇API允许您创建,启动,编辑,列表,终止和删除簇。 |
| 工作 | 2.1 | 作业API允许您通过编程方式管理Azure Databricks作业。 |
| DBFS | 2.0 | DBFS API是一个数据链球api,它使与各种数据源进行交互变得易于互动,而无需每次读取文件时都包含您的凭据。 |
| 秘密 | 2.0 | Secrets API允许您管理秘密,秘密范围和访问权限。 |
| 组 | 2.0 | 组API允许您管理用户组。 |
| 库 | 2.0 | 库API允许您安装和卸载库,并在集群上获取库的状态。 |
| 令牌 | 2.0 | 令牌API允许您创建,列出和撤销可用于身份验证和访问Azure Databricks REST API的令牌。 |
| 工作区 | 2.0 | 工作区API允许您列出,导入,导出和删除笔记本电脑和文件夹。 |
| InstancePool | 2.0 | 实例池API允许您创建,编辑,删除和列表实例池。 |
| 权限 | 2.0 | 权限API使您可以管理令牌,群集,池,作业,Delta Live Tables Pipeline,笔记本,目录,MLFLOW实验,MLFlow注册模型,SQL仓库,回购和集群策略的权限。 |
| 集群策略 | 2.0 | 集群策略API允许您创建,列出和编辑群集策略。 |
| 全局初始脚本 | 2.0 | 全局初始脚本API使Azure Databricks管理员以安全和控制的方式添加全局群集初始化脚本。 |
| SQL仓库 | 2.0 | SQL仓库API允许您管理计算资源,使您可以在Databricks SQL中的数据对象上运行SQL命令。 |
| 存储库 | 2.0 | 存储库API允许用户管理其GIT存储库。用户可以使用API访问其管理权限的所有存储库。 |
| 管道(三角洲现场表) | 2.0 | Delta Live Tables API允许您创建,编辑,删除,启动和查看有关管道的详细信息。 |
查看示例项目以获取更多详细的用法。
在以下示例中,应将baseUrl变量设置为工作区碱URL,该token看起来像https://adb-<workspace-id>.<random-number>.azuredatabricks.net Databricks个人访问令牌。
using ( var client = DatabricksClient . CreateClient ( baseUrl , token ) )
{
// ...
} var clusterConfig = ClusterAttributes
. GetNewClusterConfiguration ( " Sample cluster " )
. WithRuntimeVersion ( RuntimeVersions . Runtime_10_4 )
. WithAutoScale ( 3 , 7 )
. WithAutoTermination ( 30 )
. WithClusterLogConf ( " dbfs:/logs/ " )
. WithNodeType ( NodeTypes . Standard_D3_v2 )
. WithClusterMode ( ClusterMode . SingleNode ) ;
var clusterId = await client . Clusters . Create ( clusterConfig ) ; using Policy = Polly . Policy ;
static async Task WaitForCluster ( IClustersApi clusterClient , string clusterId , int pollIntervalSeconds = 15 )
{
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. OrResult < ClusterInfo > ( info => info . State is not ( ClusterState . RUNNING or ClusterState . ERROR or ClusterState . TERMINATED ) )
. WaitAndRetryForeverAsync (
_ => TimeSpan . FromSeconds ( pollIntervalSeconds ) ,
( delegateResult , _ ) =>
{
if ( delegateResult . Exception != null )
{
Console . WriteLine ( $" [ { DateTime . UtcNow : s } ] Failed to query cluster info - { delegateResult . Exception } " ) ;
}
} ) ;
await retryPolicy . ExecuteAsync ( async ( ) =>
{
var info = await clusterClient . Get ( clusterId ) ;
Console . WriteLine ( $" [ { DateTime . UtcNow : s } ] Cluster: { clusterId } t State: { info . State } t Message: { info . StateMessage } " ) ;
return info ;
} ) ;
}
await WaitForCluster ( client . Clusters , clusterId ) ; await client . Clusters . Terminate ( clusterId ) ;
await WaitForCluster ( client . Clusters , clusterId ) ; await client . Clusters . Delete ( clusterId ) ; // Job schedule
var schedule = new CronSchedule
{
QuartzCronExpression = " 0 0 9 ? * MON-FRI " ,
TimezoneId = " Europe/London " ,
PauseStatus = PauseStatus . UNPAUSED
} ;
// Run with a job cluster
var newCluster = ClusterAttributes . GetNewClusterConfiguration ( )
. WithClusterMode ( ClusterMode . SingleNode )
. WithNodeType ( NodeTypes . Standard_D3_v2 )
. WithRuntimeVersion ( RuntimeVersions . Runtime_10_4 ) ;
// Create job settings
var jobSettings = new JobSettings
{
MaxConcurrentRuns = 1 ,
Schedule = schedule ,
Name = " Sample Job "
} ;
// Adding 3 tasks to the job settings.
var task1 = jobSettings . AddTask ( " task1 " , new NotebookTask { NotebookPath = SampleNotebookPath } )
. WithDescription ( " Sample Job - task1 " )
. WithNewCluster ( newCluster ) ;
var task2 = jobSettings . AddTask ( " task2 " , new NotebookTask { NotebookPath = SampleNotebookPath } )
. WithDescription ( " Sample Job - task2 " )
. WithNewCluster ( newCluster ) ;
jobSettings . AddTask ( " task3 " , new NotebookTask { NotebookPath = SampleNotebookPath } , new [ ] { task1 , task2 } )
. WithDescription ( " Sample Job - task3 " )
. WithNewCluster ( newCluster ) ;
// Create the job.
Console . WriteLine ( " Creating new job " ) ;
var jobId = await client . Jobs . Create ( jobSettings ) ;
Console . WriteLine ( " Job created: {0} " , jobId ) ; // Start the job and retrieve the run id.
Console . WriteLine ( " Run now: {0} " , jobId ) ;
var runId = await client . Jobs . RunNow ( jobId ) ; using Policy = Polly . Policy ;
static async Task WaitForRun ( IJobsApi jobClient , long runId , int pollIntervalSeconds = 15 )
{
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. OrResult < RunState > ( state =>
state . LifeCycleState is RunLifeCycleState . PENDING or RunLifeCycleState . RUNNING
or RunLifeCycleState . TERMINATING )
. WaitAndRetryForeverAsync (
_ => TimeSpan . FromSeconds ( pollIntervalSeconds ) ,
( delegateResult , _ ) =>
{
if ( delegateResult . Exception != null )
{
Console . WriteLine (
$" [ { DateTime . UtcNow : s } ] Failed to query run - { delegateResult . Exception } " ) ;
}
} ) ;
await retryPolicy . ExecuteAsync ( async ( ) =>
{
var ( run , _ ) = await jobClient . RunsGet ( runId ) ;
Console . WriteLine (
$" [ { DateTime . UtcNow : s } ] Run: { runId } t LifeCycleState: { run . State . LifeCycleState } t ResultState: { run . State . ResultState } t Completed: { run . IsCompleted } "
) ;
return run . State ;
} ) ;
}
await WaitForRun ( client . Jobs , runId ) ; var ( run , _ ) = await client . Jobs . RunsGet ( runId ) ;
foreach ( var runTask in run . Tasks )
{
var viewItems = await client . Jobs . RunsExport ( runTask . RunId ) ;
foreach ( var viewItem in viewItems )
{
Console . WriteLine ( $" Exported view item from run { runTask . RunId } , task " { runTask . TaskKey } " , view " { viewItem . Name } " " ) ;
Console . WriteLine ( " ==================== " ) ;
Console . WriteLine ( viewItem . Content [ .. 200 ] + " ... " ) ;
Console . WriteLine ( " ==================== " ) ;
}
}创建秘密范围
const string scope = " SampleScope " ;
await client . Secrets . CreateScope ( scope , null ) ;创建文字秘密
var secretName = " secretkey.text " ;
await client . Secrets . PutSecret ( " secret text " , scope , secretName ) ;创建二进制秘密
var secretName = " secretkey.bin " ;
await client . Secrets . PutSecret ( new byte [ ] { 0x01 , 0x02 , 0x03 , 0x04 } , scope , secretName ) ;clusters/create , jobs/run-now和jobs/runs/submit API支持IDEMPOTENCY令牌。保证请求的势力是可选的令牌。如果已经存在带有令牌的资源(集群或运行),则该请求不会创建新资源,而是返回现有资源的ID。
如果您指定了diDempotency代币,则在失败后,您可以重试,直到请求成功。 Databricks保证使用该势力令牌启动一个资源。
以下代码说明了如何使用Polly使用idempotency_token重试该请求(如果请求失败)。
using Polly ;
double retryIntervalSec = 15 ;
string idempotencyToken = Guid . NewGuid ( ) . ToString ( ) ;
var clusterInfo = ClusterAttributes . GetNewClusterConfiguration ( " my-cluster " )
. WithNodeType ( " Standard_D3_v2 " )
. WithNumberOfWorkers ( 25 )
. WithRuntimeVersion ( RuntimeVersions . Runtime_7_3 ) ;
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . ServiceUnavailable )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. WaitAndRetryForeverAsync ( _ => TimeSpan . FromSeconds ( retryIntervalSec ) ) ;
var clusterId = await retryPolicy . ExecuteAsync ( async ( ) => await client . Clusters . Create ( clusterInfo , idempotencyToken ) ) ; 库的V2目标.NET 6运行时。
重新设计了Jobs API,以与REST API的2.1版对齐。
在上一个版本中,作业API仅支持每个作业的单个任务。新的作业API支持每个作业的多个任务,其中任务表示为DAG。
新版本支持另外两种类型的任务:Python Wheel任务和Delta Live Tables Pipeline Task。
该项目欢迎贡献和建议。大多数捐款要求您同意撰写贡献者许可协议(CLA),宣布您有权并实际上授予我们使用您的贡献的权利。有关详细信息,请访问Microsoft贡献者许可协议(CLA)。
当您提交拉动请求时,CLA机器人将自动确定您是否需要提供CLA并适当装饰PR(例如,标签,评论)。只需按照机器人提供的说明即可。您只需要使用我们的CLA在所有存储库中进行一次。
该项目采用了Microsoft开源的行为代码。有关更多信息,请参见《行为守则常见问题守则》或与其他问题或评论联系[email protected]。