Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Framework/Core/include/Framework/ProcessingPolicies.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ struct ProcessingPolicies {
enum TerminationPolicy termination;
enum TerminationPolicy error;
enum EarlyForwardPolicy earlyForward;
/// When termination policy is QUIT, optionally wait this many seconds before
/// actually quitting (0 means quit immediately). Set via --completion-policy
/// with a duration string, e.g. --completion-policy 10s.
int terminationTimeout = 0;
};

/// The mode in which the driver is running. Should be MASTER when running locally,
Expand Down
3 changes: 3 additions & 0 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,9 @@ void DataProcessingDevice::InitTask()

deviceContext.expectedRegionCallbacks = std::stoi(fConfig->GetValue<std::string>("expected-region-callbacks"));
deviceContext.exitTransitionTimeout = std::stoi(fConfig->GetValue<std::string>("exit-transition-timeout"));
if (deviceContext.exitTransitionTimeout == 0 && deviceContext.processingPolicies.terminationTimeout > 0) {
deviceContext.exitTransitionTimeout = deviceContext.processingPolicies.terminationTimeout;
}
deviceContext.dataProcessingTimeout = std::stoi(fConfig->GetValue<std::string>("data-processing-timeout"));

for (auto& channel : GetChannels()) {
Expand Down
6 changes: 6 additions & 0 deletions Framework/Core/src/DataProcessingHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,12 @@ TransitionHandlingState DataProcessingHelpers::updateStateTransition(ServiceRegi
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, quitting immediately as per --completion-policy");
} else if (deviceContext.exitTransitionTimeout == 0 && policies.termination != TerminationPolicy::QUIT) {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, switching to READY state immediately");
} else if (policies.termination == TerminationPolicy::QUIT && policies.terminationTimeout > 0) {
// --completion-policy was given as a duration: wait even though already idle
uv_update_time(state.loop);
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and already idle, waiting %d seconds before quitting as per --completion-policy.", deviceContext.exitTransitionTimeout);
uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0);
return TransitionHandlingState::Requested;
} else if (policies.termination == TerminationPolicy::QUIT) {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy");
} else {
Expand Down
30 changes: 27 additions & 3 deletions Framework/Core/src/runDataProcessing.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -2921,8 +2921,8 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow,
("resources", bpo::value<std::string>()->default_value(""), "resources allocated for the workflow") // //
("start-port,p", bpo::value<unsigned short>()->default_value(22000), "start port to allocate") // //
("port-range,pr", bpo::value<unsigned short>()->default_value(1000), "ports in range") // //
("completion-policy,c", bpo::value<TerminationPolicy>(&processingPolicies.termination)->default_value(TerminationPolicy::QUIT), // //
"what to do when processing is finished: quit, wait") // //
("completion-policy,c", bpo::value<std::string>()->default_value("quit"), //
"what to do when processing is finished: quit, wait, or a duration (e.g. 10s, 2m) to quit after waiting that long") // //
("error-policy", bpo::value<TerminationPolicy>(&processingPolicies.error)->default_value(TerminationPolicy::QUIT), // //
"what to do when a device has an error: quit, wait") // //
("min-failure-level", bpo::value<LogParsingHelpers::LogLevel>(&minFailureLevel)->default_value(LogParsingHelpers::LogLevel::Fatal), // //
Expand Down Expand Up @@ -3186,7 +3186,31 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow,
driverInfo.argc = argc;
driverInfo.argv = argv;
driverInfo.noSHMCleanup = varmap["no-cleanup"].as<bool>();
driverInfo.processingPolicies.termination = varmap["completion-policy"].as<TerminationPolicy>();
{
auto completionPolicyStr = varmap["completion-policy"].as<std::string>();
if (completionPolicyStr == "quit") {
driverInfo.processingPolicies.termination = TerminationPolicy::QUIT;
} else if (completionPolicyStr == "wait") {
driverInfo.processingPolicies.termination = TerminationPolicy::WAIT;
} else {
// Try to parse as a duration, e.g. "10s" or "2m"
int value = 0;
char unit = 's';
int matched = sscanf(completionPolicyStr.c_str(), "%d%c", &value, &unit);
if (matched >= 1 && value > 0) {
int seconds = value;
if (matched == 2 && unit == 'm') {
seconds = value * 60;
} else if (matched == 2 && unit != 's') {
throw std::runtime_error(fmt::format("Invalid --completion-policy value '{}': use 'quit', 'wait', or a duration like '10s' or '2m'", completionPolicyStr));
}
driverInfo.processingPolicies.termination = TerminationPolicy::QUIT;
driverInfo.processingPolicies.terminationTimeout = seconds;
} else {
throw std::runtime_error(fmt::format("Invalid --completion-policy value '{}': use 'quit', 'wait', or a duration like '10s' or '2m'", completionPolicyStr));
}
}
}
driverInfo.processingPolicies.earlyForward = varmap["early-forward-policy"].as<EarlyForwardPolicy>();
driverInfo.mode = varmap["driver-mode"].as<DriverMode>();

Expand Down