Skip to content

grpc: Fix cardinality violations in non-server streaming RPCs #8278

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Jun 11, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1134,6 +1134,10 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
if statusErr := a.transportStream.Status().Err(); statusErr != nil {
return statusErr
}
// received no msg and status ok for non-server streaming rpcs.
if !cs.desc.ServerStreams {
return status.Errorf(codes.Internal, "client streaming cardinality violation")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's be more specific about what happened here so that a user could understand what's happening when they see the error.

"received no response message from non-streaming RPC"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a change here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
return io.EOF // indicates successful end of stream.
}

Expand Down
84 changes: 81 additions & 3 deletions test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3588,9 +3588,6 @@ func testClientStreamingError(t *testing.T, e env) {
// Tests that a client receives a cardinality violation error for client-streaming
// RPCs if the server doesn't send a message before returning status OK.
func (s) TestClientStreamingCardinalityViolation_ServerHandlerMissingSendAndClose(t *testing.T) {
// TODO : https://github.com/grpc/grpc-go/issues/8119 - remove `t.Skip()`
// after this is fixed.
t.Skip()
ss := &stubserver.StubServer{
StreamingInputCallF: func(_ testgrpc.TestService_StreamingInputCallServer) error {
// Returning status OK without sending a response message.This is a
Expand Down Expand Up @@ -3739,6 +3736,87 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) {
}
}

// Tests that a client receives a cardinality violation error for unary
// RPCs if the server doesn't send a message before returning status OK.
func (s) TestUnaryRPC_ServerSendsOnlyTrailersWithOK(t *testing.T) {
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatal(err)
}
defer lis.Close()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.NewClient(%s) = %v", lis.Addr().String(), err)
}
defer cc.Close()

go func() {
conn, err := lis.Accept()
if err != nil {
t.Errorf("lis.Accept() = %v", err)
return
}
defer conn.Close()
framer := http2.NewFramer(conn, conn)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Realistically this test shouldn't need low level framer access. You should be able to implement it with a grpc.Server that has its EmptyCall handler configured as a streaming handling, and with an implementation that just returns nil immediately without sending a message.

I do think that would be preferable, because the framing stuff is tedious and error prone, and it makes the test much more complex. If you're having too much trouble with that approach, let me know and I can try to help, or if it somehow turns out to be a lot more work than I was expecting, feel free to push back.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially, I attempted to resolve this by returning nil from EmptyCall. However, this approach resulted in an empty, successful response rather than a failure, which wasn't the desired outcome.

The scenario that sends no response cannot be replicated within grpc-go itself, as the framework requires a response to be sent.

This test aims to verify behavior in a cross-language context, where a unary client might interact with a unary server implemented in a different language, potentially allowing the server to send no response.

Please let me know if that is not the case or I overlook something.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @dfawley is saying that we can register a streaming handler for a unary method on the server. The client will call EmptyCall, but the server will call a streaming handler.

	srv := grpc.NewServer()
	serviceDesc := grpc.ServiceDesc{
		ServiceName: "grpc.testing.TestService",
		HandlerType: (*any)(nil),
		Methods:     []grpc.MethodDesc{},
		Streams: []grpc.StreamDesc{
			{
				StreamName: "EmptyCall",
				Handler: func(any, grpc.ServerStream) error {
					return nil
				},
				ClientStreams: true,
			},
		},
		Metadata: "grpc/testing/test.proto",
	}
	srv.RegisterService(&serviceDesc, &testServer{})
	go srv.Serve(lis)
	defer srv.Stop()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying. I had made the changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option (maybe easier, but no need to change now) is to use the unknown service handler, and don't register anything on the server: https://pkg.go.dev/google.golang.org/grpc#UnknownServiceHandler

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnknownServiceHandler uses bidirectional streaming[desc.clientstreams:true, desc.serverstreams:true].
Will it work with the changes I made to addresses cardinality violations?

if !cs.desc.ServerStreams && !cs.recvFirstMsg {
  return status.Errorf(codes.Internal, "cardinality violation: received no response message from non-streaming RPC")
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is to make the server think it's a bidi stream, but the client thinks it's not. The client's setting is what the code snippet you are showing is checking. The client has no knowledge of how the server is treating the stream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Pranjali-2501 did you try to use UnknownServiceHandler? I think it should reduce the amount of test code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the changes.


if _, err := io.ReadFull(conn, make([]byte, len(clientPreface))); err != nil {
t.Errorf("Error reading client preface: %v", err)
return
}
if err := framer.WriteSettings(); err != nil {
t.Errorf("Error writing server settings: %v", err)
return
}
if err := framer.WriteSettingsAck(); err != nil {
t.Errorf("Error writing settings ack: %v", err)
return
}

for ctx.Err() == nil {
frame, err := framer.ReadFrame()
if err != nil {
t.Errorf("Error reading frame: %v", err)
return
}

switch frame := frame.(type) {
case *http2.HeadersFrame:
if frame.Header().StreamID != 1 {
t.Errorf("Expected stream ID 1, got %d", frame.Header().StreamID)
return
}

var buf bytes.Buffer
enc := hpack.NewEncoder(&buf)
_ = enc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"})
_ = enc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
_ = enc.WriteField(hpack.HeaderField{Name: "grpc-status", Value: "0"})

if err := framer.WriteHeaders(http2.HeadersFrameParam{
StreamID: 1,
BlockFragment: buf.Bytes(),
EndHeaders: true,
EndStream: true,
}); err != nil {
t.Errorf("Error while writing headers: %v", err)
return
}
time.Sleep(50 * time.Millisecond)
default:
t.Logf("Server received frame: %v", frame)
}
}
}()

client := testgrpc.NewTestServiceClient(cc)
if _, err = client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Internal {
t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal)
}
}

func (s) TestExceedMaxStreamsLimit(t *testing.T) {
for _, e := range listTestEnv() {
testExceedMaxStreamsLimit(t, e)
Expand Down