diff --git a/Framework/Core/include/Framework/ProcessingPolicies.h b/Framework/Core/include/Framework/ProcessingPolicies.h index 9ab0b0e5e323a..3041b48cb9cb9 100644 --- a/Framework/Core/include/Framework/ProcessingPolicies.h +++ b/Framework/Core/include/Framework/ProcessingPolicies.h @@ -32,6 +32,10 @@ struct ProcessingPolicies { enum TerminationPolicy termination; enum TerminationPolicy error; enum EarlyForwardPolicy earlyForward; + /// When termination policy is QUIT, optionally wait this many seconds before + /// actually quitting (0 means quit immediately). Set via --completion-policy + /// with a duration string, e.g. --completion-policy 10s. + int terminationTimeout = 0; }; /// The mode in which the driver is running. Should be MASTER when running locally, diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index b062f2bf68a75..cb7b38b0205ac 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -901,6 +901,9 @@ void DataProcessingDevice::InitTask() deviceContext.expectedRegionCallbacks = std::stoi(fConfig->GetValue("expected-region-callbacks")); deviceContext.exitTransitionTimeout = std::stoi(fConfig->GetValue("exit-transition-timeout")); + if (deviceContext.exitTransitionTimeout == 0 && deviceContext.processingPolicies.terminationTimeout > 0) { + deviceContext.exitTransitionTimeout = deviceContext.processingPolicies.terminationTimeout; + } deviceContext.dataProcessingTimeout = std::stoi(fConfig->GetValue("data-processing-timeout")); for (auto& channel : GetChannels()) { diff --git a/Framework/Core/src/DataProcessingHelpers.cxx b/Framework/Core/src/DataProcessingHelpers.cxx index b8399a4c591e7..d187a4b2ff6be 100644 --- a/Framework/Core/src/DataProcessingHelpers.cxx +++ b/Framework/Core/src/DataProcessingHelpers.cxx @@ -219,6 +219,12 @@ TransitionHandlingState DataProcessingHelpers::updateStateTransition(ServiceRegi O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, quitting immediately as per --completion-policy"); } else if (deviceContext.exitTransitionTimeout == 0 && policies.termination != TerminationPolicy::QUIT) { O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, switching to READY state immediately"); + } else if (policies.termination == TerminationPolicy::QUIT && policies.terminationTimeout > 0) { + // --completion-policy was given as a duration: wait even though already idle + uv_update_time(state.loop); + O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and already idle, waiting %d seconds before quitting as per --completion-policy.", deviceContext.exitTransitionTimeout); + uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0); + return TransitionHandlingState::Requested; } else if (policies.termination == TerminationPolicy::QUIT) { O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy"); } else { diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index 70f3c8940ef26..0143881700497 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -2921,8 +2921,8 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, ("resources", bpo::value()->default_value(""), "resources allocated for the workflow") // // ("start-port,p", bpo::value()->default_value(22000), "start port to allocate") // // ("port-range,pr", bpo::value()->default_value(1000), "ports in range") // // - ("completion-policy,c", bpo::value(&processingPolicies.termination)->default_value(TerminationPolicy::QUIT), // // - "what to do when processing is finished: quit, wait") // // + ("completion-policy,c", bpo::value()->default_value("quit"), // + "what to do when processing is finished: quit, wait, or a duration (e.g. 10s, 2m) to quit after waiting that long") // // ("error-policy", bpo::value(&processingPolicies.error)->default_value(TerminationPolicy::QUIT), // // "what to do when a device has an error: quit, wait") // // ("min-failure-level", bpo::value(&minFailureLevel)->default_value(LogParsingHelpers::LogLevel::Fatal), // // @@ -3186,7 +3186,31 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, driverInfo.argc = argc; driverInfo.argv = argv; driverInfo.noSHMCleanup = varmap["no-cleanup"].as(); - driverInfo.processingPolicies.termination = varmap["completion-policy"].as(); + { + auto completionPolicyStr = varmap["completion-policy"].as(); + if (completionPolicyStr == "quit") { + driverInfo.processingPolicies.termination = TerminationPolicy::QUIT; + } else if (completionPolicyStr == "wait") { + driverInfo.processingPolicies.termination = TerminationPolicy::WAIT; + } else { + // Try to parse as a duration, e.g. "10s" or "2m" + int value = 0; + char unit = 's'; + int matched = sscanf(completionPolicyStr.c_str(), "%d%c", &value, &unit); + if (matched >= 1 && value > 0) { + int seconds = value; + if (matched == 2 && unit == 'm') { + seconds = value * 60; + } else if (matched == 2 && unit != 's') { + throw std::runtime_error(fmt::format("Invalid --completion-policy value '{}': use 'quit', 'wait', or a duration like '10s' or '2m'", completionPolicyStr)); + } + driverInfo.processingPolicies.termination = TerminationPolicy::QUIT; + driverInfo.processingPolicies.terminationTimeout = seconds; + } else { + throw std::runtime_error(fmt::format("Invalid --completion-policy value '{}': use 'quit', 'wait', or a duration like '10s' or '2m'", completionPolicyStr)); + } + } + } driverInfo.processingPolicies.earlyForward = varmap["early-forward-policy"].as(); driverInfo.mode = varmap["driver-mode"].as();