11#include < chrono>
2+ #include < condition_variable>
3+ #include < mutex>
24#include < thread>
35#include " napi.h"
46
@@ -17,6 +19,9 @@ static struct ThreadSafeFunctionInfo {
1719 bool startSecondary;
1820 FunctionReference jsFinalizeCallback;
1921 uint32_t maxQueueSize;
22+ bool closeCalledFromJs;
23+ std::mutex protect;
24+ std::condition_variable signal;
2025} tsfnInfo;
2126
2227static void TSFNCallJS (Env env,
@@ -42,7 +47,7 @@ static int ints[ARRAY_LENGTH];
4247
4348static void SecondaryThread () {
4449 if (tsfn.Release () != napi_ok) {
45- Error::Fatal (" SecondaryThread " , " ThreadSafeFunction.Release() failed" );
50+ Error::Fatal (" TypedSecondaryThread " , " ThreadSafeFunction.Release() failed" );
4651 }
4752}
4853
@@ -52,7 +57,8 @@ static void DataSourceThread() {
5257
5358 if (info->startSecondary ) {
5459 if (tsfn.Acquire () != napi_ok) {
55- Error::Fatal (" DataSourceThread" , " ThreadSafeFunction.Acquire() failed" );
60+ Error::Fatal (" TypedDataSourceThread" ,
61+ " ThreadSafeFunction.Acquire() failed" );
5662 }
5763
5864 threads[1 ] = std::thread (SecondaryThread);
@@ -75,13 +81,13 @@ static void DataSourceThread() {
7581 break ;
7682 }
7783
78- if (info->maxQueueSize == 0 ) {
79- // Let's make this thread really busy for 200 ms to give the main thread a
80- // chance to abort .
81- auto start = std::chrono::high_resolution_clock::now ( );
82- constexpr auto MS_200 = std::chrono::milliseconds ( 200 );
83- for (; std::chrono::high_resolution_clock::now () - start < MS_200;)
84- ;
84+ if (info->abort && info-> type != ThreadSafeFunctionInfo::NON_BLOCKING ) {
85+ // Let's make this thread really busy to give the main thread a chance to
86+ // abort / close .
87+ std::unique_lock<std::mutex> lk (info-> protect );
88+ while (!info-> closeCalledFromJs ) {
89+ info-> signal . wait (lk);
90+ }
8591 }
8692
8793 switch (status) {
@@ -98,20 +104,22 @@ static void DataSourceThread() {
98104 break ;
99105
100106 default :
101- Error::Fatal (" DataSourceThread" , " ThreadSafeFunction.*Call() failed" );
107+ Error::Fatal (" TypedDataSourceThread" ,
108+ " ThreadSafeFunction.*Call() failed" );
102109 }
103110 }
104111
105112 if (info->type == ThreadSafeFunctionInfo::NON_BLOCKING && !queueWasFull) {
106- Error::Fatal (" DataSourceThread " , " Queue was never full" );
113+ Error::Fatal (" TypedDataSourceThread " , " Queue was never full" );
107114 }
108115
109116 if (info->abort && !queueWasClosing) {
110- Error::Fatal (" DataSourceThread " , " Queue was never closing" );
117+ Error::Fatal (" TypedDataSourceThread " , " Queue was never closing" );
111118 }
112119
113120 if (!queueWasClosing && tsfn.Release () != napi_ok) {
114- Error::Fatal (" DataSourceThread" , " ThreadSafeFunction.Release() failed" );
121+ Error::Fatal (" TypedDataSourceThread" ,
122+ " ThreadSafeFunction.Release() failed" );
115123 }
116124}
117125
@@ -123,6 +131,11 @@ static Value StopThread(const CallbackInfo& info) {
123131 } else {
124132 tsfn.Release ();
125133 }
134+ {
135+ std::lock_guard<std::mutex> _ (tsfnInfo.protect );
136+ tsfnInfo.closeCalledFromJs = true ;
137+ tsfnInfo.signal .notify_one ();
138+ }
126139 return Value ();
127140}
128141
@@ -145,6 +158,7 @@ static Value StartThreadInternal(const CallbackInfo& info,
145158 tsfnInfo.abort = info[1 ].As <Boolean>();
146159 tsfnInfo.startSecondary = info[2 ].As <Boolean>();
147160 tsfnInfo.maxQueueSize = info[3 ].As <Number>().Uint32Value ();
161+ tsfnInfo.closeCalledFromJs = false ;
148162
149163 tsfn = TSFN::New (info.Env (),
150164 info[0 ].As <Function>(),
@@ -163,7 +177,7 @@ static Value StartThreadInternal(const CallbackInfo& info,
163177
164178static Value Release (const CallbackInfo& /* info */ ) {
165179 if (tsfn.Release () != napi_ok) {
166- Error::Fatal (" Release" , " ThreadSafeFunction .Release() failed" );
180+ Error::Fatal (" Release" , " TypedThreadSafeFunction .Release() failed" );
167181 }
168182 return Value ();
169183}
0 commit comments