diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt index 4915958..2283b40 100644 --- a/src/core/CMakeLists.txt +++ b/src/core/CMakeLists.txt @@ -87,6 +87,13 @@ set(CORE_SOURCES # Date and time ${CMAKE_CURRENT_SOURCE_DIR}/datetime/CTime.cpp + + # Concurrency + ${CMAKE_CURRENT_SOURCE_DIR}/concurrency/ThreadManagement.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/concurrency/SharingData.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/concurrency/RaceCondition.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/concurrency/ConditionVariable.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/concurrency/FuturePromise.cpp ) # Export CORE_SOURCES to the parent CMakeLists.txt diff --git a/src/core/concurrency/ConditionVariable.cpp b/src/core/concurrency/ConditionVariable.cpp new file mode 100644 index 0000000..6888f12 --- /dev/null +++ b/src/core/concurrency/ConditionVariable.cpp @@ -0,0 +1,62 @@ +#include +#include + +#include +#include +#include + +#include "ExampleRegistry.h" + +namespace { + +std::mutex mutex; +std::condition_variable cv; +std::string data; +bool ready = false; +bool finish = false; + +void worker_thread() { + std::unique_lock g_mutex(mutex); + + std::cout << "worker_thread started. Waiting for data \n"; + cv.wait(g_mutex, []() { return ready; }); + std::cout << "worker_thread proccessing data \n"; + data += " after processing"; + finish = true; + cv.notify_one(); +} + +void run() { + std::thread w_thread(worker_thread); + + // send data + std::cout << "main_thread signals data ready for processing\n"; + { + std::unique_lock g_mutex(mutex); + data = "dummy data"; + ready = true; + } + cv.notify_one(); + + // wait for worker + { + std::unique_lock g_mutex(mutex); + cv.wait(g_mutex, []() { return finish; }); + } + std::cout << "main_thread data: " << data << '\n'; + w_thread.join(); +} +} // namespace + +class ConditionVariable : public IExample { + + std::string group() const override { return "core/concurrency"; } + std::string name() const override { return "ConditionVariable"; } + std::string description() const override { + return "The examples for condition variable"; + } + + void execute() override { run(); } +}; + +REGISTER_EXAMPLE(ConditionVariable, "core/concurrency", "ConditionVariable"); \ No newline at end of file diff --git a/src/core/concurrency/FuturePromise.cpp b/src/core/concurrency/FuturePromise.cpp new file mode 100644 index 0000000..d5e856f --- /dev/null +++ b/src/core/concurrency/FuturePromise.cpp @@ -0,0 +1,111 @@ +#include +#include +#include +#include +#include + +#include "ExampleRegistry.h" + +namespace Async { +int async_worker() { + std::cout << "[worker] started, needs 2000 ms\n"; + std::this_thread::sleep_for(std::chrono::seconds(2)); + std::cout << "[worker] finished\n"; + return 1000; +} + +void run() { + std::cout << "\n=== std::promise/std::future example ===\n"; + auto start = std::chrono::steady_clock::now(); + + // launch heavy_work asynchronously and get a future + std::future futur = + std::async(std::launch::async, []() { return async_worker(); }); + + std::cout << "[main] async launched\n"; + for (int i = 1; i <= 4; ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds(400)); + auto now = std::chrono::steady_clock::now(); + auto elapsed = + std::chrono::duration_cast(now - start) + .count(); + + std::cout << "[main] doing other work... " << elapsed << " ms\n"; + } + + // get result + int result = futur.get(); + + auto end = std::chrono::steady_clock::now(); + auto total = + std::chrono::duration_cast(end - start) + .count(); + + std::cout << "[main] result = " << result << '\n'; + std::cout << "[main] total time = " << total << " ms\n"; +} + +} // namespace Async + +namespace Simple { +void promise_worker(std::promise* prom) { + std::cout << "[worker] started, needs 2000 ms\n"; + std::this_thread::sleep_for(std::chrono::seconds(2)); + std::cout << "[worker] finished\n"; + prom->set_value(100); +} + +void run() { + std::cout << "\n=== std::async example ===\n"; + auto start = std::chrono::steady_clock::now(); + + // create a promise and future + std::promise promis; + std::future futur = promis.get_future(); + + // start heavy work async + std::thread thread(promise_worker, &promis); + + std::cout << "[main] async launched\n"; + for (int i = 1; i <= 4; ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds(400)); + auto now = std::chrono::steady_clock::now(); + auto elapsed = + std::chrono::duration_cast(now - start) + .count(); + + std::cout << "[main] doing other work... " << elapsed << " ms\n"; + } + + // get result + int result = futur.get(); + + auto end = std::chrono::steady_clock::now(); + auto total = + std::chrono::duration_cast(end - start) + .count(); + + std::cout << "[main] result = " << result << '\n'; + std::cout << "[main] total time = " << total << " ms\n"; + + thread.join(); +} + +} // namespace Simple + +class FuturePromise : public IExample { + + std::string group() const override { return "core/concurrency"; } + std::string name() const override { return "FuturePromise"; } + std::string description() const override { + return "The examples for "; + } + + void execute() override { + Async::run(); + + Simple::run(); + } +}; + +REGISTER_EXAMPLE(FuturePromise, "core/concurrency", "FuturePromise"); \ No newline at end of file diff --git a/src/core/concurrency/README.md b/src/core/concurrency/README.md new file mode 100644 index 0000000..0df32ac --- /dev/null +++ b/src/core/concurrency/README.md @@ -0,0 +1,228 @@ +## 1. Concurrency +- Concurrency `refers to the ability` `to proccess` `multiple tasks` `at the same time.` +`On a single core`, it uses `context-switching`, `on multi-core systems`, it can run in parallel. +- It's `used to improve` the `program performance` and `response time`. +- In C/C++, we can archive concurrency by using `threads`. + +- `` is a C++ header that provide `a collection of types and functions` to work with time. + +## 2. Thread +- Threads are `the basic unit of` multitasking. +- There are many errors and risks associated with concurrency, including: + - `Deadlocks`: `refers to the situation where` two or more threads are blocked, `waiting for each other indefinitely`. + - `Race condition`: `refers to the situation where` two or more threads access `shared data` concurrently, leading to the `undefined behavior`. + - `Starvation`: `refer to the situation where` a thread `is unable to gain` regular access to the shared resources. +=> We can avoid these problems by `proper synchronization` between the threads. +- Use threads if we need to run long-lived and complex tasks. + +### 2.2. Thead Synchronization +- The synchronization can be done by using the following components: + - `Mutex/Lock`: they are used to protect the shared resouces, ensure that only one thread can access `the critical sections` at a time. + - `Semaphore`: + - `Futures and Promises`: , are used for the asynchronous task execution. + - `Condition variable`: + +### 2.3. Thread Management +- `thread`: an `OS thread` `managed by` the kernel. +- Each `thread` has it own `call stack`, but all `threads` share the heap. +- `thread object`: refers to a C++ instance `that associated with` an `active thread` of execution in hardware level. +- `std::thread(callable)`: request the kernel OS to create a thread. +- `std::this_thread`: refer to the current thread +- `join`: blocks the current thread until the thread that identified by *this (a.ka. object thread) finished its execution. Note that if the exception +is throw before `join`, `std::terminate` might be called, and will kill the entire program process, not an invidual thread. +- `detach`: separates the `thread of the execution` from the `thread object`, allowing the execution to continue running. +- `yield`: give priority to other threads, pause its execution +- Use `return` to kill a thread. + + +### 2.4. Sharing Data +- `Global/Static Variable`: can be accessed by all threads. +- `Pass By Reference`: we need to explicitly wrap the args in `std::ref` to pass by reference and is the only way to properly get data out of a thread +- `thread_local` to create a static variable per thread. + +### 2.5. Atomic +- An atomic type is a type that implements atomic operations. It's used to guarantee no race conditions will occur. +- e.g. +`std::atomic` - `std::atomic_bool` +- e.g. +```cpp +#include +#include +#include +#include + +std::atomic_int acnt; +int cnt; + +void f() +{ + for (auto n{10000}; n; --n) + { + ++acnt; + ++cnt; + // Note: for this example, relaxed memory order is sufficient, + // e.g. acnt.fetch_add(1, std::memory_order_relaxed); + } +} + +int main() +{ + { + std::vector pool; + for (int n = 0; n < 10; ++n) + pool.emplace_back(f); + } + + std::cout << "The atomic counter is " << acnt << '\n' + << "The non-atomic counter is " << cnt << '\n'; +} +``` + +### 2.6. Mutex/Locks +- Mutexs are mutual exclusion objects, are owned by the thread that takes it. +- e.g. +```cpp +#include + +// Create your mutex here +std::mutex my_mutex; + +// +thread_function() +{ + my_mutex.lock(); // Acquire lock + // Do some non-thread safe stuff... + my_mutex.unlock(); // Release lock +} +``` + +- There are serveral types of mutex, including: + - `mutex` + - `timed_mutex` + - `recursive_mutex` + - `recursive_timed_mutex` + - `shared_timed_mutex` + +- `Lock Guard Type` is a wrapper mutex that provides a convinient RAII-style mechanism. +- They are several `lock guard types`, including: + - `std::lock_guard`, `std::scoped_lock` + - `std::unique_lock`, `shared_lock` + +### 2.7. Condition Variable for event handling - +- `std::condition_variable` is a synchronization primitive used with a `std::mutex` to block one or more threads until another thread both modifies a `shared variable` (the condition) and `notifies` the `std::condition_variable`. +- `wait` will releases the lock and blocks the thread until the condition is fullfilled. +- e.g. +```cpp +#include +#include +#include +#include +#include + +std::mutex m; +std::condition_variable cv; +std::string data; +bool ready = false; +bool processed = false; + +void worker_thread() +{ + // 1. wait until main() sends data + std::unique_lock lk(m); + cv.wait(lk, []{ return ready; }); + + // after the wait, we own the lock + std::cout << "Worker thread is processing data\n"; + data += " after processing"; + + // send data back to main() + processed = true; + std::cout << "Worker thread signals data processing completed\n"; + + // manual unlocking is done before notifying, to avoid waking up + // the waiting thread only to block again (see notify_one for details) + lk.unlock(); + cv.notify_one(); +} + +int main() +{ + std::thread worker(worker_thread); + + data = "Example data"; + // send data to the worker thread + { + std::lock_guard lk(m); + ready = true; + std::cout << "main() signals data ready for processing\n"; + } + cv.notify_one(); + + // wait for the worker + { + std::unique_lock lk(m); + cv.wait(lk, []{ return processed; }); + } + std::cout << "Back in main(), data = " << data << '\n'; + + worker.join(); +} + +Output: +main() signals data ready for processing +Worker thread is processing data +Worker thread signals data processing completed +Back in main(), data = Example data after processing +``` + +## 3. Task +- A `task` is a unit of asynchronous work that can be scheduled for execution. +- It is a higher-level abstraction than a thread. +- `It's generally considered` faster to work with tasks `as opposed to` threads, because the runtime can reuse threads and manage scheduling more efficiently. +- We use `task` if we want fairly simple code and don't care for managing threads, and are running shorts tasks. + +### 3.1. Promises and Futures +- `std::future` is a class template that stores a value that will be assigned in the future, and provide a way to access that value. + - It will also block if its value is accessed before the value is assigned. + - `Futures` are the objects that are returned by async operations (`std::async, std::promise, std::packaged_task`) +- `std::shared_future`: works the same way as `std::future`, except it is copytable, so multiple threads are allowed to wait for the same shared state. +- `std::promise` provides a way to store a value or an exception that will be retrieved asynchronously via a `std::future`. + - It is commonly used to pass results between threads. + - It creates a `std::future` using `get_future()`. + - The `std::promise` and the `std::future` share a common shared state. + - The `std::promise` sets the value of the shared state using `set_value()`. + - The associated `std::future` retrieves the value using `get()`. + + +### 3.2. Async +- `Async` is a function template allows we to spawn threads to do work async, then collect the results from them via the `future` mechanism. +- A call to `std::async` returns a `std::future` object. +- The future can later be used to retrieve the result of the asynchronous computation. + ```cpp + auto future = std::async(some_function, arg_1, arg_2); + ``` +- `std::async` may run the function in a `new thread` or `defer execution` until the result is requested. + - `std::launch::async`: launch in a separated thread + - `std::launch::deferred`: only be called on `get()` + - default: defer to system + ```cpp + auto future = std::async(std::launch::async,some_function, arg_1, arg_2); + ``` +- e.g. +```cpp +// Pass in function pointer +auto future = std::async(std::launch::async, some_function, arg_1, arg_2); + +// Pass in function reference +auto future = std::async(std::launch::async, &some_function, arg_1, arg_2); + +// Pass in function object +struct SomeFunctionObject +{ + void operator() (int arg_1){} +}; +auto future = std::async(std::launch::async, SomeFunctionObject(), arg_1); + +// Lambda function +auto future = std::async(std::launch::async, [](){}); +``` \ No newline at end of file diff --git a/src/core/concurrency/RaceCondition.cpp b/src/core/concurrency/RaceCondition.cpp new file mode 100644 index 0000000..c0ed3c8 --- /dev/null +++ b/src/core/concurrency/RaceCondition.cpp @@ -0,0 +1,110 @@ +#include +#include +#include +#include +#include +#include "ExampleRegistry.h" + +namespace Problem { +int glo_var = 0; + +void f1() { + for (size_t i = 0; i < 100000; i++) { + glo_var += 1; + } +} + +void f2() { + for (size_t i = 0; i < 100000; i++) { + glo_var += 1; + } +} + +void run() { + std::cout << "\n---Problem---\n"; + std::cout << "glo_var: " << glo_var << '\n'; + std::thread t1(f1); + std::thread t2(f2); + t1.join(); + t2.join(); + // expect: 200 ? + std::cout << "glo_var: " << glo_var << '\n'; +} +} // namespace Problem + +namespace Mutex { +int glo_var = 0; +std::mutex g_mutex; + +void f1() { + for (size_t i = 0; i < 100000; i++) { + // anti pattern + g_mutex.lock(); + glo_var += 1; + g_mutex.unlock(); + } +} + +void f2() { + for (size_t i = 0; i < 100000; i++) { + // use RAII + std::lock_guard lock(g_mutex); + glo_var += 1; + } +} + +void run() { + std::cout << "\n---Mutex---\n"; + std::cout << "glo_var: " << glo_var << '\n'; + std::thread t1(f1); + std::thread t2(f2); + t1.join(); + t2.join(); + // expect: 200 ? + std::cout << "glo_var: " << glo_var << '\n'; +} +} // namespace Mutex + +namespace Atomic { +std::atomic glo_var = 0; + +void f1() { + for (size_t i = 0; i < 100000; i++) { + glo_var += 1; + } +} + +void f2() { + for (size_t i = 0; i < 100000; i++) { + glo_var += 1; + } +} + +void run() { + std::cout << "\n---Atomic---\n"; + std::cout << "glo_var: " << glo_var << '\n'; + std::thread t1(f1); + std::thread t2(f2); + t1.join(); + t2.join(); + // expect: 200 ? + std::cout << "glo_var: " << glo_var << '\n'; +} +} // namespace Atomic + +class RaceCondition : public IExample { + + std::string group() const override { return "core/concurrency"; } + std::string name() const override { return "RaceCondition"; } + std::string description() const override { + return "The examples for race condition"; + } + + void execute() override { + Problem::run(); + Atomic::run(); + Mutex::run(); + } +}; + +REGISTER_EXAMPLE(RaceCondition, "core/concurrency", "RaceCondition"); \ No newline at end of file diff --git a/src/core/concurrency/SharingData.cpp b/src/core/concurrency/SharingData.cpp new file mode 100644 index 0000000..6026b30 --- /dev/null +++ b/src/core/concurrency/SharingData.cpp @@ -0,0 +1,47 @@ +#include +#include +#include +#include "ExampleRegistry.h" + +namespace { +int glo_var = 99; + +void f1(int arg) { + std::cout << "arg: " << arg << '\n'; + glo_var = 100; +} + +void f2(int& arg) { + // thread_local int thread_loc_var = 1; + std::cout << "arg: " << arg << '\n'; + arg = 100; +} + +void run() { + int loc_var = 10; + std::cout << "loc_var: " << loc_var << '\n'; + std::cout << "glo_var: " << glo_var << '\n'; + + std::thread t1(f1, loc_var); + t1.join(); + std::cout << "loc_var: " << loc_var << '\n'; + std::cout << "glo_var: " << glo_var << '\n'; + + std::thread t2(f2, std::ref(loc_var)); + t2.join(); + std::cout << "loc_var: " << loc_var << '\n'; +} +} // namespace + +class SharingData : public IExample { + + std::string group() const { return "core/concurrency"; } + std::string name() const { return "SharingData"; } + std::string description() const { + return "The examples for sharing data"; + } + + void execute() { run(); } +}; + +REGISTER_EXAMPLE(SharingData, "core/concurrency", "SharingData"); \ No newline at end of file diff --git a/src/core/concurrency/ThreadManagement.cpp b/src/core/concurrency/ThreadManagement.cpp new file mode 100644 index 0000000..f5df7d4 --- /dev/null +++ b/src/core/concurrency/ThreadManagement.cpp @@ -0,0 +1,143 @@ +#include // chrono::millisecond() +#include +#include +#include +#include "ExampleRegistry.h" + +namespace { +void checkJoinable(std::thread& thread) { + if (thread.joinable()) { + std::cout << "Thread Object is joinable.\n"; + } else { + std::cout << "Thread Object is not joinable.\n"; + } +} +} // namespace + +namespace ExceptionBeforeJoin { +class ThreadGuard { + public: + explicit ThreadGuard(std::thread& thread); + ~ThreadGuard(); + + // no copy & move + ThreadGuard operator=(const ThreadGuard& other) = delete; + ThreadGuard(const ThreadGuard& other) = delete; + ThreadGuard& operator=(ThreadGuard&& other) noexcept = delete; + ThreadGuard(ThreadGuard&& other) noexcept = delete; + + private: + std::thread& thread_; +}; + +ThreadGuard::ThreadGuard(std::thread& thread) : thread_{thread} {} + +ThreadGuard::~ThreadGuard() { + if (thread_.joinable()) { + thread_.join(); + } +} + +void callable() { + std::cout << "\ncallable started\n"; + std::this_thread::sleep_for(std::chrono::milliseconds(5000)); + std::cout << "\ncallable finished\n"; +} + +void exceptionThrow() { + throw std::runtime_error("callable runtime exception"); +} + +void run() { + std::cout << "\n---ExceptionBeforeJoin Ex---\n"; + std::thread thread(callable); + + // try { + // exceptionThrow(); + // thread.join(); + // } catch (...) { + // thread.join(); + // } + + ThreadGuard tg(thread); + try { + exceptionThrow(); + } catch (...) {} + std::cout << "\nrun finished\n"; +} + +} // namespace ExceptionBeforeJoin + +namespace Detach { +void foo() { + std::cout << "\nfoo started\n"; + std::this_thread::sleep_for(std::chrono::microseconds(1000)); + std::cout << "\nfoo finished\n"; +} + +void bar() { + std::cout << "\nbar started\n"; + std::this_thread::sleep_for(std::chrono::microseconds(2000)); + std::cout << "\nbar finished\n"; +} + +void run() { + std::cout << "\n---Detach Ex---\n"; + std::cout << "\nrun started\n"; + std::thread foo_thread(foo); + std::thread bar_thread(bar); + + bar_thread.detach(); + + foo_thread.join(); + std::cout << "\nrun finished\n"; +} +} // namespace Detach + +namespace Join { +void callable() { + for (size_t i = 0; i < 10; ++i) { + std::cout << "callable " << i << '\n'; + std::this_thread::sleep_for(std::chrono::milliseconds(5)); // sleep for 5ms + } + std::cout << "\ncallable finished\n"; +} + +void run() { + unsigned int threadsNum = std::thread::hardware_concurrency(); + std::cout << "The number of hardware thread contexts: " << threadsNum << '\n'; + + std::cout << "\n---Join Ex---\n"; + // create a thread object + std::thread user_thread(callable); + + for (size_t i = 0; i < 10; ++i) { + std::cout << "run " << i << '\n'; + std::this_thread::sleep_for(std::chrono::milliseconds(5)); // sleep for 5ms + } + + checkJoinable(user_thread); + + // sync point, block the execution of the caller until the thread executation(callable) finished + user_thread.join(); + + checkJoinable(user_thread); + + std::cout << "\nrun finished\n"; +} +} // namespace Join + +class ThreadManagement : public IExample { + + std::string group() const { return "core/concurrency"; } + std::string name() const { return "ThreadManagement"; } + std::string description() const { return "The examples for "; } + + void execute() { + Join::run(); + Detach::run(); + ExceptionBeforeJoin::run(); + } +}; + +REGISTER_EXAMPLE(ThreadManagement, "core/concurrency", "ThreadManagement"); \ No newline at end of file diff --git a/src/core/concurrency/future_promis.png b/src/core/concurrency/future_promis.png new file mode 100644 index 0000000..73da3f7 Binary files /dev/null and b/src/core/concurrency/future_promis.png differ diff --git a/src/core/concurrency/future_promis_flow.png b/src/core/concurrency/future_promis_flow.png new file mode 100644 index 0000000..1942fba Binary files /dev/null and b/src/core/concurrency/future_promis_flow.png differ diff --git a/src/core/exception/README.md b/src/core/exception/README.md index d36c2c9..7cc4852 100644 --- a/src/core/exception/README.md +++ b/src/core/exception/README.md @@ -9,5 +9,5 @@ - Low/ middle layers: catch and rethrow an exception if they do not have enough context to handle. This way, the exceptions will propagate up the call stack. - Highest layers: let an unhandled exception terminate a program. (`exit(-1)`) -- Resource Acquisition Is Initialization (RAII) +- Resource Acquisition Is Initialization (RAII) (resource lifetime = object lifetime) diff --git a/src/socket/CMakeLists.txt b/src/socket/CMakeLists.txt index be4f185..d807f84 100644 --- a/src/socket/CMakeLists.txt +++ b/src/socket/CMakeLists.txt @@ -5,6 +5,7 @@ set(SOCKET_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/simple_tcp/SimpleTCPServer.cpp ${CMAKE_CURRENT_SOURCE_DIR}/simple_tcp/TCPClient.cpp ${CMAKE_CURRENT_SOURCE_DIR}/simple_tcp/SimpleTCPClient.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/multiple_client/MultiTCPServer.cpp ) set(SOCKET_SOURCES ${SOCKET_SOURCES} PARENT_SCOPE) \ No newline at end of file diff --git a/src/socket/multiple_client/MultiTCPServer.cpp b/src/socket/multiple_client/MultiTCPServer.cpp new file mode 100644 index 0000000..cdefcbc --- /dev/null +++ b/src/socket/multiple_client/MultiTCPServer.cpp @@ -0,0 +1,57 @@ +#include "MultiTCPServer.h" + +#include // sockaddr_in, inet_ntoa +#include // socket, bind, listen, accept +#include // close(server) + +#include +#include + +void MultiTCPServer::acceptLoop() { + while (isRunning()) { + sockaddr_in client_addr{}; + socklen_t client_add_len = sizeof(client_addr); + + int client_fd = + accept(getServerFD(), reinterpret_cast(&client_addr), + &client_add_len); + + if (client_fd < 0) { + if (!isRunning()) + break; + std::cerr << "[MultiTCPServer] accept failed\n"; + continue; + } + + std::cout << "[MultiTCPServer] client " << inet_ntoa(client_addr.sin_addr) + << " connected\n"; + + client_threads_.emplace_back([this, client_fd]() { + try { + handleClient(client_fd); + } catch (const std::exception& e) { + std::cerr << "[MultiTCPServer] client error: " << e.what() << "\n"; + close(client_fd); + } + }); + } + + for (auto& t : client_threads_) { + if (t.joinable()) { + t.join(); + } + } +} + +void MultiTCPServer::run() { + setPort(8080); + + try { + start(); + } catch (const std::exception& e) { + std::cerr << e.what() << '\n'; + stop(); + } +} + +REGISTER_EXAMPLE(MultiTCPServer, "socket/tcp", "MultiTCPServer"); \ No newline at end of file diff --git a/src/socket/multiple_client/MultiTCPServer.h b/src/socket/multiple_client/MultiTCPServer.h new file mode 100644 index 0000000..3be1ad4 --- /dev/null +++ b/src/socket/multiple_client/MultiTCPServer.h @@ -0,0 +1,24 @@ +#pragma once +#include +#include +#include "ExampleRegistry.h" +#include "../simple_tcp/TCPServer.h" + +class MultiTCPServer : public TCPServer, public IExample { + public: + std::string group() const override { return "socket/tcp"; } + + std::string name() const override { return "SimpleTCPServer"; } + + std::string description() const override { + return "Simple TCP server listening on port 8080.\nRun `telnet localhost " + "8080` to connect."; + } + + void execute() override { run(); } + + protected: + void acceptLoop() override; + void run(); + std::vector client_threads_; +}; diff --git a/src/socket/simple_tcp/TCPServer.cpp b/src/socket/simple_tcp/TCPServer.cpp index 1aa5e0b..6ba5efe 100644 --- a/src/socket/simple_tcp/TCPServer.cpp +++ b/src/socket/simple_tcp/TCPServer.cpp @@ -130,7 +130,7 @@ void TCPServer::handleClient(int client_fd) { buffer[bytes] = '\0'; // print out - std::cout << "client: " << buffer; + std::cout << "client " << client_fd << ": " << buffer; sendAll(client_fd, buffer, bytes); @@ -174,4 +174,16 @@ void TCPServer::setPort(uint16_t port) { if (!running_) { port_ = port; } +} + +bool TCPServer::isRunning() const { + return running_; +} + +int TCPServer::getServerFD() const { + return server_fd_; +} + +void TCPServer::setServerFD(int fd) { + server_fd_ = fd; } \ No newline at end of file diff --git a/src/socket/simple_tcp/TCPServer.h b/src/socket/simple_tcp/TCPServer.h index fff399f..2f8c107 100644 --- a/src/socket/simple_tcp/TCPServer.h +++ b/src/socket/simple_tcp/TCPServer.h @@ -6,7 +6,7 @@ class TCPServer { public: explicit TCPServer(uint16_t port = 80); - ~TCPServer(); + virtual ~TCPServer(); void start() noexcept(false); void stop(); @@ -30,11 +30,9 @@ class TCPServer { */ void listenSocket(); - /** - * @brief Loop waiting for client connections - */ - void acceptLoop(); + static void sendAll(int fd, const char* data, size_t len); + protected: /** * @brief Handle communication with a single client * @@ -43,10 +41,17 @@ class TCPServer { */ void handleClient(int client_fd); - static void sendAll(int fd, const char* data, size_t len); + /** + * @brief Loop waiting for client connections + */ + virtual void acceptLoop(); + + bool isRunning() const; + int getServerFD() const; + void setServerFD(int fd); private: uint16_t port_; // TCP ports are in 0 - 65535 int server_fd_{-1}; // socket descriptor - bool running_{false}; + volatile bool running_{false}; }; \ No newline at end of file