数据流概述
数据流是数据处理自动化和业务规则协同的引擎。支持通过模版创建。它把分散任务整合为连贯流程,通过设定触发条件、流转逻辑,让任务在系统或人员间自动传递,实现业务自动化运作。
例如:在传统业务场景中,人工进行数据各种处理操作既费时又费力,且容易出错,有了自动化数据流,就可以对您的各种数据处理过程自动化运行,例如:数据查询、数据分析、数据存储、数据转换等等。同时,还可以运用模型以及智能体的能力,使数据流更加智能化,提高工作效率。
数据流相关概念
›触发动作:
触发动作是自动化数据流的起点,根据该触发动作的状态变化,来作为自动化流程是否执行的起始判断,例如上述案例中,在知识库上传一个新的文档就是流程的开始。
常见的触发动作有定时触发、事件触发、手动点击触发,下面将带您了解这几种触发动作的基本概念:
- 定时触发:触发动作为循环的时间周期或时间点,例如每x分钟、每x小时、每天或每周等固定周期。在实际工作中,有一些任务需要在特定的时间完成,主要适用于消息提醒类的场景,可以规定数据流运行的时间和频率。示例:文件更新后自动通知到相关人员、每周自动提醒项目负责人更新进展、定期将数据备份到指定的文件夹等。
- 事件触发:通过设置指定操作事件作为触发动作。一般适用于需要对文件和文件夹进行数据新建或变更操作的场景,也就是需要选择目标文件夹,并且在该文件夹下进行上传、复制、移动或删除的动作时,再执行下一步的操作。示例:新建文件夹时,自动将文件夹名称添加到编目模板。
- 手动点击触发:手动触发是在设置完自动化任务后,需要您手动点击运行,才会触发下一步的执行动作。与事件触发不同,手动触发的对象是针对已经上传至某个文件夹下的文件或文件夹,再手动点击,去进行下一步需要执行的动作,主要用于数据同步的场景。示例:在文档中心里存放了一个创建时间过长的文件夹,可设置自动归档到另一个指定的文件夹。
›执行条件:
执行条件也叫逻辑动作,只有满足这个条件,才会往下执行设定的操作。通俗来讲就是数据流中的分支,分支的运行顺序是按照分支从左往右依次匹配,只有当前分支条件满足后,才执行该分支的流程。一个分支操作内的所有分支执行完毕后,再执行分支外的操作。上述案例中,“审核通过→文件上传”可以理解为该流程中的分支1,而如果审核不通过,则可以删除该文件,那么“审核不通过→删除文件”就是分支2。
›执行动作:
执行动作是指前面的触发动作发生,判断条件满足以后,自动让数据流执行指定的操作事件,例如上述案例中,满足审核通过的条件后,“文档在知识库上传发布”就是让数据流执行的操作。
创建数据流方式
- 从空白新建
- 从模版新建
从空白创建数据流步骤
›基本步骤如下:选择数据源→设置执行操作→设置触发器
- 第1步:点击数据流>管道>新建,进入数据流新建页面。

- 第2步:选择数据源,配置需要具体进行处理的数据,可选择非结构化数据、结构化数据、数据视图、填写表单触发和算子数据源。

说明:
-
- 算子数据源:通过选择合适的算子,支持从 RESTful API 获取数据。
-
第3步:设置执行操作
执行操作是指当触发器事件发生后,用于处理数据或传递数据而执行实际的具体操作。同一个流程中,您可以设置一个或多个执行操作。

说明:当选择执行操作的时候,可增加操作节点、设置分支及循环

