深入学习和理解LangChain Runnables

深入学习和理解LangChain Runnables

📅 2024-10-13 | 🖱️
🔖 aigc

1.LangChain Runnable介绍 #

LangChain的Runnable对象是一种协议(protocol),它简化创建自定义链(chain)的过程。Runable是使用LangChain开发LLM应用程序的开发人员必须学习的一种抽象。

使用Ruanable协议可以将一系列的任务串联起来,将一个调用的输出作为输入提供给下一个,形成一个完整的流程。

Runnable协议通过提供一种标准化的方式使不同组件相互作用,简化了构建复杂工作流程的过程,使得通过一个函数调用,就可以执行一个链(chain)上的操作。

一个Runnable对象由以下部分组成:

  • 方法:是Runnable对象可以执行的函数。Runnable对象的标准接口包括方法如invokebatchstream及其对应的异步方法ainvokeabatchastream。这些方法允许我们定义自定义链,并且可以以标准方式调用它们。
  • 输入和输出schema:所有Runnable对象都给出了输入schema和输出schema,允许我们检查并理解Runnable对象期望的输入类型和它产生的输出类型。Runnable对象对象通过input_schema属性、output_schema属性和config_schema方法暴露有关其输入、输出和配置的模式信息。

LangChain可 Runnable对象提供了三种类型的方法来执行和与链交互:

  • invoke, ainvoke:使用单个输入执行可运行对象,通常用于处理单个数据片段时。
  • batch, abatch:允许并行处理多个输入。当有一系列输入并希望同时通过链运行它们时,此方法很有用。
  • stream, astream:将输入数据作为流进行处理,一次处理一个数据片段并提供可用结果。此方法非常适合处理实时数据处理或要增量处理的大型数据集的流式输出。

许多LangChain组件实现了Runnable协议,包括聊天模型chat models、LLMs、输出解析器output parsers、检索器retrievers、提示模板prompt templates等等。

可以使用管道|运算符将实现Runnable协议的组件连接起来,就可以创建一个复杂的自定义链,这让我们可以将LangChain的各个组件链接在一起以构建复杂的流程。

Runnable对象还对并发提供了支持,它的接口包括async方法,可以与asyncio的await语法一起用于并发。这允许我们同时运行多个任务,提高应用程序的效率和性能。

2.Runnable, RunnableLambda和Python Callable #

在理解LangChain的Runnable前,先看一下Python中的Callable

2.1 Python中的Callable #

在Python中,Callable 是一个类型提示,用于表示可以被调用的对象。简单来说,任何可以调用的对象都是 Callable,包括函数、方法、类的实例(如果类定义了 __call__ 方法),甚至内置函数。

  • 函数:Python 中的函数和方法都是 Callable,因为它们可以通过 () 来调用。

    1 def greet():
    2     print("Hello!")
    3
    4
    5 # greet 是一个 Callable
    6 greet()
    7 print(callable(greet))  # True
    
  1. 类和对象:如果一个类实现了 __call__ 方法,那么它的实例也会成为 Callable 对象。

    1 class Greeter:
    2     def __call__(self):
    3         print("Hello from instance!")
    4
    5     # 这个类的实例也是一个 Callable
    6
    7 greeter = Greeter()
    8 greeter()
    9 assert callable(greeter)
    
  2. 类型提示中的 Callable:在类型提示中,Callable 可以用来标注任何可以被调用的对象。通常,Callable 会用作类型注解,尤其是在函数的参数或返回值类型中。

     1 from collections.abc import Callable
     2
     3 def execute_function(func: Callable[[], None]) -> None:
     4     func()
     5
     6 def say_hello():
     7     print("Hello!")
     8
     9 # say_hello 是一个 Callable
    10 execute_function(say_hello)
    11 assert callable(say_hello)
    

    在上面的例子中,Callable[[], None] 表示一个不接受任何参数并且返回 None 的函数。

  3. collections.abc.Callable[[参数类型1, 参数类型2], 返回类型] 可以指定函数的参数和返回类型。

     1from collections.abc import Callable
     2
     3def add(x: int, y: int) -> int:
     4    return x + y
     5
     6def operate(func: Callable[[int, int], int], a: int, b: int) -> int:
     7    return func(a, b)
     8
     9# add 是一个 Callable
    10result = operate(add, 5, 3)  
    11print(result)
    12assert callable(add) # 8
    
  4. lambda表达式创建匿名函数也是Callable对象

    1from collections.abc import Callable
    2
    3def execute_function(func: Callable[[], None]) -> None:
    4    func()
    5
    6# 使用 lambda 表达式,lambda 也是一个 Callable
    7execute_function(lambda: print("Hello from lambda!"))
    

