Skip to content

Commit e33f770

Browse files
authored
[BUG] OBO between log service and compaction. (#4276)
Compaction assumes that enumeration position t_i means t_i was the last record seen and therefore next reader should read from t_i + 1. Log service was built and tested for t_i meaning t_i was the first record to return. Note: I changed the go code, but only by moving a +1 out a layer. The inner version was inconsistent with convention, so I updated it.
1 parent 902a9f7 commit e33f770

File tree

7 files changed

+18
-6
lines changed

7 files changed

+18
-6
lines changed

go/pkg/log/repository/log.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,14 +169,18 @@ func (r *LogRepository) GetLastCompactedOffsetForCollection(ctx context.Context,
169169
return
170170
}
171171

172+
// GetBoundsForCollection returns the offset of the last record compacted and the offset of the last
173+
// record inserted. Thus, the range of uncompacted records is the interval (start, limit], which is
174+
// kind of backwards from how it is elsewhere, so pay attention to comments indicating the bias of
175+
// the offset.
172176
func (r *LogRepository) GetBoundsForCollection(ctx context.Context, collectionId string) (start, limit int64, err error) {
173177
bounds, err := r.queries.GetBoundsForCollection(ctx, collectionId)
174178
if err != nil {
175179
trace_log.Error("Error in getting minimum and maximum offset for collection", zap.Error(err), zap.String("collectionId", collectionId))
176180
return
177181
}
178182
start = bounds.RecordCompactionOffsetPosition
179-
limit = bounds.RecordEnumerationOffsetPosition + 1
183+
limit = bounds.RecordEnumerationOffsetPosition
180184
err = nil
181185
return
182186
}

go/pkg/log/server/server.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,9 @@ func (s *logServer) ScoutLogs(ctx context.Context, req *logservicepb.ScoutLogsRe
5555
if err != nil {
5656
return
5757
}
58+
// +1 to convert from the (] bound to a [) bound.
5859
res = &logservicepb.ScoutLogsResponse {
59-
LimitOffset: int64(limit),
60+
FirstUninsertedRecordOffset: int64(limit + 1),
6061
}
6162
return
6263
}

idl/chromadb/proto/logservice.proto

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ message ScoutLogsRequest {
1919
}
2020

2121
message ScoutLogsResponse {
22-
int64 limit_offset = 1;
22+
// This field was once used for an ambiguous last_record_offset alternative.
23+
reserved 1;
24+
// The next record to insert will have this offset.
25+
int64 first_uninserted_record_offset = 2;
2326
}
2427

2528
message PullLogsRequest {

rust/log-service/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -671,7 +671,9 @@ impl LogService for LogServer {
671671
}
672672
};
673673
let limit_offset = limit_position.offset() as i64;
674-
Ok(Response::new(ScoutLogsResponse { limit_offset }))
674+
Ok(Response::new(ScoutLogsResponse {
675+
first_uninserted_record_offset: limit_offset,
676+
}))
675677
}
676678
.instrument(span)
677679
.await

rust/log/src/grpc_log.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ impl GrpcLog {
197197
&mut self.client
198198
}
199199

200+
// ScoutLogs returns the offset of the next record to be inserted into the log.
200201
pub(super) async fn scout_logs(
201202
&mut self,
202203
collection_id: CollectionUuid,
@@ -216,7 +217,7 @@ impl GrpcLog {
216217
}
217218
};
218219
let scout = response.into_inner();
219-
Ok(scout.limit_offset as u64)
220+
Ok(scout.first_uninserted_record_offset as u64)
220221
}
221222

222223
pub(super) async fn read(

rust/log/src/log.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ impl Log {
5050
}
5151
}
5252

53+
// ScoutLogs returns the offset of the next record to be inserted into the log.
5354
#[tracing::instrument(skip(self))]
5455
pub async fn scout_logs(
5556
&mut self,

rust/wal3/src/manifest.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -644,7 +644,7 @@ impl Manifest {
644644
},
645645
(Some(f), None) => f,
646646
(None, Some(s)) => s,
647-
(None, None) => LogPosition::default(),
647+
(None, None) => LogPosition::from_offset(1),
648648
}
649649
}
650650

0 commit comments

Comments
 (0)