Files
core_future/tjp/core/future/Futures.cpp
2026-03-06 09:28:09 -05:00

315 lines
5.8 KiB
C++
Executable File

// License: Modified MIT (NON-AI)
// See the LICENSE file in the root directory for license information.
// Copyright 2026 Timothy Prepscius
#include <tjp/core/header_only/compile.h>
#ifdef TJP_CORE_HEADER_ONLY
#pragma once
#endif
#include "Futures.hpp"
#include <tjp/core/threads/Lock.hpp>
#include <tjp/core/containers/List.hpp>
#include <tjp/core/containers/Vector.hpp>
#include <tjp/core/containers/Optional.hpp>
#include <tjp/core/threads/Atomic.h>
#include <tjp/core/algorithm/vector_erase_if_value.hpp>
#include <tjp/core/assert/debug_assert.h>
#include <tjp/core/containers/Map.hpp>
#include <tjp/core/containers/List.hpp>
#include <tjp/core/debug/Stack.h>
#include <tjp/core/string/String.h>
#include <tjp/core/algorithm/map_erase.hpp>
#include <tjp/core/time/Time.h>
#include <tjp/core/log/Log.h>
#include <tjp/core/log/LogOf.h>
//#define DEBUG_FUTURE_LEAKS_BUT_PROBLEMS_WITH_THIS
#ifdef DEBUG_FUTURE_LEAKS_BUT_PROBLEMS_WITH_THIS
#include <execinfo.h>
#include <sstream>
#endif
namespace tjp {
namespace core {
#ifdef DEBUG_FUTURE_LEAKS_BUT_PROBLEMS_WITH_THIS
static Atomic<size_t> totalOutstandingFutures = 0;
struct StackFrames {
void* trace[64];
int size;
} ;
String toString(StackFrames &f)
{
char** messages = NULL;
std::ostringstream out;
messages = backtrace_symbols( f.trace, f.size );
for( int i = 2; i < f.size; ++i ) {
out << "\t" << messages[i] << std::endl;
}
free (messages);
return out.str();
}
struct Leak {
time::Time then;
StackFrames stack;
WeakPtr<FuturePersist> future;
} ;
Map<void *, Leak> outstandingFutures;
time::Time lastLeaker = 0;
static void debug_log();
static void debug_futureCreated(const StrongPtr<FuturePersist> &future)
{
totalOutstandingFutures++;
auto &leak = outstandingFutures[ptr_of(future)];
leak.then = time::now();
leak.stack.size = backtrace( leak.stack.trace, 16 );
leak.future = weak(future);
sLogRelease("core::futures", logVar(totalOutstandingFutures));
debug_log();
}
static void debug_futureFinished(void *fp)
{
totalOutstandingFutures--;
auto erased_leak = map_erase(outstandingFutures, fp);
debug_assert(erased_leak);
sLogDebug("core::futures", logVar(totalOutstandingFutures));
debug_log();
}
void debug_log()
{
auto now = time::now();
if (now - lastLeaker > 5.0)
{
lastLeaker = now;
for (auto &[p, leak]: outstandingFutures)
{
auto diff = now - leak.then;
if (diff > 10.0)
{
sLogRelease("core::futures::leak", logVar(leak.future.use_count()) << logVar(diff) << logVar(p) << logVar(toString(leak.stack)));
}
}
}
}
#else
#define debug_futureCreated(x)
#define debug_futureFinished(x)
#endif
struct Futures::Internal {
mutable Mutex mutex;
List<StrongPtr<FuturePersist>> futures;
virtual void add(StrongPtr<Futures::Internal> &self, const StrongPtr<FuturePersist> &future)
{
StrongPtr<FuturePersist> r;
{
auto lock = lock_of(mutex);
r = futures.emplace_back(future);
debug_futureCreated(future);
}
r->inject([weak=weak(self)](auto *fp) {
if (auto self = strong(weak))
self->onFuture(fp);
});
}
void onFuture_noLock(void *fp)
{
auto erased = vector_erase_if_value(
futures,
[fp](auto &v) {
return ptr_of(v) == fp;
}
);
debug_assert(erased);
debug_futureFinished(fp);
}
virtual void onFuture(void *fp)
{
auto lock = lock_of(mutex);
onFuture_noLock(fp);
}
virtual void clear ()
{
auto l = lock_of(mutex);
futures.clear();
}
bool empty() const
{
auto l = lock_of(mutex);
return futures.empty();
}
} ;
struct FuturesEvent_Internal : Futures::Internal {
using Super = Futures::Internal;
Event event;
void onFuture(void *fp) override
{
Super::onFuture(fp);
event.notify_all();
}
void clear () override
{
auto l = lock_of(mutex);
futures.clear();
event.notify_all();
}
void wait()
{
while (!empty())
{
Event::Mutex m;
auto l = lock_of(m);
event.wait(l);
}
}
} ;
struct FuturesFuture_Internal : Futures::Internal
{
using Super = Futures::Internal;
bool shouldSetFinished = false;
Promise<void> finished;
void onFuture(void *fp) override
{
Optional<Promise<void>> finished_;
{
auto lock = lock_of(mutex);
Super::onFuture_noLock(fp);
if (shouldSetFinished && futures.empty())
{
// TODO: promises should have a move constructor
finished_.emplace(std::move(finished));
finished = {};
}
}
if (finished_)
finished_->set_value();
}
Future<void> get_future(StrongPtr<Futures::Internal> &self)
{
auto l = lock_of(mutex);
if (futures.empty())
return future_of_value();
shouldSetFinished = true;
return finished.get_future()
.then([self](auto &&f) {
return f.get();
});
}
} ;
// ----------
struct Futures::NoConstructInternal {};
TJP_CORE_HEADER_ONLY_INLINE
Futures::Futures (NoConstructInternal)
{
}
TJP_CORE_HEADER_ONLY_INLINE
Futures::Futures ()
{
internal = strong<Internal>();
}
TJP_CORE_HEADER_ONLY_INLINE
Futures::~Futures ()
{
}
TJP_CORE_HEADER_ONLY_INLINE
void Futures::add(const StrongPtr<FuturePersist> &future)
{
internal->add(internal, future);
}
TJP_CORE_HEADER_ONLY_INLINE
bool Futures::empty() const
{
return internal->empty();
}
TJP_CORE_HEADER_ONLY_INLINE
void Futures::clear()
{
internal->clear();
}
// ------------------
TJP_CORE_HEADER_ONLY_INLINE
FuturesEvent::FuturesEvent() :
Super(Super::NoConstructInternal{})
{
internal = strong<FuturesEvent_Internal>();
}
TJP_CORE_HEADER_ONLY_INLINE
void FuturesEvent::wait ()
{
strong_ptr_cast<FuturesEvent_Internal>(internal)->wait();
}
// ------------------
TJP_CORE_HEADER_ONLY_INLINE
FuturesFuture::FuturesFuture() :
Super(Super::NoConstructInternal{})
{
internal = strong<FuturesFuture_Internal>();
}
TJP_CORE_HEADER_ONLY_INLINE
Future<void> FuturesFuture::get_future()
{
return strong_ptr_cast<FuturesFuture_Internal>(internal)->get_future(internal);
}
} // namespace
} // namespace