2.2 使Python的Callable成为LangChain的Runnable #

在 LangChain中,Runnable可以是任何Python中的Callable对象,但是,不能直接传递这些Callable对象,需要将它们包装在一个RunnableLambda对象中。

RunnableLambda将Python Callable对象转换为LangChain Runnable。

将Callable对象包装在RunnableLambda中使得该Callable对象可以在同步或异步上下文中使用。

RunnableLambda可以像其他Runnable对象一样进行组合,并提供与LangChain tracing的无缝集成。

RunnableLambda最适合不需要支持流式处理的代码。如果需要支持流式处理(即能够对输入块进行操作并产生输出块),需要使用RunnableGenerator
 1from langchain_core.runnables import RunnableLambda
 2
 3
 4def add_one(x: int) -> int:
 5    return x + 1
 6
 7
 8async def add_one_async(x: int) -> int:
 9    return x + 1
10
11
12async def main():
13    runnable = RunnableLambda(add_one)
14
15    print(runnable.invoke(1))  # 2
16    print(runnable.batch([1, 2, 3]))  # [2, 3, 4]
17
18    # Async is supported by default by delegating to the sync implementation
19    print(await runnable.ainvoke(1))  # 2
20    print(await runnable.abatch([1, 2, 3]))  # [2, 3, 4]
21
22    # Alternatively, can provide both synd and async implementations
23    runnable = RunnableLambda(add_one, afunc=add_one_async)
24    print(runnable.invoke(1))  # Uses add_one
25    print(await runnable.ainvoke(1))  # Uses add_one_async
26
27
28import asyncio
29
30if __name__ == "__main__":
31    asyncio.run(main())

从上面的示例可以看出可以从Callable对象、异步Callable对象或两者一起创建RunnableLambda。

2.3 使用@chain装饰器创建Runnable对象 #

还可以通过@chain装饰器将任意函数转换为链。这在功能上等同于用RunnableLambda包装。

下面的示例,使用@chain装饰器创建一个自定义链,它结合了多个组件,例如提示、模型和输出解析器:

 1from langchain_core.output_parsers import StrOutputParser
 2from langchain_core.prompts import ChatPromptTemplate
 3from langchain_core.runnables import chain
 4from langchain_openai import ChatOpenAI
 5
 6prompt1 = ChatPromptTemplate.from_template("Tell me a joke about {topic}")
 7prompt2 = ChatPromptTemplate.from_template("What is the subject of this joke: {joke}")
 8
 9@chain
10def custom_chain(text):
11    prompt_val1 = prompt1.invoke({"topic": text})
12    output1 = ChatOpenAI().invoke(prompt_val1)
13    parsed_output1 = StrOutputParser().invoke(output1)
14    chain2 = prompt2 | ChatOpenAI() | StrOutputParser()
15    return chain2.invoke({"joke": parsed_output1})

custom_chain 现在是一个可运行对象,这意味可以对其使用 invoke

1custom_chain.invoke("bears")

3.RunnableSequence和"|“运算符 #

RunnableSequence是LangChain中最重要的组合运算符,因为它几乎在每个链中都会使用。

RunnableSequence可以直接实例化,或者更常见的是通过使用|运算符,其中左操作数或右操作数(或两者)必须是一个Runnable对象。

所有RunnableSequence对象都支持同步sync、异步async和批处理batch。batchabatch的默认实现是利用线程池和asyncio gather,对于IO密集型的可运行对象来说,会比单纯调用invokeainvoke更快。

