跳到主要内容
版本: 0.7

Open API

Dinky 平台提供 OpenAPI 能力,通过调用 Dinky 的 OpenAPI 可以使用 Dinky 的功能,实现应用和 Dinky 的集成和交互。

背景

Dinky 的O penAPI 提供了 10 多种 API 功能。通过调用 API,可以快速进行操作和系统集成对接等工作,提高数据开发效率,满足企业定制化需求。您还可以通过开放平台,轻松获取 OpenAPI 的使用情况。

当前支持的 OpenAPI 包括如下:

序号执行模式类型名称作用
1explainSqlcancel调用取消 FlinkSQL 作业
2explainSqlstatement调用 statement 语句
3executeJaryarn-application调用执行 Yarn-Application jar包
4executeSqlkubernetes-session调用执行作业在 K8ssession 上运行
5executeSqllocal调用执行作业在 Local 上运行
6executeSqlyarn-per-job调用执行作业在 Yarn-per-job 上运行
7executeSqlstandalone调用执行作业在 Standalone 上运行
8executeSqlyarn-session调用执行作业在 Yarn-session 上运行
9getJobData
10getJobPlanstatement调用获取执行计划
11getStreamGraphstatement调用获取 Flink DAG 图
12savepoint调用手动保存 Checkpoints
13savepointTask调用并触发 Savepoint
14submitTask作业调度

示例

explainSql

explainSql 包括 FlinkSQL 作业取消和 statement 语句执行

FlinkSQL 作业 cancel:

explainSql

explainSql 包括 cancel

http://127.0.0.1:8888/openapi/explainSql
{
/* required-start */
"jobId":"195352b0a4518e16699983a13205f059",
/* required-end */
/* custom-start */
"address":"127.0.0.1:8081",
"gatewayConfig":{
"clusterConfig":{
"appId":"application_1637739262398_0032",
"flinkConfigPath":"/opt/src/flink-1.13.3_conf/conf",
"flinkLibPath":"hdfs:///flink13/lib/flinklib",
"yarnConfigPath":"/usr/local/hadoop/hadoop-2.7.7/etc/hadoop"
}
}
/* custom-start */
}

statement 语句执行:

 http://127.0.0.1:8888/openapi/explainSql
{
/* required-start */
"statement":"CREATE TABLE Orders (\r\n order_number INT,\r\n price DECIMAL(32,2),\r\n order_time TIMESTAMP(3)\r\n) WITH (\r\n 'connector' = 'datagen',\r\n 'rows-per-second' = '1',\r\n 'fields.order_number.kind' = 'sequence',\r\n 'fields.order_number.start' = '1',\r\n 'fields.order_number.end' = '1000'\r\n);\r\nCREATE TABLE pt (\r\nordertotal INT,\r\nnumtotal INT\r\n) WITH (\r\n 'connector' = 'print'\r\n);\r\ninsert into pt select 1 as ordertotal ,sum(order_number)*2 as numtotal from Orders",
/* required-end */
/* default-start */
"useStatementSet":false,
"fragment":false,
"parallelism":1,
/* default-start */
/* custom-start */
"configuration":{
"table.exec.resource.default-parallelism":2
}
/* custom-end */
}

executeJar

Yarn-application jar 包调用并提交

 http://127.0.0.1:8888/openapi/executeJar
{
/* required-start */
"type":"yarn-application",
"gatewayConfig":{
"clusterConfig":{
"flinkConfigPath":"/opt/src/flink-1.13.3_conf/conf",
"flinkLibPath":"hdfs:///flink13/lib/flinklib",
"yarnConfigPath":"/usr/local/hadoop/hadoop-2.7.7/etc/hadoop"
},
"appConfig":{
"userJarPath":"hdfs:///flink12/jar/currencyAppJar.jar",
"userJarParas":["--id","2774,2775,2776"," --type","dwd"],
"userJarMainAppClass":"com.app.MainApp"
},
"flinkConfig": {
"configuration":{
"parallelism.default": 1
}
}
},
/* required-end */
/* custom-start */
"jobName":"openapitest",
"savePointPath":"hdfs://ns/flink/savepoints/savepoint-5f4b8c-4326844a6843"
/* custom-end */
}

executeSql

executeSql 提交执行作业包括 Local,Kubernetes-session,Yarn-per-job,Standalone,Yarn-session

Local:

 http://127.0.0.1:8888/openapi/executeSql 
{
/* required-start */
"type":"local",
"statement":"CREATE TABLE Orders (\r\n order_number INT,\r\n price DECIMAL(32,2),\r\n order_time TIMESTAMP(3)\r\n) WITH (\r\n 'connector' = 'datagen',\r\n 'rows-per-second' = '1',\r\n 'fields.order_number.kind' = 'sequence',\r\n 'fields.order_number.start' = '1',\r\n 'fields.order_number.end' = '1000'\r\n);\r\nCREATE TABLE pt (\r\nordertotal INT,\r\nnumtotal INT\r\n) WITH (\r\n 'connector' = 'print'\r\n);\r\ninsert into pt select 1 as ordertotal ,sum(order_number)*2 as numtotal from Orders",
/* required-end */
/* default-start */
"useResult":false,
"useChangeLog":false,
"useAutoCancel":false,
"useStatementSet":false,
"fragment":false,
"maxRowNum":100,
"checkPoint":0,
"parallelism":1,
/* default-start */
/* custom-start */
"jobName":"openapitest",
"savePointPath":"hdfs://ns/flink/savepoints/savepoint-5f4b8c-4326844a6843",
"configuration":{
"table.exec.resource.default-parallelism":2
}
/* custom-end */
}

