Skip to content

Commit b65f30d

Browse files
authored
feat(langgraph): deferred nodes (#1210)
2 parents a6f56a1 + 68c6514 commit b65f30d

15 files changed

+731
-78
lines changed

libs/langgraph/src/channels/any_value.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,8 @@ export class AnyValue<Value> extends BaseChannel<Value, Value, Value> {
5252
}
5353
return this.value[0];
5454
}
55+
56+
isAvailable(): boolean {
57+
return this.value.length !== 0;
58+
}
5559
}

libs/langgraph/src/channels/base.ts

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,42 @@ export abstract class BaseChannel<
7070

7171
/**
7272
* Mark the current value of the channel as consumed. By default, no-op.
73-
* This is called by Pregel before the start of the next step, for all
74-
* channels that triggered a node. If the channel was updated, return true.
73+
* A channel can use this method to modify its state, preventing the value
74+
* from being consumed again.
75+
*
76+
* Returns True if the channel was updated, False otherwise.
7577
*/
7678
consume(): boolean {
7779
return false;
7880
}
81+
82+
/**
83+
* Notify the channel that the Pregel run is finishing. By default, no-op.
84+
* A channel can use this method to modify its state, preventing finish.
85+
*
86+
* Returns True if the channel was updated, False otherwise.
87+
*/
88+
finish(): boolean {
89+
return false;
90+
}
91+
92+
/**
93+
* Return True if the channel is available (not empty), False otherwise.
94+
* Subclasses should override this method to provide a more efficient
95+
* implementation than calling get() and catching EmptyChannelError.
96+
*/
97+
isAvailable(): boolean {
98+
try {
99+
this.get();
100+
return true;
101+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
102+
} catch (error: any) {
103+
if (error.name === EmptyChannelError.unminifiable_name) {
104+
return false;
105+
}
106+
throw error;
107+
}
108+
}
79109
}
80110

81111
export function emptyChannels<Cc extends Record<string, BaseChannel>>(

libs/langgraph/src/channels/binop.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,8 @@ export class BinaryOperatorAggregate<
7373
}
7474
return this.value;
7575
}
76+
77+
isAvailable(): boolean {
78+
return this.value !== undefined;
79+
}
7680
}

libs/langgraph/src/channels/dynamic_barrier_value.ts

Lines changed: 119 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,9 @@ export class DynamicBarrierValue<Value> extends BaseChannel<
6666
"Assertion Error: Received unexpected WaitForNames instance."
6767
);
6868
}
69-
if (this.names.has(value)) {
70-
if (!this.seen.has(value)) {
71-
this.seen.add(value);
72-
updated = true;
73-
}
74-
} else {
75-
throw new InvalidUpdateError(
76-
`Value ${value} not in ${[...this.names]}`
77-
);
69+
if (this.names.has(value) && !this.seen.has(value)) {
70+
this.seen.add(value);
71+
updated = true;
7872
}
7973
}
8074
return updated;
@@ -103,4 +97,120 @@ export class DynamicBarrierValue<Value> extends BaseChannel<
10397
checkpoint(): [Value[] | undefined, Value[]] {
10498
return [this.names ? [...this.names] : undefined, [...this.seen]];
10599
}
100+
101+
isAvailable(): boolean {
102+
return !!this.names && areSetsEqual(this.names, this.seen);
103+
}
104+
}
105+
106+
/**
107+
* A channel that switches between two states with an additional finished flag
108+
*
109+
* - in the "priming" state it can't be read from.
110+
* - if it receives a WaitForNames update, it switches to the "waiting" state.
111+
* - in the "waiting" state it collects named values until all are received.
112+
* - once all named values are received, and the finished flag is set, it can be read once, and it switches
113+
* back to the "priming" state.
114+
* @internal
115+
*/
116+
export class DynamicBarrierValueAfterFinish<Value> extends BaseChannel<
117+
void,
118+
Value | WaitForNames<Value>,
119+
[Value[] | undefined, Value[], boolean]
120+
> {
121+
lc_graph_name = "DynamicBarrierValueAfterFinish";
122+
123+
names?: Set<Value>; // Names of nodes that we want to wait for.
124+
125+
seen: Set<Value>;
126+
127+
finished: boolean;
128+
129+
constructor() {
130+
super();
131+
this.names = undefined;
132+
this.seen = new Set<Value>();
133+
this.finished = false;
134+
}
135+
136+
fromCheckpoint(checkpoint?: [Value[] | undefined, Value[], boolean]) {
137+
const empty = new DynamicBarrierValueAfterFinish<Value>();
138+
if (typeof checkpoint !== "undefined") {
139+
const [names, seen, finished] = checkpoint;
140+
empty.names = names ? new Set(names) : undefined;
141+
empty.seen = new Set(seen);
142+
empty.finished = finished;
143+
}
144+
return empty as this;
145+
}
146+
147+
update(values: (Value | WaitForNames<Value>)[]): boolean {
148+
const waitForNames = values.filter(isWaitForNames);
149+
if (waitForNames.length > 0) {
150+
if (waitForNames.length > 1) {
151+
throw new InvalidUpdateError(
152+
"Received multiple WaitForNames updates in the same step."
153+
);
154+
}
155+
this.names = new Set(waitForNames[0].__names);
156+
return true;
157+
} else if (this.names !== undefined) {
158+
let updated = false;
159+
for (const value of values) {
160+
if (isWaitForNames(value)) {
161+
throw new Error(
162+
"Assertion Error: Received unexpected WaitForNames instance."
163+
);
164+
}
165+
if (this.names.has(value) && !this.seen.has(value)) {
166+
this.seen.add(value);
167+
updated = true;
168+
}
169+
}
170+
return updated;
171+
}
172+
return false;
173+
}
174+
175+
consume(): boolean {
176+
if (
177+
this.finished &&
178+
this.seen &&
179+
this.names &&
180+
areSetsEqual(this.seen, this.names)
181+
) {
182+
this.seen = new Set<Value>();
183+
this.names = undefined;
184+
this.finished = false;
185+
return true;
186+
}
187+
return false;
188+
}
189+
190+
finish(): boolean {
191+
if (!this.finished && this.names && areSetsEqual(this.names, this.seen)) {
192+
this.finished = true;
193+
return true;
194+
}
195+
return false;
196+
}
197+
198+
get(): void {
199+
if (!this.finished || !this.names || !areSetsEqual(this.names, this.seen)) {
200+
throw new EmptyChannelError();
201+
}
202+
return undefined;
203+
}
204+
205+
checkpoint(): [Value[] | undefined, Value[], boolean] {
206+
return [
207+
this.names ? [...this.names] : undefined,
208+
[...this.seen],
209+
this.finished,
210+
];
211+
}
212+
213+
isAvailable(): boolean {
214+
return this.finished && !!this.names && areSetsEqual(this.names, this.seen);
215+
}
106216
}