batch是通过按顺序调用RunnableSequence的每个组件的batch方法来实现的。

RunnableSequence本身也是一个Runnable行对象,可以继续被管道化。

下面是一个RunnableSequence的基本使用示例:

 1from langchain_core.runnables import RunnableLambda
 2from langchain_core.runnables import RunnableSequence
 3
 4
 5def add_one(x: int) -> int:
 6    return x + 1
 7
 8
 9def mul_two(x: int) -> int:
10    return x * 2
11
12
13async def main():
14    runnable_1 = RunnableLambda(add_one)
15    runnable_2 = RunnableLambda(mul_two)
16    sequence = runnable_1 | runnable_2
17    # sequence = RunnableSequence(first=runnable_1, last=runnable_2)
18    print(sequence.invoke(1))  # 4
19    print(await sequence.ainvoke(1))  # 4
20
21    print(sequence.batch([1, 2, 3]))
22    print(await sequence.abatch([1, 2, 3]))
23
24
25import asyncio
26
27if __name__ == "__main__":
28    asyncio.run(main())

这里有一个使用LLM生成的JSON输出流的示例:

 1from langchain_core.prompts import PromptTemplate
 2from langchain_core.output_parsers.json import SimpleJsonOutputParser
 3from langchain_openai import ChatOpenAI
 4
 5
 6async def main():
 7    prompt = PromptTemplate.from_template(
 8        "In JSON format, give me a list of {topic} and their "
 9        "corresponding names in French, Spanish and in a "
10        "Cat Language."
11    )
12
13    model = ChatOpenAI(model="gpt-4o-mini")
14    chain = prompt | model | SimpleJsonOutputParser()
15
16    async for chunk in chain.astream({"topic": "colors"}):
17        print("-")  # noqa: T201
18        print(chunk, sep="", flush=True)  # noqa: T201
19
20
21import asyncio
22
23if __name__ == "__main__":
24    asyncio.run(main())

“|“运算符是如何实现的 #

LangChain 中的Runnable对象是通过实现__or__运算符来支持|语法的。这在Runnable的源码中可以找到:

 1class Runnable(Generic[Input, Output], ABC):
 2...
 3    def __or__(
 4        self,
 5        other: Union[
 6            Runnable[Any, Other],
 7            Callable[[Any], Other],
 8            Callable[[Iterator[Any]], Iterator[Other]],
 9            Mapping[str, Union[Runnable[Any, Other], Callable[[Any], Other], Any]],
10        ],
11    ) -> RunnableSerializable[Input, Other]:
12        """Compose this Runnable with another object to create a RunnableSequence."""
13        return RunnableSequence(self, coerce_to_runnable(other))
14...

4.RunnableParallel #

RunnableParallel并行运行Runnables对象的字典,并返回其输出的字典。

RunnableParallel是LCEL的两个主要组合原语之一,另一个是前面介绍的RunnableSequence。

RunnableParallel并行调用Runnables对象,并为每个Runnables对象提供相同的输入。RunnableParallel可以直接实例化,也可以通过在RunnableSequence中使用字典字面量来实例化。RunnableSequence中的Mapping[str, Runnable[Input, Any]]会被转换为一个RunnableParallel。

 1from langchain_core.runnables import RunnableLambda
 2
 3
 4def add_one(x: int) -> int:
 5    return x + 1
 6
 7
 8def mul_two(x: int) -> int:
 9    return x * 2
