在 Python 中构建生成式 AI 处理器

liftword3个月前 (02-09)技术文章56

为什么不为 Apache NiFi 2.0.0 创建一个 Python 处理器?在本教程中,了解这样做的挑战是容易还是困难。

当我开始做这件事时,那是一个下雪天。我看到了 IBM WatsonX Python SDK,并意识到我需要连接我的 Gen AI 模型 (LLM),以便从 Slack 发送我的上下文增强提示。为什么不为 Apache NiFi 2.0.0 创建一个 Python 处理器?我想这并不难。这很容易!

IBM WatsonXAI 有大量强大的基础模型可供您选择,只是不要选择那些 v1 模型,因为它们将在几个月内被删除。

  • GitHub,IBM/watsonxdata-python-sdk:用于 wastonx.data Python SDK。

在我们选择了一个模型后,我在 WatsonX 的 Prompt Lab 中对其进行了测试。然后我把它移植到一个简单的 Python 程序中。一旦成功,我就开始添加属性和转换方法等功能。就是这样。

源代码

这是源代码的链接。

现在,我们可以将新的 LLM 调用处理器放入流中,并将其用作任何其他内置处理器。例如,Python API 要求 Python 3.9+ 在托管 NiFi 的计算机上可用。

包级依赖项

添加到requirements.txt。

Python 处理器的基本格式

您需要从库中导入各种内容。然后,设置您的类 .您需要包含类定义,其中包括 NiFi 、 、 a 和 一些 .
nifiapiCallWatsonXAIJavaProcessDetailsversiondependenciesdescriptiontags

class ProcessorDetails:
        version = '0.0.1-SNAPSHOT',
        dependencies = ['pandas']


定义处理器的所有属性


您需要为包含 、 、 等内容的每个属性设置 s。
PropertyDescriptornamedescriptionrequiredvalidatorsexpression_language_scope

Transform Main 方法

在这里,我们包括所需的导入。您可以通过 访问属性。然后,您可以设置输出的属性,如 所示。然后,我们设置流文件输出。最后,对于所有指南来说,哪个指南是 。您应该添加一些内容来处理错误。我需要补充一下。
context.getPropertyattributescontentsrelationshipsuccess

如果需要,请重新部署、调试或修复某些内容。

虽然您可以在 NiFi 停止时删除整个目录,但这样做可能会导致 NiFi 下次启动所需的时间要长得多,因为它必须从 PyPI 获取所有扩展的依赖项,并扩展所有 Java 扩展的 NAR 文件。work

  • 请参阅:NiFi Python 开发人员指南

因此,要部署它,我们只需要将 Python 文件复制到
nifi-2.0.0/python/extensions
目录,并可能重新启动您的 NiFi 服务器。我会开始使用本地 GitHub 构建或 Docker 在您的笔记本电脑上进行本地开发。

现在我们已经编写了一个处理器,让我们在实时流数据管道应用程序中使用它。

应用实例

基于我们之前接收 Slack 消息的应用程序,我们将获取这些 Slack 查询,将它们发送到 PineCone 或 Chroma 向量数据库,并获取该上下文并将其与我们对 IBM 的 WatsonX AI REST API for Generative AI (LLM) 的调用一起发送。

您可以在此处找到之前的详细信息:

  • 使用生成式 AI 构建实时 Slackbot
  • 具有 Chroma Vector DB 和 Apache NiFi 的无代码生成式 AI 管道
  • 使用 Apache NiFi 流式处理 LLM (HuggingFace)
  • 使用实时上下文增强和丰富 LLM


NiFi 流

  1. Listen HTTP:在端口 9518/slack 上;NiFi 是一个通用的 REST 端点
  2. QueryRecord:JSON 清理
  3. SplitJSON: $.*
  4. EvalJSONPath:$.inputs 的输出属性
  5. QueryChroma使用 ONNX 模型在端口 9776 上调用服务器,导出 25 行
  6. QueryRecordJSON->JSON;限制 1
  7. SplitRecordJSON->JSON;成 1 行
  8. EvalJSONPath从中导出上下文 $.document
  9. ReplaceText将上下文设置为新的流文件
  10. UpdateAttribute更新输入
  11. CallWatsonX:我们调用 IBM 的 Python 处理器
  12. SplitRecord1 条记录,JSON -> JSON
  13. EvalJSONPath添加属性
  14. AttributesToJSON从属性创建新的 Flow 文件
  15. QueryRecord:验证 JSON
  16. UpdateRecord:添加生成的文本、输入、ts、UUID
  17. Kafka 路径,:将结果发送到 Kafka。PublishKafkaRecord_2_6
  18. Kafka 路径:如果 Apache Kafka 发送失败,请重试。RetryFlowFile
  19. 松弛路径, :拆分为 1 条记录进行显示。SplitRecord
  20. 松弛路径,:拉出要显示的字段。EvaluateJSONPath
  21. Slack 路径, :将格式化的消息发送到 #chat 群组。PutSlack

这是一个利用 ChromaDB 的成熟检索增强生成 (RAG) 应用程序。(NiFi 流也可以使用松果。接下来,我正在开发 Milvus、SOLR 和 OpenSearch。

享受将 Python 代码添加到分布式 NiFi 应用程序是多么容易。


原文标题:Building a Generative AI Processor in Python

原文链接:
https://dzone.com/articles/building-a-generative-ai-processor-in-python

作者:Tim Spann

编译:LCR

相关文章

C# 中调用 Python 代码的实现方式

一、引言在软件开发中,经常会遇到需要结合不同编程语言优势的场景。C# 和 Python 作为两种广泛应用的语言,各自在某些领域有着独特的优势。C# 以其高性能和强大的类型系统在企业级应用中占据重要地位...

PySpark源码解析,用Python调用高效Scala接口,搞定大规模数据分析

机器之心专栏作者:汇量科技-陈绪相较于Scala语言而言,Python具有其独有的优势及广泛应用性,因此Spark也推出了PySpark,在框架上提供了利用Python语言的接口,为数据科学家使用该框...

python之requests介绍和案例_python怎么安装requests库

requests功能: 用于发送 HTTP 请求,适合与 REST API 交互。实际案例:调用 API 获取数据: 从监控系统 API 获取服务器状态。import requestsdef get_...

[Python] FastAPI基础:Query查询参数用法全面解析与实例

1. HTTP 请求格式在开始写 Restful API 前,先简单的了解一下常规 HTTP 请求的格式,以便后续遇到问题知道从哪里排查,各部分的含义。下面是一个常规的 POST 请求的例子,各部分含...

如何用python语言调用身份证实名认证接口

  身份证实名认证接口是一种在线服务,允许企业和开发者通过API集成来验证用户提供的身份信息是否真实有效。这类接口通常通过第三方平台来提供,如翔云API,实时对接权威数据源,以确保验证结果的准确性。而...

7 招教你轻松搭建以图搜图系统_以图搜图开源项目

作者 | 小龙责编 | 胡巍巍当您听到“以图搜图”时,是否首先想到了百度、Google 等搜索引擎的以图搜图功能呢?事实上,您完全可以搭建一个属于自己的以图搜图系统:自己建立图片库;自己选择一张图片到...