Skip to content

Commit 0feb5c2

Browse files
prateekmjagadish-northguard
authored andcommitted
SAMZA-1229; Disk space monitor should only count data in use by the container
Author: Prateek Maheshwari <[email protected]> Reviewers: Jagadish <[email protected]> Closes apache#134 from prateekm/disk-space-monitor
1 parent c91da78 commit 0feb5c2

File tree

2 files changed

+29
-26
lines changed

2 files changed

+29
-26
lines changed

samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,6 @@ object SamzaContainer extends Logging {
413413
info("Got default storage engine base directory: %s" format defaultStoreBaseDir)
414414

415415
val storeWatchPaths = new util.HashSet[Path]()
416-
storeWatchPaths.add(defaultStoreBaseDir.toPath)
417416

418417
val taskInstances: Map[TaskName, TaskInstance] = containerModel.getTasks.values.asScala.map(taskModel => {
419418
debug("Setting up task instance: %s" format taskModel)
@@ -455,8 +454,6 @@ object SamzaContainer extends Logging {
455454
loggedStorageBaseDir = defaultStoreBaseDir
456455
}
457456

458-
storeWatchPaths.add(loggedStorageBaseDir.toPath)
459-
460457
info("Got base directory for logged data stores: %s" format loggedStorageBaseDir)
461458

462459
val taskStores = storageEngineFactories
@@ -467,25 +464,30 @@ object SamzaContainer extends Logging {
467464
} else {
468465
null
469466
}
467+
470468
val keySerde = config.getStorageKeySerde(storeName) match {
471469
case Some(keySerde) => serdes.getOrElse(keySerde,
472470
throw new SamzaException("StorageKeySerde: No class defined for serde: %s." format keySerde))
473471
case _ => null
474472
}
473+
475474
val msgSerde = config.getStorageMsgSerde(storeName) match {
476475
case Some(msgSerde) => serdes.getOrElse(msgSerde,
477476
throw new SamzaException("StorageMsgSerde: No class defined for serde: %s." format msgSerde))
478477
case _ => null
479478
}
480-
val storeBaseDir = if(changeLogSystemStreamPartition != null) {
479+
480+
val storeDir = if (changeLogSystemStreamPartition != null) {
481481
TaskStorageManager.getStorePartitionDir(loggedStorageBaseDir, storeName, taskName)
482-
}
483-
else {
482+
} else {
484483
TaskStorageManager.getStorePartitionDir(defaultStoreBaseDir, storeName, taskName)
485484
}
485+
486+
storeWatchPaths.add(storeDir.toPath)
487+
486488
val storageEngine = storageEngineFactory.getStorageEngine(
487489
storeName,
488-
storeBaseDir,
490+
storeDir,
489491
keySerde,
490492
msgSerde,
491493
collector,

samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -93,24 +93,24 @@ class TaskStorageManager(
9393
debug("Cleaning base directories for stores.")
9494

9595
taskStores.keys.foreach(storeName => {
96-
val storagePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName)
97-
info("Got default storage partition directory as %s" format storagePartitionDir.toPath.toString)
96+
val storePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName)
97+
info("Got default storage partition directory as %s" format storePartitionDir.toPath.toString)
9898

99-
if(storagePartitionDir.exists()) {
100-
info("Deleting default storage partition directory %s" format storagePartitionDir.toPath.toString)
101-
Util.rm(storagePartitionDir)
99+
if(storePartitionDir.exists()) {
100+
info("Deleting default storage partition directory %s" format storePartitionDir.toPath.toString)
101+
Util.rm(storePartitionDir)
102102
}
103103

104-
val loggedStoreDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName)
105-
info("Got logged storage partition directory as %s" format loggedStoreDir.toPath.toString)
104+
val loggedStorePartitionDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName)
105+
info("Got logged storage partition directory as %s" format loggedStorePartitionDir.toPath.toString)
106106

107107
// Delete the logged store if it is not valid.
108-
if (!isLoggedStoreValid(storeName, loggedStoreDir)) {
109-
info("Deleting logged storage partition directory %s." format loggedStoreDir.toPath.toString)
110-
Util.rm(loggedStoreDir)
108+
if (!isLoggedStoreValid(storeName, loggedStorePartitionDir)) {
109+
info("Deleting logged storage partition directory %s." format loggedStorePartitionDir.toPath.toString)
110+
Util.rm(loggedStorePartitionDir)
111111
} else {
112-
val offset = readOffsetFile(loggedStoreDir)
113-
info("Read offset %s for the store %s from logged storage partition directory %s." format(offset, storeName, loggedStoreDir))
112+
val offset = readOffsetFile(loggedStorePartitionDir)
113+
info("Read offset %s for the store %s from logged storage partition directory %s." format(offset, storeName, loggedStorePartitionDir))
114114
fileOffset.put(new SystemStreamPartition(changeLogSystemStreams(storeName), partition), offset)
115115
}
116116
})
@@ -182,13 +182,13 @@ class TaskStorageManager(
182182
taskStores.foreach {
183183
case (storeName, storageEngine) =>
184184
if (storageEngine.getStoreProperties.isLoggedStore) {
185-
val loggedStoragePartitionDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName)
186-
info("Using logged storage partition directory: %s for store: %s." format(loggedStoragePartitionDir.toPath.toString, storeName))
187-
if (!loggedStoragePartitionDir.exists()) loggedStoragePartitionDir.mkdirs()
185+
val loggedStorePartitionDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName)
186+
info("Using logged storage partition directory: %s for store: %s." format(loggedStorePartitionDir.toPath.toString, storeName))
187+
if (!loggedStorePartitionDir.exists()) loggedStorePartitionDir.mkdirs()
188188
} else {
189-
val storagePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName)
190-
info("Using storage partition directory: %s for store: %s." format(storagePartitionDir.toPath.toString, storeName))
191-
storagePartitionDir.mkdirs()
189+
val storePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName)
190+
info("Using storage partition directory: %s for store: %s." format(storePartitionDir.toPath.toString, storeName))
191+
storePartitionDir.mkdirs()
192192
}
193193
}
194194
}
@@ -322,7 +322,8 @@ class TaskStorageManager(
322322
}
323323
debug("Got offset %s for store %s" format(newestOffset, storeName))
324324

325-
val offsetFile = new File(TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName), offsetFileName)
325+
val loggedStorePartitionDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName)
326+
val offsetFile = new File(loggedStorePartitionDir, offsetFileName)
326327
if (newestOffset != null) {
327328
debug("Storing offset for store in OFFSET file ")
328329
Util.writeDataToFile(offsetFile, newestOffset)

0 commit comments

Comments
 (0)