10
11
12def mul_three(x: int) -> int:
13    return x * 3
14
15
16async def main():
17    runnable_1 = RunnableLambda(add_one)
18    runnable_2 = RunnableLambda(mul_two)
19    runnable_3 = RunnableLambda(mul_three)
20
21    sequence = runnable_1 | {  # this dict is coerced to a RunnableParallel
22        "mul_two": runnable_2,
23        "mul_three": runnable_3,
24    }
25    # Or equivalently:
26    # sequence = runnable_1 | RunnableParallel(
27    #     {"mul_two": runnable_2, "mul_three": runnable_3}
28    # )
29    # Also equivalently:
30    # sequence = runnable_1 | RunnableParallel(
31    #     mul_two=runnable_2,
32    #     mul_three=runnable_3,
33    # )
34
35    print(sequence.invoke(1))  # {'mul_two': 4, 'mul_three': 6}
36    print(await sequence.ainvoke(1))  # {'mul_two': 4, 'mul_three': 6}
37
38    print(
39        sequence.batch([1, 2, 3])
40    )  # [{'mul_two': 4, 'mul_three': 6}, {'mul_two': 6, 'mul_three': 9}, {'mul_two': 8, 'mul_three': 12}]
41    print(
42        await sequence.abatch([1, 2, 3])
43    )  # [{'mul_two': 4, 'mul_three': 6}, {'mul_two': 6, 'mul_three': 9}, {'mul_two': 8, 'mul_three': 12}]
44
45
46import asyncio
47
48if __name__ == "__main__":
49    asyncio.run(main())

RunnableParallel使并行Runnable变得简单。在下面的示例中,我们同时从两个不同的Runnable流式传输输出:

 1from langchain_core.prompts import ChatPromptTemplate
 2from langchain_core.runnables import RunnableParallel
 3from langchain_openai import ChatOpenAI
 4
 5model = ChatOpenAI(model="gpt-4o-mini")
 6joke_chain = ChatPromptTemplate.from_template("tell me a joke about {topic}") | model
 7poem_chain = (
 8    ChatPromptTemplate.from_template("write a 2-line poem about {topic}") | model
 9)
10
11runnable = RunnableParallel(joke=joke_chain, poem=poem_chain)
12
13# Display stream
14output = {key: "" for key, _ in runnable.output_schema()}
15print(output)
16for chunk in runnable.stream({"topic": "bear"}):
17    for key in chunk:
18        output[key] = output[key] + chunk[key].content
19    print(output)  # noqa: T201

5.RunnablePassthrough #

RunnablePassthrough也是一个Runnable对象,它不会改变输入,但可以在输入是字典时,向输出添加额外的键。它经常与RunnableParallel一起使用,这允许我们在保持原始输入不变的同时添加一些额外信息。

示例1:

1from langchain_core.runnables import RunnablePassthrough, RunnableParallel
2
3
4runnable = RunnableParallel(
5    {"context": lambda c: c.upper(), "question": RunnablePassthrough()}
6)
7
8print(runnable.invoke("what?"))  # {'context': 'WHAT?', 'question': 'what?'}

示例2:

 1from langchain_core.runnables import (
 2    RunnableLambda,
 3    RunnableParallel,
 4    RunnablePassthrough,
 5)
 6
 7runnable = RunnableParallel(origin=RunnablePassthrough(), modified=lambda x: x + 1)
 8
 9print(runnable.invoke(1))  # {'origin': 1, 'modified': 2}
10
11
12def fake_llm(prompt: str) -> str:  # Fake LLM for the example
13    return "completion"
14
15
16chain = RunnableLambda(fake_llm) | {
17    "original": RunnablePassthrough(),  # Original LLM output
18    "parsed": lambda text: text[::-1],  # Parsing logic
19}
20
21print(chain.invoke("hello"))  # {'original': 'completion', 'parsed': 'noitelpmoc'}

在某些情况下,可能需要在将输入传递的同时向输出添加一些键。在这种情况下,您可以使用 assign 方法:

 1from langchain_core.runnables import RunnablePassthrough
 2
 3
 4def fake_llm(prompt: str) -> str:  # Fake LLM for the example
 5    return "completion"
 6
 7
 8runnable = {
 9    "llm1": fake_llm,
10    "llm2": fake_llm,
11} | RunnablePassthrough.assign(
12    total_chars=lambda inputs: len(inputs["llm1"] + inputs["llm2"])
13)
14
15print(
16    runnable.invoke("hello")
17)  # {'llm1': 'completion', 'llm2': 'completion', 'total_chars': 20}

