Skip to content

Commit bc07cf9

Browse files
authored
Merge branch 'main' into cassandra-key-value-storage-impl
2 parents 7670636 + b1e83ad commit bc07cf9

File tree

13 files changed

+355
-202
lines changed

13 files changed

+355
-202
lines changed

golem-common/src/model/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2108,7 +2108,7 @@ impl WorkerEvent {
21082108
}),
21092109
WorkerEvent::StdErr { timestamp, bytes } => Some(OplogEntry::Log {
21102110
timestamp: *timestamp,
2111-
level: oplog::LogLevel::Stdout,
2111+
level: oplog::LogLevel::Stderr,
21122112
context: String::new(),
21132113
message: String::from_utf8_lossy(bytes).to_string(),
21142114
}),

golem-test-framework/src/dsl/mod.rs

Lines changed: 122 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -82,15 +82,15 @@ pub trait TestDsl {
8282
async fn get_worker_metadata(
8383
&self,
8484
worker_id: &WorkerId,
85-
) -> crate::Result<Option<WorkerMetadata>>;
85+
) -> crate::Result<Option<(WorkerMetadata, Option<String>)>>;
8686
async fn get_workers_metadata(
8787
&self,
8888
component_id: &ComponentId,
8989
filter: Option<WorkerFilter>,
9090
cursor: ScanCursor,
9191
count: u64,
9292
precise: bool,
93-
) -> crate::Result<(Option<ScanCursor>, Vec<WorkerMetadata>)>;
93+
) -> crate::Result<(Option<ScanCursor>, Vec<(WorkerMetadata, Option<String>)>)>;
9494
async fn delete_worker(&self, worker_id: &WorkerId) -> crate::Result<()>;
9595