-
- 分支:
-
-
- 分支默认添加两条。满足分支条件走其中一条。分支可设置“且”或者“或”关系。
- 可执行多条分支,不设置分支条件,走所有分支路径。
- 分支默认添加两条。满足分支条件走其中一条。分支可设置“且”或者“或”关系。
-
-
- 循环:设置循环体内节点,循环输出,循环输出结果可后续使用。
- 第4步:详细设置,以选择执行操作-大模型为例,进行介绍。
-
- 基础设置:针对具体的执行动作设置的必要的配置内容
- 高级设置:节点高级配置功能支持用户在创建或编辑数据流节点时,自主设定超时时间、重试次数及重试间隔等运行策略。通过该功能,用户可根据数据量级和业务重要性个性化调整节点运行规则,避免因默认配置僵化导致的流程失败,同时清晰感知节点重试状态,提升数据任务执行的稳定性和透明度。

-
-
- 功能应用场景
大数据量企业客户:当数据处理任务因数据量大、耗时久,默认超时时间(1800 秒)导致流程频繁失败时,可延长节点超时时间,并搭配合理重试策略,确保任务完整执行。
小数据量企业客户:数据处理任务耗时短,无需等待默认超时时间,可缩短超时时间,快速获取任务结果,提升操作效率。 -
- 功能应用场景
-
- 第5步:设置触发器
触发器可以理解为“当某事件发生时,执行后续操作“中的某事件。在一个自动化工作流程中,触发器是自动化流程的开端,一个流程只能配置一个触发器,且不能删除。

说明:当数据源为数据视图时,触发方式支持定时触发和手动触发。
从模版新建数据流步骤
›前提条件
- 构建相关业务知识网络
- 构建将运用到的算子
›基本步骤
- 选择业务知识网络→设置执行操作→设置触发器
›从模版创建场景示例
下文以构建数据流【创建用户时自动更新业务知识网络】为例,进行阐述。此数据流的主要目的是为了实现,当有用户新增时,自动提取用户的相关信息,进行业务知识网络更新。
- 第1步:点击数据流>管道>新建,进入管道新建页面,选择从模版新建

- 第4步:点击更新业务知识网络,选择【AS默认图谱_1】;选择的实体信息,图谱内所有实体信息会显示出来,根据需要进行选择;添加文档库-对文件储存的文件夹进行配置;

- 第5步:引入已经配置好的算子,点击确定;

- 第6步:工作流创建成功;

- 第7步:验证-查看是否工作流新建好后,运行成功。新建用户:点击信息安全管理>用户>用户管理>新建用户;用户新建好后,回到数据流页面,点击运行统计。

- 第8步:选择执行动作-通过此算子输出所有简历内容

- 第9步:工作流应用。此工作流创建好后,会自动运行,当有新增用户时,即更新业务知识网络;当用户在进行超级助手提问时,可以应用最新的相关信息进行问题回答。
重点+难点步骤详解
新建-从空白创建
选择数据源-数据视图

说明:
- 数据视图获取数据的两种方式:
- 全量获取:基于用户设置的时间范围,获取数据视图内全部数据
- 增量获取:基于用户设置的增量字段以及过滤规则来获取视图内的增量数据。其核心逻辑是:通过记录增量字段的最新值,在后续操作中仅获取大于该值的增量数据,并通过自定义SQL的条件进一步进行数据过滤。
- 增量字段的初始值:只有数据型字段可以被设置为增量字段,初始值可根据实际情况进行设置。如果设置为0,代表“初始状态”或“零值起点”,适用于需从零计数的场景(如版本控制、状态标记)。
- 当数据视图的查询语言为DSL时,获取数据的方式不显示,默认为全量
- 过滤规则:支持自定义筛选SQL语句WHERE条件。
- 示例: "分区" = to_char( (CURRENT_DATE - INTERVAL '1 day') ,'yyyymmdd' ) and "区域" = '275'
- 批大小:单次处理的最大数据量阈值。系统会按批次循环处理数据,直到所有数据被覆盖。输入范围为1000-10000。
- 数据预览:数据源配置好后,可进行数据预览。如下图所示:

选择数据源-算子数据源