理解下面的代码:

 1...
 2# Retrieve and generate using the relevant snippets of the blog.
 3retriever = vectorstore.as_retriever()
 4prompt = hub.pull("rlm/rag-prompt")
 5# print(prompt)
 6
 7def format_docs(docs):
 8    return "\n\n".join(doc.page_content for doc in docs)
 9from langchain_core.runnables import  RunnableParallel
10rag_chain = (
11    RunnableParallel({"context": retriever | format_docs, "question": RunnablePassthrough()})
12    | prompt
13    | llm
14    | StrOutputParser()
15)
16
17rag_chain.invoke("What is Task Decomposition?")

上面代码中prompt = hub.pull("rlm/rag-prompt")期望的输入是一个包含"context"和"question"键的字典,rag_chain中的RunnableParallel会从链的输入"What is Task Decomposition?"计算出这个字典。

6.RunnableGenerator #

RunnableGenerator用于运行生成器函数。RunnableGenerator可以直接实例化,或者通过在RunnableSequence中使用生成器来实例化。

RunnableGenerator可以用来实现自定义行为,比如自定义输出解析器,同时保持流式处理能力。给定一个签名为Iterator[A] -> Iterator[B]的生成器函数,将其包装在RunnableGenerator中可以让它在从上一步流式传入输出块时立即发出这些块。

注意,如果生成器函数的签名为A -> Iterator[B],即它需要上一步的输入完成才能发出块(例如,大多数LLM需要整个提示可用才能开始生成),那么它可以被包装在RunnableLambda中。

这里有一个例子来展示RunnableGenerator的基本机制:

 1from typing import Any, AsyncIterator, Iterator
 2
 3from langchain_core.runnables import RunnableGenerator
 4
 5
 6def gen(input: Iterator[Any]) -> Iterator[str]:
 7    for token in ["Have", " a", " nice", " day"]:
 8        yield token
 9
10
11# Async version:
12async def agen(input: AsyncIterator[Any]) -> AsyncIterator[str]:
13    for token in ["Have", " a", " nice", " day"]:
14        yield token
15
16
17async def main():
18    runnable = RunnableGenerator(gen)
19    runnable.invoke(None)  # "Have a nice day"
20    list(runnable.stream(None))  # ["Have", " a", " nice", " day"]
21    runnable.batch([None, None])  # ["Have a nice day", "Have a nice day"]
22
23    runnable = RunnableGenerator(agen)
24    await runnable.ainvoke(None)  # "Have a nice day"
25    [p async for p in runnable.astream(None)]  # ["Have", " a", " nice", " day"]
26
27
28import asyncio
29
30if __name__ == "__main__":
31    asyncio.run(main())

RunnableGenerator使得在流式上下文中实现自定义行为变得容易:

 1from langchain_core.prompts import ChatPromptTemplate
 2from langchain_core.runnables import RunnableGenerator, RunnableLambda
 3from langchain_openai import ChatOpenAI
 4from langchain_core.output_parsers import StrOutputParser
 5from typing import Iterator
 6
 7
 8model = ChatOpenAI(model="gpt-4o-mini")
 9chant_chain = (
10    ChatPromptTemplate.from_template("Give me a 3 word chant about {topic}")
11    | model
12    | StrOutputParser()
13)
14
15
16def character_generator(input: Iterator[str]) -> Iterator[str]:
17    for token in input:
18        if "," in token or "." in token:
19            yield "👏" + token
20        else:
21            yield token
22
23
24runnable = chant_chain | character_generator
25assert type(runnable.last) is RunnableGenerator
26print("".join(runnable.stream({"topic": "waste"})))  # Reduce👏, Reuse👏, Recycle👏.
27
28
29# Note that RunnableLambda can be used to delay streaming of one step in a
30# sequence until the previous step is finished:
31def reverse_generator(input: str) -> Iterator[str]:
32    # Yield characters of input in reverse order.
33    for character in input[::-1]:
34        yield character
35
36
37runnable = chant_chain | RunnableLambda(reverse_generator)
38print("".join(runnable.stream({"topic": "waste"})))  # ".elcycer ,esuer ,ecudeR"

参考 #

© 2025 青蛙小白 | 总访问量 | 总访客数