Kubernetes-session:

 http://127.0.0.1:8888/openapi/executeSql
{
/* required-start */
"type":"kubernetes-session",
"address":"127.0.0.1:8081",
"statement":"CREATE TABLE Orders (\r\n order_number INT,\r\n price DECIMAL(32,2),\r\n order_time TIMESTAMP(3)\r\n) WITH (\r\n 'connector' = 'datagen',\r\n 'rows-per-second' = '1',\r\n 'fields.order_number.kind' = 'sequence',\r\n 'fields.order_number.start' = '1',\r\n 'fields.order_number.end' = '1000'\r\n);\r\nCREATE TABLE pt (\r\nordertotal INT,\r\nnumtotal INT\r\n) WITH (\r\n 'connector' = 'print'\r\n);\r\ninsert into pt select 1 as ordertotal ,sum(order_number)*2 as numtotal from Orders",
/* required-end */
/* default-start */
"useResult":false,
"useStatementSet":false,
"useChangeLog":false,
"useAutoCancel":false,
"fragment":false,
"maxRowNum":100,
"checkPoint":0,
"parallelism":1,
/* default-start */
/* custom-start */
"jobName":"openapitest",
"savePointPath":"hdfs://ns/flink/savepoints/savepoint-5f4b8c-4326844a6843",
"configuration":{
"table.exec.resource.default-parallelism":2
}
/* custom-end */
}

Yarn-per-job:

http://127.0.0.1:8888/openapi/executeSql 
{
/* required-start */
"type":"yarn-per-job",
"statement":"CREATE TABLE Orders (\r\n order_number INT,\r\n price DECIMAL(32,2),\r\n order_time TIMESTAMP(3)\r\n) WITH (\r\n 'connector' = 'datagen',\r\n 'rows-per-second' = '1',\r\n 'fields.order_number.kind' = 'sequence',\r\n 'fields.order_number.start' = '1',\r\n 'fields.order_number.end' = '1000'\r\n);\r\nCREATE TABLE pt (\r\nordertotal INT,\r\nnumtotal INT\r\n) WITH (\r\n 'connector' = 'print'\r\n);\r\ninsert into pt select 1 as ordertotal ,sum(order_number)*2 as numtotal from Orders",
"gatewayConfig":{
"clusterConfig":{
"flinkConfigPath":"/opt/src/flink-1.13.3_conf/conf",
"flinkLibPath":"hdfs:///flink13/lib/flinklib",
"yarnConfigPath":"/usr/local/hadoop/hadoop-2.7.7/etc/hadoop"
},
"flinkConfig": {
"configuration":{
"parallelism.default": 1
}
}
},
/* required-end */
/* default-start */
"useResult":false,
"useStatementSet":false,
"fragment":false,
"maxRowNum":100,
"checkPoint":0,
"parallelism":1,
/* default-start */
/* custom-start */
"jobName":"openapitest",
"savePointPath":"hdfs://ns/flink/savepoints/savepoint-5f4b8c-4326844a6843",
"configuration":{
"table.exec.resource.default-parallelism":2
}
/* custom-end */
}

Yarn-session:

 http://127.0.0.1:8888/openapi/executeSql 
{
/* required-start */
"type":"yarn-session",
"address":"127.0.0.1:8081",
"statement":"CREATE TABLE Orders (\r\n order_number INT,\r\n price DECIMAL(32,2),\r\n order_time TIMESTAMP(3)\r\n) WITH (\r\n 'connector' = 'datagen',\r\n 'rows-per-second' = '1',\r\n 'fields.order_number.kind' = 'sequence',\r\n 'fields.order_number.start' = '1',\r\n 'fields.order_number.end' = '1000'\r\n);\r\nCREATE TABLE pt (\r\nordertotal INT,\r\nnumtotal INT\r\n) WITH (\r\n 'connector' = 'print'\r\n);\r\ninsert into pt select 1 as ordertotal ,sum(order_number)*2 as numtotal from Orders",
/* required-end */
/* default-start */
"useResult":false,
"useStatementSet":false,
"useChangeLog":false,
"useAutoCancel":false,
"fragment":false,
"maxRowNum":100,
"checkPoint":0,
"parallelism":1,
/* default-start */
/* custom-start */
"jobName":"openapitest",
"savePointPath":"hdfs://ns/flink/savepoints/savepoint-5f4b8c-4326844a6843",
"configuration":{
"table.exec.resource.default-parallelism":2
}
/* custom-end */
}

