1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
// Copyright (c) 2026 Michael Vandeberg
3  
// Copyright (c) 2026 Michael Vandeberg
4  
//
4  
//
5  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
6  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7  
//
7  
//
8  
// Official repository: https://github.com/boostorg/capy
8  
// Official repository: https://github.com/boostorg/capy
9  
//
9  
//
10  

10  

11  
#include <boost/capy/ex/thread_pool.hpp>
11  
#include <boost/capy/ex/thread_pool.hpp>
12  
#include <boost/capy/detail/intrusive.hpp>
12  
#include <boost/capy/detail/intrusive.hpp>
13  
#include <boost/capy/test/thread_name.hpp>
13  
#include <boost/capy/test/thread_name.hpp>
14  
#include <atomic>
14  
#include <atomic>
15  
#include <condition_variable>
15  
#include <condition_variable>
16  
#include <cstdio>
16  
#include <cstdio>
17  
#include <mutex>
17  
#include <mutex>
18  
#include <thread>
18  
#include <thread>
19  
#include <vector>
19  
#include <vector>
20  

20  

21  
/*
21  
/*
22  
    Thread pool implementation using a shared work queue.
22  
    Thread pool implementation using a shared work queue.
23  

23  

24  
    Work items are coroutine handles wrapped in intrusive list nodes, stored
24  
    Work items are coroutine handles wrapped in intrusive list nodes, stored
25  
    in a single queue protected by a mutex. Worker threads wait on a
25  
    in a single queue protected by a mutex. Worker threads wait on a
26  
    condition_variable until work is available or stop is requested.
26  
    condition_variable until work is available or stop is requested.
27  

27  

28  
    Threads are started lazily on first post() via std::call_once to avoid
28  
    Threads are started lazily on first post() via std::call_once to avoid
29  
    spawning threads for pools that are constructed but never used. Each
29  
    spawning threads for pools that are constructed but never used. Each
30  
    thread is named with a configurable prefix plus index for debugger
30  
    thread is named with a configurable prefix plus index for debugger
31  
    visibility.
31  
    visibility.
32  

32  

33  
    Shutdown sequence: stop() sets the stop flag and notifies all threads,
33  
    Shutdown sequence: stop() sets the stop flag and notifies all threads,
34  
    then the destructor joins threads and destroys any remaining queued
34  
    then the destructor joins threads and destroys any remaining queued
35  
    work without executing it.
35  
    work without executing it.
36  
*/
36  
*/
37  

37  

