Skip to content

Commit 7808f8a

Browse files
committed
added test
1 parent 11da0dc commit 7808f8a

File tree

2 files changed

+38
-1
lines changed

2 files changed

+38
-1
lines changed

stream.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -542,6 +542,8 @@ type clientStream struct {
542542

543543
sentLast bool // sent an end stream
544544

545+
recvFirstMsg bool // received first msg
546+
545547
methodConfig *MethodConfig
546548

547549
ctx context.Context // the application's context, wrapped by stats/tracing
@@ -1135,14 +1137,17 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
11351137
return statusErr
11361138
}
11371139
// received no msg and status ok for non-server streaming rpcs.
1138-
if !cs.desc.ServerStreams {
1140+
if !cs.desc.ServerStreams && !cs.recvFirstMsg {
11391141
return status.Errorf(codes.Internal, "cardinality violation: received no response message from non-streaming RPC")
11401142
}
11411143
return io.EOF // indicates successful end of stream.
11421144
}
11431145

11441146
return toRPCErr(err)
11451147
}
1148+
if !cs.desc.ServerStreams {
1149+
cs.recvFirstMsg = true
1150+
}
11461151
if a.trInfo != nil {
11471152
a.mu.Lock()
11481153
if a.trInfo.tr != nil {

test/end2end_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3736,6 +3736,38 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) {
37363736
}
37373737
}
37383738

3739+
// Tests for a successful RPC, client will continue to receive io.EOF for successive calls to CloseAndRecv().
3740+
func (s) TestClientStreaming_ClientCallCloseAndRecvAfterCloseAndRecv(t *testing.T) {
3741+
ss := stubserver.StubServer{
3742+
StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error {
3743+
if err := stream.SendAndClose(&testpb.StreamingInputCallResponse{}); err != nil {
3744+
t.Errorf("stream.SendAndClose(_) = %v, want <nil>", err)
3745+
}
3746+
return nil
3747+
},
3748+
}
3749+
if err := ss.Start(nil); err != nil {
3750+
t.Fatal("Error starting server:", err)
3751+
}
3752+
defer ss.Stop()
3753+
3754+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
3755+
defer cancel()
3756+
stream, err := ss.Client.StreamingInputCall(ctx)
3757+
if err != nil {
3758+
t.Fatalf(".StreamingInputCall(_) = _, %v, want <nil>", err)
3759+
}
3760+
if err := stream.Send(&testpb.StreamingInputCallRequest{}); err != nil {
3761+
t.Fatalf("stream.Send(_) = %v, want <nil>", err)
3762+
}
3763+
if _, err := stream.CloseAndRecv(); err != nil {
3764+
t.Fatalf("stream.CloseAndRecv() = %v , want <nil>", err)
3765+
}
3766+
if _, err := stream.CloseAndRecv(); err != io.EOF {
3767+
t.Fatalf("stream.CloseAndRecv() = %v, want error %s", err, io.EOF)
3768+
}
3769+
}
3770+
37393771
// Tests that a client receives a cardinality violation error for unary
37403772
// RPCs if the server doesn't send a message before returning status OK.
37413773
func (s) TestUnaryRPC_ServerSendsOnlyTrailersWithOK(t *testing.T) {

0 commit comments

Comments
 (0)