说明:
- 使用算子数据源首先需要在行动配置好相关算子,配置好的数据源算子会自动加载
- 算子数据源的算子增加数据源标识字段, 仅该标识的算子允许在数据源节点使用
- 数据源算子必须是同步算子
- 数据源节点仅允许使用全局变量,如 用户token
选择执行操作
.
› 写入业务知识网络
可对内置实体包括文件、用户、组织进行写入知识网络操作;也可以对自定义实体写入指定的业务知识网络。
选择业务知识网络和对应的实体类。属性值可以是变量或者自定义的值。

›Python代码执行
通过Python脚本执行复杂逻辑的自动化操作,例如:数据处理等
- 输入变量:string、int、array、object类型可选择,可以是之前节点对应类型的变量,或者是自定义的值

- 输出变量:可输出string、int、array、object类型

- 编写代码示例,输出用return,不可print。
说明:已经内置的库如下图,如果需要导入外部库,参考在数据流中使用Python节点。

›JSON
用与处理JSON数据的自动化操作。
1.根据Key获取值
- 核心作用:数据提取器。从复杂的 JSON 对象中精准挖取所需的值。
- 工作流程:输入一个 Key→ 在输入的 JSON 中查找 → 输出对应的 Value


2.根据Key设置值
- 核心作用:数据编辑器/构造器。修改或创建一个新的 JSON 对象。
- 工作流程:输入一个 JSON → 指定要设置或修改的 Key 和 Value → 输出一个全新的、修改后的 JSON


3.模板转换
- 核心作用:数据格式化渲染器。将 JSON 数据填充到一个预定义的文本模板中,生成最终所需的文本格式。
- 工作流程:输入一个 JSON 数据源 + 一个包含占位符的文本模板 → 引擎将数据注入模板 → 输出渲染后的完整文本。


›AI能力:
通过运用大模型、小模型、智能体来进行自动化操作。

说明:
1.大模型

- 大模型添加附件:附件非必填,可添加多个
- 文件类型:
- 图片类,默认选中
- 视频类,视觉模型限制只能选择一个视频,当有任意一个附件已选择视频类时,其他附件的视频类选择需灰化处理,悬停灰化出现文字提示灰化原因:视觉模型只支持选择一个视频
- 文件:默认通过直接选择文件或选择文件变量的形式,可切换为输入url或选择url变量的形式
- 文件类型:
-
-
- 当选择”直接选择文件或选择文件变量的形式“,可指定文件的版本,非必须指定;当选择”输入url或选择url变量的形式“,无需指定文件版本
-
- 节点输出:
- answer: 文本
- json:从文本中解析的 json 对象
2.小模型-Embedding

- 小模型-Embedding 模型作用
核心任务:将任何类型的信息(文本、图像、音频等)转换为数值形式的向量(即一组数字)。这个向量试图在数学空间中捕捉原始信息的语义含义。
典型应用:
检索(Retrieval):向量数据库搜索、语义搜索、推荐系统。用户输入一个问题,系统将其转换为向量,然后在数据库中查找最相近的向量(即最相关的内容)。
聚类(Clustering):将内容相似的文本自动分组。
分类(Classification):作为特征输入给分类器。
去重(Deduplication):通过计算向量相似度来发现重复或高度相似的内容。
- 配置及输出说明
-
input: 文本数组,待embedding 内容,至少包含一项
- 输出:data:embedding 结果数组,和 input 一一对应
-
3.小模型-Rerank模型

- 小模型-Rerank模型作用
核心任务:对一组已经初步筛选出的候选结果(例如,通过Embedding模型检索到的Top-100个文档)进行精细化重新排序,选出最相关的那几个。
典型应用:
检索增强生成(RAG)系统:在向量检索后,对Top-K个结果进行重排序,将最相关的几个上下文提供给大模型,以生成更准确的答案。
搜索引擎:对初步搜索结果进行重新排序,提升用户体验。
- 配置及输出说明
- query:文本,不能为空
- documents:候选文本数组,至少包含一项
- 输出:
- results: rerank 结果
- documents:按 relevance_score 排序后的文本数组
›写入索引库