9696
async fn invoke(
@@ -283,7 +283,7 @@ impl<T: TestDependencies + Send + Sync> TestDsl for T {
283283
async fn get_worker_metadata(
284284
&self,
285285
worker_id: &WorkerId,
286-
) -> crate::Result<Option<WorkerMetadata>> {
286+
) -> crate::Result<Option<(WorkerMetadata, Option<String>)>> {
287287
let worker_id: golem_api_grpc::proto::golem::worker::WorkerId = worker_id.clone().into();
288288
let response = self
289289
.worker_service()
@@ -319,7 +319,7 @@ impl<T: TestDependencies + Send + Sync> TestDsl for T {
319319
cursor: ScanCursor,
320320
count: u64,
321321
precise: bool,
322-
) -> crate::Result<(Option<ScanCursor>, Vec<WorkerMetadata>)> {
322+
) -> crate::Result<(Option<ScanCursor>, Vec<(WorkerMetadata, Option<String>)>)> {
323323
let component_id: golem_api_grpc::proto::golem::component::ComponentId =
324324
component_id.clone().into();
325325
let response = self
@@ -945,117 +945,120 @@ pub fn worker_error_message(error: &Error) -> String {
945945

946946
pub fn to_worker_metadata(
947947
metadata: &golem_api_grpc::proto::golem::worker::WorkerMetadata,
948-
) -> WorkerMetadata {
949-
WorkerMetadata {
950-
worker_id: metadata
951-
.worker_id
952-
.clone()
953-
.expect("no worker_id")
954-
.clone()
955-
.try_into()
956-
.expect("invalid worker_id"),
957-
args: metadata.args.clone(),
958-
env: metadata
959-
.env
960-
.iter()
961-
.map(|(k, v)| (k.clone(), v.clone()))
962-
.collect::<Vec<_>>(),
963-
account_id: metadata
964-
.account_id
965-
.clone()
966-
.expect("no account_id")
967-
.clone()
968-
.into(),
969-
created_at: metadata
970-
.created_at
971-
.as_ref()
972-
.expect("no created_at")
973-
.clone()
974-
.into(),
975-
last_known_status: WorkerStatusRecord {
976-
oplog_idx: OplogIndex::default(),
977-
status: metadata.status.try_into().expect("invalid status"),
978-
overridden_retry_config: None, // not passed through gRPC
979-
deleted_regions: DeletedRegions::new(),
980-
pending_invocations: vec![],
981-
pending_updates: metadata
982-
.updates
948+
) -> (WorkerMetadata, Option<String>) {
949+
(
950+
WorkerMetadata {
951+
worker_id: metadata
952+
.worker_id
953+
.clone()
954+
.expect("no worker_id")
955+
.clone()
956+
.try_into()
957+
.expect("invalid worker_id"),
958+
args: metadata.args.clone(),
959+
env: metadata
960+
.env
983961
.iter()
984-
.filter_map(|u| match &u.update {
985-
Some(Update::Pending(_)) => Some(TimestampedUpdateDescription {
986-
timestamp: u
987-
.timestamp
988-
.as_ref()
989-
.expect("no timestamp on update record")
990-
.clone()
991-
.into(),
992-
oplog_index: OplogIndex::from_u64(0),
993-
description: UpdateDescription::Automatic {
962+
.map(|(k, v)| (k.clone(), v.clone()))
963+
.collect::<Vec<_>>(),
964+
account_id: metadata
965+
.account_id
966+
.clone()
967+
.expect("no account_id")
968+
.clone()
969+
.into(),
970+
created_at: metadata
971+
.created_at
972+
.as_ref()
973+
.expect("no created_at")
974+
.clone()
975+
.into(),
976+
last_known_status: WorkerStatusRecord {
977+
oplog_idx: OplogIndex::default(),
978+
status: metadata.status.try_into().expect("invalid status"),
979+
overridden_retry_config: None, // not passed through gRPC
980+
deleted_regions: DeletedRegions::new(),
981+
pending_invocations: vec![],
982+
pending_updates: metadata
983+
.updates
984+
.iter()
985+
.filter_map(|u| match &u.update {
986+
Some(Update::Pending(_)) => Some(TimestampedUpdateDescription {
987+
timestamp: u
988+
.timestamp
989+
.as_ref()
990+
.expect("no timestamp on update record")
991+
.clone()
992+
.into(),
993+
oplog_index: OplogIndex::from_u64(0),
994+
description: UpdateDescription::Automatic {
995+
target_version: u.target_version,
996+
},
997+
}),
998+
_ => None,
999+
})
1000+
.collect(),
1001+
failed_updates: metadata
1002+
.updates
1003+
.iter()
1004+
.filter_map(|u| match &u.update {
1005+
Some(Update::Failed(failed_update)) => Some(FailedUpdateRecord {
1006+
timestamp: u
1007+
.timestamp
1008+
.as_ref()
1009+
.expect("no timestamp on update record")
1010+
.clone()
1011+
.into(),
9941012
target_version: u.target_version,
995-
},
996-
}),
997-
_ => None,
998-
})
999-
.collect(),
1000-
failed_updates: metadata
1001-
.updates
1002-
.iter()
1003-
.filter_map(|u| match &u.update {
1004-
Some(Update::Failed(failed_update)) => Some(FailedUpdateRecord {
1005-
timestamp: u
1006-
.timestamp
1007-
.as_ref()
1008-
.expect("no timestamp on update record")
1009-
.clone()
1010-
.into(),
1011-
target_version: u.target_version,
1012-
details: failed_update.details.clone(),
1013-
}),
1014-
_ => None,
1015-
})
1016-
.collect(),
1017-
successful_updates: metadata
1018-
.updates
1019-
.iter()
1020-
.filter_map(|u| match &u.update {
1021-
Some(Update::Successful(_)) => Some(SuccessfulUpdateRecord {
1022-
timestamp: u
1023-
.timestamp
1024-
.as_ref()
1025-
.expect("no timestamp on update record")
1026-
.clone()
1027-
.into(),
1028-
target_version: u.target_version,
1029-
}),
1030-
_ => None,
1031-
})
1032-
.collect(),
1033-
invocation_results: HashMap::new(),
1034-
current_idempotency_key: None,
1035-
component_version: metadata.component_version,
1036-
component_size: metadata.component_size,
1037-
total_linear_memory_size: metadata.total_linear_memory_size,
1038-
owned_resources: metadata
1039-
.owned_resources
1040-
.iter()
1041-
.map(|(k, v)| {
1042-
(
1043-
WorkerResourceId(*k),
1044-
WorkerResourceDescription {
1045-
created_at: v
1046-
.created_at
1013+
details: failed_update.details.clone(),
1014+
}),
1015+
_ => None,
1016+
})
1017+
.collect(),
1018+
successful_updates: metadata
1019+
.updates
1020+
.iter()
1021+
.filter_map(|u| match &u.update {
1022+
Some(Update::Successful(_)) => Some(SuccessfulUpdateRecord {
1023+
timestamp: u
1024+
.timestamp
10471025
.as_ref()
1048-
.expect("no timestamp on resource metadata")
1026+
.expect("no timestamp on update record")
10491027
.clone()
10501028
.into(),
1051-
indexed_resource_key: v.indexed.clone().map(|i| i.into()),
1052-
},
1053-
)
1054-
})
1055-
.collect(),
1029+
target_version: u.target_version,
1030+
}),
1031+
_ => None,
1032+
})
1033+
.collect(),
1034+
invocation_results: HashMap::new(),
1035+
current_idempotency_key: None,
1036+
component_version: metadata.component_version,
1037+
component_size: metadata.component_size,
1038+
total_linear_memory_size: metadata.total_linear_memory_size,
1039+
owned_resources: metadata
1040+
.owned_resources
1041+
.iter()
1042+
.map(|(k, v)| {
1043+
(
1044+
WorkerResourceId(*k),
1045+
WorkerResourceDescription {
1046+
created_at: v
1047+
.created_at
1048+
.as_ref()
1049+
.expect("no timestamp on resource metadata")
1050+
.clone()
1051+
.into(),
1052+
indexed_resource_key: v.indexed.clone().map(|i| i.into()),
1053+
},
1054+
)
1055+
})
1056+
.collect(),
1057+
},
1058+
parent: None,
10561059
},
1057-
parent: None,
1058-
}
1060+
metadata.last_error.clone(),
1061+
)
10591062
}
10601063

10611064
fn dump_component_info(path: &Path) -> golem_common::model::component_metadata::ComponentMetadata {
@@ -1121,15 +1124,18 @@ pub trait TestDslUnsafe {
11211124
args: Vec<String>,
11221125
env: HashMap<String, String>,
11231126
) -> Result<WorkerId, Error>;
1124-
async fn get_worker_metadata(&self, worker_id: &WorkerId) -> Option<WorkerMetadata>;
1127+
async fn get_worker_metadata(
1128+
&self,
1129+
worker_id: &WorkerId,
1130+
) -> Option<(WorkerMetadata, Option<String>)>;
11251131
async fn get_workers_metadata(
11261132
&self,
11271133
component_id: &ComponentId,
11281134
filter: Option<WorkerFilter>,
11291135
cursor: ScanCursor,
11301136
count: u64,
11311137
precise: bool,
1132-
) -> (Option<ScanCursor>, Vec<WorkerMetadata>);
1138+
) -> (Option<ScanCursor>, Vec<(WorkerMetadata, Option<String>)>);
11331139
async fn delete_worker(&self, worker_id: &WorkerId) -> ();
11341140

11351141
async fn invoke(
@@ -1246,7 +1252,10 @@ impl<T: TestDsl + Sync> TestDslUnsafe for T {
12461252
.expect("Failed to start worker")
12471253
}
12481254

1249-
async fn get_worker_metadata(&self, worker_id: &WorkerId) -> Option<WorkerMetadata> {
1255+
async fn get_worker_metadata(
1256+
&self,
1257+
worker_id: &WorkerId,
1258+
) -> Option<(WorkerMetadata, Option<String>)> {
12501259
<T as TestDsl>::get_worker_metadata(self, worker_id)
12511260
.await
12521261
.expect("Failed to get worker metadata")
@@ -1259,7 +1268,7 @@ impl<T: TestDsl + Sync> TestDslUnsafe for T {
12591268
cursor: ScanCursor,
12601269
count: u64,
12611270
precise: bool,
1262-
) -> (Option<ScanCursor>, Vec<WorkerMetadata>) {
1271+
) -> (Option<ScanCursor>, Vec<(WorkerMetadata, Option<String>)>) {
12631272
<T as TestDsl>::get_workers_metadata(self, component_id, filter, cursor, count, precise)
12641273
.await
12651274
.expect("Failed to get workers metadata")

golem-worker-executor-base/src/durable_host/mod.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1394,11 +1394,13 @@ async fn last_error_and_retry_count<T: HasOplogService + HasConfig>(
13941394
None
13951395
} else {
13961396
let mut first_error = None;
1397+
let mut last_error_index = idx;
13971398
let result = loop {
13981399
let oplog_entry = this.oplog_service().read(owned_worker_id, idx, 1).await;
13991400
match oplog_entry.first_key_value() {
14001401
Some((_, OplogEntry::Error { error, .. })) => {
14011402
retry_count += 1;
1403+
last_error_index = idx;
14021404
if first_error.is_none() {
14031405
first_error = Some(error.clone());
14041406
}
@@ -1409,7 +1411,8 @@ async fn last_error_and_retry_count<T: HasOplogService + HasConfig>(
14091411
break Some(LastError {
14101412
error: first_error.unwrap(),
14111413
retry_count,
1412-
stderr: recover_stderr_logs(this, owned_worker_id, idx).await,
1414+
stderr: recover_stderr_logs(this, owned_worker_id, last_error_index)
1415+
.await,
14131416
});
14141417
}
14151418
}
@@ -1424,7 +1427,12 @@ async fn last_error_and_retry_count<T: HasOplogService + HasConfig>(
14241427
break Some(LastError {
14251428
error,
14261429
retry_count,
1427-
stderr: recover_stderr_logs(this, owned_worker_id, idx).await,
1430+
stderr: recover_stderr_logs(
1431+
this,
1432+
owned_worker_id,
1433+
last_error_index,
1434+
)
1435+
.await,
14281436
})
14291437
}
14301438
None => break None,
@@ -1436,7 +1444,8 @@ async fn last_error_and_retry_count<T: HasOplogService + HasConfig>(
14361444
break Some(LastError {
14371445
error,
14381446
retry_count,
1439-
stderr: recover_stderr_logs(this, owned_worker_id, idx).await,
1447+
stderr: recover_stderr_logs(this, owned_worker_id, last_error_index)
1448+
.await,
14401449
})
14411450
}
14421451
None => break None,
@@ -1488,7 +1497,7 @@ pub(crate) async fn recover_stderr_logs<T: HasOplogService + HasConfig>(
14881497
}
14891498
}
14901499
stderr_entries.reverse();
1491-
stderr_entries.join("\n")
1500+
stderr_entries.join("")
14921501
}
14931502

14941503
/// Indicates which step of the http request handling is responsible for closing an open

golem-worker-executor-base/src/grpc.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -820,13 +820,15 @@ impl<Ctx: WorkerCtx, Svcs: HasAll<Ctx> + UsesAllDeps<Ctx = Ctx> + Send + Sync +
820820
)
821821
.await?;
822822

823-
let result: Vec<golem::worker::WorkerMetadata> = workers
824-
.into_iter()
825-
.map(|worker| {
826-
let status = worker.last_known_status.clone();
827-
Self::create_proto_metadata(worker, status, None)
828-
})
829-
.collect();
823+
let mut result = Vec::new();
824+
825+
for worker in workers {
826+
let status = worker.last_known_status.clone();
827+
let last_error_and_retry_count =
828+
Ctx::get_last_error_and_retry_count(self, &worker.owned_worker_id()).await;
829+
let metadata = Self::create_proto_metadata(worker, status, last_error_and_retry_count);
830+
result.push(metadata);
831+
}
830832

831833
Ok((
832834
new_cursor.map(|cursor| Cursor {

0 commit comments

Comments
 (0)