Skip to content

Commit 6827adc

Browse files
kexiongliu123kl943
andauthored
[2/2][Integrate Transports with Tracing Interceptor]Re-Design tracing instrumentation in yarpc-go (#2362)
Co-authored-by: Kexiong Liu <[email protected]>
1 parent d2c6ad1 commit 6827adc

22 files changed

+1517
-111
lines changed

internal/interceptor/interceptortest/outbound.go

+645
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/interceptor/outbound.go

+150-4
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,48 @@
2121
package interceptor
2222

2323
import (
24-
"go.uber.org/yarpc/api/middleware"
24+
"context"
25+
26+
"go.uber.org/yarpc/api/transport"
2527
)
2628

2729
type (
30+
// UnaryOutboundChain defines the interface for a chain of unary outbound requests.
31+
// It provides methods to invoke the next outbound in the chain with the given context
32+
// and request, and to retrieve the outbound component of the chain.
33+
//
34+
// Next: Executes the next outbound request in the chain with the provided context and request,
35+
// returning the response and any error encountered during the process.
36+
// Outbound: Retrieves the outbound component of the chain, allowing for further inspection or manipulation.
37+
UnaryOutboundChain interface {
38+
Next(ctx context.Context, request *transport.Request) (*transport.Response, error)
39+
Outbound() transport.Outbound
40+
}
41+
42+
// OnewayOutboundChain defines the interface for a chain of one-way outbound requests.
43+
// It provides methods to invoke the next outbound in the chain with the given context
44+
// and request, and to retrieve the outbound component of the chain.
45+
//
46+
// Next: Executes the next one-way outbound request in the chain with the provided context and request,
47+
// returning an acknowledgment and any error encountered during the process.
48+
// Outbound: Retrieves the outbound component of the chain, allowing for further inspection or manipulation.
49+
OnewayOutboundChain interface {
50+
Next(ctx context.Context, request *transport.Request) (transport.Ack, error)
51+
Outbound() transport.Outbound
52+
}
53+
54+
// StreamOutboundChain defines the interface for a chain of streaming outbound requests.
55+
// It provides methods to invoke the next outbound in the chain with the given context
56+
// and request, and to retrieve the outbound component of the chain.
57+
//
58+
// Next: Executes the next streaming outbound request in the chain with the provided context and request,
59+
// returning a client stream and any error encountered during the process.
60+
// Outbound: Retrieves the outbound component of the chain, allowing for further inspection or manipulation.
61+
StreamOutboundChain interface {
62+
Next(ctx context.Context, request *transport.StreamRequest) (*transport.ClientStream, error)
63+
Outbound() transport.Outbound
64+
}
65+
2866
// UnaryOutbound defines transport interceptor for `UnaryOutbound`s.
2967
//
3068
// UnaryOutbound interceptor MAY do zero or more of the following: change the
@@ -36,7 +74,9 @@ type (
3674
//
3775
// UnaryOutbound interceptor is re-used across requests and MAY be called
3876
// multiple times on the same request.
39-
UnaryOutbound = middleware.UnaryOutbound
77+
UnaryOutbound interface {
78+
Call(ctx context.Context, request *transport.Request, out UnaryOutboundChain) (*transport.Response, error)
79+
}
4080

4181
// OnewayOutbound defines transport interceptor for `OnewayOutbound`s.
4282
//
@@ -49,7 +89,9 @@ type (
4989
//
5090
// OnewayOutbound interceptor is re-used across requests and MAY be called
5191
// multiple times on the same request.
52-
OnewayOutbound = middleware.OnewayOutbound
92+
OnewayOutbound interface {
93+
CallOneway(ctx context.Context, request *transport.Request, out OnewayOutboundChain) (transport.Ack, error)
94+
}
5395

5496
// StreamOutbound defines transport interceptor for `StreamOutbound`s.
5597
//
@@ -62,5 +104,109 @@ type (
62104
//
63105
// StreamOutbound interceptors is re-used across requests and MAY be called
64106
// multiple times on the same request.
65-
StreamOutbound = middleware.StreamOutbound
107+
StreamOutbound interface {
108+
CallStream(ctx context.Context, req *transport.StreamRequest, out StreamOutboundChain) (*transport.ClientStream, error)
109+
}
66110
)
111+
112+
// DirectUnaryOutbound is a transport that knows how to send unary requests for procedure
113+
// calls.
114+
type DirectUnaryOutbound interface {
115+
transport.Outbound
116+
117+
// DirectCall is called without interceptor.
118+
DirectCall(ctx context.Context, request *transport.Request) (*transport.Response, error)
119+
}
120+
121+
// DirectOnewayOutbound defines a transport outbound for oneway requests
122+
// that does not involve any interceptors.
123+
type DirectOnewayOutbound interface {
124+
transport.Outbound
125+
126+
// DirectCallOneway is called without interceptor.
127+
DirectCallOneway(ctx context.Context, request *transport.Request) (transport.Ack, error)
128+
}
129+
130+
// DirectStreamOutbound defines a transport outbound for streaming requests
131+
// that does not involve any interceptors.
132+
type DirectStreamOutbound interface {
133+
transport.Outbound
134+
135+
// DirectCallStream is called without interceptor.
136+
DirectCallStream(ctx context.Context, req *transport.StreamRequest) (*transport.ClientStream, error)
137+
}
138+
139+
type nopUnaryOutbound struct{}
140+
141+
func (nopUnaryOutbound) Call(ctx context.Context, request *transport.Request, out UnaryOutboundChain) (*transport.Response, error) {
142+
return out.Next(ctx, request)
143+
}
144+
145+
// NopUnaryOutbound is a unary outbound middleware that does not do
146+
// anything special. It simply calls the underlying UnaryOutbound.
147+
var NopUnaryOutbound UnaryOutbound = nopUnaryOutbound{}
148+
149+
type nopOnewayOutbound struct{}
150+
151+
func (nopOnewayOutbound) CallOneway(ctx context.Context, request *transport.Request, out OnewayOutboundChain) (transport.Ack, error) {
152+
return out.Next(ctx, request)
153+
}
154+
155+
// NopOnewayOutbound is an oneway outbound middleware that does not do
156+
// anything special. It simply calls the underlying OnewayOutbound.
157+
var NopOnewayOutbound OnewayOutbound = nopOnewayOutbound{}
158+
159+
type nopStreamOutbound struct{}
160+
161+
func (nopStreamOutbound) CallStream(ctx context.Context, requestMeta *transport.StreamRequest, out StreamOutboundChain) (*transport.ClientStream, error) {
162+
return out.Next(ctx, requestMeta)
163+
}
164+
165+
// NopStreamOutbound is a stream outbound middleware that does not do
166+
// anything special. It simply calls the underlying StreamOutbound.
167+
var NopStreamOutbound StreamOutbound = nopStreamOutbound{}
168+
169+
// ApplyUnaryOutbound applies the given UnaryOutbound interceptor to the given DirectUnaryOutbound transport.
170+
func ApplyUnaryOutbound(uo UnaryOutboundChain, i UnaryOutbound) transport.UnaryOutbound {
171+
return unaryOutboundWithInterceptor{uo: uo, i: i}
172+
}
173+
174+
// ApplyOnewayOutbound applies the given OnewayOutbound interceptor to the given DirectOnewayOutbound transport.
175+
func ApplyOnewayOutbound(oo OnewayOutboundChain, i OnewayOutbound) transport.OnewayOutbound {
176+
return onewayOutboundWithInterceptor{oo: oo, i: i}
177+
}
178+
179+
// ApplyStreamOutbound applies the given StreamOutbound interceptor to the given DirectStreamOutbound transport.
180+
func ApplyStreamOutbound(so StreamOutboundChain, i StreamOutbound) transport.StreamOutbound {
181+
return streamOutboundWithInterceptor{so: so, i: i}
182+
}
183+
184+
type unaryOutboundWithInterceptor struct {
185+
transport.Outbound
186+
uo UnaryOutboundChain
187+
i UnaryOutbound
188+
}
189+
190+
func (uoc unaryOutboundWithInterceptor) Call(ctx context.Context, request *transport.Request) (*transport.Response, error) {
191+
return uoc.i.Call(ctx, request, uoc.uo)
192+
}
193+
194+
type onewayOutboundWithInterceptor struct {
195+
transport.Outbound
196+
oo OnewayOutboundChain
197+
i OnewayOutbound
198+
}
199+
200+
func (ooc onewayOutboundWithInterceptor) CallOneway(ctx context.Context, request *transport.Request) (transport.Ack, error) {
201+
return ooc.i.CallOneway(ctx, request, ooc.oo)
202+
}
203+
204+
type streamOutboundWithInterceptor struct {
205+
transport.Outbound
206+
so StreamOutboundChain
207+
i StreamOutbound
208+
}
209+
210+
func (soc streamOutboundWithInterceptor) CallStream(ctx context.Context, requestMeta *transport.StreamRequest) (*transport.ClientStream, error) {
211+
return soc.i.CallStream(ctx, requestMeta, soc.so)
212+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
// Copyright (c) 2025 Uber Technologies, Inc.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a copy
4+
// of this software and associated documentation files (the "Software"), to deal
5+
// in the Software without restriction, including without limitation the rights
6+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
// copies of the Software, and to permit persons to whom the Software is
8+
// furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19+
// THE SOFTWARE.
20+
21+
package outboundinterceptor
22+
23+
import (
24+
"context"
25+
26+
"go.uber.org/yarpc/api/transport"
27+
"go.uber.org/yarpc/internal/interceptor"
28+
)
29+
30+
// NewUnaryChain combines a series of `UnaryInbound`s into a single `InboundMiddleware`.
31+
func NewUnaryChain(out interceptor.DirectUnaryOutbound, list []interceptor.UnaryOutbound) interceptor.UnaryOutboundChain {
32+
return unaryChainExec{
33+
Chain: list,
34+
Final: out,
35+
}
36+
}
37+
38+
func (x unaryChainExec) Next(ctx context.Context, request *transport.Request) (*transport.Response, error) {
39+
if len(x.Chain) == 0 {
40+
return x.Final.DirectCall(ctx, request)
41+
}
42+
next := x.Chain[0]
43+
x.Chain = x.Chain[1:]
44+
return next.Call(ctx, request, x)
45+
}
46+
47+
func (x unaryChainExec) Outbound() transport.Outbound {
48+
return x.Final
49+
}
50+
51+
// unaryChainExec adapts a series of `UnaryOutbound`s into a `UnaryOutbound`. It
52+
// is scoped to a single call of a UnaryOutbound and is not thread-safe.
53+
type unaryChainExec struct {
54+
Chain []interceptor.UnaryOutbound
55+
Final interceptor.DirectUnaryOutbound
56+
}
57+
58+
// NewOnewayChain combines a series of `OnewayInbound`s into a single `InboundMiddleware`.
59+
func NewOnewayChain(out interceptor.DirectOnewayOutbound, list []interceptor.OnewayOutbound) interceptor.OnewayOutboundChain {
60+
return onewayChainExec{
61+
Chain: list,
62+
Final: out,
63+
}
64+
}
65+
66+
func (x onewayChainExec) Next(ctx context.Context, request *transport.Request) (transport.Ack, error) {
67+
if len(x.Chain) == 0 {
68+
return x.Final.DirectCallOneway(ctx, request)
69+
}
70+
next := x.Chain[0]
71+
x.Chain = x.Chain[1:]
72+
return next.CallOneway(ctx, request, x)
73+
}
74+
75+
func (x onewayChainExec) Outbound() transport.Outbound {
76+
return x.Final
77+
}
78+
79+
// onewayChainExec adapts a series of `OnewayOutbound`s into a `OnewayOutbound`. It
80+
// is scoped to a single call of a OnewayOutbound and is not thread-safe.
81+
type onewayChainExec struct {
82+
Chain []interceptor.OnewayOutbound
83+
Final interceptor.DirectOnewayOutbound
84+
}
85+
86+
func (x onewayChainExec) DirectCallOneway(ctx context.Context, request *transport.Request) (transport.Ack, error) {
87+
if len(x.Chain) == 0 {
88+
return x.Final.DirectCallOneway(ctx, request)
89+
}
90+
next := x.Chain[0]
91+
x.Chain = x.Chain[1:]
92+
return next.CallOneway(ctx, request, x)
93+
}
94+
95+
// NewStreamChain combines a series of `OnewayInbound`s into a single `InboundMiddleware`.
96+
func NewStreamChain(out interceptor.DirectStreamOutbound, list []interceptor.StreamOutbound) interceptor.StreamOutboundChain {
97+
return streamChainExec{
98+
Chain: list,
99+
Final: out,
100+
}
101+
}
102+
103+
func (x streamChainExec) Next(ctx context.Context, request *transport.StreamRequest) (*transport.ClientStream, error) {
104+
if len(x.Chain) == 0 {
105+
return x.Final.DirectCallStream(ctx, request)
106+
}
107+
next := x.Chain[0]
108+
x.Chain = x.Chain[1:]
109+
return next.CallStream(ctx, request, x)
110+
}
111+
112+
func (x streamChainExec) Outbound() transport.Outbound {
113+
return x.Final
114+
}
115+
116+
// streamChainExec adapts a series of `StreamOutbound`s into a `StreamOutbound`. It
117+
// is scoped to a single call of a StreamOutbound and is not thread-safe.
118+
type streamChainExec struct {
119+
Chain []interceptor.StreamOutbound
120+
Final interceptor.DirectStreamOutbound
121+
}
122+
123+
func (x streamChainExec) DirectCallStream(ctx context.Context, request *transport.StreamRequest) (*transport.ClientStream, error) {
124+
if len(x.Chain) == 0 {
125+
return x.Final.DirectCallStream(ctx, request)
126+
}
127+
next := x.Chain[0]
128+
x.Chain = x.Chain[1:]
129+
return next.CallStream(ctx, request, x)
130+
}

0 commit comments

Comments
 (0)