16
16
*/
17
17
package com .alipay .sofa .rpc .interceptor ;
18
18
19
+ import com .alipay .common .tracer .core .context .trace .SofaTraceContext ;
20
+ import com .alipay .common .tracer .core .holder .SofaTraceContextHolder ;
21
+ import com .alipay .common .tracer .core .span .SofaTracerSpan ;
19
22
import com .alipay .sofa .rpc .config .ConsumerConfig ;
23
+ import com .alipay .sofa .rpc .context .RpcInternalContext ;
20
24
import com .alipay .sofa .rpc .context .RpcInvokeContext ;
21
25
import com .alipay .sofa .rpc .context .RpcRunningState ;
22
26
import com .alipay .sofa .rpc .core .request .SofaRequest ;
27
+ import com .alipay .sofa .rpc .core .response .SofaResponse ;
28
+ import com .alipay .sofa .rpc .event .ClientAsyncReceiveEvent ;
29
+ import com .alipay .sofa .rpc .event .EventBus ;
23
30
import com .alipay .sofa .rpc .server .triple .TripleContants ;
24
31
import com .alipay .sofa .rpc .tracer .sofatracer .TripleTracerAdapter ;
32
+ import com .alipay .sofa .rpc .utils .TripleExceptionUtils ;
25
33
import io .grpc .CallOptions ;
26
34
import io .grpc .Channel ;
27
35
import io .grpc .ClientCall ;
@@ -59,12 +67,15 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT
59
67
60
68
@ Override
61
69
public void start (Listener <RespT > responseListener , Metadata requestHeader ) {
62
-
70
+ RpcInternalContext internalContext = RpcInternalContext . getContext ();
63
71
RpcInvokeContext context = RpcInvokeContext .getContext ();
64
72
SofaRequest sofaRequest = (SofaRequest ) context .get (TripleContants .SOFA_REQUEST_KEY );
65
73
66
- ConsumerConfig consumerConfig = (ConsumerConfig ) context .get (TripleContants .SOFA_CONSUMER_CONFIG_KEY );
67
- TripleTracerAdapter .beforeSend (sofaRequest , consumerConfig , requestHeader );
74
+ ConsumerConfig <?> consumerConfig = (ConsumerConfig <?>) context
75
+ .get (TripleContants .SOFA_CONSUMER_CONFIG_KEY );
76
+ TripleTracerAdapter .beforeSend (sofaRequest , consumerConfig , requestHeader , method );
77
+ SofaTraceContext sofaTraceContext = SofaTraceContextHolder .getSofaTraceContext ();
78
+ SofaTracerSpan clientSpan = sofaTraceContext .getCurrentSpan ();
68
79
if (RpcRunningState .isDebugMode ()) {
69
80
LOGGER .info ("[2]prepare to send from client:{}" , requestHeader );
70
81
}
@@ -80,18 +91,48 @@ public void onHeaders(Metadata responseHeader) {
80
91
81
92
@ Override
82
93
public void onMessage (RespT message ) {
83
- if (RpcRunningState .isDebugMode ()) {
84
- LOGGER .info ("[4]response message received from server:{}" , message );
94
+ // onMessage -> onNext()
95
+ try {
96
+ if (sofaRequest .isAsync ()) {
97
+ RpcInvokeContext .setContext (context );
98
+ sofaTraceContext .push (clientSpan );
99
+ }
100
+ if (RpcRunningState .isDebugMode ()) {
101
+ LOGGER .info ("[4]response message received from server:{}" , message );
102
+ }
103
+ super .onMessage (message );
104
+ } finally {
105
+ if (sofaRequest .isAsync ()) {
106
+ sofaTraceContext .clear ();
107
+ RpcInvokeContext .removeContext ();
108
+ }
85
109
}
86
- super .onMessage (message );
87
110
}
88
111
89
112
@ Override
90
113
public void onClose (Status status , Metadata trailers ) {
91
- if (RpcRunningState .isDebugMode ()) {
92
- LOGGER .info ("[5]response close received from server:{},trailers:{}" , status , trailers );
114
+ // onClose -> onComplete() or onError()
115
+ try {
116
+ if (sofaRequest .isAsync ()) {
117
+ RpcInvokeContext .setContext (context );
118
+ sofaTraceContext .push (clientSpan );
119
+ }
120
+ if (RpcRunningState .isDebugMode ()) {
121
+ LOGGER .info ("[5]response close received from server:{},trailers:{}" , status , trailers );
122
+ }
123
+ super .onClose (status , trailers );
124
+ } finally {
125
+ if (sofaRequest .isAsync ()) {
126
+ Throwable throwable = TripleExceptionUtils .getThrowableFromStatus (status );
127
+ RpcInternalContext .setContext (internalContext );
128
+ if (EventBus .isEnable (ClientAsyncReceiveEvent .class )) {
129
+ EventBus .post (new ClientAsyncReceiveEvent (consumerConfig , null ,
130
+ sofaRequest , new SofaResponse (), throwable ));
131
+ }
132
+ RpcInvokeContext .removeContext ();
133
+ RpcInternalContext .removeAllContext ();
134
+ }
93
135
}
94
- super .onClose (status , trailers );
95
136
}
96
137
97
138
@ Override
@@ -104,6 +145,15 @@ public void onReady() {
104
145
}, requestHeader );
105
146
}
106
147
148
+ @ Override
149
+ public void sendMessage (ReqT message ) {
150
+ try {
151
+ super .sendMessage (message );
152
+ } catch (Throwable t ) {
153
+ LOGGER .error ("Client invoke grpc sendMessage meet error:" , t );
154
+ throw t ;
155
+ }
156
+ }
107
157
};
108
158
}
109
159
}
0 commit comments