Standalone:

 http://127.0.0.1:8888/openapi/executeSql
{
/* required-start */
"type":"standalone",
"address":"127.0.0.1:8081",
"statement":"CREATE TABLE Orders (\r\n order_number INT,\r\n price DECIMAL(32,2),\r\n order_time TIMESTAMP(3)\r\n) WITH (\r\n 'connector' = 'datagen',\r\n 'rows-per-second' = '1',\r\n 'fields.order_number.kind' = 'sequence',\r\n 'fields.order_number.start' = '1',\r\n 'fields.order_number.end' = '1000'\r\n);\r\nCREATE TABLE pt (\r\nordertotal INT,\r\nnumtotal INT\r\n) WITH (\r\n 'connector' = 'print'\r\n);\r\ninsert into pt select 1 as ordertotal ,sum(order_number)*2 as numtotal from Orders",
/* required-end */
/* default-start */
"useResult":false,
"useStatementSet":false,
"useChangeLog":false,
"useAutoCancel":false,
"fragment":false,
"maxRowNum":100,
"checkPoint":0,
"parallelism":1,
/* default-start */
/* custom-start */
"jobName":"openapitest",
"savePointPath":"hdfs://ns/flink/savepoints/savepoint-5f4b8c-4326844a6843",
"configuration":{
"table.exec.resource.default-parallelism":2
}
/* custom-end */
}

getJobData

 http://127.0.0.1:8888/openapi/getJobData?jobId=195352b0a4518e16699983a13205f059

getJobPlan

 http://127.0.0.1:8888/openapi/getJobPlan
{
/* required-start */
"statement":"CREATE TABLE Orders (\r\n order_number INT,\r\n price DECIMAL(32,2),\r\n order_time TIMESTAMP(3)\r\n) WITH (\r\n 'connector' = 'datagen',\r\n 'rows-per-second' = '1',\r\n 'fields.order_number.kind' = 'sequence',\r\n 'fields.order_number.start' = '1',\r\n 'fields.order_number.end' = '1000'\r\n);\r\nCREATE TABLE pt (\r\nordertotal INT,\r\nnumtotal INT\r\n) WITH (\r\n 'connector' = 'print'\r\n);\r\ninsert into pt select 1 as ordertotal ,sum(order_number)*2 as numtotal from Orders",
/* required-end */
/* default-start */
"useStatementSet":false,
"fragment":false,
"parallelism":1,
/* default-start */
/* custom-start */
"configuration":{
"table.exec.resource.default-parallelism":2
}
/* custom-end */
}

getStreamGraph

 http://127.0.0.1:8888/openapi/getStreamGraph 
{
/* required-start */
"statement":"CREATE TABLE Orders (\r\n order_number INT,\r\n price DECIMAL(32,2),\r\n order_time TIMESTAMP(3)\r\n) WITH (\r\n 'connector' = 'datagen',\r\n 'rows-per-second' = '1',\r\n 'fields.order_number.kind' = 'sequence',\r\n 'fields.order_number.start' = '1',\r\n 'fields.order_number.end' = '1000'\r\n);\r\nCREATE TABLE pt (\r\nordertotal INT,\r\nnumtotal INT\r\n) WITH (\r\n 'connector' = 'print'\r\n);\r\ninsert into pt select 1 as ordertotal ,sum(order_number)*2 as numtotal from Orders",
/* required-end */
/* default-start */
"useStatementSet":false,
"fragment":false,
"parallelism":1,
/* default-start */
/* custom-start */
"configuration":{
"table.exec.resource.default-parallelism":2
}
/* custom-end */
}

savepoint

 http://127.0.0.1:8888/openapi/savepoint
{
/* required-start */
"jobId":"195352b0a4518e16699983a13205f059",
"savePointType":"trigger", // trigger | stop | cancel
/* required-end */
/* custom-start */
"savePoint":"195352b0a4518e16699983a13205f059",
"address":"127.0.0.1:8081",
"gatewayConfig":{
"clusterConfig":{
"appId":"application_1637739262398_0032",
"flinkConfigPath":"/opt/src/flink-1.13.3_conf/conf",
"flinkLibPath":"hdfs:///flink13/lib/flinklib",
"yarnConfigPath":"/usr/local/hadoop/hadoop-2.7.7/etc/hadoop"
}
}
/* custom-start */
}

savepointTask

http://127.0.0.1:8888/openapi/savepointTask
{
/* required-start */
"taskId":"1",
/* required-end */
/* custom-start */
"type":"trigger" // trigger | stop | cancel | canceljob
/* custom-start */
}

submitTask

http://127.0.0.1:8888/openapi/submitTask?id=1 
说明

OpenAPI 包括元数据、数据开发、数据集成、运维中心、数据质量、数据服务等。其中数据开发已经开发完成,其他根据版本发布会逐步实现。如果您在数据开发过程中需要使用 Dinky OpenAPI,使用方式请参见示例。