说明:
- 请提前创建好索引库,索引库创建路径:多模态数据湖 >索引 >索引库管理
- 索引节点:选择索引库,索引内容是json格式,可以是引用数据源变量或json字符串
›SQL写入

- SQL写入节点源数据:支持array或者json,可以输入或者选择变量
- 批量写入大小:批量写入的批次大小
- 连接类型:数据连接类型,支持mariadb ,mysql ,dameng, postgresql, sqlserver, oracle,人大金仓
- 连接名称:数据连接名称
- 目标表:新建目标表后续需要自定义表名,添加字段映射(如果“源数据“是json类型,可以自动映射自动,不需要手动输入表字段)
- 选择已有目标表:已有表名和字段映射不可更改
- 表名:符合数据库表名添加规范
- 表概述:非必填
- 字段映射:源数据是json类型,可以自动映射,不需要手动输入表字段。可以手动维护表字段
›文档库写入
- 新建文件:支持docx文件、xlsx文件、pdf文件、markdown文件
- 更新文件:支持docx文件、xlsx文件、markdown文件

›内容处理

- 提取文件中的所有文本
- version: 文件版本,选填,默认最新版本
- 支持格式:.docx .dotx .dot .doc .odt .wps .docm .dotm .dwg .xlsx .xlsm .xlsb .xls .et .xla .xlam .xltm .xltx .xlt .ods .csv .pptx .ppt .pot .pps .ppsx .dps .potm .ppsm .potx .pptm .odp .pdf .txt .html .md .markdown
- 节点输出结果:
- 提取结果(文本)
- 提取结果(url)
注意:复制url下载文件后,需要编辑文件名称加.txt后缀,才可以正常查看
- 提取图片中的所有文本
- version: 文件版本,选填,默认最新版本
注意:安装dataflow大包,需要配置ocr,具体安装配置信息,可参考ADP更新指导。
- 提取音频中的所有文本
- version: 文件版本,选填,默认最新版本
-
支持的格式:.mp3 .wav .m4a .mp4
- 节点输出结果:提取结果(文本)
注意:安装dataflow大包,需要配置音视频服务,具体安装配置信息,可参考ADP更新指导。
- 文件转PDF
- version: 文件版本,选填,默认最新版本
-
支持的格式:.txt .cpp .h .go .py .cs .php .ini .yaml .css .doc .docx .odt .dotx .wps .dotm .docm .dot .ott .xls .xlsx .xlsb .xlsm .et .xltx .xltm .xml .xlt .csv .ods .ppt .pptx .odp .potm .potx .pps .ppsm .ppsx .pptm .dps .pot .pdf .ofd .jpg .jpeg .png .gif .bmp .tif .tiff .ai .psd .psb .heic
- 节点输出结果:url: 下载链接
注意:复制url下载文件后,需要编辑文件名称加.pdf后缀,才可以正常使用该pdf文件
- 文件解析
- 文件:可选文件或变量。选择框下方显示支持的音频格式:支持PDF格式的文件
- version:非必填,默认获取文件最新版本。可输入版本号或选择变量
- 解析配置:
- 单选,默认选中:切片且向量化,选中时需指定一个向量模型(不能为空),下拉框点击后展示系统内已配置的向量模型。切换到其他选项无需配置
- 节点输出:
- 解析结果(PDF 内容列表)
- 解析结果(Markdown)
- 解析结果(切片和Embedding)
- 解析结果(切片)
注意:安装dataflow大包,需要配置文档结构化解析服务,具体安装配置信息,可参考ADP更新指导。
›文本处理

- 文本分割
-
- 核心功能:将单个文本按规则拆分成多个片段
- 典型应用:长文档分块、日志文件解析、数据预处理
- 分割方式:支持基于分隔符的自动分割


- 文本合并
-
- 核心功能:将多个文本片段合并为完整文档
- 典型应用:多源数据整合、分片文档重组、结果汇总
- 输出形式:生成单一连贯的文本内容