38  
namespace boost {
38  
namespace boost {
39  
namespace capy {
39  
namespace capy {
40  

40  

41  
//------------------------------------------------------------------------------
41  
//------------------------------------------------------------------------------
42  

42  

43  
class thread_pool::impl
43  
class thread_pool::impl
44  
{
44  
{
45  
    struct work : detail::intrusive_queue<work>::node
45  
    struct work : detail::intrusive_queue<work>::node
46  
    {
46  
    {
47  
        std::coroutine_handle<> h_;
47  
        std::coroutine_handle<> h_;
48  

48  

49  
        explicit work(std::coroutine_handle<> h) noexcept
49  
        explicit work(std::coroutine_handle<> h) noexcept
50  
            : h_(h)
50  
            : h_(h)
51  
        {
51  
        {
52  
        }
52  
        }
53  

53  

54  
        void run()
54  
        void run()
55  
        {
55  
        {
56  
            auto h = h_;
56  
            auto h = h_;
57  
            delete this;
57  
            delete this;
58  
            h.resume();
58  
            h.resume();
59  
        }
59  
        }
60  

60  

61  
        void destroy()
61  
        void destroy()
62  
        {
62  
        {
63  
            delete this;
63  
            delete this;
64  
        }
64  
        }
65  
    };
65  
    };
66  

66  

67  
    std::mutex mutex_;
67  
    std::mutex mutex_;
68  
    std::condition_variable cv_;
68  
    std::condition_variable cv_;
69  
    detail::intrusive_queue<work> q_;
69  
    detail::intrusive_queue<work> q_;
70  
    std::vector<std::thread> threads_;
70  
    std::vector<std::thread> threads_;
71  
    std::atomic<bool> stop_{false};
71  
    std::atomic<bool> stop_{false};
72  
    std::size_t num_threads_;
72  
    std::size_t num_threads_;
73  
    char thread_name_prefix_[13]{};  // 12 chars max + null terminator
73  
    char thread_name_prefix_[13]{};  // 12 chars max + null terminator
74  
    std::once_flag start_flag_;
74  
    std::once_flag start_flag_;
75  

75  

76  
public:
76  
public:
77  
    ~impl()
77  
    ~impl()
78  
    {
78  
    {
79  
        stop();
79  
        stop();
80  
        for(auto& t : threads_)
80  
        for(auto& t : threads_)
81  
            if(t.joinable())
81  
            if(t.joinable())
82  
                t.join();
82  
                t.join();
83  

83  

84  
        while(auto* w = q_.pop())
84  
        while(auto* w = q_.pop())
85  
            w->destroy();
85  
            w->destroy();
86  
    }
86  
    }
87  

87  

88  
    impl(std::size_t num_threads, std::string_view thread_name_prefix)
88  
    impl(std::size_t num_threads, std::string_view thread_name_prefix)
89  
        : num_threads_(num_threads)
89  
        : num_threads_(num_threads)
90  
    {
90  
    {
91  
        if(num_threads_ == 0)
91  
        if(num_threads_ == 0)
92  
            num_threads_ = std::thread::hardware_concurrency();
92  
            num_threads_ = std::thread::hardware_concurrency();
93  
        if(num_threads_ == 0)
93  
        if(num_threads_ == 0)
94  
            num_threads_ = 1;
94  
            num_threads_ = 1;
95  

95  

96  
        // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
96  
        // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
97  
        auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
97  
        auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
98  
        thread_name_prefix_[n] = '\0';
98  
        thread_name_prefix_[n] = '\0';
99  
    }
99  
    }
100  

100  

101  
    void
101  
    void
102  
    post(std::coroutine_handle<> h)
102  
    post(std::coroutine_handle<> h)
103  
    {
103  
    {
104  
        ensure_started();
104  
        ensure_started();
105  
        auto* w = new work(h);
105  
        auto* w = new work(h);
106  
        {
106  
        {
107  
            std::lock_guard<std::mutex> lock(mutex_);
107  
            std::lock_guard<std::mutex> lock(mutex_);
108  
            q_.push(w);
108  
            q_.push(w);
109  
        }
109  
        }
110  
        cv_.notify_one();
110  
        cv_.notify_one();
111  
    }
111  
    }
112  

112  

113  
    void
113  
    void
114  
    join() noexcept
114  
    join() noexcept
115  
    {
115  
    {
116  
        stop();
116  
        stop();
117  
        for(auto& t : threads_)
117  
        for(auto& t : threads_)
118  
            if(t.joinable())
118  
            if(t.joinable())
119  
                t.join();
119  
                t.join();
120  
    }
120  
    }
121  

121  

122  
    void
122  
    void
123  
    stop() noexcept
123  
    stop() noexcept
124  
    {
124  
    {
125  
        stop_.store(true, std::memory_order_release);
125  
        stop_.store(true, std::memory_order_release);
126  
        cv_.notify_all();
126  
        cv_.notify_all();
127  
    }
127  
    }
128  

128  

129  
private:
129  
private:
130  
    void
130  
    void
131  
    ensure_started()
131  
    ensure_started()
132  
    {
132  
    {
133  
        std::call_once(start_flag_, [this]{
133  
        std::call_once(start_flag_, [this]{
134  
            threads_.reserve(num_threads_);
134  
            threads_.reserve(num_threads_);
135  
            for(std::size_t i = 0; i < num_threads_; ++i)
135  
            for(std::size_t i = 0; i < num_threads_; ++i)
136  
                threads_.emplace_back([this, i]{ run(i); });
136  
                threads_.emplace_back([this, i]{ run(i); });
137  
        });
137  
        });
138  
    }
138  
    }
139  

139  

140  
    void
140  
    void
141  
    run(std::size_t index)
141  
    run(std::size_t index)
142  
    {
142  
    {
143  
        // Build name; set_current_thread_name truncates to platform limits.
143  
        // Build name; set_current_thread_name truncates to platform limits.
144  
        char name[16];
144  
        char name[16];
145  
        std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
145  
        std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
146  
        set_current_thread_name(name);
146  
        set_current_thread_name(name);
147  

147  

148  
        for(;;)
148  
        for(;;)
149  
        {
149  
        {
150  
            work* w = nullptr;
150  
            work* w = nullptr;
151  
            {
151  
            {
152  
                std::unique_lock<std::mutex> lock(mutex_);
152  
                std::unique_lock<std::mutex> lock(mutex_);
153  
                cv_.wait(lock, [this]{
153  
                cv_.wait(lock, [this]{
154  
                    return !q_.empty() ||
154  
                    return !q_.empty() ||
155  
                        stop_.load(std::memory_order_acquire);
155  
                        stop_.load(std::memory_order_acquire);
156  
                });
156  
                });
157  
                if(stop_.load(std::memory_order_acquire) && q_.empty())
157  
                if(stop_.load(std::memory_order_acquire) && q_.empty())
158  
                    return;
158  
                    return;
159  
                w = q_.pop();
159  
                w = q_.pop();
160  
            }
160  
            }
161  
            if(w)
161  
            if(w)
162  
                w->run();
162  
                w->run();
163  
        }
163  
        }
164  
    }
164  
    }
165  
};
165  
};
166  

166  

167  
//------------------------------------------------------------------------------
167  
//------------------------------------------------------------------------------
168  

168  

169  
thread_pool::
169  
thread_pool::
170  
~thread_pool()
170  
~thread_pool()
171  
{
171  
{
172  
    impl_->join();
172  
    impl_->join();
173  
    shutdown();
173  
    shutdown();
174  
    destroy();
174  
    destroy();
175  
    delete impl_;
175  
    delete impl_;
176  
}
176  
}
177  

177  

178  
thread_pool::
178  
thread_pool::
179  
thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
179  
thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
180  
    : impl_(new impl(num_threads, thread_name_prefix))
180  
    : impl_(new impl(num_threads, thread_name_prefix))
181  
{
181  
{
182  
    this->set_frame_allocator(std::allocator<void>{});
182  
    this->set_frame_allocator(std::allocator<void>{});
183  
}
183  
}
184  

184  

185  
void
185  
void
186  
thread_pool::
186  
thread_pool::
187  
stop() noexcept
187  
stop() noexcept
188  
{
188  
{
189  
    impl_->stop();
189  
    impl_->stop();
190  
}
190  
}
191  

191  

192  
//------------------------------------------------------------------------------
192  
//------------------------------------------------------------------------------
193  

193  

194  
thread_pool::executor_type
194  
thread_pool::executor_type
195  
thread_pool::
195  
thread_pool::
196  
get_executor() const noexcept
196  
get_executor() const noexcept
197  
{
197  
{
198  
    return executor_type(
198  
    return executor_type(
199  
        const_cast<thread_pool&>(*this));
199  
        const_cast<thread_pool&>(*this));
200  
}
200  
}
201  

201  

202  
void
202  
void
203  
thread_pool::executor_type::
203  
thread_pool::executor_type::
204  
post(std::coroutine_handle<> h) const
204  
post(std::coroutine_handle<> h) const
205  
{
205  
{
206  
    pool_->impl_->post(h);
206  
    pool_->impl_->post(h);
207  
}
207  
}
208  

208  

209  
} // capy
209  
} // capy
210  
} // boost
210  
} // boost