Skip to content

Commit 6eb3399

Browse files
committed
fix(ttstream): metrics missing caused by server-side rpcinfo not set correctly
1 parent b3ca38d commit 6eb3399

File tree

3 files changed

+41
-43
lines changed

3 files changed

+41
-43
lines changed

pkg/remote/trans/ttstream/server_handler.go

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,14 @@ import (
2525
"runtime/debug"
2626
"sync"
2727

28+
"github.com/bytedance/gopkg/cloud/metainfo"
29+
2830
"github.com/cloudwego/kitex/pkg/endpoint"
2931
"github.com/cloudwego/kitex/pkg/gofunc"
3032
"github.com/cloudwego/kitex/pkg/kerrors"
3133
"github.com/cloudwego/kitex/pkg/klog"
3234
"github.com/cloudwego/kitex/pkg/remote"
35+
"github.com/cloudwego/kitex/pkg/remote/trans/ttstream/ktx"
3336
"github.com/cloudwego/kitex/pkg/rpcinfo"
3437
"github.com/cloudwego/kitex/pkg/streaming"
3538
ktransport "github.com/cloudwego/kitex/transport"
@@ -118,7 +121,7 @@ func (t *svrTransHandler) OnRead(ctx context.Context, conn net.Conn) (err error)
118121
}()
119122
// connection level goroutine
120123
for {
121-
nctx, ss, nerr := t.provider.OnStream(ctx, conn)
124+
nctx, st, nerr := t.provider.OnStream(ctx, conn)
122125
if nerr != nil {
123126
if errors.Is(nerr, io.EOF) {
124127
return nil
@@ -130,7 +133,7 @@ func (t *svrTransHandler) OnRead(ctx context.Context, conn net.Conn) (err error)
130133
// stream level goroutine
131134
gofunc.GoFunc(nctx, func() {
132135
defer wg.Done()
133-
err := t.OnStream(nctx, conn, ss)
136+
err := t.OnStream(nctx, conn, st)
134137
if err != nil && !errors.Is(err, io.EOF) {
135138
t.OnError(nctx, err, conn)
136139
}
@@ -142,7 +145,7 @@ func (t *svrTransHandler) OnRead(ctx context.Context, conn net.Conn) (err error)
142145
// - create server stream
143146
// - process server stream
144147
// - close server stream
145-
func (t *svrTransHandler) OnStream(ctx context.Context, conn net.Conn, ss streaming.ServerStream) (err error) {
148+
func (t *svrTransHandler) OnStream(ctx context.Context, conn net.Conn, st *stream) (err error) {
146149
ri := t.opt.InitOrResetRPCInfoFunc(nil, conn.RemoteAddr())
147150
ctx = rpcinfo.NewCtxWithRPCInfo(ctx, ri)
148151
defer func() {
@@ -153,23 +156,35 @@ func (t *svrTransHandler) OnStream(ctx context.Context, conn net.Conn, ss stream
153156
}()
154157

155158
ink := ri.Invocation().(rpcinfo.InvocationSetter)
156-
if si, ok := ss.(StreamInfo); ok {
157-
sinfo := t.opt.SvcSearcher.SearchService(si.Service(), si.Method(), false)
158-
if sinfo == nil {
159-
return remote.NewTransErrorWithMsg(remote.UnknownService, fmt.Sprintf("unknown service %s", si.Service()))
160-
}
161-
minfo := sinfo.MethodInfo(si.Method())
162-
if minfo == nil {
163-
return remote.NewTransErrorWithMsg(remote.UnknownMethod, fmt.Sprintf("unknown method %s", si.Method()))
164-
}
165-
ink.SetServiceName(sinfo.ServiceName)
166-
ink.SetMethodName(si.Method())
167-
ink.SetStreamingMode(minfo.StreamingMode())
168-
if mutableTo := rpcinfo.AsMutableEndpointInfo(ri.To()); mutableTo != nil {
169-
_ = mutableTo.SetMethod(si.Method())
159+
sinfo := t.opt.SvcSearcher.SearchService(st.Service(), st.Method(), false)
160+
if sinfo == nil {
161+
return remote.NewTransErrorWithMsg(remote.UnknownService, fmt.Sprintf("unknown service %s", st.Service()))
162+
}
163+
minfo := sinfo.MethodInfo(st.Method())
164+
if minfo == nil {
165+
return remote.NewTransErrorWithMsg(remote.UnknownMethod, fmt.Sprintf("unknown method %s", st.Method()))
166+
}
167+
ink.SetServiceName(sinfo.ServiceName)
168+
ink.SetMethodName(st.Method())
169+
ink.SetStreamingMode(minfo.StreamingMode())
170+
if mutableTo := rpcinfo.AsMutableEndpointInfo(ri.To()); mutableTo != nil {
171+
_ = mutableTo.SetMethod(st.Method())
172+
}
173+
rpcinfo.AsMutableRPCConfig(ri.Config()).SetTransportProtocol(st.TransportProtocol())
174+
// headerHandler return a new stream level ctx
175+
if t.provider.headerHandler != nil {
176+
ctx, err = t.provider.headerHandler.OnReadStream(ctx, st.meta, st.header)
177+
if err != nil {
178+
return err
170179
}
171-
rpcinfo.AsMutableRPCConfig(ri.Config()).SetTransportProtocol(si.TransportProtocol())
172180
}
181+
// register metainfo into ctx
182+
ctx = metainfo.SetMetaInfoFromMap(ctx, st.header)
183+
ss := newServerStream(st)
184+
185+
// cancel ctx when OnStreamFinish
186+
ctx, cancelFunc := ktx.WithCancel(ctx)
187+
ctx = context.WithValue(ctx, serverStreamCancelCtxKey{}, cancelFunc)
173188

174189
ctx = t.startTracer(ctx, ri)
175190
defer func() {

pkg/remote/trans/ttstream/server_handler_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,9 @@ func TestOnStream(t *testing.T) {
137137
test.Assert(t, err == nil, err)
138138
err = wbuf.Flush()
139139
test.Assert(t, err == nil, err)
140-
nctx, ss, err := transHdl.provider.OnStream(ctx, mockConn)
140+
nctx, st, err := transHdl.provider.OnStream(ctx, mockConn)
141141
test.Assert(t, err == nil, err)
142-
err = transHdl.OnStream(nctx, mockConn, ss)
142+
err = transHdl.OnStream(nctx, mockConn, st)
143143
test.Assert(t, err == nil, err)
144144
})
145145

@@ -162,9 +162,9 @@ func TestOnStream(t *testing.T) {
162162
test.Assert(t, err == nil, err)
163163
err = wbuf.Flush()
164164
test.Assert(t, err == nil, err)
165-
nctx, ss, err := transHdl.provider.OnStream(ctx, mockConn)
165+
nctx, st, err := transHdl.provider.OnStream(ctx, mockConn)
166166
test.Assert(t, err == nil, err)
167-
err = transHdl.OnStream(nctx, mockConn, ss)
167+
err = transHdl.OnStream(nctx, mockConn, st)
168168
test.Assert(t, errors.Is(err, kerrors.ErrPanic), err)
169169
transHdl.OnError(ctx, err, mockConn)
170170
})
@@ -192,9 +192,9 @@ func TestOnStream(t *testing.T) {
192192
test.Assert(t, err == nil, err)
193193
err = wbuf.Flush()
194194
test.Assert(t, err == nil, err)
195-
nctx, ss, err := transHdl.provider.OnStream(ctx, mockConn)
195+
nctx, st, err := transHdl.provider.OnStream(ctx, mockConn)
196196
test.Assert(t, err == nil, err)
197-
err = transHdl.OnStream(nctx, mockConn, ss)
197+
err = transHdl.OnStream(nctx, mockConn, st)
198198
test.Assert(t, err == nil, err)
199199
})
200200
}

pkg/remote/trans/ttstream/server_provider.go

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,12 @@ import (
2222
"net"
2323
"strconv"
2424

25-
"github.com/bytedance/gopkg/cloud/metainfo"
2625
"github.com/cloudwego/gopkg/protocol/thrift"
2726
"github.com/cloudwego/gopkg/protocol/ttheader"
2827
"github.com/cloudwego/netpoll"
2928

3029
"github.com/cloudwego/kitex/pkg/kerrors"
3130
"github.com/cloudwego/kitex/pkg/remote"
32-
"github.com/cloudwego/kitex/pkg/remote/trans/ttstream/ktx"
3331
"github.com/cloudwego/kitex/pkg/streaming"
3432
"github.com/cloudwego/kitex/pkg/utils"
3533
)
@@ -86,7 +84,7 @@ func (s *serverProvider) OnInactive(ctx context.Context, conn net.Conn) (context
8684
return ctx, nil
8785
}
8886

89-
func (s *serverProvider) OnStream(ctx context.Context, conn net.Conn) (context.Context, streaming.ServerStream, error) {
87+
func (s *serverProvider) OnStream(ctx context.Context, conn net.Conn) (context.Context, *stream, error) {
9088
trans, _ := ctx.Value(serverTransCtxKey{}).(*transport)
9189
if trans == nil {
9290
return nil, nil, fmt.Errorf("server transport is nil")
@@ -98,22 +96,7 @@ func (s *serverProvider) OnStream(ctx context.Context, conn net.Conn) (context.C
9896
return nil, nil, err
9997
}
10098
st.setMetaFrameHandler(s.metaHandler)
101-
102-
// headerHandler return a new stream level ctx
103-
if s.headerHandler != nil {
104-
ctx, err = s.headerHandler.OnReadStream(ctx, st.meta, st.header)
105-
if err != nil {
106-
return nil, nil, err
107-
}
108-
}
109-
// register metainfo into ctx
110-
ctx = metainfo.SetMetaInfoFromMap(ctx, st.header)
111-
ss := newServerStream(st)
112-
113-
// cancel ctx when OnStreamFinish
114-
ctx, cancelFunc := ktx.WithCancel(ctx)
115-
ctx = context.WithValue(ctx, serverStreamCancelCtxKey{}, cancelFunc)
116-
return ctx, ss, nil
99+
return ctx, st, nil
117100
}
118101

119102
func (s *serverProvider) OnStreamFinish(ctx context.Context, ss streaming.ServerStream, err error) (context.Context, error) {

0 commit comments

Comments
 (0)