- 提取文本内容
-
- 核心功能:从文本中定位并抽取值信息
- 典型应用:实体识别、关键信息捕获、数据清洗
- 提取方式:基于规则或模式的智能提取


›变量
支持自定义变量和变量更新。

- 变量定义
-
- 核心功能:支持通过此节点显式定义变量,可选择 string、number、object、array 四种类型,设置与类型匹配的初始值
- 典型应用:工作流执行前提前配置变量基础信息,为后续节点提供可引用的初始变量,解决无显式变量定义的痛点
- 输出形式:变量存储后的具体值

说明:支持对变量定义的编辑,如下图所示

- 变量赋值
-
- 核心功能:支持通过此节点对已定义变量(含嵌套字段)进行赋值修改
- 典型应用:工作流运行中根据业务逻辑动态更新变量值,灵活调整变量内容以适配分支、循环等复杂流程需求
- 输出形式:变量修改结果
- 核心功能:支持通过此节点对已定义变量(含嵌套字段)进行赋值修改

变量语法规范
部分文本框支持变量输入, 包括JSON、AI能力、写入索引库;以下说明为进行变量输入的语法规范。变量输入框示例如下图:

- 基本语法
变量使用双花括号 {{}} 包裹,例如:{{__1}} - 变量命名规则
1.变量包含全局变量和节点输出变量,全局变量命名规则为 __g_var, 例如 __g_authorization
2.节点输出变量名称为 __ID, 例如 __1 - 访问链语法
使用点号 . 访问对象属性或数组元素:
{{__1.outputs}}
{{__2.输出}} // 属性名支持中文
{{ __3.result }} // 忽略花括号内的首尾空格
- 特殊字符处理
对于包含特殊字符(如点号、空格)的属性名,使用引号包裹:
1.单引号 '
2.双引号 "
示例:
{{__0.fields.input.'a.b'}} // 包含点号
{{__0.fields.input.'a b'}} // 包含空格
- 数字处理
解析器会自动识别纯数字为数字类型:
{{__1.array.0}} // 数字0
{{__1.object.'0'}} // 字符串"0"
- 三花括号转义
使用三花括号 {{{}}} 表示原始文本内容:
{{{text}}} // 输出 "text"
{{{{text}}}} // 输出 "{text}"
{{{{{text}}}}} // 输出 "{{text}}"
- 未定义处理
{{undefined}} // 未定义变量返回 null
{{defined.undefined}} // 未定义属性返回 null - 文本拼接
花括号外的内容(包括空格)会保留:
{{var1}} {{var2}} {{var3}} // 保留前后空格
数据流导入导出
数据流导入
点击算子管理>导入,进入数据流导入页面。

说明:
- 需导入本地文件
- 仅支持JSON格式
- 单次仅支持导入1个文件,大小不超过20M
- 需具备新建权限
- 若导入时包含算子,但此算子在系统内并不存在,则可导入成功,导入后的数据流配置中该算子节点为空
- 若导入的过程中,发现数据流名称与已有数据流同名,则自动重命名,名称后缀加(1)、(2)、(3)……以此类推
数据流导出
点击数据流,进入数据流管理页面,勾选预导出的数据流, 点击导出即可。如下图所示:

说明:
- 数据流导出仅支持单选
- 用户需具备流程查看权限
- 导出为json/zip格式文件
- 若导出的数据流中包含算子,但用户无算子的导出权限,则导出的数据流中该算子节点为空
数据流查看
运行统计
勾选目标数据流,点击查看>运行统计,即可查看目标数据流的运行统计信息。

点击查看日志,即可查看该运行目标的详细日志信息。

在搜索框中输入运行目标,即可查看具体运行目标的相关信息。
版本信息
勾选目标数据流,点击查看>版本信息,即可查看目标数据流的版本相关信息。

点击查看,可跳转至目标数据流的编辑页面。
