-
Notifications
You must be signed in to change notification settings - Fork 4.5k
grpc: Fix cardinality violations in non-server streaming RPCs #8278
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
f06ff56
1546427
ca4860a
c3ca09b
bbd8366
1a23bb7
f37ea78
9436092
11da0dc
7808f8a
ad4ce90
a21fcc4
8107444
6deac72
643998d
f85707b
4e5075b
b2ab207
85ab6ed
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1134,6 +1134,10 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { | |
if statusErr := a.transportStream.Status().Err(); statusErr != nil { | ||
return statusErr | ||
} | ||
// received no msg and status ok for non-server streaming rpcs. | ||
if !cs.desc.ServerStreams { | ||
arjan-bal marked this conversation as resolved.
Show resolved
Hide resolved
arjan-bal marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return status.Errorf(codes.Internal, "client streaming cardinality violation") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's be more specific about what happened here so that a user could understand what's happening when they see the error.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see a change here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Pranjali-2501 bump. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
} | ||
return io.EOF // indicates successful end of stream. | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3588,9 +3588,6 @@ func testClientStreamingError(t *testing.T, e env) { | |
// Tests that a client receives a cardinality violation error for client-streaming | ||
// RPCs if the server doesn't send a message before returning status OK. | ||
func (s) TestClientStreamingCardinalityViolation_ServerHandlerMissingSendAndClose(t *testing.T) { | ||
// TODO : https://github.com/grpc/grpc-go/issues/8119 - remove `t.Skip()` | ||
// after this is fixed. | ||
t.Skip() | ||
ss := &stubserver.StubServer{ | ||
StreamingInputCallF: func(_ testgrpc.TestService_StreamingInputCallServer) error { | ||
// Returning status OK without sending a response message.This is a | ||
|
@@ -3739,6 +3736,87 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) { | |
} | ||
} | ||
|
||
// Tests that a client receives a cardinality violation error for unary | ||
// RPCs if the server doesn't send a message before returning status OK. | ||
func (s) TestUnaryRPC_ServerSendsOnlyTrailersWithOK(t *testing.T) { | ||
lis, err := testutils.LocalTCPListener() | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
defer lis.Close() | ||
|
||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) | ||
defer cancel() | ||
cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) | ||
if err != nil { | ||
t.Fatalf("grpc.NewClient(%s) = %v", lis.Addr().String(), err) | ||
dfawley marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
defer cc.Close() | ||
|
||
go func() { | ||
conn, err := lis.Accept() | ||
if err != nil { | ||
t.Errorf("lis.Accept() = %v", err) | ||
dfawley marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return | ||
} | ||
defer conn.Close() | ||
framer := http2.NewFramer(conn, conn) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Realistically this test shouldn't need low level framer access. You should be able to implement it with a I do think that would be preferable, because the framing stuff is tedious and error prone, and it makes the test much more complex. If you're having too much trouble with that approach, let me know and I can try to help, or if it somehow turns out to be a lot more work than I was expecting, feel free to push back. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Initially, I attempted to resolve this by returning nil from EmptyCall. However, this approach resulted in an empty, successful response rather than a failure, which wasn't the desired outcome. The scenario that sends no response cannot be replicated within grpc-go itself, as the framework requires a response to be sent. This test aims to verify behavior in a cross-language context, where a unary client might interact with a unary server implemented in a different language, potentially allowing the server to send no response. Please let me know if that is not the case or I overlook something. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think @dfawley is saying that we can register a streaming handler for a unary method on the server. The client will call srv := grpc.NewServer()
serviceDesc := grpc.ServiceDesc{
ServiceName: "grpc.testing.TestService",
HandlerType: (*any)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "EmptyCall",
Handler: func(any, grpc.ServerStream) error {
return nil
},
ClientStreams: true,
},
},
Metadata: "grpc/testing/test.proto",
}
srv.RegisterService(&serviceDesc, &testServer{})
go srv.Serve(lis)
defer srv.Stop() There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for clarifying. I had made the changes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another option (maybe easier, but no need to change now) is to use the unknown service handler, and don't register anything on the server: https://pkg.go.dev/google.golang.org/grpc#UnknownServiceHandler There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The idea is to make the server think it's a bidi stream, but the client thinks it's not. The client's setting is what the code snippet you are showing is checking. The client has no knowledge of how the server is treating the stream. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Pranjali-2501 did you try to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Made the changes. |
||
|
||
if _, err := io.ReadFull(conn, make([]byte, len(clientPreface))); err != nil { | ||
t.Errorf("Error reading client preface: %v", err) | ||
dfawley marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return | ||
} | ||
if err := framer.WriteSettings(); err != nil { | ||
t.Errorf("Error writing server settings: %v", err) | ||
return | ||
} | ||
if err := framer.WriteSettingsAck(); err != nil { | ||
t.Errorf("Error writing settings ack: %v", err) | ||
return | ||
} | ||
|
||
for ctx.Err() == nil { | ||
frame, err := framer.ReadFrame() | ||
if err != nil { | ||
t.Errorf("Error reading frame: %v", err) | ||
return | ||
} | ||
|
||
switch frame := frame.(type) { | ||
case *http2.HeadersFrame: | ||
if frame.Header().StreamID != 1 { | ||
t.Errorf("Expected stream ID 1, got %d", frame.Header().StreamID) | ||
return | ||
} | ||
|
||
var buf bytes.Buffer | ||
enc := hpack.NewEncoder(&buf) | ||
_ = enc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"}) | ||
_ = enc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"}) | ||
_ = enc.WriteField(hpack.HeaderField{Name: "grpc-status", Value: "0"}) | ||
dfawley marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if err := framer.WriteHeaders(http2.HeadersFrameParam{ | ||
StreamID: 1, | ||
BlockFragment: buf.Bytes(), | ||
EndHeaders: true, | ||
EndStream: true, | ||
}); err != nil { | ||
t.Errorf("Error while writing headers: %v", err) | ||
return | ||
} | ||
time.Sleep(50 * time.Millisecond) | ||
default: | ||
t.Logf("Server received frame: %v", frame) | ||
} | ||
} | ||
}() | ||
|
||
client := testgrpc.NewTestServiceClient(cc) | ||
if _, err = client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Internal { | ||
t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) | ||
} | ||
} | ||
|
||
func (s) TestExceedMaxStreamsLimit(t *testing.T) { | ||
for _, e := range listTestEnv() { | ||
testExceedMaxStreamsLimit(t, e) | ||
|
Uh oh!
There was an error while loading. Please reload this page.