libs/langgraph/src/channels/ephemeral_value.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,8 @@ export class EphemeralValue<Value> extends BaseChannel<Value, Value, Value> {
5757
}
5858
return this.value[0];
5959
}
60+
61+
isAvailable(): boolean {
62+
return this.value.length !== 0;
63+
}
6064
}

libs/langgraph/src/channels/last_value.ts

Lines changed: 77 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,7 @@ export class LastValue<Value> extends BaseChannel<Value, Value, Value> {
3131
if (values.length !== 1) {
3232
throw new InvalidUpdateError(
3333
"LastValue can only receive one value per step.",
34-
{
35-
lc_error_code: "INVALID_CONCURRENT_GRAPH_UPDATE",
36-
}
34+
{ lc_error_code: "INVALID_CONCURRENT_GRAPH_UPDATE" }
3735
);
3836
}
3937

@@ -55,4 +53,80 @@ export class LastValue<Value> extends BaseChannel<Value, Value, Value> {
5553
}
5654
return this.value[0];
5755
}
56+
57+
isAvailable(): boolean {
58+
return this.value.length !== 0;
59+
}
60+
}
61+
62+
/**
63+
* Stores the last value received, but only made available after finish().
64+
* Once made available, clears the value.
65+
* @internal
66+
*/
67+
export class LastValueAfterFinish<Value> extends BaseChannel<
68+
Value,
69+
Value,
70+
[Value, boolean]
71+
> {
72+
lc_graph_name = "LastValueAfterFinish";
73+
74+
// value is an array so we don't misinterpret an update to undefined as no write
75+
value: [Value] | [] = [];
76+
77+
finished: boolean = false;
78+
79+
fromCheckpoint(checkpoint?: [Value, boolean]) {
80+
const empty = new LastValueAfterFinish<Value>();
81+
if (typeof checkpoint !== "undefined") {
82+
const [value, finished] = checkpoint;
83+
empty.value = [value];
84+
empty.finished = finished;
85+
}
86+
return empty as this;
87+
}
88+
89+
update(values: Value[]): boolean {
90+
if (values.length === 0) {
91+
return false;
92+
}
93+
94+
this.finished = false;
95+
// eslint-disable-next-line prefer-destructuring
96+
this.value = [values[values.length - 1]];
97+
return true;
98+
}
99+
100+
get(): Value {
101+
if (this.value.length === 0 || !this.finished) {
102+
throw new EmptyChannelError();
103+
}
104+
return this.value[0];
105+
}
106+
107+
checkpoint(): [Value, boolean] | undefined {
108+
if (this.value.length === 0) return undefined;
109+
return [this.value[0], this.finished];
110+
}
111+
112+
consume(): boolean {
113+
if (this.finished) {
114+
this.finished = false;
115+
this.value = [];
116+
return true;
117+
}
118+
return false;
119+
}
120+
121+
finish(): boolean {
122+
if (!this.finished && this.value.length > 0) {
123+
this.finished = true;
124+
return true;
125+
}
126+
return false;
127+
}
128+
129+
isAvailable(): boolean {
130+
return this.value.length !== 0 && this.finished;
131+
}
58132
}

0 commit comments

Comments
 (0)