From f1348235afb8118e5b02310d84d25b61b691a0a3 Mon Sep 17 00:00:00 2001 From: ronag Date: Sat, 22 Oct 2011 19:11:39 +0000 Subject: [PATCH] --- concrt_extras/agents_extras.h | 3014 +++++++++++++++++++ concrt_extras/concrt_extras.h | 34 + concrt_extras/concurrent_unordered_map.h | 1003 ++++++ concrt_extras/concurrent_unordered_set.h | 918 ++++++ concrt_extras/connect.h | 277 ++ concrt_extras/internal_concurrent_hash.h | 1486 +++++++++ concrt_extras/internal_split_ordered_list.h | 777 +++++ concrt_extras/semaphore.h | 70 + 8 files changed, 7579 insertions(+) create mode 100644 concrt_extras/agents_extras.h create mode 100644 concrt_extras/concrt_extras.h create mode 100644 concrt_extras/concurrent_unordered_map.h create mode 100644 concrt_extras/concurrent_unordered_set.h create mode 100644 concrt_extras/connect.h create mode 100644 concrt_extras/internal_concurrent_hash.h create mode 100644 concrt_extras/internal_split_ordered_list.h create mode 100644 concrt_extras/semaphore.h diff --git a/concrt_extras/agents_extras.h b/concrt_extras/agents_extras.h new file mode 100644 index 000000000..f49b45a1e --- /dev/null +++ b/concrt_extras/agents_extras.h @@ -0,0 +1,3014 @@ +//-------------------------------------------------------------------------- +// +// Copyright (c) Microsoft Corporation. All rights reserved. +// +// File: agents_extras.h +// +// Implementation of various useful message blocks +// +//-------------------------------------------------------------------------- + +#pragma once + +#include + +// bounded_buffer uses a map +#include +#include + +namespace Concurrency +{ + /// + /// Simple queue class for storing messages. + /// + /// + /// The payload type of messages stored in this queue. + /// + template + class MessageQueue + { + public: + typedef message<_Type> _Message; + + /// + /// Constructs an initially empty queue. + /// + MessageQueue() + { + } + + /// + /// Removes and deletes any messages remaining in the queue. + /// + ~MessageQueue() + { + _Message * _Msg = dequeue(); + while (_Msg != NULL) + { + delete _Msg; + _Msg = dequeue(); + } + } + + /// + /// Add an item to the queue. + /// + /// + /// Message to add. + /// + void enqueue(_Message *_Msg) + { + _M_queue.push(_Msg); + } + + /// + /// Dequeue an item from the head of queue. + /// + /// + /// Returns a pointer to the message found at the head of the queue. + /// + _Message * dequeue() + { + _Message * _Msg = NULL; + + if (!_M_queue.empty()) + { + _Msg = _M_queue.front(); + _M_queue.pop(); + } + + return _Msg; + } + + /// + /// Return the item at the head of the queue, without dequeuing + /// + /// + /// Returns a pointer to the message found at the head of the queue. + /// + _Message * peek() const + { + _Message * _Msg = NULL; + + if (!_M_queue.empty()) + { + _Msg = _M_queue.front(); + } + + return _Msg; + } + + /// + /// Returns the number of items currently in the queue. + /// + /// + /// Size of the queue. + /// + size_t count() const + { + return _M_queue.size(); + } + + /// + /// Checks to see if specified msg id is at the head of the queue. + /// + /// + /// Message id to check for. + /// + /// + /// True if a message with specified id is at the head, false otherwise. + /// + bool is_head(const runtime_object_identity _MsgId) const + { + _Message * _Msg = peek(); + if(_Msg != NULL) + { + return _Msg->msg_id() == _MsgId; + } + return false; + } + + private: + + std::queue<_Message *> _M_queue; + }; + + /// + /// Simple queue implementation that takes into account priority + /// using the comparison operator <. + /// + /// + /// The payload type of messages stored in this queue. + /// + template + class PriorityQueue + { + public: + /// + /// Constructs an initially empty queue. + /// + PriorityQueue() : _M_pHead(NULL), _M_count(0) {} + + /// + /// Removes and deletes any messages remaining in the queue. + /// + ~PriorityQueue() + { + message<_Type> * _Msg = dequeue(); + while (_Msg != NULL) + { + delete _Msg; + _Msg = dequeue(); + } + } + + /// + /// Add an item to the queue, comparisons using the 'payload' field + /// will determine the location in the queue. + /// + /// + /// Message to add. + /// + /// + /// True if this new message can be inserted at the head. + /// + void enqueue(message<_Type> *_Msg, const bool fInsertAtHead = true) + { + MessageNode *_Element = new MessageNode(); + _Element->_M_pMsg = _Msg; + + // Find location to insert. + MessageNode *pCurrent = _M_pHead; + MessageNode *pPrev = NULL; + if(!fInsertAtHead && pCurrent != NULL) + { + pPrev = pCurrent; + pCurrent = pCurrent->_M_pNext; + } + while(pCurrent != NULL) + { + if(_Element->_M_pMsg->payload < pCurrent->_M_pMsg->payload) + { + break; + } + pPrev = pCurrent; + pCurrent = pCurrent->_M_pNext; + } + + // Insert at head. + if(pPrev == NULL) + { + _M_pHead = _Element; + } + else + { + pPrev->_M_pNext = _Element; + } + + // Last item in queue. + if(pCurrent == NULL) + { + _Element->_M_pNext = NULL; + } + else + { + _Element->_M_pNext = pCurrent; + } + + ++_M_count; + } + + /// + /// Dequeue an item from the head of queue. + /// + /// + /// Returns a pointer to the message found at the head of the queue. + /// + message<_Type> * dequeue() + { + if (_M_pHead == NULL) + { + return NULL; + } + + MessageNode *_OldHead = _M_pHead; + message<_Type> * _Result = _OldHead->_M_pMsg; + + _M_pHead = _OldHead->_M_pNext; + + delete _OldHead; + + if(--_M_count == 0) + { + _M_pHead = NULL; + } + return _Result; + } + + /// + /// Return the item at the head of the queue, without dequeuing + /// + /// + /// Returns a pointer to the message found at the head of the queue. + /// + message<_Type> * peek() const + { + if(_M_count != 0) + { + return _M_pHead->_M_pMsg; + } + return NULL; + } + + /// + /// Returns the number of items currently in the queue. + /// + /// + /// Size of the queue. + /// + size_t count() const + { + return _M_count; + } + + /// + /// Checks to see if specified msg id is at the head of the queue. + /// + /// + /// Message id to check for. + /// + /// + /// True if a message with specified id is at the head, false otherwise. + /// + bool is_head(const runtime_object_identity _MsgId) const + { + if(_M_count != 0) + { + return _M_pHead->_M_pMsg->msg_id() == _MsgId; + } + return false; + } + + private: + + // Used to store individual message nodes. + struct MessageNode + { + MessageNode() : _M_pMsg(NULL), _M_pNext(NULL) {} + message<_Type> * _M_pMsg; + MessageNode * _M_pNext; + }; + + // A pointer to the head of the queue. + MessageNode * _M_pHead; + + // The number of elements presently stored in the queue. + size_t _M_count; + }; + + /// + /// priority_buffer is a buffer that uses a comparison operator on the 'payload' of each message to determine + /// order when offering to targets. Besides this it acts exactly like an unbounded_buffer. + /// + /// + /// The payload type of messages stored and propagated by the buffer. + /// + template + class priority_buffer : public propagator_block>, multi_link_registry>> + { + public: + + /// + /// Creates an priority_buffer within the default scheduler, and places it any schedule + /// group of the scheduler’s choosing. + /// + priority_buffer() + { + initialize_source_and_target(); + } + + /// + /// Creates an priority_buffer within the default scheduler, and places it any schedule + /// group of the scheduler’s choosing. + /// + /// + /// A reference to a filter function. + /// + priority_buffer(filter_method const& _Filter) + { + initialize_source_and_target(); + register_filter(_Filter); + } + + /// + /// Creates an priority_buffer within the specified scheduler, and places it any schedule + /// group of the scheduler’s choosing. + /// + /// + /// A reference to a scheduler instance. + /// + priority_buffer(Scheduler& _PScheduler) + { + initialize_source_and_target(&_PScheduler); + } + + /// + /// Creates an priority_buffer within the specified scheduler, and places it any schedule + /// group of the scheduler’s choosing. + /// + /// + /// A reference to a scheduler instance. + /// + /// + /// A reference to a filter function. + /// + priority_buffer(Scheduler& _PScheduler, filter_method const& _Filter) + { + initialize_source_and_target(&_PScheduler); + register_filter(_Filter); + } + + /// + /// Creates an priority_buffer within the specified schedule group. The scheduler is implied + /// by the schedule group. + /// + /// + /// A reference to a schedule group. + /// + priority_buffer(ScheduleGroup& _PScheduleGroup) + { + initialize_source_and_target(NULL, &_PScheduleGroup); + } + + /// + /// Creates an priority_buffer within the specified schedule group. The scheduler is implied + /// by the schedule group. + /// + /// + /// A reference to a schedule group. + /// + /// + /// A reference to a filter function. + /// + priority_buffer(ScheduleGroup& _PScheduleGroup, filter_method const& _Filter) + { + initialize_source_and_target(NULL, &_PScheduleGroup); + register_filter(_Filter); + } + + /// + /// Cleans up any resources that may have been created by the priority_buffer. + /// + ~priority_buffer() + { + // Remove all links + remove_network_links(); + } + + /// + /// Add an item to the priority_buffer + /// + /// + /// A reference to the item to add. + /// + /// + /// A boolean indicating whether the data was accepted. + /// + bool enqueue(_Type const& _Item) + { + return Concurrency::send<_Type>(this, _Item); + } + + /// + /// Remove an item from the priority_buffer + /// + /// + /// The message payload. + /// + _Type dequeue() + { + return receive<_Type>(this); + } + + protected: + + /// + /// The main propagate() function for ITarget blocks. Called by a source + /// block, generally within an asynchronous task to send messages to its targets. + /// + /// + /// A pointer to the message. + /// + /// + /// A pointer to the source block offering the message. + /// + /// + /// An indication of what the target decided to do with the message. + /// + /// + /// It is important that calls to propagate do *not* take the same lock on the + /// internal structure that is used by Consume and the LWT. Doing so could + /// result in a deadlock with the Consume call. (in the case of the priority_buffer, + /// this lock is the m_internalLock) + /// + virtual message_status propagate_message(message<_Type> * _PMessage, ISource<_Type> * _PSource) + { + message_status _Result = accepted; + // + // Accept the message being propagated + // Note: depending on the source block propagating the message + // this may not necessarily be the same message (pMessage) first + // passed into the function. + // + _PMessage = _PSource->accept(_PMessage->msg_id(), this); + + if (_PMessage != NULL) + { + async_send(_PMessage); + } + else + { + _Result = missed; + } + + return _Result; + } + + /// + /// Synchronously sends a message to this block. When this function completes the message will + /// already have propagated into the block. + /// + /// + /// A pointer to the message. + /// + /// + /// A pointer to the source block offering the message. + /// + /// + /// An indication of what the target decided to do with the message. + /// + virtual message_status send_message(message<_Type> * _PMessage, ISource<_Type> * _PSource) + { + _PMessage = _PSource->accept(_PMessage->msg_id(), this); + + if (_PMessage != NULL) + { + sync_send(_PMessage); + } + else + { + return missed; + } + + return accepted; + } + + /// + /// Accepts an offered message by the source, transferring ownership to the caller. + /// + /// + /// The runtime object identity of the message. + /// + /// + /// A pointer to the message that the caller now has ownership of. + /// + virtual message<_Type> * accept_message(runtime_object_identity _MsgId) + { + // + // Peek at the head message in the message buffer. If the Ids match + // dequeue and transfer ownership + // + message<_Type> * _Msg = NULL; + + if (_M_messageBuffer.is_head(_MsgId)) + { + _Msg = _M_messageBuffer.dequeue(); + } + + return _Msg; + } + + /// + /// Reserves a message previously offered by the source. + /// + /// + /// The runtime object identity of the message. + /// + /// + /// A Boolean indicating whether the reservation worked or not. + /// + /// + /// After 'reserve' is called, either 'consume' or 'release' must be called. + /// + virtual bool reserve_message(runtime_object_identity _MsgId) + { + // Allow reservation if this is the head message + return _M_messageBuffer.is_head(_MsgId); + } + + /// + /// Consumes a message that was reserved previously. + /// + /// + /// The runtime object identity of the message. + /// + /// + /// A pointer to the message that the caller now has ownership of. + /// + /// + /// Similar to 'accept', but is always preceded by a call to 'reserve'. + /// + virtual message<_Type> * consume_message(runtime_object_identity _MsgId) + { + // By default, accept the message + return accept_message(_MsgId); + } + + /// + /// Releases a previous message reservation. + /// + /// + /// The runtime object identity of the message. + /// + virtual void release_message(runtime_object_identity _MsgId) + { + // The head message is the one reserved. + if (!_M_messageBuffer.is_head(_MsgId)) + { + throw message_not_found(); + } + } + + /// + /// Resumes propagation after a reservation has been released + /// + virtual void resume_propagation() + { + // If there are any messages in the buffer, propagate them out + if (_M_messageBuffer.count() > 0) + { + async_send(NULL); + } + } + + /// + /// Notification that a target was linked to this source. + /// + /// + /// A pointer to the newly linked target. + /// + virtual void link_target_notification(ITarget<_Type> * _PTarget) + { + // If the message queue is blocked due to reservation + // there is no need to do any message propagation + if (_M_pReservedFor != NULL) + { + return; + } + + message<_Type> * _Msg = _M_messageBuffer.peek(); + + if (_Msg != NULL) + { + // Propagate the head message to the new target + message_status _Status = _PTarget->propagate(_Msg, this); + + if (_Status == accepted) + { + // The target accepted the message, restart propagation. + propagate_to_any_targets(NULL); + } + + // If the status is anything other than accepted, then leave + // the message queue blocked. + } + } + + /// + /// Takes the message and propagates it to all the targets of this priority_buffer. + /// + /// + /// A pointer to a new message. + /// + virtual void propagate_to_any_targets(message<_Type> * _PMessage) + { + // Enqueue pMessage to the internal unbounded buffer queue if it is non-NULL. + // _PMessage can be NULL if this LWT was the result of a Repropagate call + // out of a Consume or Release (where no new message is queued up, but + // everything remaining in the priority_buffer needs to be propagated out) + if (_PMessage != NULL) + { + message<_Type> *pPrevHead = _M_messageBuffer.peek(); + + // If a reservation is held make sure to not insert this new + // message before it. + if(_M_pReservedFor != NULL) + { + _M_messageBuffer.enqueue(_PMessage, false); + } + else + { + _M_messageBuffer.enqueue(_PMessage); + } + + // If the head message didn't change, we can safely assume that + // the head message is blocked and waiting on Consume(), Release() or a new + // link_target() + if (pPrevHead != NULL && !_M_messageBuffer.is_head(pPrevHead->msg_id())) + { + return; + } + } + + // Attempt to propagate messages to all the targets + _Propagate_priority_order(); + } + + private: + + /// + /// Attempts to propagate out any messages currently in the block. + /// + void _Propagate_priority_order() + { + message<_Target_type> * _Msg = _M_messageBuffer.peek(); + + // If someone has reserved the _Head message, don't propagate anymore + if (_M_pReservedFor != NULL) + { + return; + } + + while (_Msg != NULL) + { + message_status _Status = declined; + + // Always start from the first target that linked. + for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter) + { + ITarget<_Target_type> * _PTarget = *_Iter; + _Status = _PTarget->propagate(_Msg, this); + + // Ownership of message changed. Do not propagate this + // message to any other target. + if (_Status == accepted) + { + break; + } + + // If the target just propagated to reserved this message, stop + // propagating it to others. + if (_M_pReservedFor != NULL) + { + break; + } + } + + // If status is anything other than accepted, then the head message + // was not propagated out. Thus, nothing after it in the queue can + // be propagated out. Cease propagation. + if (_Status != accepted) + { + break; + } + + // Get the next message + _Msg = _M_messageBuffer.peek(); + } + } + + /// + /// Priority Queue used to store messages. + /// + PriorityQueue<_Type> _M_messageBuffer; + + // + // Hide assignment operator and copy constructor. + // + priority_buffer const &operator =(priority_buffer const&); // no assignment operator + priority_buffer(priority_buffer const &); // no copy constructor + }; + + + /// + /// A bounded_buffer implementation. Once the capacity is reached it will save the offered message + /// id and postpone. Once below capacity again the bounded_buffer will try to reserve and consume + /// any of the postponed messages. Preference is given to previously offered messages before new ones. + /// + /// NOTE: this bounded_buffer implementation contains code that is very unique to this particular block. + /// Extreme caution should be taken if code is directly copy and pasted from this class. The bounded_buffer + /// implementation uses a critical_section, several interlocked operations, and additional calls to async_send. + /// These are needed to not abandon a previously saved message id. Most blocks never have to deal with this problem. + /// + /// + /// The payload type of messages stored and propagated by the buffer. + /// + template + class bounded_buffer : public propagator_block>, multi_link_registry>> + { + public: + /// + /// Creates an bounded_buffer within the default scheduler, and places it any schedule + /// group of the scheduler’s choosing. + /// + bounded_buffer(const size_t capacity) + : _M_capacity(capacity), _M_currentSize(0) + { + initialize_source_and_target(); + } + + /// + /// Creates an bounded_buffer within the default scheduler, and places it any schedule + /// group of the scheduler’s choosing. + /// + /// + /// A reference to a filter function. + /// + bounded_buffer(const size_t capacity, filter_method const& _Filter) + : _M_capacity(capacity), _M_currentSize(0) + { + initialize_source_and_target(); + register_filter(_Filter); + } + + /// + /// Creates an bounded_buffer within the specified scheduler, and places it any schedule + /// group of the scheduler’s choosing. + /// + /// + /// A reference to a scheduler instance. + /// + bounded_buffer(const size_t capacity, Scheduler& _PScheduler) + : _M_capacity(capacity), _M_currentSize(0) + { + initialize_source_and_target(&_PScheduler); + } + + /// + /// Creates an bounded_buffer within the specified scheduler, and places it any schedule + /// group of the scheduler’s choosing. + /// + /// + /// A reference to a scheduler instance. + /// + /// + /// A reference to a filter function. + /// + bounded_buffer(const size_t capacity, Scheduler& _PScheduler, filter_method const& _Filter) + : _M_capacity(capacity), _M_currentSize(0) + { + initialize_source_and_target(&_PScheduler); + register_filter(_Filter); + } + + /// + /// Creates an bounded_buffer within the specified schedule group. The scheduler is implied + /// by the schedule group. + /// + /// + /// A reference to a schedule group. + /// + bounded_buffer(const size_t capacity, ScheduleGroup& _PScheduleGroup) + : _M_capacity(capacity), _M_currentSize(0) + { + initialize_source_and_target(NULL, &_PScheduleGroup); + } + + /// + /// Creates an bounded_buffer within the specified schedule group. The scheduler is implied + /// by the schedule group. + /// + /// + /// A reference to a schedule group. + /// + /// + /// A reference to a filter function. + /// + bounded_buffer(const size_t capacity, ScheduleGroup& _PScheduleGroup, filter_method const& _Filter) + : _M_capacity(capacity), _M_currentSize(0) + { + initialize_source_and_target(NULL, &_PScheduleGroup); + register_filter(_Filter); + } + + /// + /// Cleans up any resources that may have been used by the bounded_buffer. + /// + ~bounded_buffer() + { + // Remove all links + remove_network_links(); + } + + /// + /// Add an item to the bounded_buffer. + /// + /// + /// A reference to the item to add. + /// + /// + /// A boolean indicating whether the data was accepted. + /// + bool enqueue(_Type const& _Item) + { + return Concurrency::send<_Type>(this, _Item); + } + + /// + /// Remove an item from the bounded_buffer. + /// + /// + /// The message payload. + /// + _Type dequeue() + { + return receive<_Type>(this); + } + + protected: + + /// + /// The main propagate() function for ITarget blocks. Called by a source + /// block, generally within an asynchronous task to send messages to its targets. + /// + /// + /// A pointer to the message. + /// + /// + /// A pointer to the source block offering the message. + /// + /// + /// An indication of what the target decided to do with the message. + /// + /// + /// It is important that calls to propagate do *not* take the same lock on the + /// internal structure that is used by Consume and the LWT. Doing so could + /// result in a deadlock with the Consume call. (in the case of the bounded_buffer, + /// this lock is the m_internalLock) + /// + virtual message_status propagate_message(message<_Type> * _PMessage, ISource<_Type> * _PSource) + { + message_status _Result = accepted; + + // Check current capacity. + if((size_t)_InterlockedIncrement(&_M_currentSize) > _M_capacity) + { + // Postpone the message, buffer is full. + _InterlockedDecrement(&_M_currentSize); + _Result = postponed; + + // Save off the message id from this source to later try + // and reserve/consume when more space is free. + { + critical_section::scoped_lock scopedLock(_M_savedIdsLock); + _M_savedSourceMsgIds[_PSource] = _PMessage->msg_id(); + } + + async_send(NULL); + } + else + { + // + // Accept the message being propagated + // Note: depending on the source block propagating the message + // this may not necessarily be the same message (pMessage) first + // passed into the function. + // + _PMessage = _PSource->accept(_PMessage->msg_id(), this); + + if (_PMessage != NULL) + { + async_send(_PMessage); + } + else + { + // Didn't get a message so need to decrement. + _InterlockedDecrement(&_M_currentSize); + _Result = missed; + async_send(NULL); + } + } + + return _Result; + } + + /// + /// Synchronously sends a message to this block. When this function completes the message will + /// already have propagated into the block. + /// + /// + /// A pointer to the message. + /// + /// + /// A pointer to the source block offering the message. + /// + /// + /// An indication of what the target decided to do with the message. + /// + virtual message_status send_message(message<_Type> * _PMessage, ISource<_Type> * _PSource) + { + message_status _Result = accepted; + + // Check current capacity. + if((size_t)_InterlockedIncrement(&_M_currentSize) > _M_capacity) + { + // Postpone the message, buffer is full. + _InterlockedDecrement(&_M_currentSize); + _Result = postponed; + + // Save off the message id from this source to later try + // and reserve/consume when more space is free. + { + critical_section::scoped_lock scopedLock(_M_savedIdsLock); + _M_savedSourceMsgIds[_PSource] = _PMessage->msg_id(); + } + + async_send(NULL); + } + else + { + // + // Accept the message being propagated + // Note: depending on the source block propagating the message + // this may not necessarily be the same message (pMessage) first + // passed into the function. + // + _PMessage = _PSource->accept(_PMessage->msg_id(), this); + + if (_PMessage != NULL) + { + async_send(_PMessage); + } + else + { + // Didn't get a message so need to decrement. + _InterlockedDecrement(&_M_currentSize); + _Result = missed; + async_send(NULL); + } + } + + return _Result; + } + + /// + /// Accepts an offered message by the source, transferring ownership to the caller. + /// + /// + /// The runtime object identity of the message. + /// + /// + /// A pointer to the message that the caller now has ownership of. + /// + virtual message<_Type> * accept_message(runtime_object_identity _MsgId) + { + // + // Peek at the head message in the message buffer. If the Ids match + // dequeue and transfer ownership + // + message<_Type> * _Msg = NULL; + + if (_M_messageBuffer.is_head(_MsgId)) + { + _Msg = _M_messageBuffer.dequeue(); + + // Give preference to any previously postponed messages + // before decrementing current size. + if(!try_consume_msg()) + { + _InterlockedDecrement(&_M_currentSize); + } + } + + return _Msg; + } + + /// + /// Try to reserve and consume a message from list of saved message ids. + /// + /// + /// True if a message was sucessfully consumed, false otherwise. + /// + bool try_consume_msg() + { + runtime_object_identity _ReservedId = -1; + ISource<_Type> * _PSource = NULL; + + // Walk through source links seeing if any saved ids exist. + bool _ConsumedMsg = true; + while(_ConsumedMsg) + { + source_iterator _Iter = _M_connectedSources.begin(); + { + critical_section::scoped_lock scopedLock(_M_savedIdsLock); + for (; *_Iter != NULL; ++_Iter) + { + _PSource = *_Iter; + std::map *, runtime_object_identity>::iterator _MapIter; + if((_MapIter = _M_savedSourceMsgIds.find(_PSource)) != _M_savedSourceMsgIds.end()) + { + _ReservedId = _MapIter->second; + _M_savedSourceMsgIds.erase(_MapIter); + break; + } + } + } + + // Can't call into source block holding _M_savedIdsLock, that would be a recipe for disaster. + if(_ReservedId != -1) + { + if(_PSource->reserve(_ReservedId, this)) + { + message<_Type> * _ConsumedMsg = _PSource->consume(_ReservedId, this); + async_send(_ConsumedMsg); + return true; + } + // Reserve failed go or link was removed, + // go back and try and find a different msg id. + else + { + continue; + } + } + + // If this point is reached the map of source ids was empty. + break; + } + + return false; + } + + /// + /// Reserves a message previously offered by the source. + /// + /// + /// The runtime object identity of the message. + /// + /// + /// A Boolean indicating whether the reservation worked or not. + /// + /// + /// After 'reserve' is called, either 'consume' or 'release' must be called. + /// + virtual bool reserve_message(runtime_object_identity _MsgId) + { + // Allow reservation if this is the head message + return _M_messageBuffer.is_head(_MsgId); + } + + /// + /// Consumes a message that was reserved previously. + /// + /// + /// The runtime object identity of the message. + /// + /// + /// A pointer to the message that the caller now has ownership of. + /// + /// + /// Similar to 'accept', but is always preceded by a call to 'reserve'. + /// + virtual message<_Type> * consume_message(runtime_object_identity _MsgId) + { + // By default, accept the message + return accept_message(_MsgId); + } + + /// + /// Releases a previous message reservation. + /// + /// + /// The runtime object identity of the message. + /// + virtual void release_message(runtime_object_identity _MsgId) + { + // The head message is the one reserved. + if (!_M_messageBuffer.is_head(_MsgId)) + { + throw message_not_found(); + } + } + + /// + /// Resumes propagation after a reservation has been released + /// + virtual void resume_propagation() + { + // If there are any messages in the buffer, propagate them out + if (_M_messageBuffer.count() > 0) + { + async_send(NULL); + } + } + + /// + /// Notification that a target was linked to this source. + /// + /// + /// A pointer to the newly linked target. + /// + virtual void link_target_notification(ITarget<_Type> * _PTarget) + { + // If the message queue is blocked due to reservation + // there is no need to do any message propagation + if (_M_pReservedFor != NULL) + { + return; + } + + message<_Type> * _Msg = _M_messageBuffer.peek(); + + if (_Msg != NULL) + { + // Propagate the head message to the new target + message_status _Status = _PTarget->propagate(_Msg, this); + + if (_Status == accepted) + { + // The target accepted the message, restart propagation. + propagate_to_any_targets(NULL); + } + + // If the status is anything other than accepted, then leave + // the message queue blocked. + } + } + + /// + /// Takes the message and propagates it to all the targets of this bounded_buffer. + /// This is called from async_send. + /// + /// + /// A pointer to a new message. + /// + virtual void propagate_to_any_targets(message<_Type> * _PMessage) + { + // Enqueue pMessage to the internal message buffer if it is non-NULL. + // pMessage can be NULL if this LWT was the result of a Repropagate call + // out of a Consume or Release (where no new message is queued up, but + // everything remaining in the bounded buffer needs to be propagated out) + if (_PMessage != NULL) + { + _M_messageBuffer.enqueue(_PMessage); + + // If the incoming pMessage is not the head message, we can safely assume that + // the head message is blocked and waiting on Consume(), Release() or a new + // link_target() and cannot be propagated out. + if (_M_messageBuffer.is_head(_PMessage->msg_id())) + { + _Propagate_priority_order(); + } + } + else + { + // While current size is less than capacity try to consume + // any previously offered ids. + bool _ConsumedMsg = true; + while(_ConsumedMsg) + { + // Assume a message will be found to successfully consume in the + // saved ids, if not this will be decremented afterwards. + if((size_t)_InterlockedIncrement(&_M_currentSize) > _M_capacity) + { + break; + } + + _ConsumedMsg = try_consume_msg(); + } + + // Decrement the current size, we broke out of the previous loop + // because we reached capacity or there were no more messages to consume. + _InterlockedDecrement(&_M_currentSize); + } + } + + private: + + /// + /// Attempts to propagate out any messages currently in the block. + /// + void _Propagate_priority_order() + { + message<_Target_type> * _Msg = _M_messageBuffer.peek(); + + // If someone has reserved the _Head message, don't propagate anymore + if (_M_pReservedFor != NULL) + { + return; + } + + while (_Msg != NULL) + { + message_status _Status = declined; + + // Always start from the first target that linked. + for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter) + { + ITarget<_Target_type> * _PTarget = *_Iter; + _Status = _PTarget->propagate(_Msg, this); + + // Ownership of message changed. Do not propagate this + // message to any other target. + if (_Status == accepted) + { + break; + } + + // If the target just propagated to reserved this message, stop + // propagating it to others. + if (_M_pReservedFor != NULL) + { + break; + } + } + + // If status is anything other than accepted, then the head message + // was not propagated out. Thus, nothing after it in the queue can + // be propagated out. Cease propagation. + if (_Status != accepted) + { + break; + } + + // Get the next message + _Msg = _M_messageBuffer.peek(); + } + } + + /// + /// Message buffer used to store messages. + /// + MessageQueue<_Type> _M_messageBuffer; + + /// + /// Maximum number of messages bounded_buffer can hold. + /// + const size_t _M_capacity; + + /// + /// Current number of messages in bounded_buffer. + /// + volatile long _M_currentSize; + + /// + /// Lock used to guard saved message ids map. + /// + critical_section _M_savedIdsLock; + + /// + /// Map of source links to saved message ids. + /// + std::map *, runtime_object_identity> _M_savedSourceMsgIds; + + // + // Hide assignment operator and copy constructor + // + bounded_buffer const &operator =(bounded_buffer const&); // no assignment operator + bounded_buffer(bounded_buffer const &); // no copy constructor + }; + + /// + /// A simple alternator, offers messages in order to each target + /// one at a time. If a consume occurs a message won't be offered to that target again + /// until all others are given a chance. This causes messages to be distributed more + /// evenly among targets. + /// + /// + /// The payload type of messages stored and propagated by the buffer. + /// + template + class alternator : public propagator_block>, multi_link_registry>> + { + public: + /// + /// Create an alternator within the default scheduler, and places it any schedule + /// group of the scheduler’s choosing. + /// + alternator() + : _M_indexNextTarget(0) + { + initialize_source_and_target(); + } + + /// + /// Creates an alternator within the default scheduler, and places it any schedule + /// group of the scheduler’s choosing. + /// + /// + /// A reference to a filter function. + /// + alternator(filter_method const& _Filter) + : _M_indexNextTarget(0) + { + initialize_source_and_target(); + register_filter(_Filter); + } + + /// + /// Creates an alternator within the specified scheduler, and places it any schedule + /// group of the scheduler’s choosing. + /// + /// + /// A reference to a scheduler instance. + /// + alternator(Scheduler& _PScheduler) + : _M_indexNextTarget(0) + { + initialize_source_and_target(&_PScheduler); + } + + /// + /// Creates an alternator within the specified scheduler, and places it any schedule + /// group of the scheduler’s choosing. + /// + /// + /// A reference to a scheduler instance. + /// + /// + /// A reference to a filter function. + /// + alternator(Scheduler& _PScheduler, filter_method const& _Filter) + : _M_indexNextTarget(0) + { + initialize_source_and_target(&_PScheduler); + register_filter(_Filter); + } + + /// + /// Creates an alternator within the specified schedule group. The scheduler is implied + /// by the schedule group. + /// + /// + /// A reference to a schedule group. + /// + alternator(ScheduleGroup& _PScheduleGroup) + : _M_indexNextTarget(0) + { + initialize_source_and_target(NULL, &_PScheduleGroup); + } + + /// + /// Creates an alternator within the specified schedule group. The scheduler is implied + /// by the schedule group. + /// + /// + /// A reference to a schedule group. + /// + /// + /// A reference to a filter function. + /// + alternator(ScheduleGroup& _PScheduleGroup, filter_method const& _Filter) + : _M_indexNextTarget(0) + { + initialize_source_and_target(NULL, &_PScheduleGroup); + register_filter(_Filter); + } + + /// + /// Cleans up any resources that may have been created by the alternator. + /// + ~alternator() + { + // Remove all links + remove_network_links(); + } + + protected: + + /// + /// The main propagate() function for ITarget blocks. Called by a source + /// block, generally within an asynchronous task to send messages to its targets. + /// + /// + /// A pointer to the message + /// + /// + /// A pointer to the source block offering the message. + /// + /// + /// An indication of what the target decided to do with the message. + /// + /// + /// It is important that calls to propagate do *not* take the same lock on the + /// internal structure that is used by Consume and the LWT. Doing so could + /// result in a deadlock with the Consume call. (in the case of the alternator, + /// this lock is the m_internalLock) + /// + virtual message_status propagate_message(message<_Type> * _PMessage, ISource<_Type> * _PSource) + { + message_status _Result = accepted; + // + // Accept the message being propagated + // Note: depending on the source block propagating the message + // this may not necessarily be the same message (pMessage) first + // passed into the function. + // + _PMessage = _PSource->accept(_PMessage->msg_id(), this); + + if (_PMessage != NULL) + { + async_send(_PMessage); + } + else + { + _Result = missed; + } + + return _Result; + } + + /// + /// Synchronously sends a message to this block. When this function completes the message will + /// already have propagated into the block. + /// + /// + /// A pointer to the message. + /// + /// + /// A pointer to the source block offering the message. + /// + /// + /// An indication of what the target decided to do with the message. + /// + virtual message_status send_message(message<_Type> * _PMessage, ISource<_Type> * _PSource) + { + _PMessage = _PSource->accept(_PMessage->msg_id(), this); + + if (_PMessage != NULL) + { + sync_send(_PMessage); + } + else + { + return missed; + } + + return accepted; + } + + /// + /// Accepts an offered message by the source, transferring ownership to the caller. + /// + /// + /// The runtime object identity of the message. + /// + /// + /// A pointer to the message that the caller now has ownership of. + /// + virtual message<_Type> * accept_message(runtime_object_identity _MsgId) + { + // + // Peek at the head message in the message buffer. If the Ids match + // dequeue and transfer ownership + // + message<_Type> * _Msg = NULL; + + if (_M_messageBuffer.is_head(_MsgId)) + { + _Msg = _M_messageBuffer.dequeue(); + } + + return _Msg; + } + + /// + /// Reserves a message previously offered by the source. + /// + /// + /// The runtime object identity of the message. + /// + /// + /// A Boolean indicating whether the reservation worked or not. + /// + /// + /// After 'reserve' is called, either 'consume' or 'release' must be called. + /// + virtual bool reserve_message(runtime_object_identity _MsgId) + { + // Allow reservation if this is the head message + return _M_messageBuffer.is_head(_MsgId); + } + + /// + /// Consumes a message that was reserved previously. + /// + /// + /// The runtime object identity of the message. + /// + /// + /// A pointer to the message that the caller now has ownership of. + /// + /// + /// Similar to 'accept', but is always preceded by a call to 'reserve'. + /// + virtual message<_Type> * consume_message(runtime_object_identity _MsgId) + { + // Update so we don't offer to this target again until + // all others have a chance. + target_iterator _CurrentIter = _M_connectedTargets.begin(); + for(size_t i = 0;*_CurrentIter != NULL; ++_CurrentIter, ++i) + { + if(*_CurrentIter == _M_pReservedFor) + { + _M_indexNextTarget = i + 1; + break; + } + } + + // By default, accept the message + return accept_message(_MsgId); + } + + /// + /// Releases a previous message reservation. + /// + /// + /// The runtime object identity of the message. + /// + virtual void release_message(runtime_object_identity _MsgId) + { + // The head message is the one reserved. + if (!_M_messageBuffer.is_head(_MsgId)) + { + throw message_not_found(); + } + } + + /// + /// Resumes propagation after a reservation has been released. + /// + virtual void resume_propagation() + { + // If there are any messages in the buffer, propagate them out + if (_M_messageBuffer.count() > 0) + { + async_send(NULL); + } + } + + /// + /// Notification that a target was linked to this source. + /// + /// + /// A pointer to the newly linked target. + /// + virtual void link_target_notification(ITarget<_Type> * _PTarget) + { + // If the message queue is blocked due to reservation + // there is no need to do any message propagation + if (_M_pReservedFor != NULL) + { + return; + } + + message<_Type> * _Msg = _M_messageBuffer.peek(); + + if (_Msg != NULL) + { + // Propagate the head message to the new target + message_status _Status = _PTarget->propagate(_Msg, this); + + if (_Status == accepted) + { + // The target accepted the message, restart propagation. + propagate_to_any_targets(NULL); + } + + // If the status is anything other than accepted, then leave + // the message queue blocked. + } + } + + /// + /// Takes the message and propagates it to all the targets of this alternator. + /// This is called from async_send. + /// + /// + /// A pointer to a new message. + /// + virtual void propagate_to_any_targets(message<_Type> * _PMessage) + { + // Enqueue pMessage to the internal buffer queue if it is non-NULL. + // pMessage can be NULL if this LWT was the result of a Repropagate call + // out of a Consume or Release (where no new message is queued up, but + // everything remaining in the unbounded buffer needs to be propagated out) + if (_PMessage != NULL) + { + _M_messageBuffer.enqueue(_PMessage); + + // If the incoming pMessage is not the head message, we can safely assume that + // the head message is blocked and waiting on Consume(), Release() or a new + // link_target() + if (!_M_messageBuffer.is_head(_PMessage->msg_id())) + { + return; + } + } + + // Attempt to propagate messages to targets in order last left off. + _Propagate_alternating_order(); + } + + /// + /// Offers messages to targets in alternating order to help distribute messages + /// evenly among targets. + /// + void _Propagate_alternating_order() + { + message<_Target_type> * _Msg = _M_messageBuffer.peek(); + + // If someone has reserved the _Head message, don't propagate anymore + if (_M_pReservedFor != NULL) + { + return; + } + + // + // Try to start where left off before, if the link has been removed + // or this is the first time then start at the beginning. + // + target_iterator _CurrentIter = _M_connectedTargets.begin(); + const target_iterator _FirstLinkIter(_CurrentIter); + for(size_t i = 0;*_CurrentIter != NULL && i < _M_indexNextTarget; ++_CurrentIter, ++i) {} + + while (_Msg != NULL) + { + message_status _Status = declined; + + // Loop offering message until end of links is reached. + target_iterator _StartedIter(_CurrentIter); + for(;*_CurrentIter != NULL; ++_CurrentIter) + { + _Status = (*_CurrentIter)->propagate(_Msg, this); + ++_M_indexNextTarget; + + // Ownership of message changed. Do not propagate this + // message to any other target. + if (_Status == accepted) + { + ++_CurrentIter; + break; + } + + // If the target just propagated to reserved this message, stop + // propagating it to others + if (_M_pReservedFor != NULL) + { + return; + } + } + + // Message ownership changed go to next messages. + if (_Status == accepted) + { + continue; + } + + // Try starting from the beginning until the first link offering was started at. + _M_indexNextTarget = 0; + for(_CurrentIter = _FirstLinkIter;*_CurrentIter != NULL; ++_CurrentIter) + { + // I have offered the same message to all links now so stop. + if(*_CurrentIter == *_StartedIter) + { + break; + } + + _Status = (*_CurrentIter)->propagate(_Msg, this); + ++_M_indexNextTarget; + + // Ownership of message changed. Do not propagate this + // message to any other target. + if (_Status == accepted) + { + ++_CurrentIter; + break; + } + + // If the target just propagated to reserved this message, stop + // propagating it to others + if (_M_pReservedFor != NULL) + { + return; + } + } + + // If status is anything other than accepted, then the head message + // was not propagated out. Thus, nothing after it in the queue can + // be propagated out. Cease propagation. + if (_Status != accepted) + { + break; + } + + // Get the next message + _Msg = _M_messageBuffer.peek(); + } + } + + private: + + /// + /// Message queue used to store messages. + /// + MessageQueue<_Type> _M_messageBuffer; + + /// + /// Index of next target to call propagate on. Used to alternate and load + /// balance message offering. + /// + size_t _M_indexNextTarget; + + // + // Hide assignment operator and copy constructor. + // + alternator const &operator =(alternator const&); // no assignment operator + alternator(alternator const &); // no copy constructor + }; + + #include + + // + // Sample block that combines join and transform. + // + template + class join_transform : public propagator_block>, multi_link_registry>> + { + public: + + typedef std::tr1::function<_Output(std::vector<_Input> const&)> _Transform_method; + + /// + /// Create an join block within the default scheduler, and places it any schedule + /// group of the scheduler’s choosing. + /// + /// + /// The number of inputs this join will be allowed + /// + join_transform(size_t _NumInputs, _Transform_method const& _Func) + : _M_messageArray(_NumInputs, 0), + _M_savedMessageIdArray(_NumInputs, -1), + _M_pFunc(_Func) + { + _Initialize(_NumInputs); + } + + /// + /// Create an join block within the default scheduler, and places it any schedule + /// group of the scheduler’s choosing. + /// + /// + /// The number of inputs this join will be allowed + /// + /// + /// A filter method placed on this join + /// + join_transform(size_t _NumInputs, _Transform_method const& _Func, filter_method const& _Filter) + : _M_messageArray(_NumInputs, 0), + _M_savedMessageIdArray(_NumInputs, -1), + _M_pFunc(_Func) + { + _Initialize(_NumInputs); + register_filter(_Filter); + } + + /// + /// Create an join block within the specified scheduler, and places it any schedule + /// group of the scheduler’s choosing. + /// + /// + /// The scheduler onto which the task's message propagation will be scheduled. + /// + /// + /// The number of inputs this join will be allowed + /// + join_transform(Scheduler& _PScheduler, size_t _NumInputs, _Transform_method const& _Func) + : _M_messageArray(_NumInputs, 0), + _M_savedMessageIdArray(_NumInputs, -1), + _M_pFunc(_Func) + { + _Initialize(_NumInputs, &_PScheduler); + } + + /// + /// Create an join block within the specified scheduler, and places it any schedule + /// group of the scheduler’s choosing. + /// + /// + /// The scheduler onto which the task's message propagation will be scheduled. + /// + /// + /// The number of inputs this join will be allowed + /// + /// + /// A filter method placed on this join + /// + join_transform(Scheduler& _PScheduler, size_t _NumInputs, _Transform_method const& _Func, filter_method const& _Filter) + : _M_messageArray(_NumInputs, 0), + _M_savedMessageIdArray(_NumInputs, -1), + _M_pFunc(_Func) + { + _Initialize(_NumInputs, &_PScheduler); + register_filter(_Filter); + } + + /// + /// Create an join block within the specified schedule group. The scheduler is implied + /// by the schedule group. + /// + /// + /// The ScheduleGroup onto which the task's message propagation will be scheduled. + /// + /// + /// The number of inputs this join will be allowed + /// + join_transform(ScheduleGroup& _PScheduleGroup, size_t _NumInputs, _Transform_method const& _Func) + : _M_messageArray(_NumInputs, 0), + _M_savedMessageIdArray(_NumInputs, -1), + _M_pFunc(_Func) + { + _Initialize(_NumInputs, NULL, &_PScheduleGroup); + } + + /// + /// Create an join block within the specified schedule group. The scheduler is implied + /// by the schedule group. + /// + /// + /// The ScheduleGroup onto which the task's message propagation will be scheduled. + /// + /// + /// The number of inputs this join will be allowed + /// + /// + /// A filter method placed on this join + /// + join_transform(ScheduleGroup& _PScheduleGroup, size_t _NumInputs, _Transform_method const& _Func, filter_method const& _Filter) + : _M_messageArray(_NumInputs, 0), + _M_savedMessageIdArray(_NumInputs, -1), + _M_pFunc(_Func) + { + _Initialize(_NumInputs, NULL, &_PScheduleGroup); + register_filter(_Filter); + } + + /// + /// Destroys a join + /// + ~join_transform() + { + // Remove all links that are targets of this join + remove_network_links(); + + delete [] _M_savedIdBuffer; + } + + protected: + // + // propagator_block protected function implementations + // + + /// + /// The main propagate() function for ITarget blocks. Called by a source + /// block, generally within an asynchronous task to send messages to its targets. + /// + /// + /// The message being propagated + /// + /// + /// The source doing the propagation + /// + /// + /// An indication of what the target decided to do with the message. + /// + /// + /// It is important that calls to propagate do *not* take the same lock on the + /// internal structure that is used by Consume and the LWT. Doing so could + /// result in a deadlock with the Consume call. + /// + message_status propagate_message(message<_Input> * _PMessage, ISource<_Input> * _PSource) + { + message_status _Ret_val = accepted; + + // + // Find the slot index of this source + // + size_t _Slot = 0; + bool _Found = false; + for (source_iterator _Iter = _M_connectedSources.begin(); *_Iter != NULL; ++_Iter) + { + if (*_Iter == _PSource) + { + _Found = true; + break; + } + + _Slot++; + } + + if (!_Found) + { + // If this source was not found in the array, this is not a connected source + // decline the message + return declined; + } + + _ASSERTE(_Slot < _M_messageArray.size()); + + bool fIsGreedy = (_Jtype == greedy); + + if (fIsGreedy) + { + // + // Greedy type joins immediately accept the message. + // + { + critical_section::scoped_lock lockHolder(_M_propagationLock); + if (_M_messageArray[_Slot] != NULL) + { + _M_savedMessageIdArray[_Slot] = _PMessage->msg_id(); + _Ret_val = postponed; + } + } + + if (_Ret_val != postponed) + { + _M_messageArray[_Slot] = _PSource->accept(_PMessage->msg_id(), this); + + if (_M_messageArray[_Slot] != NULL) + { + if (_InterlockedDecrementSizeT(&_M_messagesRemaining) == 0) + { + // If messages have arrived on all links, start a propagation + // of the current message + async_send(NULL); + } + } + else + { + _Ret_val = missed; + } + } + } + else + { + // + // Non-greedy type joins save the message ids until they have all arrived + // + + if (_InterlockedExchange((volatile long *) &_M_savedMessageIdArray[_Slot], _PMessage->msg_id()) == -1) + { + // Decrement the message remaining count if this thread is switching + // the saved id from -1 to a valid value. + if (_InterlockedDecrementSizeT(&_M_messagesRemaining) == 0) + { + async_send(NULL); + } + } + + // Always return postponed. This message will be consumed + // in the LWT + _Ret_val = postponed; + } + + return _Ret_val; + } + + /// + /// Accepts an offered message by the source, transferring ownership to the caller. + /// + /// + /// The runtime object identity of the message. + /// + /// + /// A pointer to the message that the caller now has ownership of. + /// + virtual message<_Output> * accept_message(runtime_object_identity _MsgId) + { + // + // Peek at the head message in the message buffer. If the Ids match + // dequeue and transfer ownership + // + message<_Output> * _Msg = NULL; + + if (_M_messageBuffer.is_head(_MsgId)) + { + _Msg = _M_messageBuffer.dequeue(); + } + + return _Msg; + } + + /// + /// Reserves a message previously offered by the source. + /// + /// + /// The runtime object identity of the message. + /// + /// + /// A bool indicating whether the reservation worked or not + /// + /// + /// After 'reserve' is called, either 'consume' or 'release' must be called. + /// + virtual bool reserve_message(runtime_object_identity _MsgId) + { + // Allow reservation if this is the head message + return _M_messageBuffer.is_head(_MsgId); + } + + /// + /// Consumes a message previously offered by the source and reserved by the target, + /// transferring ownership to the caller. + /// + /// + /// The runtime object identity of the message. + /// + /// + /// A pointer to the message that the caller now has ownership of. + /// + /// + /// Similar to 'accept', but is always preceded by a call to 'reserve' + /// + virtual message<_Output> * consume_message(runtime_object_identity _MsgId) + { + // By default, accept the message + return accept_message(_MsgId); + } + + /// + /// Releases a previous message reservation. + /// + /// + /// The runtime object identity of the message. + /// + virtual void release_message(runtime_object_identity _MsgId) + { + // The head message is the one reserved. + if (!_M_messageBuffer.is_head(_MsgId)) + { + throw message_not_found(); + } + } + + /// + /// Resumes propagation after a reservation has been released + /// + virtual void resume_propagation() + { + // If there are any messages in the buffer, propagate them out + if (_M_messageBuffer.count() > 0) + { + async_send(NULL); + } + } + + /// + /// Notification that a target was linked to this source. + /// + /// + /// A pointer to the newly linked target. + /// + virtual void link_target_notification(ITarget<_Output> *) + { + // If the message queue is blocked due to reservation + // there is no need to do any message propagation + if (_M_pReservedFor != NULL) + { + return; + } + + _Propagate_priority_order(_M_messageBuffer); + } + + /// + /// Takes the message and propagates it to all the target of this join. + /// This is called from async_send. + /// + /// + /// The message being propagated + /// + void propagate_to_any_targets(message<_Output> *) + { + message<_Output> * _Msg = NULL; + // Create a new message from the input sources + // If messagesRemaining == 0, we have a new message to create. Otherwise, this is coming from + // a consume or release from the target. In that case we don't want to create a new message. + if (_M_messagesRemaining == 0) + { + // A greedy join can immediately create the message, a non-greedy + // join must try and consume all the messages it has postponed + _Msg = _Create_new_message(); + } + + if (_Msg == NULL) + { + // Create message failed. This happens in non_greedy joins when the + // reserve/consumption of a postponed message failed. + _Propagate_priority_order(_M_messageBuffer); + return; + } + + bool fIsGreedy = (_Jtype == greedy); + + // For a greedy join, reset the number of messages remaining + // Check to see if multiple messages have been passed in on any of the links, + // and postponed. If so, try and reserve/consume them now + if (fIsGreedy) + { + // Look at the saved ids and reserve/consume any that have passed in while + // this join was waiting to complete + _ASSERTE(_M_messageArray.size() == _M_savedMessageIdArray.size()); + + for (size_t i = 0; i < _M_messageArray.size(); i++) + { + for(;;) + { + runtime_object_identity _Saved_id; + // Grab the current saved id value. This value could be changing from based on any + // calls of source->propagate(this). If the message id is different than what is snapped + // here, that means, the reserve below must fail. This is because reserve is trying + // to get the same source lock the propagate(this) call must be holding. + { + critical_section::scoped_lock lockHolder(_M_propagationLock); + + _ASSERTE(_M_messageArray[i] != NULL); + + _Saved_id = _M_savedMessageIdArray[i]; + + if (_Saved_id == -1) + { + _M_messageArray[i] = NULL; + break; + } + else + { + _M_savedMessageIdArray[i] = -1; + } + } + + if (_Saved_id != -1) + { + source_iterator _Iter = _M_connectedSources.begin(); + + ISource<_Input> * _PSource = _Iter[i]; + if ((_PSource != NULL) && _PSource->reserve(_Saved_id, this)) + { + _M_messageArray[i] = _PSource->consume(_Saved_id, this); + _InterlockedDecrementSizeT(&_M_messagesRemaining); + break; + } + } + } + } + + // If messages have all been received, async_send again, this will start the + // LWT up to create a new message + if (_M_messagesRemaining == 0) + { + async_send(NULL); + } + } + + // Add the new message to the outbound queue + _M_messageBuffer.enqueue(_Msg); + + if (!_M_messageBuffer.is_head(_Msg->msg_id())) + { + // another message is at the head of the outbound message queue and blocked + // simply return + return; + } + + _Propagate_priority_order(_M_messageBuffer); + } + + private: + + // + // Private Methods + // + + /// + /// Propagate messages in priority order + /// + /// + /// Reference to a message queue with messages to be propagated + /// + void _Propagate_priority_order(MessageQueue<_Output> & _MessageBuffer) + { + message<_Output> * _Msg = _MessageBuffer.peek(); + + // If someone has reserved the _Head message, don't propagate anymore + if (_M_pReservedFor != NULL) + { + return; + } + + while (_Msg != NULL) + { + message_status _Status = declined; + + // Always start from the first target that linked + for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter) + { + ITarget<_Output> * _PTarget = *_Iter; + _Status = _PTarget->propagate(_Msg, this); + + // Ownership of message changed. Do not propagate this + // message to any other target. + if (_Status == accepted) + { + break; + } + + // If the target just propagated to reserved this message, stop + // propagating it to others + if (_M_pReservedFor != NULL) + { + break; + } + } + + // If status is anything other than accepted, then the head message + // was not propagated out. Thus, nothing after it in the queue can + // be propagated out. Cease propagation. + if (_Status != accepted) + { + break; + } + + // Get the next message + _Msg = _MessageBuffer.peek(); + } + } + + /// + /// Create a new message from the data output + /// + /// + /// The created message (NULL if creation failed) + /// + message<_Output> * __cdecl _Create_new_message() + { + bool fIsNonGreedy = (_Jtype == non_greedy); + + // If this is a non-greedy join, check each source and try to consume their message + if (fIsNonGreedy) + { + + // The iterator _Iter below will ensure that it is safe to touch + // non-NULL source pointers. Take a snapshot. + std::vector *> _Sources; + source_iterator _Iter = _M_connectedSources.begin(); + + while (*_Iter != NULL) + { + ISource<_Input> * _PSource = *_Iter; + + if (_PSource == NULL) + { + break; + } + + _Sources.push_back(_PSource); + ++_Iter; + } + + if (_Sources.size() != _M_messageArray.size()) + { + // Some of the sources were unlinked. The join is broken + return NULL; + } + + // First, try and reserve all the messages. If a reservation fails, + // then release any reservations that had been made. + for (size_t i = 0; i < _M_savedMessageIdArray.size(); i++) + { + // Snap the current saved id into a buffer. This value can be changing behind the scenes from + // other source->propagate(msg, this) calls, but if so, that just means the reserve below will + // fail. + _InterlockedIncrementSizeT(&_M_messagesRemaining); + _M_savedIdBuffer[i] = _InterlockedExchange((volatile long *) &_M_savedMessageIdArray[i], -1); + + _ASSERTE(_M_savedIdBuffer[i] != -1); + + if (!_Sources[i]->reserve(_M_savedIdBuffer[i], this)) + { + // A reservation failed, release all reservations made up until + // this block, and wait for another message to arrive on this link + for (size_t j = 0; j < i; j++) + { + _Sources[j]->release(_M_savedIdBuffer[j], this); + if (_InterlockedCompareExchange((volatile long *) &_M_savedMessageIdArray[j], _M_savedIdBuffer[j], -1) == -1) + { + if (_InterlockedDecrementSizeT(&_M_messagesRemaining) == 0) + { + async_send(NULL); + } + } + } + + // Return NULL to indicate that the create failed + return NULL; + } + } + + // Since everything has been reserved, consume all the messages. + // This is guaranteed to return true. + for (size_t i = 0; i < _M_messageArray.size(); i++) + { + _M_messageArray[i] = _Sources[i]->consume(_M_savedIdBuffer[i], this); + _M_savedIdBuffer[i] = -1; + } + } + + if (!fIsNonGreedy) + { + // Reinitialize how many messages are being waited for. + // This is safe because all messages have been received, thus no new async_sends for + // greedy joins can be called. + _M_messagesRemaining = _M_messageArray.size(); + } + + std::vector<_Input> _OutputVector; + for (size_t i = 0; i < _M_messageArray.size(); i++) + { + _ASSERTE(_M_messageArray[i] != NULL); + _OutputVector.push_back(_M_messageArray[i]->payload); + + delete _M_messageArray[i]; + if (fIsNonGreedy) + { + _M_messageArray[i] = NULL; + } + } + + _Output _Out = _M_pFunc(_OutputVector); + + return (new message<_Output>(_Out)); + } + + /// + /// Initialize the join block + /// + /// + /// The number of inputs + /// + void _Initialize(size_t _NumInputs, Scheduler * _PScheduler = NULL, ScheduleGroup * _PScheduleGroup = NULL) + { + initialize_source_and_target(_PScheduler, _PScheduleGroup); + + _M_connectedSources.set_bound(_NumInputs); + _M_messagesRemaining = _NumInputs; + + bool fIsNonGreedy = (_Jtype == non_greedy); + + if (fIsNonGreedy) + { + // Non greedy joins need a buffer to snap off saved message ids to. + _M_savedIdBuffer = new runtime_object_identity[_NumInputs]; + memset(_M_savedIdBuffer, -1, sizeof(runtime_object_identity) * _NumInputs); + } + else + { + _M_savedIdBuffer = NULL; + } + } + + // The current number of messages remaining + volatile size_t _M_messagesRemaining; + + // An array containing the accepted messages of this join. + std::vector*> _M_messageArray; + + // An array containing the msg ids of messages propagated to the array + // For greedy joins, this contains a log of other messages passed to this + // join after the first has been accepted + // For non-greedy joins, this contains the message id of any message + // passed to it. + std::vector _M_savedMessageIdArray; + + // The transformer method called by this block + _Transform_method _M_pFunc; + + // Buffer for snapping saved ids in non-greedy joins + runtime_object_identity * _M_savedIdBuffer; + + // A lock for modifying the buffer or the connected blocks + ::Concurrency::critical_section _M_propagationLock; + + // Queue to hold output messages + MessageQueue<_Output> _M_messageBuffer; + }; + + // + // Message block that invokes a transform method when it receives message on any of the input links. + // A typical example is recal engine for a cell in an Excel spreadsheet. + // (Remember that a normal join block is triggered only when it receives messages on all its input links). + // + template + class recalculate : public propagator_block>, multi_link_registry>> + { + public: + typedef std::tr1::function<_Output(std::vector<_Input> const&)> _Recalculate_method; + + /// + /// Create an recalculate block within the default scheduler, and places it any schedule + /// group of the scheduler’s choosing. + /// + /// + /// The number of inputs + /// + recalculate(size_t _NumInputs, _Recalculate_method const& _Func) + : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1), + _M_pFunc(_Func) + { + _Initialize(_NumInputs); + } + + /// + /// Create an recalculate block within the default scheduler, and places it any schedule + /// group of the scheduler’s choosing. + /// + /// + /// The number of inputs + /// + /// + /// A filter method placed on this join + /// + recalculate(size_t _NumInputs, _Recalculate_method const& _Func, filter_method const& _Filter) + : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1), + _M_pFunc(_Func) + { + _Initialize(_NumInputs); + register_filter(_Filter); + } + + /// + /// Create an recalculate block within the specified scheduler, and places it any schedule + /// group of the scheduler’s choosing. + /// + /// + /// The scheduler onto which the task's message propagation will be scheduled. + /// + /// + /// The number of inputs + /// + recalculate(size_t _NumInputs, Scheduler& _PScheduler, _Recalculate_method const& _Func) + : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1), + _M_pFunc(_Func) + { + _Initialize(_NumInputs, &_PScheduler); + } + + /// + /// Create an recalculate block within the specified scheduler, and places it any schedule + /// group of the scheduler’s choosing. + /// + /// + /// The scheduler onto which the task's message propagation will be scheduled. + /// + /// + /// The number of inputs + /// + /// + /// A filter method placed on this join + /// + recalculate(size_t _NumInputs, Scheduler& _PScheduler, _Recalculate_method const& _Func, filter_method const& _Filter) + : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1), + _M_pFunc(_Func) + { + _Initialize(_NumInputs, &_PScheduler); + register_filter(_Filter); + } + + /// + /// Create an recalculate block within the specified schedule group. The scheduler is implied + /// by the schedule group. + /// + /// + /// The ScheduleGroup onto which the task's message propagation will be scheduled. + /// + /// + /// The number of inputs + /// + recalculate(size_t _NumInputs, ScheduleGroup& _PScheduleGroup, _Recalculate_method const& _Func) + : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1), + _M_pFunc(_Func) + { + _Initialize(_NumInputs, NULL, &_PScheduleGroup); + } + + /// + /// Create an recalculate block within the specified schedule group. The scheduler is implied + /// by the schedule group. + /// + /// + /// The ScheduleGroup onto which the task's message propagation will be scheduled. + /// + /// + /// The number of inputs + /// + /// + /// A filter method placed on this join + /// + recalculate(size_t _NumInputs, ScheduleGroup& _PScheduleGroup, _Recalculate_method const& _Func, filter_method const& _Filter) + : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1), + _M_pFunc(_Func) + { + _Initialize(_NumInputs, NULL, &_PScheduleGroup); + register_filter(_Filter); + } + + /// + /// Destroys a join + /// + ~recalculate() + { + // Remove all links that are targets of this join + remove_network_links(); + + delete [] _M_savedIdBuffer; + } + + protected: + // + // propagator_block protected function implementations + // + + /// + /// The main propagate() function for ITarget blocks. Called by a source + /// block, generally within an asynchronous task to send messages to its targets. + /// + /// + /// The message being propagated + /// + /// + /// The source doing the propagation + /// + /// + /// An indication of what the target decided to do with the message. + /// + /// + /// It is important that calls to propagate do *not* take the same lock on the + /// internal structure that is used by Consume and the LWT. Doing so could + /// result in a deadlock with the Consume call. + /// + message_status propagate_message(message<_Input> * _PMessage, ISource<_Input> * _PSource) + { + // + // Find the slot index of this source + // + size_t _Slot = 0; + bool _Found = false; + for (source_iterator _Iter = _M_connectedSources.begin(); *_Iter != NULL; ++_Iter) + { + if (*_Iter == _PSource) + { + _Found = true; + break; + } + + _Slot++; + } + + if (!_Found) + { + // If this source was not found in the array, this is not a connected source + // decline the message + return declined; + } + + // + // Save the message id + // + if (_InterlockedExchange((volatile long *) &_M_savedMessageIdArray[_Slot], _PMessage->msg_id()) == -1) + { + // If it is not seen by Create_message attempt a recalculate + async_send(NULL); + } + + // Always return postponed. This message will be consumed + // in the LWT + return postponed; + } + + /// + /// Accepts an offered message by the source, transferring ownership to the caller. + /// + /// + /// The runtime object identity of the message. + /// + /// + /// A pointer to the message that the caller now has ownership of. + /// + virtual message<_Output> * accept_message(runtime_object_identity _MsgId) + { + // + // Peek at the head message in the message buffer. If the Ids match + // dequeue and transfer ownership + // + message<_Output> * _Msg = NULL; + + if (_M_messageBuffer.is_head(_MsgId)) + { + _Msg = _M_messageBuffer.dequeue(); + } + + return _Msg; + } + + /// + /// Reserves a message previously offered by the source. + /// + /// + /// The runtime object identity of the message. + /// + /// + /// A bool indicating whether the reservation worked or not + /// + /// + /// After 'reserve' is called, either 'consume' or 'release' must be called. + /// + virtual bool reserve_message(runtime_object_identity _MsgId) + { + // Allow reservation if this is the head message + return _M_messageBuffer.is_head(_MsgId); + } + + /// + /// Consumes a message previously offered by the source and reserved by the target, + /// transferring ownership to the caller. + /// + /// + /// The runtime object identity of the message. + /// + /// + /// A pointer to the message that the caller now has ownership of. + /// + /// + /// Similar to 'accept', but is always preceded by a call to 'reserve' + /// + virtual message<_Output> * consume_message(runtime_object_identity _MsgId) + { + // By default, accept the message + return accept_message(_MsgId); + } + + /// + /// Releases a previous message reservation. + /// + /// + /// The runtime object identity of the message. + /// + virtual void release_message(runtime_object_identity _MsgId) + { + // The head message is the one reserved. + if (!_M_messageBuffer.is_head(_MsgId)) + { + throw message_not_found(); + } + } + + /// + /// Resumes propagation after a reservation has been released + /// + virtual void resume_propagation() + { + // If there are any messages in the buffer, propagate them out + if (_M_messageBuffer.count() > 0) + { + async_send(NULL); + } + } + + /// + /// Notification that a target was linked to this source. + /// + /// + /// A pointer to the newly linked target. + /// + virtual void link_target_notification(ITarget<_Output> *) + { + // If the message queue is blocked due to reservation + // there is no need to do any message propagation + if (_M_pReservedFor != NULL) + { + return; + } + + _Propagate_priority_order(_M_messageBuffer); + } + + /// + /// Takes the message and propagates it to all the target of this join. + /// This is called from async_send. + /// + /// + /// The message being propagated + /// + void propagate_to_any_targets(message<_Output> *) + { + // Attempt to create a new message + message<_Output> * _Msg = _Create_new_message(); + + if (_Msg != NULL) + { + // Add the new message to the outbound queue + _M_messageBuffer.enqueue(_Msg); + + if (!_M_messageBuffer.is_head(_Msg->msg_id())) + { + // another message is at the head of the outbound message queue and blocked + // simply return + return; + } + } + + _Propagate_priority_order(_M_messageBuffer); + } + + private: + + // + // Private Methods + // + + /// + /// Propagate messages in priority order + /// + /// + /// Reference to a message queue with messages to be propagated + /// + void _Propagate_priority_order(MessageQueue<_Output> & _MessageBuffer) + { + message<_Output> * _Msg = _MessageBuffer.peek(); + + // If someone has reserved the _Head message, don't propagate anymore + if (_M_pReservedFor != NULL) + { + return; + } + + while (_Msg != NULL) + { + message_status _Status = declined; + + // Always start from the first target that linked + for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter) + { + ITarget<_Output> * _PTarget = *_Iter; + _Status = _PTarget->propagate(_Msg, this); + + // Ownership of message changed. Do not propagate this + // message to any other target. + if (_Status == accepted) + { + break; + } + + // If the target just propagated to reserved this message, stop + // propagating it to others + if (_M_pReservedFor != NULL) + { + break; + } + } + + // If status is anything other than accepted, then the head message + // was not propagated out. Thus, nothing after it in the queue can + // be propagated out. Cease propagation. + if (_Status != accepted) + { + break; + } + + // Get the next message + _Msg = _MessageBuffer.peek(); + } + } + + /// + /// Create a new message from the data output + /// + /// + /// The created message (NULL if creation failed) + /// + message<_Output> * __cdecl _Create_new_message() + { + // If this is a non-greedy join, check each source and try to consume their message + size_t _NumInputs = _M_savedMessageIdArray.size(); + + // The iterator _Iter below will ensure that it is safe to touch + // non-NULL source pointers. Take a snapshot. + std::vector *> _Sources; + source_iterator _Iter = _M_connectedSources.begin(); + + while (*_Iter != NULL) + { + ISource<_Input> * _PSource = *_Iter; + + if (_PSource == NULL) + { + break; + } + + _Sources.push_back(_PSource); + ++_Iter; + } + + if (_Sources.size() != _NumInputs) + { + // Some of the sources were unlinked. The join is broken + return NULL; + } + + // First, try and reserve all the messages. If a reservation fails, + // then release any reservations that had been made. + for (size_t i = 0; i < _M_savedMessageIdArray.size(); i++) + { + // Swap the id to -1 indicating that we have used that value for a recalculate + _M_savedIdBuffer[i] = _InterlockedExchange((volatile long *) &_M_savedMessageIdArray[i], -1); + + // If the id is -1, either we have never received a message on that link or the previous message is stored + // in the message array. If it is the former we abort. + // If the id is not -1, we attempt to reserve the message. On failure we abort. + if (((_M_savedIdBuffer[i] == -1) && (_M_messageArray[i] == NULL)) + || ((_M_savedIdBuffer[i] != -1) && !_Sources[i]->reserve(_M_savedIdBuffer[i], this))) + { + // Abort. Release all reservations made up until this block, + // and wait for another message to arrive. + for (size_t j = 0; j < i; j++) + { + if (_M_savedIdBuffer[j] != -1) + { + _Sources[j]->release(_M_savedIdBuffer[j], this); + _InterlockedCompareExchange((volatile long *) &_M_savedMessageIdArray[j], _M_savedIdBuffer[j], -1); + } + } + + // Return NULL to indicate that the create failed + return NULL; + } + } + + // Since everything has been reserved, consume all the messages. + // This is guaranteed to return true. + size_t _NewMessages = 0; + for (size_t i = 0; i < _NumInputs; i++) + { + if (_M_savedIdBuffer[i] != -1) + { + // Delete previous message since we have a new one + if (_M_messageArray[i] != NULL) + { + delete _M_messageArray[i]; + } + _M_messageArray[i] = _Sources[i]->consume(_M_savedIdBuffer[i], this); + _M_savedIdBuffer[i] = -1; + _NewMessages++; + } + } + + if (_NewMessages == 0) + { + // There is no need to recal if we did not consume a new message + return NULL; + } + + std::vector<_Input> _OutputVector; + for (size_t i = 0; i < _NumInputs; i++) + { + _ASSERTE(_M_messageArray[i] != NULL); + _OutputVector.push_back(_M_messageArray[i]->payload); + } + + _Output _Out = _M_pFunc(_OutputVector); + return (new message<_Output>(_Out)); + } + + /// + /// Initialize the recalculate block + /// + /// + /// The number of inputs + /// + void _Initialize(size_t _NumInputs, Scheduler * _PScheduler = NULL, ScheduleGroup * _PScheduleGroup = NULL) + { + initialize_source_and_target(_PScheduler, _PScheduleGroup); + + _M_connectedSources.set_bound(_NumInputs); + + // Non greedy joins need a buffer to snap off saved message ids to. + _M_savedIdBuffer = new runtime_object_identity[_NumInputs]; + memset(_M_savedIdBuffer, -1, sizeof(runtime_object_identity) * _NumInputs); + } + + // An array containing the accepted messages of this join. + std::vector*> _M_messageArray; + + // An array containing the msg ids of messages propagated to the array + // For non-greedy joins, this contains the message id of any message + // passed to it. + std::vector _M_savedMessageIdArray; + + // Buffer for snapping saved ids in non-greedy joins + runtime_object_identity * _M_savedIdBuffer; + + // The transformer method called by this block + _Recalculate_method _M_pFunc; + + // Queue to hold output messages + MessageQueue<_Output> _M_messageBuffer; + }; + + // + // Container class to hold a join_transform block and keep unbounded buffers in front of each input. + // + template + class buffered_join + { + typedef std::tr1::function<_Output(std::vector<_Input> const&)> _Transform_method; + + public: + buffered_join(int _NumInputs, _Transform_method const& _Func): m_currentInput(0), m_numInputs(_NumInputs) + { + m_buffers = new unbounded_buffer<_Input>*[_NumInputs]; + m_join = new join_transform<_Input,_Output,greedy>(_NumInputs, _Func); + + for(int i = 0; i < _NumInputs; i++) + { + m_buffers[i] = new unbounded_buffer<_Input>; + m_buffers[i]->link_target(m_join); + } + } + + ~buffered_join() + { + for(int i = 0; i < m_numInputs; i++) + delete m_buffers[i]; + delete [] m_buffers; + delete m_join; + } + + // Add input takes _PSource and connects it to the next input on this block + void add_input(ISource<_Input> * _PSource) + { + _PSource->link_target(m_buffers[m_currentInput]); + m_currentInput++; + } + + // link_target links this container block to _PTarget + void link_target(ITarget<_Output> * _PTarget) + { + m_join->link_target(_PTarget); + } + private: + + int m_currentInput; + int m_numInputs; + unbounded_buffer<_Input> ** m_buffers; + join_transform<_Input,_Output,greedy> * m_join; + }; +} // namespace Concurrency diff --git a/concrt_extras/concrt_extras.h b/concrt_extras/concrt_extras.h new file mode 100644 index 000000000..c0a78188d --- /dev/null +++ b/concrt_extras/concrt_extras.h @@ -0,0 +1,34 @@ +//-------------------------------------------------------------------------- +// +// Copyright (c) Microsoft Corporation. All rights reserved. +// +// File: concrt_extras.h +// +// Implementation of ConcRT helpers +// +//-------------------------------------------------------------------------- + +#pragma once + +#include +#include + +namespace Concurrency +{ + /// + /// An RAII style wrapper around Concurrency::Context::Oversubscribe, + /// useful for annotating known blocking calls + /// + class scoped_oversubcription_token + { + public: + scoped_oversubcription_token() + { + Concurrency::Context::CurrentContext()->Oversubscribe(true); + } + ~scoped_oversubcription_token() + { + Concurrency::Context::CurrentContext()->Oversubscribe(false); + } + }; +} \ No newline at end of file diff --git a/concrt_extras/concurrent_unordered_map.h b/concrt_extras/concurrent_unordered_map.h new file mode 100644 index 000000000..bde00a1b9 --- /dev/null +++ b/concrt_extras/concurrent_unordered_map.h @@ -0,0 +1,1003 @@ +/*** +* ==++== +* +* Copyright (c) Microsoft Corporation. All rights reserved. +* +* ==--== +* =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +* +* concurrent_unordered_map.h +* +* =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- +****/ +#pragma once + +#include +#include "internal_concurrent_hash.h" + +#if !(defined(_M_AMD64) || defined(_M_IX86)) + #error ERROR: Concurrency Runtime is supported only on X64 and X86 architectures. +#endif + +#if defined(_M_CEE) + #error ERROR: Concurrency Runtime is not supported when compiling /clr. +#endif + +#pragma pack(push,_CRT_PACKING) + +namespace Concurrency +{ +namespace details +{ +// Template class for hash map traits +template +class _Concurrent_unordered_map_traits : public std::_Container_base +{ +public: + typedef std::pair<_Key_type, _Element_type> _Value_type; + typedef std::pair value_type; + typedef _Key_type key_type; + typedef _Key_comparator key_compare; + + typedef typename _Allocator_type::template rebind::other allocator_type; + + enum + { + _M_allow_multimapping = _Allow_multimapping + }; + + _Concurrent_unordered_map_traits() : _M_comparator() + { + } + + _Concurrent_unordered_map_traits(const key_compare& _Traits) : _M_comparator(_Traits) + { + } + + class value_compare : public std::binary_function + { + friend class _Concurrent_unordered_map_traits<_Key_type, _Element_type, _Key_comparator, _Allocator_type, _Allow_multimapping>; + + public: + bool operator()(const value_type& _Left, const value_type& _Right) const + { + return (_M_comparator(_Left.first, _Right.first)); + } + + value_compare(const key_compare& _Traits) : _M_comparator(_Traits) + { + } + + protected: + key_compare _M_comparator; // the comparator predicate for keys + }; + + template + static const _Type1& _Key_function(const std::pair<_Type1, _Type2>& _Value) + { + return (_Value.first); + } + key_compare _M_comparator; // the comparator predicate for keys +}; +} // namespace details; + +/// +/// The concurrent_unordered_map class is an concurrency-safe container that controls a varying-length sequence of +/// elements of type std::pair. The sequence is represented in a way that enables +/// concurrency-safe append, element access, iterator access and iterator traversal operations. +/// +/// +/// The key type. +/// +/// +/// The mapped type. +/// +/// +/// The hash function object type. This argument is optional and the default value is +/// tr1::hash<>. +/// +/// +/// The equality comparison function object type. This argument is optional and the default value is +/// equal_to<>. +/// +/// +/// The type that represents the stored allocator object that encapsulates details about the allocation and +/// deallocation of memory for the concurrent vector. This argument is optional and the default value is +/// allocator<, >. +/// +/// +/// For detailed information on the concurrent_unordered_map class, see . +/// +/// +/**/ +template , typename _Key_equality = std::equal_to<_Key_type>, typename _Allocator_type = std::allocator > > +class concurrent_unordered_map : public details::_Concurrent_hash< details::_Concurrent_unordered_map_traits<_Key_type, _Element_type, details::_Hash_compare<_Key_type, _Hasher, _Key_equality>, _Allocator_type, false> > +{ +public: + // Base type definitions + typedef concurrent_unordered_map<_Key_type, _Element_type, _Hasher, _Key_equality, _Allocator_type> _Mytype; + typedef details::_Hash_compare<_Key_type, _Hasher, _Key_equality> _Mytraits; + typedef details::_Concurrent_hash< details::_Concurrent_unordered_map_traits<_Key_type, _Element_type, _Mytraits, _Allocator_type, false> > _Mybase; + + // Type definitions + typedef _Key_type key_type; + typedef typename _Mybase::value_type value_type; + typedef _Element_type mapped_type; + typedef _Hasher hasher; + typedef _Key_equality key_equal; + typedef _Mytraits key_compare; + + typedef typename _Mybase::allocator_type allocator_type; + typedef typename _Mybase::pointer pointer; + typedef typename _Mybase::const_pointer const_pointer; + typedef typename _Mybase::reference reference; + typedef typename _Mybase::const_reference const_reference; + + typedef typename _Mybase::size_type size_type; + typedef typename _Mybase::difference_type difference_type; + + typedef typename _Mybase::iterator iterator; + typedef typename _Mybase::const_iterator const_iterator; + typedef typename _Mybase::iterator local_iterator; + typedef typename _Mybase::const_iterator const_local_iterator; + + /// + /// Constructs a concurrent unordered map. + /// + /// + /// The initial number of buckets for this unordered map. + /// + /// + /// The hash function for this unordered map. + /// + /// + /// The equality comparison function for this unordered map. + /// + /// + /// The allocator for this unordered map. + /// + /// + /// All constructors store an allocator object and initialize the unordered map. + /// The first constructor specifies an empty initial map and explicitly specifies the number of buckets, + /// hash function, equality function and allocator type to be used. + /// The second constructor specifies an allocator for the unordered map. + /// The third constructor specifies values supplied by the iterator range [, ). + /// The fourth and fifth constructors specify a copy of the concurrent unordered map . + /// The last constructor specifies a move of the concurrent unordered map . + /// + /**/ + explicit concurrent_unordered_map(size_type _Number_of_buckets = 8, const hasher& _Hasher = hasher(), const key_equal& _Key_equality = key_equal(), + const allocator_type& _Allocator = allocator_type()) + : _Mybase(_Number_of_buckets, key_compare(_Hasher, _Key_equality), _Allocator) + { + this->rehash(_Number_of_buckets); + } + + /// + /// Constructs a concurrent unordered map. + /// + /// + /// The allocator for this unordered map. + /// + /// + /// All constructors store an allocator object and initialize the unordered map. + /// The first constructor specifies an empty initial map and explicitly specifies the number of buckets, + /// hash function, equality function and allocator type to be used. + /// The second constructor specifies an allocator for the unordered map. + /// The third constructor specifies values supplied by the iterator range [, ). + /// The fourth and fifth constructors specify a copy of the concurrent unordered map . + /// The last constructor specifies a move of the concurrent unordered map . + /// + /**/ + concurrent_unordered_map(const allocator_type& _Allocator) : _Mybase(8, key_compare(), _Allocator) + { + } + + /// + /// Constructs a concurrent unordered map. + /// + /// + /// The type of the input iterator. + /// + /// + /// Position of the first element in the range of elements to be copied. + /// + /// + /// Position of the first element beyond the range of elements to be copied. + /// + /// + /// The initial number of buckets for this unordered map. + /// + /// + /// The hash function for this unordered map. + /// + /// + /// The equality comparison function for this unordered map. + /// + /// + /// The allocator for this unordered map. + /// + /// + /// All constructors store an allocator object and initialize the unordered map. + /// The first constructor specifies an empty initial map and explicitly specifies the number of buckets, + /// hash function, equality function and allocator type to be used. + /// The second constructor specifies an allocator for the unordered map. + /// The third constructor specifies values supplied by the iterator range [, ). + /// The fourth and fifth constructors specify a copy of the concurrent unordered map . + /// The last constructor specifies a move of the concurrent unordered map . + /// + /**/ + template + concurrent_unordered_map(_Iterator _Begin, _Iterator _End, size_type _Number_of_buckets = 8, const hasher& _Hasher = hasher(), + const key_equal& _Key_equality = key_equal(), const allocator_type& _Allocator = allocator_type()) + : _Mybase(_Number_of_buckets, key_compare(), allocator_type()) + { + this->rehash(_Number_of_buckets); + for (; _Begin != _End; ++_Begin) + { + _Mybase::insert(*_Begin); + } + } + + /// + /// Constructs a concurrent unordered map. + /// + /// + /// The source concurrent_unordered_map object to copy or move elements from. + /// + /// + /// All constructors store an allocator object and initialize the unordered map. + /// The first constructor specifies an empty initial map and explicitly specifies the number of buckets, + /// hash function, equality function and allocator type to be used. + /// The second constructor specifies an allocator for the unordered map. + /// The third constructor specifies values supplied by the iterator range [, ). + /// The fourth and fifth constructors specify a copy of the concurrent unordered map . + /// The last constructor specifies a move of the concurrent unordered map . + /// + /**/ + concurrent_unordered_map(const concurrent_unordered_map& _Umap) : _Mybase(_Umap) + { + } + + /// + /// Constructs a concurrent unordered map. + /// + /// + /// The source concurrent_unordered_map object to copy or move elements from. + /// + /// + /// The allocator for this unordered map. + /// + /// + /// All constructors store an allocator object and initialize the unordered map. + /// The first constructor specifies an empty initial map and explicitly specifies the number of buckets, + /// hash function, equality function and allocator type to be used. + /// The second constructor specifies an allocator for the unordered map. + /// The third constructor specifies values supplied by the iterator range [, ). + /// The fourth and fifth constructors specify a copy of the concurrent unordered map . + /// The last constructor specifies a move of the concurrent unordered map . + /// + /**/ + concurrent_unordered_map(const concurrent_unordered_map& _Umap, const allocator_type& _Allocator) : _Mybase(_Umap, _Allocator) + { + } + + /// + /// Constructs a concurrent unordered map. + /// + /// + /// The source concurrent_unordered_map object to copy or move elements from. + /// + /// + /// All constructors store an allocator object and initialize the unordered map. + /// The first constructor specifies an empty initial map and explicitly specifies the number of buckets, + /// hash function, equality function and allocator type to be used. + /// The second constructor specifies an allocator for the unordered map. + /// The third constructor specifies values supplied by the iterator range [, ). + /// The fourth and fifth constructors specify a copy of the concurrent unordered map . + /// The last constructor specifies a move of the concurrent unordered map . + /// + /**/ + concurrent_unordered_map(concurrent_unordered_map&& _Umap) : _Mybase(std::move(_Umap)) + { + } + + /// + /// Assigns the contents of another concurrent_unordered_map object to this one. This method is not concurrency-safe. + /// + /// + /// The source concurrent_unordered_map object. + /// + /// + /// A reference to this concurrent_unordered_map object. + /// + /// + /// After erasing any existing elements a concurrent vector, operator= either copies or moves the contents of into + /// the concurrent vector. + /// + /**/ + concurrent_unordered_map& operator=(const concurrent_unordered_map& _Umap) + { + _Mybase::operator=(_Umap); + return (*this); + } + + /// + /// Assigns the contents of another concurrent_unordered_map object to this one. This method is not concurrency-safe. + /// + /// + /// The source concurrent_unordered_map object. + /// + /// + /// A reference to this concurrent_unordered_map object. + /// + /// + /// After erasing any existing elements in a concurrent vector, operator= either copies or moves the contents of into + /// the concurrent vector. + /// + /**/ + concurrent_unordered_map& operator=(concurrent_unordered_map&& _Umap) + { + _Mybase::operator=(std::move(_Umap)); + return (*this); + } + + /// + /// Erases elements from the concurrent_unordered_map. This method is not concurrency-safe. + /// + /// + /// The iterator position to erase from. + /// + /// + /// The first function erases an element from the map given an iterator position. + /// The second function erases an element matching a key + /// The third function erases elements given an iterator begin and end position + /// + /// + /// The iterator for this concurrent_unordered_map object. + /// + /**/ + iterator unsafe_erase(const_iterator _Where) + { + return _Mybase::unsafe_erase(_Where); + } + + /// + /// Erases elements from the concurrent_unordered_map. This method is not concurrency-safe. + /// + /// + /// The key to erase. + /// + /// + /// The first function erases an element from the map given an iterator position. + /// The second function erases an element matching a key + /// The third function erases elements given an iterator begin and end position + /// + /// + /// The count of elements erased from this concurrent_unordered_map object. + /// + /**/ + size_type unsafe_erase(const key_type& _Keyval) + { + return _Mybase::unsafe_erase(_Keyval); + } + + /// + /// Erases elements from the concurrent_unordered_map. This method is not concurrency-safe. + /// + /// + /// Position of the first element in the range of elements to be erased. + /// + /// + /// Position of the first element beyond the range of elements to be erased. + /// + /// + /// The first function erases an element from the map given an iterator position. + /// The second function erases an element matching a key + /// The third function erases elements given an iterator begin and end position + /// + /// + /// The iterator for this concurrent_unordered_map object. + /// + /**/ + iterator unsafe_erase(const_iterator _Begin, const_iterator _End) + { + return _Mybase::unsafe_erase(_Begin, _End); + } + + /// + /// Swaps the contents of two concurrent_unordered_map objects. + /// This method is not concurrency-safe. + /// + /// + /// The concurrent_unordered_map object to swap with. + /// + /**/ + void swap(concurrent_unordered_map& _Umap) + { + _Mybase::swap(_Umap); + } + + // Observers + /// + /// The hash function object. + /// + /**/ + hasher hash_function() const + { + return _M_comparator._M_hash_object; + } + + /// + /// The equality comparison function object. + /// + /**/ + key_equal key_eq() const + { + return _M_comparator._M_key_compare_object; + } + + /// + /// Provides access to the element at the given key in the concurrent unordered map. This method is concurrency-safe. + /// + /// + /// The key of the element to be retrieved. + /// + /// + /// A element mapped to by the key. + /// + /**/ + mapped_type& operator[](const key_type& _Keyval) + { + iterator _Where = find(_Keyval); + + if (_Where == end()) + { + _Where = insert(std::pair(std::move(_Keyval), mapped_type())).first; + } + + return ((*_Where).second); + } + + /// + /// Provides access to the element at the given key in the concurrent unordered map. This method is concurrency-safe. + /// + /// + /// The key of the element to be retrieved. + /// + /// + /// A element mapped to by the key. + /// + /**/ + mapped_type& at(const key_type& _Keyval) + { + iterator _Where = find(_Keyval); + + if (_Where == end()) + { + throw std::out_of_range("invalid concurrent_unordered_map key"); + } + + return ((*_Where).second); + } + + /// + /// Provides read access to the element at the given key in the concurrent unordered map. This method is concurrency-safe. + /// + /// + /// The key of the element to be retrieved. + /// + /// + /// A element mapped to by the key. + /// + /**/ + const mapped_type& at(const key_type& _Keyval) const + { + const_iterator _Where = find(_Keyval); + + if (_Where == end()) + { + throw std::out_of_range("invalid concurrent_unordered_map key"); + } + + return ((*_Where).second); + } +}; + +/// +/// The concurrent_unordered_multimap class is an concurrency-safe container that controls a varying-length sequence of +/// elements of type std::pair. The sequence is represented in a way that enables +/// concurrency-safe append, element access, iterator access and iterator traversal operations. +/// +/// +/// The key type. +/// +/// +/// The mapped type. +/// +/// +/// The hash function object type. This argument is optional and the default value is +/// tr1::hash<>. +/// +/// +/// The equality comparison function object type. This argument is optional and the default value is +/// equal_to<>. +/// +/// +/// The type that represents the stored allocator object that encapsulates details about the allocation and +/// deallocation of memory for the concurrent vector. This argument is optional and the default value is +/// allocator<, >. +/// +/// +/// For detailed information on the concurrent_unordered_multimap class, see . +/// +/// +/**/ +template , typename _Key_equality = std::equal_to<_Key_type>, typename _Allocator_type = std::allocator > > +class concurrent_unordered_multimap : public details::_Concurrent_hash< details::_Concurrent_unordered_map_traits<_Key_type, _Element_type, details::_Hash_compare<_Key_type, _Hasher, _Key_equality>, _Allocator_type, true> > +{ +public: + // Base type definitions + typedef concurrent_unordered_multimap<_Key_type, _Element_type, _Hasher, _Key_equality, _Allocator_type> _Mytype; + typedef details::_Hash_compare<_Key_type, _Hasher, _Key_equality> _Mytraits; + typedef details::_Concurrent_hash< details::_Concurrent_unordered_map_traits<_Key_type, _Element_type, _Mytraits, _Allocator_type, true> > _Mybase; + + // Type definitions + typedef _Key_type key_type; + typedef typename _Mybase::value_type value_type; + typedef _Element_type mapped_type; + typedef _Hasher hasher; + typedef _Key_equality key_equal; + typedef _Mytraits key_compare; + + typedef typename _Mybase::allocator_type allocator_type; + typedef typename _Mybase::pointer pointer; + typedef typename _Mybase::const_pointer const_pointer; + typedef typename _Mybase::reference reference; + typedef typename _Mybase::const_reference const_reference; + + typedef typename _Mybase::size_type size_type; + typedef typename _Mybase::difference_type difference_type; + + typedef typename _Mybase::iterator iterator; + typedef typename _Mybase::const_iterator const_iterator; + typedef typename _Mybase::iterator local_iterator; + typedef typename _Mybase::const_iterator const_local_iterator; + + /// + /// Constructs a concurrent unordered multimap. + /// + /// + /// The initial number of buckets for this unordered multimap. + /// + /// + /// The hash function for this unordered multimap. + /// + /// + /// The equality comparison function for this unordered multimap. + /// + /// + /// The allocator for this unordered multimap. + /// + /// + /// All constructors store an allocator object and initialize the unordered multimap. + /// The first constructor specifies an empty initial multimap and explicitly specifies the number of buckets, + /// hash function, equality function and allocator type to be used. + /// The second constructor specifies an allocator for the unordered multimap. + /// The third constructor specifies values supplied by the iterator range [, ). + /// The fourth and fifth constructors specify a copy of the concurrent unordered multimap . + /// The last constructor specifies a move of the concurrent unordered multimap . + /// + /**/ + explicit concurrent_unordered_multimap(size_type _Number_of_buckets = 8, const hasher& _Hasher = hasher(), const key_equal& _Key_equality = key_equal(), + const allocator_type& _Allocator = allocator_type()) + : _Mybase(_Number_of_buckets, key_compare(_Hasher, _Key_equality), _Allocator) + { + this->rehash(_Number_of_buckets); + } + + /// + /// Constructs a concurrent unordered multimap. + /// + /// + /// The allocator for this unordered multimap. + /// + /// + /// All constructors store an allocator object and initialize the unordered multimap. + /// The first constructor specifies an empty initial multimap and explicitly specifies the number of buckets, + /// hash function, equality function and allocator type to be used. + /// The second constructor specifies an allocator for the unordered multimap. + /// The third constructor specifies values supplied by the iterator range [, ). + /// The fourth and fifth constructors specify a copy of the concurrent unordered multimap . + /// The last constructor specifies a move of the concurrent unordered multimap . + /// + /**/ + concurrent_unordered_multimap(const allocator_type& _Allocator) : _Mybase(8, key_compare(), _Allocator) + { + } + + /// + /// Constructs a concurrent unordered multimap. + /// + /// + /// The type of the input iterator. + /// + /// + /// Position of the first element in the range of elements to be copied. + /// + /// + /// Position of the first element beyond the range of elements to be copied. + /// + /// + /// The initial number of buckets for this unordered multimap. + /// + /// + /// The hash function for this unordered multimap. + /// + /// + /// The equality comparison function for this unordered multimap. + /// + /// + /// The allocator for this unordered multimap. + /// + /// + /// All constructors store an allocator object and initialize the unordered multimap. + /// The first constructor specifies an empty initial multimap and explicitly specifies the number of buckets, + /// hash function, equality function and allocator type to be used. + /// The second constructor specifies an allocator for the unordered multimap. + /// The third constructor specifies values supplied by the iterator range [, ). + /// The fourth and fifth constructors specify a copy of the concurrent unordered multimap . + /// The last constructor specifies a move of the concurrent unordered multimap . + /// + /**/ + template + concurrent_unordered_multimap(_Iterator _Begin, _Iterator _End, size_type _Number_of_buckets = 8, const hasher& _Hasher = hasher(), + const key_equal& _Key_equality = key_equal(), const allocator_type& _Allocator = allocator_type()) + : _Mybase(_Number_of_buckets, key_compare(), allocator_type()) + { + this->rehash(_Number_of_buckets); + for (; _Begin != _End; ++_Begin) + { + _Mybase::insert(*_Begin); + } + } + + /// + /// Constructs a concurrent unordered multimap. + /// + /// + /// The source concurrent_unordered_multimap object to copy elements from. + /// + /// + /// All constructors store an allocator object and initialize the unordered multimap. + /// The first constructor specifies an empty initial multimap and explicitly specifies the number of buckets, + /// hash function, equality function and allocator type to be used. + /// The second constructor specifies an allocator for the unordered multimap. + /// The third constructor specifies values supplied by the iterator range [, ). + /// The fourth and fifth constructors specify a copy of the concurrent unordered multimap . + /// The last constructor specifies a move of the concurrent unordered multimap . + /// + /**/ + concurrent_unordered_multimap(const concurrent_unordered_multimap& _Umap) : _Mybase(_Umap) + { + } + + /// + /// Constructs a concurrent unordered multimap. + /// + /// + /// The source concurrent_unordered_multimap object to copy elements from. + /// + /// + /// The allocator for this unordered multimap. + /// + /// + /// All constructors store an allocator object and initialize the unordered multimap. + /// The first constructor specifies an empty initial multimap and explicitly specifies the number of buckets, + /// hash function, equality function and allocator type to be used. + /// The second constructor specifies an allocator for the unordered multimap. + /// The third constructor specifies values supplied by the iterator range [, ). + /// The fourth and fifth constructors specify a copy of the concurrent unordered multimap . + /// The last constructor specifies a move of the concurrent unordered multimap . + /// + /**/ + concurrent_unordered_multimap(const concurrent_unordered_multimap& _Umap, const allocator_type& _Allocator) : _Mybase(_Umap, _Allocator) + { + } + + /// + /// Constructs a concurrent unordered multimap. + /// + /// + /// The source concurrent_unordered_multimap object to copy elements from. + /// + /// + /// All constructors store an allocator object and initialize the unordered multimap. + /// The first constructor specifies an empty initial multimap and explicitly specifies the number of buckets, + /// hash function, equality function and allocator type to be used. + /// The second constructor specifies an allocator for the unordered multimap. + /// The third constructor specifies values supplied by the iterator range [, ). + /// The fourth and fifth constructors specify a copy of the concurrent unordered multimap . + /// The last constructor specifies a move of the concurrent unordered multimap . + /// + /**/ + concurrent_unordered_multimap(concurrent_unordered_multimap&& _Umap) : _Mybase(std::move(_Umap)) + { + } + + /// + /// Assigns the contents of another concurrent_unordered_multimap object to this one. This method is not concurrency-safe. + /// + /// + /// The source concurrent_unordered_multimap object. + /// + /// + /// A reference to this concurrent_unordered_multimap object. + /// + /// + /// After erasing any existing elements in a concurrent unordered multimap, operator= either copies or moves the contents of + /// into the concurrent unordered multimap. + /// + /**/ + concurrent_unordered_multimap& operator=(const concurrent_unordered_multimap& _Umap) + { + _Mybase::operator=(_Umap); + return (*this); + } + + /// + /// Assigns the contents of another concurrent_unordered_multimap object to this one. This method is not concurrency-safe. + /// + /// + /// The source concurrent_unordered_multimap object. + /// + /// + /// A reference to this concurrent_unordered_multimap object. + /// + /// + /// After erasing any existing elements in a concurrent unordered multimap, operator= either copies or moves the contents of + /// into the concurrent unordered multimap. + /// + /**/ + concurrent_unordered_multimap& operator=(concurrent_unordered_multimap&& _Umap) + { + _Mybase::operator=(std::move(_Umap)); + return (*this); + } + + // Modifiers + /// + /// Inserts a value into the concurrent_unordered_multimap object. + /// + /// + /// The value to be inserted into the concurrent_unordered_multimap object. + /// + /// + /// The first function determines whether an element X exists in the sequence whose key has equivalent ordering to + /// that of _Value. If not, it creates such an element X and initializes it with _Value. The function then determines the + /// iterator where that designates X. If an insertion occurred, the function returns std::pair(where, true). Otherwise, + /// it returns std::pair(where, false). + /// The second function uses the const_iterator _Where as a starting location to search for an insertion point + /// The third function inserts the sequence of element values, from the range [_First, _Last). + /// The last two functions behave the same as the first two, except that is used to construct the inserted value. + /// + /// + /// An iterator pointing to the insertion location. + /// + /**/ + iterator insert(const value_type& _Value) + { + return (_Mybase::insert(_Value)).first; + } + + /// + /// Inserts a value into the concurrent_unordered_multimap object. + /// + /// + /// The starting location to search for an insertion point into the concurrent_unordered_multimap object. + /// + /// + /// The value to be inserted into the concurrent_unordered_multimap object. + /// + /// + /// The first function determines whether an element X exists in the sequence whose key has equivalent ordering to + /// that of _Value. If not, it creates such an element X and initializes it with _Value. The function then determines the + /// iterator where that designates X. If an insertion occurred, the function returns std::pair(where, true). Otherwise, + /// it returns std::pair(where, false). + /// The second function uses the const_iterator _Where as a starting location to search for an insertion point + /// The third function inserts the sequence of element values, from the range [_First, _Last). + /// The last two functions behave the same as the first two, except that is used to construct the inserted value. + /// + /// + /// The iterator for the concurrent_unordered_multimap object. + /// + iterator insert(const_iterator _Where, const value_type& _Value) + { + return _Mybase::insert(_Where, _Value); + } + + /// + /// Inserts values into the concurrent_unordered_multimap object. + /// + /// + /// The iterator type used for insertion. + /// + /// + /// The starting location in an itertor of elements to insert into the concurrent_unordered_multimap object. + /// + /// + /// The ending location in an itertor of elements to insert into the concurrent_unordered_multimap object. + /// + /// + /// The first function determines whether an element X exists in the sequence whose key has equivalent ordering to + /// that of _Value. If not, it creates such an element X and initializes it with _Value. The function then determines the + /// iterator where that designates X. If an insertion occurred, the function returns std::pair(where, true). Otherwise, + /// it returns std::pair(where, false). + /// The second function uses the const_iterator _Where as a starting location to search for an insertion point + /// The third function inserts the sequence of element values, from the range [_First, _Last). + /// The last two functions behave the same as the first two, except that is used to construct the inserted value. + /// + template + void insert(_Iterator _First, _Iterator _Last) + { + _Mybase::insert(_First, _Last); + } + + /// + /// Inserts a value into the concurrent_unordered_multimap object. + /// + /// + /// The type of the value inserted into the map. + /// + /// + /// The value to be inserted into the concurrent_unordered_multimap object. + /// + /// + /// The first function determines whether an element X exists in the sequence whose key has equivalent ordering to + /// that of _Value. If not, it creates such an element X and initializes it with _Value. The function then determines the + /// iterator where that designates X. If an insertion occurred, the function returns std::pair(where, true). Otherwise, + /// it returns std::pair(where, false). + /// The second function uses the const_iterator _Where as a starting location to search for an insertion point + /// The third function inserts the sequence of element values, from the range [_First, _Last). + /// The last two functions behave the same as the first two, except that is used to construct the inserted value. + /// + /// + /// An iterator pointing to the insertion location. + /// + /**/ + template + iterator insert(_Valty&& _Value) + { + return (_Mybase::insert(std::forward<_Valty>(_Value))).first; + } + + /// + /// Inserts a value into the concurrent_unordered_multimap object. + /// + /// + /// The type of the value inserted into the map. + /// + /// + /// The starting location to search for an insertion point into the concurrent_unordered_multimap object. + /// + /// + /// The value to be inserted into the concurrent_unordered_multimap object. + /// + /// + /// The first function determines whether an element X exists in the sequence whose key has equivalent ordering to + /// that of _Value. If not, it creates such an element X and initializes it with _Value. The function then determines the + /// iterator where that designates X. If an insertion occurred, the function returns std::pair(where, true). Otherwise, + /// it returns std::pair(where, false). + /// The second function uses the const_iterator _Where as a starting location to search for an insertion point + /// The third function inserts the sequence of element values, from the range [_First, _Last). + /// The last two functions behave the same as the first two, except that is used to construct the inserted value. + /// + /// + /// The iterator for the concurrent_unordered_multimap object. + /// + template + typename std::tr1::enable_if::type>::value, iterator>::type + insert(const_iterator _Where, _Valty&& _Value) + { + return _Mybase::insert(_Where, std::forward<_Valty>(_Value)); + } + + /// + /// Erases elements from the concurrent_unordered_multimap. This method is not concurrency-safe. + /// + /// + /// The iterator position to erase from. + /// + /// + /// The first function erases an element from the map given an iterator position. + /// The second function erases an element matching a key + /// The third function erases elements given an iterator begin and end position + /// + /// + /// The iterator for this concurrent_unordered_multimap object. + /// + /**/ + iterator unsafe_erase(const_iterator _Where) + { + return _Mybase::unsafe_erase(_Where); + } + + /// + /// Erases elements from the concurrent_unordered_multimap. This method is not concurrency-safe. + /// + /// + /// The key to erase. + /// + /// + /// The first function erases an element from the map given an iterator position. + /// The second function erases an element matching a key + /// The third function erases elements given an iterator begin and end position + /// + /// + /// The count of elements erased from this concurrent_unordered_multimap object. + /// + /**/ + size_type unsafe_erase(const key_type& _Keyval) + { + return _Mybase::unsafe_erase(_Keyval); + } + + /// + /// Erases elements from the concurrent_unordered_multimap. This method is not concurrency-safe. + /// + /// + /// Position of the first element in the range of elements to be erased. + /// + /// + /// Position of the first element beyond the range of elements to be erased. + /// + /// + /// The first function erases an element from the map given an iterator position. + /// The second function erases an element matching a key + /// The third function erases elements given an iterator begin and end position + /// + /// + /// The iterator for this concurrent_unordered_multimap object. + /// + /**/ + iterator unsafe_erase(const_iterator _First, const_iterator _Last) + { + return _Mybase::unsafe_erase(_First, _Last); + } + + /// + /// Swaps the contents of two concurrent_unordered_multimap objects. + /// This method is not concurrency-safe. + /// + /// + /// The concurrent_unordered_multimap object to swap with. + /// + /**/ + void swap(concurrent_unordered_multimap& _Umap) + { + _Mybase::swap(_Umap); + } + + // Observers + /// + /// The hash function object. + /// + /**/ + hasher hash_function() const + { + return _M_comparator._M_hash_object; + } + + /// + /// The equality comparison function object. + /// + /**/ + key_equal key_eq() const + { + return _M_comparator._M_key_compare_object; + } +}; +} // namespace Concurrency + +#pragma pack(pop) diff --git a/concrt_extras/concurrent_unordered_set.h b/concrt_extras/concurrent_unordered_set.h new file mode 100644 index 000000000..51a90535c --- /dev/null +++ b/concrt_extras/concurrent_unordered_set.h @@ -0,0 +1,918 @@ +/*** +* ==++== +* +* Copyright (c) Microsoft Corporation. All rights reserved. +* +* ==--== +* =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +* +* concurrent_unordered_set.h +* +* =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- +****/ +#pragma once + +#include +#include "internal_concurrent_hash.h" + +#if !(defined(_M_AMD64) || defined(_M_IX86)) + #error ERROR: Concurrency Runtime is supported only on X64 and X86 architectures. +#endif + +#if defined(_M_CEE) + #error ERROR: Concurrency Runtime is not supported when compiling /clr. +#endif + +#pragma pack(push,_CRT_PACKING) + +namespace Concurrency +{ +namespace samples +{ +namespace details +{ +// Template class for hash set traits +template +class _Concurrent_unordered_set_traits : public std::_Container_base +{ +public: + typedef _Key_type _Value_type; + typedef _Key_type value_type; + typedef _Key_type key_type; + typedef _Key_comparator key_compare; + + typedef typename _Allocator_type::template rebind::other allocator_type; + + enum + { + _M_allow_multimapping = _Allow_multimapping + }; + + _Concurrent_unordered_set_traits() : _M_comparator() + { + } + + _Concurrent_unordered_set_traits(const _Key_comparator& _Traits) : _M_comparator(_Traits) + { + } + + typedef key_compare value_compare; + + static const _Key_type& _Key_function(const value_type& _Value) + { + return _Value; + } + + _Key_comparator _M_comparator; // the comparator predicate for keys +}; +} // namespace details; + +/// +/// The concurrent_unordered_set class is an concurrency-safe container that controls a varying-length sequence of +/// elements of type _Key_type The sequence is represented in a way that enables concurrency-safe append, element access, +/// iterator access and iterator traversal operations. +/// +/// +/// The key type. +/// +/// +/// The hash function object type. This argument is optional and the default value is +/// tr1::hash<>. +/// +/// +/// The equality comparison function object type. This argument is optional and the default value is +/// equal_to<>. +/// +/// +/// The type that represents the stored allocator object that encapsulates details about the allocation and +/// deallocation of memory for the concurrent vector. This argument is optional and the default value is +/// allocator<, >. +/// +/// +/// For detailed information on the concurrent_unordered_set class, see . +/// +/// +/**/ +template , typename _Key_equality = std::equal_to<_Key_type>, typename _Allocator_type = std::allocator<_Key_type> > +class concurrent_unordered_set : public details::_Concurrent_hash< details::_Concurrent_unordered_set_traits<_Key_type, details::_Hash_compare<_Key_type, _Hasher, _Key_equality>, _Allocator_type, false> > +{ +public: + // Base type definitions + typedef concurrent_unordered_set<_Key_type, _Hasher, _Key_equality, _Allocator_type> _Mytype; + typedef details::_Hash_compare<_Key_type, _Hasher, _Key_equality> _Mytraits; + typedef details::_Concurrent_hash< details::_Concurrent_unordered_set_traits<_Key_type, _Mytraits, _Allocator_type, false> > _Mybase; + + // Type definitions + typedef _Key_type key_type; + typedef typename _Mybase::value_type value_type; + typedef _Key_type mapped_type; + typedef _Hasher hasher; + typedef _Key_equality key_equal; + typedef _Mytraits key_compare; + + typedef typename _Mybase::allocator_type allocator_type; + typedef typename _Mybase::pointer pointer; + typedef typename _Mybase::const_pointer const_pointer; + typedef typename _Mybase::reference reference; + typedef typename _Mybase::const_reference const_reference; + + typedef typename _Mybase::size_type size_type; + typedef typename _Mybase::difference_type difference_type; + + typedef typename _Mybase::iterator iterator; + typedef typename _Mybase::const_iterator const_iterator; + typedef typename _Mybase::iterator local_iterator; + typedef typename _Mybase::const_iterator const_local_iterator; + + /// + /// Constructs a concurrent unordered set. + /// + /// + /// The initial number of buckets for this unordered set. + /// + /// + /// The hash function for this unordered set. + /// + /// + /// The equality comparison function for this unordered set. + /// + /// + /// The allocator for this unordered set. + /// + /// + /// All constructors store an allocator object and initialize the unordered set. + /// The first constructor specifies an empty initial set and explicitly specifies the number of buckets, + /// hash function, equality function and allocator type to be used. + /// The second constructor specifies an allocator for the unordered set. + /// The third constructor specifies values supplied by the iterator range [, ). + /// The fourth and fifth constructors specify a copy of the concurrent unordered set . + /// The last constructor specifies a move of the concurrent unordered set . + /// + /**/ + explicit concurrent_unordered_set(size_type _Number_of_buckets = 8, const hasher& _Hasher = hasher(), const key_equal& _Key_equality = key_equal(), + const allocator_type& _Allocator = allocator_type()) + : _Mybase(_Number_of_buckets, key_compare(_Hasher, _Key_equality), _Allocator) + { + this->rehash(_Number_of_buckets); + } + + /// + /// Constructs a concurrent unordered set. + /// + /// + /// The allocator for this unordered set. + /// + /// + /// All constructors store an allocator object and initialize the unordered set. + /// The first constructor specifies an empty initial set and explicitly specifies the number of buckets, + /// hash function, equality function and allocator type to be used. + /// The second constructor specifies an allocator for the unordered set. + /// The third constructor specifies values supplied by the iterator range [, ). + /// The fourth and fifth constructors specify a copy of the concurrent unordered set . + /// The last constructor specifies a move of the concurrent unordered set . + /// + /**/ + concurrent_unordered_set(const allocator_type& _Allocator) : _Mybase(8, key_compare(), _Allocator) + { + } + + /// + /// Constructs a concurrent unordered set. + /// + /// + /// The type of the input iterator. + /// + /// + /// Position of the first element in the range of elements to be copied. + /// + /// + /// Position of the first element beyond the range of elements to be copied. + /// + /// + /// The initial number of buckets for this unordered set. + /// + /// + /// The hash function for this unordered set. + /// + /// + /// The equality comparison function for this unordered set. + /// + /// + /// The allocator for this unordered set. + /// + /// + /// All constructors store an allocator object and initialize the unordered set. + /// The first constructor specifies an empty initial set and explicitly specifies the number of buckets, + /// hash function, equality function and allocator type to be used. + /// The second constructor specifies an allocator for the unordered set. + /// The third constructor specifies values supplied by the iterator range [, ). + /// The fourth and fifth constructors specify a copy of the concurrent unordered set . + /// The last constructor specifies a move of the concurrent unordered set . + /// + /**/ + template + concurrent_unordered_set(_Iterator _First, _Iterator _Last, size_type _Number_of_buckets = 8, const hasher& _Hasher = hasher(), + const key_equal& _Key_equality = key_equal(), const allocator_type& _Allocator = allocator_type()) + : _Mybase(_Number_of_buckets, key_compare(), allocator_type()) + { + this->rehash(_Number_of_buckets); + for (; _First != _Last; ++_First) + { + _Mybase::insert(*_First); + } + } + + /// + /// Constructs a concurrent unordered set. + /// + /// + /// The source concurrent_unordered_set object to copy or move elements from. + /// + /// + /// All constructors store an allocator object and initialize the unordered set. + /// The first constructor specifies an empty initial set and explicitly specifies the number of buckets, + /// hash function, equality function and allocator type to be used. + /// The second constructor specifies an allocator for the unordered set. + /// The third constructor specifies values supplied by the iterator range [, ). + /// The fourth and fifth constructors specify a copy of the concurrent unordered set . + /// The last constructor specifies a move of the concurrent unordered set . + /// + /**/ + concurrent_unordered_set(const concurrent_unordered_set& _Uset) : _Mybase(_Uset) + { + } + + /// + /// Constructs a concurrent unordered set. + /// + /// + /// The source concurrent_unordered_map object to copy or move elements from. + /// + /// + /// The allocator for this unordered set. + /// + /// + /// All constructors store an allocator object and initialize the unordered set. + /// The first constructor specifies an empty initial set and explicitly specifies the number of buckets, + /// hash function, equality function and allocator type to be used. + /// The second constructor specifies an allocator for the unordered set. + /// The third constructor specifies values supplied by the iterator range [, ). + /// The fourth and fifth constructors specify a copy of the concurrent unordered set . + /// The last constructor specifies a move of the concurrent unordered set . + /// + /**/ + concurrent_unordered_set(const concurrent_unordered_set& _Uset, const allocator_type& _Allocator) : _Mybase(_Uset, _Allocator) + { + } + + /// + /// Constructs a concurrent unordered set. + /// + /// + /// The source concurrent_unordered_set object to copy or move elements from. + /// + /// + /// All constructors store an allocator object and initialize the unordered set. + /// The first constructor specifies an empty initial set and explicitly specifies the number of buckets, + /// hash function, equality function and allocator type to be used. + /// The second constructor specifies an allocator for the unordered set. + /// The third constructor specifies values supplied by the iterator range [, ). + /// The fourth and fifth constructors specify a copy of the concurrent unordered set . + /// The last constructor specifies a move of the concurrent unordered set . + /// + /**/ + concurrent_unordered_set(concurrent_unordered_set&& _Uset) : _Mybase(std::move(_Uset)) + { + } + + /// + /// Assigns the contents of another concurrent_unordered_set object to this one. This method is not concurrency-safe. + /// + /// + /// The source concurrent_unordered_set object. + /// + /// + /// A reference to this concurrent_unordered_set object. + /// + /// + /// After erasing any existing elements in a concurrent unordered set, operator= either copies or moves the contents of + /// into the concurrent unordered set. + /// + /**/ + concurrent_unordered_set& operator=(const concurrent_unordered_set& _Uset) + { + _Mybase::operator=(_Uset); + return (*this); + } + + /// + /// Assigns the contents of another concurrent_unordered_set object to this one. This method is not concurrency-safe. + /// + /// + /// The source concurrent_unordered_set object. + /// + /// + /// A reference to this concurrent_unordered_set object. + /// + /// + /// After erasing any existing elements in a concurrent unordered set, operator= either copies or moves the contents of + /// into the concurrent unordered set. + /// + /**/ + concurrent_unordered_set& operator=(concurrent_unordered_set&& _Uset) + { + _Mybase::operator=(std::move(_Uset)); + return (*this); + } + + /// + /// Erases elements from the concurrent_unordered_set. This method is not concurrency-safe. + /// + /// + /// The iterator position to erase from. + /// + /// + /// The first function erases an element from the set given an iterator position. + /// The second function erases an element matching a key + /// The third function erases elements given an iterator begin and end position + /// + /// + /// The iterator for this concurrent_unordered_set object. + /// + /**/ + iterator unsafe_erase(const_iterator _Where) + { + return _Mybase::unsafe_erase(_Where); + } + + /// + /// Erases elements from the concurrent_unordered_set. This method is not concurrency-safe. + /// + /// + /// The key to erase. + /// + /// + /// The first function erases an element from the set given an iterator position. + /// The second function erases an element matching a key + /// The third function erases elements given an iterator begin and end position + /// + /// + /// The count of elements erased from this concurrent_unordered_set object. + /// + /**/ + size_type unsafe_erase(const key_type& _Keyval) + { + return _Mybase::unsafe_erase(_Keyval); + } + + /// + /// Erases elements from the concurrent_unordered_set. This method is not concurrency-safe. + /// + /// + /// Position of the first element in the range of elements to be erased. + /// + /// + /// Position of the first element beyond the range of elements to be erased. + /// + /// + /// The first function erases an element from the set given an iterator position. + /// The second function erases an element matching a key + /// The third function erases elements given an iterator begin and end position + /// + /// + /// The iterator for this concurrent_unordered_set object. + /// + /**/ + iterator unsafe_erase(const_iterator _First, const_iterator _Last) + { + return _Mybase::unsafe_erase(_First, _Last); + } + + /// + /// Swaps the contents of two concurrent_unordered_set objects. + /// This method is not concurrency-safe. + /// + /// + /// The concurrent_unordered_set object to swap with. + /// + /**/ + void swap(concurrent_unordered_set& _Uset) + { + _Mybase::swap(_Uset); + } + + // Observers + /// + /// The hash function object. + /// + /**/ + hasher hash_function() const + { + return _M_comparator._M_hash_object; + } + + /// + /// The equality comparison function object. + /// + /**/ + key_equal key_eq() const + { + return _M_comparator._M_key_compare_object; + } +}; + +/// +/// The concurrent_unordered_multiset class is an concurrency-safe container that controls a varying-length sequence of +/// elements of type _Key_type The sequence is represented in a way that enables concurrency-safe append, element access, +/// iterator access and iterator traversal operations. +/// +/// +/// The key type. +/// +/// +/// The hash function object type. This argument is optional and the default value is +/// tr1::hash<>. +/// +/// +/// The equality comparison function object type. This argument is optional and the default value is +/// equal_to<>. +/// +/// +/// The type that represents the stored allocator object that encapsulates details about the allocation and +/// deallocation of memory for the concurrent vector. This argument is optional and the default value is +/// allocator<, >. +/// +/// +/// For detailed information on the concurrent_unordered_multiset class, see . +/// +/// +/**/ +template , typename _Key_equality = std::equal_to<_Key_type>, typename _Allocator_type = std::allocator<_Key_type> > +class concurrent_unordered_multiset : public details::_Concurrent_hash< details::_Concurrent_unordered_set_traits<_Key_type, details::_Hash_compare<_Key_type, _Hasher, _Key_equality>, _Allocator_type, true> > +{ +public: + // Base type definitions + typedef concurrent_unordered_multiset<_Key_type, _Hasher, _Key_equality, _Allocator_type> _Mytype; + typedef details::_Hash_compare<_Key_type, _Hasher, _Key_equality> _Mytraits; + typedef details::_Concurrent_hash< details::_Concurrent_unordered_set_traits<_Key_type, _Mytraits, _Allocator_type, true> > _Mybase; + + // Type definitions + typedef _Key_type key_type; + typedef typename _Mybase::value_type value_type; + typedef _Key_type mapped_type; + typedef _Hasher hasher; + typedef _Key_equality key_equal; + typedef _Mytraits key_compare; + + typedef typename _Mybase::allocator_type allocator_type; + typedef typename _Mybase::pointer pointer; + typedef typename _Mybase::const_pointer const_pointer; + typedef typename _Mybase::reference reference; + typedef typename _Mybase::const_reference const_reference; + + typedef typename _Mybase::size_type size_type; + typedef typename _Mybase::difference_type difference_type; + + typedef typename _Mybase::iterator iterator; + typedef typename _Mybase::const_iterator const_iterator; + typedef typename _Mybase::iterator local_iterator; + typedef typename _Mybase::const_iterator const_local_iterator; + + /// + /// Constructs a concurrent unordered multiset. + /// + /// + /// The initial number of buckets for this unordered multiset. + /// + /// + /// The hash function for this unordered multiset. + /// + /// + /// The equality comparison function for this unordered multiset. + /// + /// + /// The allocator for this unordered multiset. + /// + /// + /// All constructors store an allocator object and initialize the unordered multiset. + /// The first constructor specifies an empty initial multiset and explicitly specifies the number of buckets, + /// hash function, equality function and allocator type to be used. + /// The second constructor specifies an allocator for the unordered multiset. + /// The third constructor specifies values supplied by the iterator range [, ). + /// The fourth and fifth constructors specify a copy of the concurrent unordered multiset . + /// The last constructor specifies a move of the concurrent unordered multiset . + /// + /**/ + explicit concurrent_unordered_multiset(size_type _Number_of_buckets = 8, const hasher& _Hasher = hasher(), const key_equal& _Key_equality = key_equal(), + const allocator_type& _Allocator = allocator_type()) + : _Mybase(_Number_of_buckets, key_compare(_Hasher, _Key_equality), _Allocator) + { + this->rehash(_Number_of_buckets); + } + + /// + /// Constructs a concurrent unordered multiset. + /// + /// + /// The allocator for this unordered multiset. + /// + /// + /// All constructors store an allocator object and initialize the unordered multiset. + /// The first constructor specifies an empty initial multiset and explicitly specifies the number of buckets, + /// hash function, equality function and allocator type to be used. + /// The second constructor specifies an allocator for the unordered multiset. + /// The third constructor specifies values supplied by the iterator range [, ). + /// The fourth and fifth constructors specify a copy of the concurrent unordered multiset . + /// The last constructor specifies a move of the concurrent unordered multiset . + /// + /**/ + concurrent_unordered_multiset(const allocator_type& _Allocator) : _Mybase(8, key_compare(), _Allocator) + { + } + + /// + /// Constructs a concurrent unordered multiset. + /// + /// + /// The type of the input iterator. + /// + /// + /// Position of the first element in the range of elements to be copied. + /// + /// + /// Position of the first element beyond the range of elements to be copied. + /// + /// + /// The initial number of buckets for this unordered multiset. + /// + /// + /// The hash function for this unordered multiset. + /// + /// + /// The equality comparison function for this unordered multiset. + /// + /// + /// The allocator for this unordered multiset. + /// + /// + /// All constructors store an allocator object and initialize the unordered multiset. + /// The first constructor specifies an empty initial multiset and explicitly specifies the number of buckets, + /// hash function, equality function and allocator type to be used. + /// The second constructor specifies an allocator for the unordered multiset. + /// The third constructor specifies values supplied by the iterator range [, ). + /// The fourth and fifth constructors specify a copy of the concurrent unordered multiset . + /// The last constructor specifies a move of the concurrent unordered multiset . + /// + /**/ + template + concurrent_unordered_multiset(_Iterator _First, _Iterator _Last, size_type _Number_of_buckets = 8, const hasher& _Hasher = hasher(), + const key_equal& _Key_equality = key_equal(), const allocator_type& _Allocator = allocator_type()) + : _Mybase(_Number_of_buckets, key_compare(), allocator_type()) + { + this->rehash(_Number_of_buckets); + for (; _First != _Last; ++_First) + { + _Mybase::insert(*_First); + } + } + + /// + /// Constructs a concurrent unordered multiset. + /// + /// + /// The source concurrent_unordered_multiset object to copy elements from. + /// + /// + /// All constructors store an allocator object and initialize the unordered multiset. + /// The first constructor specifies an empty initial multiset and explicitly specifies the number of buckets, + /// hash function, equality function and allocator type to be used. + /// The second constructor specifies an allocator for the unordered multiset. + /// The third constructor specifies values supplied by the iterator range [, ). + /// The fourth and fifth constructors specify a copy of the concurrent unordered multiset . + /// The last constructor specifies a move of the concurrent unordered multiset . + /// + /**/ + concurrent_unordered_multiset(const concurrent_unordered_multiset& _Uset) : _Mybase(_Uset) + { + } + + /// + /// Constructs a concurrent unordered multiset. + /// + /// + /// The source concurrent_unordered_multiset object to copy elements from. + /// + /// + /// The allocator for this unordered multiset. + /// + /// + /// All constructors store an allocator object and initialize the unordered multiset. + /// The first constructor specifies an empty initial multiset and explicitly specifies the number of buckets, + /// hash function, equality function and allocator type to be used. + /// The second constructor specifies an allocator for the unordered multiset. + /// The third constructor specifies values supplied by the iterator range [, ). + /// The fourth and fifth constructors specify a copy of the concurrent unordered multiset . + /// The last constructor specifies a move of the concurrent unordered multiset . + /// + /**/ + concurrent_unordered_multiset(const concurrent_unordered_multiset& _Uset, const allocator_type& _Allocator) : _Mybase(_Uset, _Allocator) + { + } + + /// + /// Constructs a concurrent unordered multiset. + /// + /// + /// The source concurrent_unordered_multiset object to move elements from. + /// + /// + /// All constructors store an allocator object and initialize the unordered multiset. + /// The first constructor specifies an empty initial multiset and explicitly specifies the number of buckets, + /// hash function, equality function and allocator type to be used. + /// The second constructor specifies an allocator for the unordered multiset. + /// The third constructor specifies values supplied by the iterator range [, ). + /// The fourth and fifth constructors specify a copy of the concurrent unordered multiset . + /// The last constructor specifies a move of the concurrent unordered multiset . + /// + /**/ + concurrent_unordered_multiset(concurrent_unordered_multiset&& _Uset) : _Mybase(std::move(_Uset)) + { + } + + /// + /// Assigns the contents of another concurrent_unordered_multiset object to this one. This method is not concurrency-safe. + /// + /// + /// The source concurrent_unordered_multiset object. + /// + /// + /// A reference to this concurrent_unordered_multiset object. + /// + /// + /// After erasing any existing elements in a concurrent unordered multiset, operator= either copies or moves the contents of + /// into the concurrent unordered multiset. + /// + /**/ + concurrent_unordered_multiset& operator=(const concurrent_unordered_multiset& _Uset) + { + _Mybase::operator=(_Uset); + return (*this); + } + + /// + /// Assigns the contents of another concurrent_unordered_multiset object to this one. This method is not concurrency-safe. + /// + /// + /// The source concurrent_unordered_multiset object. + /// + /// + /// A reference to this concurrent_unordered_multiset object. + /// + /// + /// After erasing any existing elements in a concurrent unordered multiset, operator= either copies or moves the contents of + /// into the concurrent unordered multiset. + /// + /**/ + concurrent_unordered_multiset& operator=(concurrent_unordered_multiset&& _Uset) + { + _Mybase::operator=(std::move(_Uset)); + return (*this); + } + + // Modifiers + /// + /// Inserts a value into the concurrent_unordered_multiset object. + /// + /// + /// The value to be inserted into the concurrent_unordered_multiset object. + /// + /// + /// The first function determines whether an element X exists in the sequence whose key has equivalent ordering to + /// that of _Value. If not, it creates such an element X and initializes it with _Value. The function then determines the + /// iterator where that designates X. If an insertion occurred, the function returns std::pair(where, true). Otherwise, + /// it returns std::pair(where, false). + /// The second function uses the const_iterator _Where as a starting location to search for an insertion point + /// The third function inserts the sequence of element values, from the range [_First, _Last). + /// The last two functions behave the same as the first two, except that is used to construct the inserted value. + /// + /// + /// An iterator pointing to the insertion location. + /// + /**/ + iterator insert(const value_type& _Value) + { + return (_Mybase::insert(_Value)).first; + } + + /// + /// Inserts a value into the concurrent_unordered_multiset object. + /// + /// + /// The starting location to search for an insertion point into the concurrent_unordered_multiset object. + /// + /// + /// The value to be inserted into the concurrent_unordered_multiset object. + /// + /// + /// The first function determines whether an element X exists in the sequence whose key has equivalent ordering to + /// that of _Value. If not, it creates such an element X and initializes it with _Value. The function then determines the + /// iterator where that designates X. If an insertion occurred, the function returns std::pair(where, true). Otherwise, + /// it returns std::pair(where, false). + /// The second function uses the const_iterator _Where as a starting location to search for an insertion point + /// The third function inserts the sequence of element values, from the range [_First, _Last). + /// The last two functions behave the same as the first two, except that is used to construct the inserted value. + /// + /// + /// The iterator for the concurrent_unordered_multiset object. + /// + iterator insert(const_iterator _Where, const value_type& _Value) + { + return _Mybase::insert(_Where, _Value); + } + + /// + /// Inserts values into the concurrent_unordered_multiset object. + /// + /// + /// The iterator type used for insertion. + /// + /// + /// The starting location in an itertor of elements to insert into the concurrent_unordered_multiset object. + /// + /// + /// The ending location in an itertor of elements to insert into the concurrent_unordered_multiset object. + /// + /// + /// The first function determines whether an element X exists in the sequence whose key has equivalent ordering to + /// that of _Value. If not, it creates such an element X and initializes it with _Value. The function then determines the + /// iterator where that designates X. If an insertion occurred, the function returns std::pair(where, true). Otherwise, + /// it returns std::pair(where, false). + /// The second function uses the const_iterator _Where as a starting location to search for an insertion point + /// The third function inserts the sequence of element values, from the range [_First, _Last). + /// The last two functions behave the same as the first two, except that is used to construct the inserted value. + /// + template + void insert(_Iterator _First, _Iterator _Last) + { + _Mybase::insert(_First, _Last); + } + + /// + /// Inserts a value into the concurrent_unordered_multiset object. + /// + /// + /// The type of the value inserted into the map. + /// + /// + /// The value to be inserted into the concurrent_unordered_multiset object. + /// + /// + /// The first function determines whether an element X exists in the sequence whose key has equivalent ordering to + /// that of _Value. If not, it creates such an element X and initializes it with _Value. The function then determines the + /// iterator where that designates X. If an insertion occurred, the function returns std::pair(where, true). Otherwise, + /// it returns std::pair(where, false). + /// The second function uses the const_iterator _Where as a starting location to search for an insertion point + /// The third function inserts the sequence of element values, from the range [_First, _Last). + /// The last two functions behave the same as the first two, except that is used to construct the inserted value. + /// + /// + /// An iterator pointing to the insertion location. + /// + /**/ + template + iterator insert(_Valty&& _Value) + { + return (_Mybase::insert(std::forward<_Valty>(_Value))).first; + } + + /// + /// Inserts a value into the concurrent_unordered_multiset object. + /// + /// + /// The type of the value inserted into the map. + /// + /// + /// The starting location to search for an insertion point into the concurrent_unordered_multiset object. + /// + /// + /// The value to be inserted into the concurrent_unordered_multiset object. + /// + /// + /// The first function determines whether an element X exists in the sequence whose key has equivalent ordering to + /// that of _Value. If not, it creates such an element X and initializes it with _Value. The function then determines the + /// iterator where that designates X. If an insertion occurred, the function returns std::pair(where, true). Otherwise, + /// it returns std::pair(where, false). + /// The second function uses the const_iterator _Where as a starting location to search for an insertion point + /// The third function inserts the sequence of element values, from the range [_First, _Last). + /// The last two functions behave the same as the first two, except that is used to construct the inserted value. + /// + /// + /// The iterator for the concurrent_unordered_multiset object. + /// + template + typename std::tr1::enable_if::type>::value, iterator>::type + insert(const_iterator _Where, _Valty&& _Value) + { + return _Mybase::insert(_Where, std::forward<_Valty>(_Value)); + } + + /// + /// Erases elements from the concurrent_unordered_multiset. This method is not concurrency-safe. + /// + /// + /// The iterator position to erase from. + /// + /// + /// The first function erases an element from the multiset given an iterator position. + /// The second function erases an element matching a key + /// The third function erases elements given an iterator begin and end position + /// + /// + /// The iterator for this concurrent_unordered_multiset object. + /// + /**/ + iterator unsafe_erase(const_iterator _Where) + { + return _Mybase::unsafe_erase(_Where); + } + + /// + /// Erases elements from the concurrent_unordered_multiset. This method is not concurrency-safe. + /// + /// + /// The key to erase. + /// + /// + /// The first function erases an element from the multiset given an iterator position. + /// The second function erases an element matching a key + /// The third function erases elements given an iterator begin and end position + /// + /// + /// The count of elements erased from this concurrent_unordered_multiset object. + /// + /**/ + size_type unsafe_erase(const key_type& _Keyval) + { + return _Mybase::unsafe_erase(_Keyval); + } + + /// + /// Erases elements from the concurrent_unordered_multiset. This method is not concurrency-safe. + /// + /// + /// Position of the first element in the range of elements to be erased. + /// + /// + /// Position of the first element beyond the range of elements to be erased. + /// + /// + /// The first function erases an element from the multiset given an iterator position. + /// The second function erases an element matching a key + /// The third function erases elements given an iterator begin and end position + /// + /// + /// The iterator for this concurrent_unordered_multiset object. + /// + /**/ + iterator unsafe_erase(const_iterator _First, const_iterator _Last) + { + return _Mybase::unsafe_erase(_First, _Last); + } + + /// + /// Swaps the contents of two concurrent_unordered_multiset objects. + /// This method is not concurrency-safe. + /// + /// + /// The concurrent_unordered_multiset object to swap with. + /// + /**/ + void swap(concurrent_unordered_multiset& _Uset) + { + _Mybase::swap(_Uset); + } + + // Observers + /// + /// The hash function object. + /// + /**/ + hasher hash_function() const + { + return _M_comparator._M_hash_object; + } + + /// + /// The equality comparison function object. + /// + /**/ + key_equal key_eq() const + { + return _M_comparator._M_key_compare_object; + } +}; +} // namespace samples +} // namespace Concurrency + +#pragma pack(pop) diff --git a/concrt_extras/connect.h b/concrt_extras/connect.h new file mode 100644 index 000000000..48a35d861 --- /dev/null +++ b/concrt_extras/connect.h @@ -0,0 +1,277 @@ +//-------------------------------------------------------------------------- +// +// Copyright (c) Microsoft Corporation. All rights reserved. +// +// File: connect.h +// +// connect / disconnect helper functions +// +//-------------------------------------------------------------------------- +#pragma once +#include +#include + +namespace Concurrency +{ + namespace details + { + //details for the connect type traits + + //creates a has_member_foo class which can be used to check + //for a named member variable + #define DEFINE_HAS_MEMBER(NAME) \ + template class has_member_ ## NAME { \ + private: \ + template static std::true_type helper(decltype(&U::NAME)); \ + template static std::false_type helper(...); \ + typedef decltype(helper(nullptr)) helper_t; \ + public: \ + static const bool value = helper_t::value; \ + }; + + //define input_buffer & output_buffer classes + DEFINE_HAS_MEMBER(input_buffer) + DEFINE_HAS_MEMBER(output_buffer) + + //there must be an existing type trait for this + template + struct _is_true : std::false_type + { + }; + + template<> + struct _is_true : std::true_type + { + }; + + //_as_ref normalizes an instance of a class to a reference parameter. + //this works with pointers, references and values, but probably not with + //pointer to pointer and definitely not with shared pointers + template + class _as_ref + { + _as_ref & operator=(const _as_ref & ); + + public: + typedef T type; + //_as_ref(T& t): ref(t){} + _as_ref(T& t): ref(t){} + _as_ref(T* t): ref(*t){} + typename std::add_const::type>::type& ref; + }; + + //_as_ref normalizes an instance of a class to a reference parameter. + template + class as_ref : public _as_ref::type> + { + as_ref & operator=( const as_ref & ); + public: + as_ref(T& t):_as_ref(t){} + }; + + }// end details namespace + + using namespace details; + //type traits, determines if a class acts as a compound message target + template + struct is_compound_target + : _is_true::type>::value> + { + }; + + //type traits, determines if a class acts as a compound message source + template + struct is_compound_source + : _is_true::type>::value> + { + }; + + //connects two message blocks or compound message blocks forward declaration + template + inline void connect(source_type& source, target_type& target); + + //disconnects two message blocks or compound message blocks forward declaration + template + inline void disconnect(source_type& source, target_type& target); + + namespace details + { + //connects an ISource to an ITarget + template + inline void connect_to_target_impl(source_block& source, target_block& target) + { + source.link_target(&target); + } + //connect a simple source to a simple target + template + inline void connect_impl(source_type& source, std::false_type /*not_compound_source*/, + target_type& target, std::false_type /*not_compound_target*/) + { + connect_to_target_impl(source,target); + } + + //connect a simple source to a compound target + template + inline void connect_impl(source_type& source, std::false_type /*not_compound_source*/, + target_type& target, std::true_type /*is_compound_target*/) + { + samples::connect(source, target.input_buffer); + } + + //connect a compound source to a simple target + template + inline void connect_impl(source_type& source, std::true_type /*not_compound_source*/, + target_type& target, std::false_type /*is_compound_target*/) + { + samples::connect(source.output_buffer, target); + } + + //connect a compound source to a compound target + template + inline void connect_impl(source_type& source, std::true_type /*not_compound_source*/, + target_type& target, std::true_type /*is_compound_target*/) + { + samples::connect(source.output_buffer, target.input_buffer); + } + + //connect_impl function that works with 'as_ref' types, this function + //relies on overloading and is_compound_source/target type traits to resolve + //to simple message blocks + template + inline void connect_impl(const source_type& source, const target_type& target) + { + connect_impl(source.ref, is_compound_source(), + target.ref, is_compound_target()); + } + + template + inline void disconnect_from_target_impl(source_block& source, target_block& target) + { + source.unlink_target(&target); + } + //disconnect a source from a target, neither of which is a compound source or target + template + inline void disconnect_impl(source_type& source, std::false_type /*not_compound_source*/, + target_type& target, std::false_type /*not_compound_target*/) + { + disconnect_from_target_impl(source,target); + } + //disconnect a simple source to a compound target + template + inline void disconnect_impl(source_type& source, std::false_type /*not_compound_source*/, + target_type& target, std::true_type /*is_compound_target*/) + { + samples::disconnect(source, target.input_buffer); + } + //disconnect a compound source from a simple target + template + inline void disconnect_impl(source_type& source, std::true_type /*not_compound_source*/, + target_type& target, std::false_type /*is_compound_target*/) + { + samples::disconnect(source.output_buffer, target); + } + + //disconnect a compound source from a compound target + template + inline void disconnect_impl(source_type& source, std::true_type /*not_compound_source*/, + target_type& target, std::true_type /*is_compound_target*/) + { + samples::disconnect(source.output_buffer, target.input_buffer); + } + + //disconnect impl has pointers removed already these are as_ref types + template + inline void disconnect_impl(const source_type& source, const target_type& target) + { + //first pass remove pointers + disconnect_impl(source.ref, is_compound_source(), + target.ref, is_compound_target()); + } + template + inline void disconnect_all_impl(source_type& source, std::false_type/*not_compound_source*/) + { + source.unlink_targets(); + } + template + inline void disconnect_all_impl(source_type& source, std::true_type /*is_compound_source*/) + { + samples::disconnect(source.output_buffer); + } + template + inline void disconnect_all_impl(source_type& source) + { + details::disconnect_all_impl(source.ref, is_compound_source()); + } + + }// end details namespace + + //connects two message blocks or compound message blocks + template + inline void connect(source_type& source, target_type& target) + { + details::connect_impl(as_ref(source), as_ref(target)); + } + + //disconnects two message blocks or compound message blocks + template + inline void disconnect(source_type& source, target_type& target) + { + details::disconnect_impl(as_ref(source), as_ref(target)); + } + + //connects two message blocks or compound message blocks, source is shared_ptr + template + inline void connect(std::shared_ptr& source_ptr, target_type& target) + { + details::connect_impl(as_ref(*source_ptr.get()), as_ref(target)); + } + + //connects two message blocks or compound message blocks both shared ptrs + template + inline void connect(std::shared_ptr& source_ptr, std::shared_ptr& target_ptr) + { + details::connect_impl(as_ref(*source_ptr.get()), as_ref(*target_ptr.get())); + } + + //connects two message blocks or compound message blocks target is shared_ptr + template + inline void connect(source_type& source, std::shared_ptr& target_ptr) + { + details::connect_impl(as_ref(source), as_ref(*target_ptr.get())); + } + + //connects two message blocks or compound message blocks, source is shared_ptr + template + inline void disconnect(std::shared_ptr& source_ptr, target_type& target) + { + details::disconnect_impl(as_ref(*source_ptr.get()), as_ref(target)); + } + + //connects two message blocks or compound message blocks both shared ptrs + template + inline void disconnect(std::shared_ptr& source_ptr, std::shared_ptr& target_ptr) + { + details::disconnect_impl(as_ref(*source_ptr.get()), as_ref(*target_ptr.get())); + } + + //connects two message blocks or compound message blocks target is shared_ptr + template + inline void disconnect(source_type& source, std::shared_ptr& target_ptr) + { + details::disconnect_impl(as_ref(source), as_ref(*target_ptr.get())); + } + + //disconnects all connected targets + template + inline void disconnect(source_type& source) + { + details::disconnect_all_impl(as_ref(source)); + } + + //disconnects a message block that is a shared_ptr + template + inline void disconnect(std::shared_ptr& source_ptr) + { + details::disconnect_all_impl(as_ref(*source_ptr.get())); + } +} \ No newline at end of file diff --git a/concrt_extras/internal_concurrent_hash.h b/concrt_extras/internal_concurrent_hash.h new file mode 100644 index 000000000..0e58dd260 --- /dev/null +++ b/concrt_extras/internal_concurrent_hash.h @@ -0,0 +1,1486 @@ +/*** +* ==++== +* +* Copyright (c) Microsoft Corporation. All rights reserved. +* +* ==--== +* =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +* +* internal_concurrent_hash.h +* +* =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- +****/ +#pragma once + +#include +#include "internal_split_ordered_list.h" +#include + +namespace Concurrency +{ +namespace details +{ +// Template class for hash compare +template +class _Hash_compare +{ +public: + typedef _Hasher hasher; + + _Hash_compare() + { + } + + _Hash_compare(hasher _Hasharg) : _M_hash_object(_Hasharg) + { + } + + _Hash_compare(hasher _Hasharg, _Key_equality _Keyeqarg) : _M_hash_object(_Hasharg), _M_key_compare_object(_Keyeqarg) + { + } + + size_t operator()(const _Key_type& _Keyval) const + { + return ((size_t)_M_hash_object(_Keyval)); + } + + bool operator()(const _Key_type& _Keyval1, const _Key_type& _Keyval2) const + { + return (!_M_key_compare_object(_Keyval1, _Keyval2)); + } + + hasher _M_hash_object; // The hash object + _Key_equality _M_key_compare_object; // The equality comparator object +}; + +// An efficient implementation of the _Reverse function utilizes a 2^8 or 2^16 lookup table holding the +// bit-reversed values of [0..2^8 - 1] or [0..2^16 - 1] respectively. Those values can also be computed +// on the fly at a slightly higher cost. +const unsigned char _Byte_reverse_table[] = +{ + 0x00, 0x80, 0x40, 0xC0, 0x20, 0xA0, 0x60, 0xE0, 0x10, 0x90, 0x50, 0xD0, 0x30, 0xB0, 0x70, 0xF0, + 0x08, 0x88, 0x48, 0xC8, 0x28, 0xA8, 0x68, 0xE8, 0x18, 0x98, 0x58, 0xD8, 0x38, 0xB8, 0x78, 0xF8, + 0x04, 0x84, 0x44, 0xC4, 0x24, 0xA4, 0x64, 0xE4, 0x14, 0x94, 0x54, 0xD4, 0x34, 0xB4, 0x74, 0xF4, + 0x0C, 0x8C, 0x4C, 0xCC, 0x2C, 0xAC, 0x6C, 0xEC, 0x1C, 0x9C, 0x5C, 0xDC, 0x3C, 0xBC, 0x7C, 0xFC, + 0x02, 0x82, 0x42, 0xC2, 0x22, 0xA2, 0x62, 0xE2, 0x12, 0x92, 0x52, 0xD2, 0x32, 0xB2, 0x72, 0xF2, + 0x0A, 0x8A, 0x4A, 0xCA, 0x2A, 0xAA, 0x6A, 0xEA, 0x1A, 0x9A, 0x5A, 0xDA, 0x3A, 0xBA, 0x7A, 0xFA, + 0x06, 0x86, 0x46, 0xC6, 0x26, 0xA6, 0x66, 0xE6, 0x16, 0x96, 0x56, 0xD6, 0x36, 0xB6, 0x76, 0xF6, + 0x0E, 0x8E, 0x4E, 0xCE, 0x2E, 0xAE, 0x6E, 0xEE, 0x1E, 0x9E, 0x5E, 0xDE, 0x3E, 0xBE, 0x7E, 0xFE, + 0x01, 0x81, 0x41, 0xC1, 0x21, 0xA1, 0x61, 0xE1, 0x11, 0x91, 0x51, 0xD1, 0x31, 0xB1, 0x71, 0xF1, + 0x09, 0x89, 0x49, 0xC9, 0x29, 0xA9, 0x69, 0xE9, 0x19, 0x99, 0x59, 0xD9, 0x39, 0xB9, 0x79, 0xF9, + 0x05, 0x85, 0x45, 0xC5, 0x25, 0xA5, 0x65, 0xE5, 0x15, 0x95, 0x55, 0xD5, 0x35, 0xB5, 0x75, 0xF5, + 0x0D, 0x8D, 0x4D, 0xCD, 0x2D, 0xAD, 0x6D, 0xED, 0x1D, 0x9D, 0x5D, 0xDD, 0x3D, 0xBD, 0x7D, 0xFD, + 0x03, 0x83, 0x43, 0xC3, 0x23, 0xA3, 0x63, 0xE3, 0x13, 0x93, 0x53, 0xD3, 0x33, 0xB3, 0x73, 0xF3, + 0x0B, 0x8B, 0x4B, 0xCB, 0x2B, 0xAB, 0x6B, 0xEB, 0x1B, 0x9B, 0x5B, 0xDB, 0x3B, 0xBB, 0x7B, 0xFB, + 0x07, 0x87, 0x47, 0xC7, 0x27, 0xA7, 0x67, 0xE7, 0x17, 0x97, 0x57, 0xD7, 0x37, 0xB7, 0x77, 0xF7, + 0x0F, 0x8F, 0x4F, 0xCF, 0x2F, 0xAF, 0x6F, 0xEF, 0x1F, 0x9F, 0x5F, 0xDF, 0x3F, 0xBF, 0x7F, 0xFF +}; + +// Given a byte, reverses it +inline unsigned char _Reverse_byte(unsigned char _Original_byte) +{ + // return ((_Original_byte * 0x80200802ULL) & 0x0884422110ULL) * 0x0101010101ULL >> 32; + return _Byte_reverse_table[_Original_byte]; +} + +// Finds the most significant bit in a size_t +inline unsigned char _Get_msb(size_t _Mask) +{ + unsigned long _Index = 0; + +#if defined(_M_IX86) + _BitScanReverse(&_Index, _Mask); +#else + _BitScanReverse64(&_Index, _Mask); +#endif + + return (unsigned char) _Index; +} + +#pragma warning(push) +#pragma warning(disable: 4127) // warning 4127 -- while (true) has a constant expression in it + +template +class _Concurrent_hash : public _Traits +{ +public: + // Type definitions + typedef _Concurrent_hash<_Traits> _Mytype; + typedef typename _Traits::_Value_type _Value_type; + typedef typename _Traits::value_type value_type; + typedef typename _Traits::key_type key_type; + typedef typename _Traits::key_compare key_compare; + typedef typename _Traits::value_compare value_compare; + + typedef typename _Traits::allocator_type allocator_type; + typedef typename allocator_type::pointer pointer; + typedef typename allocator_type::const_pointer const_pointer; + typedef typename allocator_type::reference reference; + typedef typename allocator_type::const_reference const_reference; + + typedef typename allocator_type::size_type size_type; + typedef typename allocator_type::difference_type difference_type; + + typedef typename _Split_ordered_list _Mylist; + typedef typename _Mylist::_Nodeptr _Nodeptr; + + typedef typename std::tr1::conditional::value, typename _Mylist::const_iterator, typename _Mylist::iterator>::type iterator; + typedef typename _Mylist::const_iterator const_iterator; + typedef iterator local_iterator; + typedef const_iterator const_local_iterator; + typedef std::pair _Pairib; + typedef std::pair _Pairii; + typedef std::pair _Paircc; + + // Iterators that walk the entire split-order list, including dummy nodes + typedef typename _Mylist::_Full_iterator _Full_iterator; + typedef typename _Mylist::_Full_const_iterator _Full_const_iterator; + + static const size_type _Initial_bucket_number = 8; // Initial number of buckets + static const size_type _Initial_bucket_load = 4; // Initial maximum number of elements per bucket + static size_type const _Pointers_per_table = sizeof(size_type) * 8; // One bucket segment per bit + + // Constructors/Destructors + _Concurrent_hash(size_type _Number_of_buckets = _Initial_bucket_number, const key_compare& _Parg = key_compare(), const allocator_type& _Allocator = allocator_type()) + : _Traits(_Parg), _M_number_of_buckets(_Number_of_buckets), _M_split_ordered_list(_Allocator), _M_allocator(_Allocator), _M_maximum_bucket_size((float) _Initial_bucket_load) + { + _Init(); + } + + _Concurrent_hash(const _Concurrent_hash& _Right, const allocator_type& _Allocator) : _Traits(_Right._M_comparator), _M_split_ordered_list(_Allocator), _M_allocator(_Allocator) + { + _Copy(_Right); + } + + _Concurrent_hash(const _Concurrent_hash& _Right) : _Traits(_Right._M_comparator), _M_split_ordered_list(_Right.get_allocator()), _M_allocator(_Right.get_allocator()) + { + _Init(); + _Copy(_Right); + } + + _Concurrent_hash(_Concurrent_hash&& _Right) : _Traits(_Right._M_comparator), _M_split_ordered_list(_Right.get_allocator()), _M_allocator(_Right.get_allocator()), + _M_number_of_buckets(_Initial_bucket_number), _M_maximum_bucket_size((float) _Initial_bucket_load) + { + _Init(); + swap(_Right); + } + + _Concurrent_hash& operator=(const _Concurrent_hash& _Right) + { + if (this != &_Right) + { + _Copy(_Right); + } + + return (*this); + } + + _Concurrent_hash& operator=(_Concurrent_hash&& _Right) + { + if (this != &_Right) + { + clear(); + swap(_Right); + } + + return (*this); + } + + ~_Concurrent_hash() + { + // Delete all node segments + for (size_type _Index = 0; _Index < _Pointers_per_table; _Index++) + { + if (_M_buckets[_Index] != NULL) + { + size_type _Seg_size = _Segment_size(_Index); + for (size_type _Index2 = 0; _Index2 < _Seg_size; _Index2++) + { + _M_allocator.destroy(&_M_buckets[_Index][_Index2]); + } + _M_allocator.deallocate(_M_buckets[_Index], _Seg_size); + } + } + } + + static size_type __cdecl _Segment_index_of( size_type _Index ) + { + return size_type( _Get_msb( _Index|1 ) ); + } + + static size_type _Segment_base( size_type _K ) + { + return (size_type(1)<<_K & ~size_type(1)); + } + + static size_type _Segment_size( size_type _K ) + { + return _K ? size_type(1)<<_K : 2; + } + + /// + /// Returns the allocator used for this concurrent container. + /// + /// + /// This function is concurrency safe. + /// + /// + /// The allocator used for this concurrent container. + /// + /**/ + allocator_type get_allocator() const + { + return _M_split_ordered_list.get_allocator(); + } + + // Size and capacity function + /// + /// Checks whether the number of elements in this concurrent container is non-zero. + /// + /// + /// This function is concurrency safe. + /// With concurrent inserts, whether or not the concurrent container is empty may change + /// immediately after calling this function, before the return value is even read. + /// + /// + /// True, if this concurrent container object is empty, false, otherwise. + /// + /**/ + bool empty() const + { + return _M_split_ordered_list.empty(); + } + + /// + /// Returns the number of elements in this concurrent container. + /// + /// + /// This function is concurrency safe. + /// With concurrent inserts, the number of elements in the concurrent container may change + /// immediately after calling this function, before the return value is even read. + /// + /// + /// The number of items in the container. + /// + /**/ + size_type size() const + { + return _M_split_ordered_list.size(); + } + + /// + /// Returns the maximum size of the concurrent container, determined by the allocator. + /// + /// + /// This function is concurrency safe. + /// This upper bound value may actually be higher than what the container can actually hold. + /// + /// + /// The maximum number of elements that can be put into this concurrent container. + /// + /**/ + size_type max_size() const + { + return _M_split_ordered_list.max_size(); + } + + // Iterators + /// + /// Returns an iterator pointing to the first element in the concurrent container. + /// + /// + /// This function is concurrency safe. + /// + /// + /// An iterator to the first element in the concurrent container. + /// + /**/ + iterator begin() + { + return _M_split_ordered_list.begin(); + } + + /// + /// Returns a const_iterator pointing to the first element in the concurrent container. + /// + /// + /// This function is concurrency safe. + /// + /// + /// A const_iterator to the first element in the concurrent container. + /// + /**/ + const_iterator begin() const + { + return _M_split_ordered_list.begin(); + } + + /// + /// Returns an iterator pointing to the location succeeding the last element in the concurrent container. + /// + /// + /// This function is concurrency safe. + /// + /// + /// An iterator to the location succeeding the last element in the concurrent container. + /// + /**/ + iterator end() + { + return _M_split_ordered_list.end(); + } + + /// + /// Returns a const_iterator pointing to the location succeeding the last element in the concurrent container. + /// + /// + /// This function is concurrency safe. + /// + /// + /// A const_iterator to the location succeeding the last element in the concurrent container. + /// + /**/ + const_iterator end() const + { + return _M_split_ordered_list.end(); + } + + /// + /// Returns a const_iterator pointing to the first element in the concurrent container. + /// + /// + /// This function is concurrency safe. + /// + /// + /// A const_iterator to the first element in the concurrent container. + /// + /**/ + const_iterator cbegin() const + { + return _M_split_ordered_list.cbegin(); + } + + /// + /// Returns a const_iterator pointing to the location succeeding the last element in the concurrent container. + /// + /// + /// This function is concurrency safe. + /// + /// + /// A const_iterator to the location succeeding the last element in the concurrent container. + /// + /**/ + const_iterator cend() const + { + return _M_split_ordered_list.cend(); + } + + // Modifiers + /// + /// Inserts a value into the concurrent container. + /// + /// + /// The value to insert. + /// + /// + /// This function is concurrency safe. + /// + /// + /// A pair where the first object is an iterator into the container at the insertion point and the + /// second object is a bool indicating whether the value was inserted (true) or not (false). + /// + /**/ + _Pairib insert(const value_type& _Value) + { + return _Insert(_Value); + } + + /// + /// Inserts a value into the concurrent container. + /// + /// + /// The type of the value inserted into the map. + /// + /// + /// The value to insert. + /// + /// + /// This function is concurrency safe. + /// + /// + /// A pair where the first object is an iterator into the container at the insertion point and the + /// second object is a bool indicating whether the value was inserted (true) or not (false). + /// + /**/ + template + _Pairib insert(_Valty&& _Value) + { + return _Insert(std::forward<_Valty>(_Value)); + } + + /// + /// Inserts a value into the concurrent container. + /// + /// + /// The value to insert. + /// + /// + /// This function is concurrency safe. + /// The current implementation ignores the first const_iterator argument. It exists for + /// similarity with the unordered_map, and hints to a location to start the search + /// for an insertion point. + /// + /// + /// An iterator pointing to the insertion point of the object. If the key already exists in the container + /// and the container does not support multiple keys, an iterator pointing to the duplicate key location in + /// the map is returned. + /// + /**/ + iterator insert(const_iterator, const value_type& _Value) + { + // Ignore hint + return insert(_Value).first; + } + + /// + /// Inserts a value into the concurrent container. + /// + /// + /// The type of the value inserted into the map. + /// + /// + /// The value to insert. + /// + /// + /// This function is concurrency safe. + /// The current implementation ignores the first const_iterator argument. It exists for + /// similarity with the unordered_map, and hints to a location to start the search + /// for an insertion point. + /// + /// + /// An iterator pointing to the insertion point of the object. If the key already exists in the container + /// and the container does not support multiple keys, an iterator pointing to the duplicate key location in + /// the map is returned. + /// + /**/ + template + typename std::tr1::enable_if::type>::value, iterator>::type + insert(const_iterator, _Valty&& _Value) + { + // Ignore hint + return insert(std::forward<_Valty>(_Value)).first; + } + + /// + /// Inserts a set of values into the concurrent container from an iterator. + /// + /// + /// The iterator type used for insertion. + /// + /// + /// The input iterator pointing to the beginning location. + /// + /// + /// The input iterator pointing to the end location. + /// + /// + /// This function is concurrency safe. + /// + /**/ + template + void insert(_Iterator _First, _Iterator _Last) + { + for (_Iterator _I = _First; _I != _Last; _I++) + { + insert(*_I); + } + } + + /// + /// Erases an element from the concurrent container. + /// + /// + /// A const_iterator pointing to the element to erase. + /// + /// + /// This function is not concurrency safe. + /// + /// + /// An iterator pointing to the location immediately following the deleted object in the container. + /// + /**/ + iterator unsafe_erase(const_iterator _Where) + { + return _Erase(_Where); + } + + /// + /// Erases multiple elements from the concurrent container. + /// + /// + /// A const_iterator pointing to the first element to erase. + /// + /// + /// A const_iterator pointing to the location after the last element to erase,. + /// + /// + /// This function is not concurrency safe. + /// + /// + /// An iterator pointing to the location immediately following the deleted object(s) in the container. + /// + /**/ + iterator unsafe_erase(const_iterator _First, const_iterator _Last) + { + while (_First != _Last) + { + unsafe_erase(_First++); + } + + return _M_split_ordered_list._Get_iterator(_First); + } + + /// + /// Erases an element from the concurrent container. + /// + /// + /// The value to erase from the concurrent container. + /// + /// + /// This function is not concurrency safe. + /// + /// + /// A count of the number of keys removed from the concurrent_unordered_map. + /// + /**/ + size_type unsafe_erase(const key_type& _Keyval) + { + _Pairii _Where = equal_range(_Keyval); + size_type _Count = _Distance(_Where.first, _Where.second); + unsafe_erase(_Where.first, _Where.second); + return _Count; + } + + /// + /// Swaps the contents of two concurrent containers. + /// + /// + /// The container to swap elements from. + /// + /// + /// This function is not concurrency safe. + /// + /// + /// Throws an invalid_argument exception if the swap is being + /// performed on unequal allocators. + /// + /**/ + void swap(_Concurrent_hash& _Right) + { + if (this != &_Right) + { + std::_Swap_adl(_M_comparator, _Right._M_comparator); + _M_split_ordered_list.swap(_Right._M_split_ordered_list); + _Swap_buckets(_Right); + std::swap(_M_number_of_buckets, _Right._M_number_of_buckets); + std::swap(_M_maximum_bucket_size, _Right._M_maximum_bucket_size); + } + } + + // Observers + /// + /// Clears all the objects in the concurrent container. + /// + /// + /// This function is not concurrency safe. + /// + /**/ + void clear() + { + // Clear list + _M_split_ordered_list.clear(); + + // Clear buckets + for (size_type _Index = 0; _Index < _Pointers_per_table; _Index++) + { + if (_M_buckets[_Index] != NULL) + { + size_type _Seg_size = _Segment_size(_Index); + for (size_type _Index2 = 0; _Index2 < _Seg_size; _Index2++) + { + _M_allocator.destroy(&_M_buckets[_Index][_Index2]); + } + _M_allocator.deallocate(_M_buckets[_Index], _Seg_size); + } + } + + // memset all the buckets to zero and initialize the dummy node 0 + _Init(); + } + + // Lookup + /// + /// Searches the concurrent container for a specific key. + /// + /// + /// The key to search for. + /// + /// + /// This function is concurrency safe. + /// + /// + /// An iterator pointing to the location of the searched for key, or points to end() if not found. + /// + /**/ + iterator find(const key_type& _Keyval) + { + return _Find(_Keyval); + } + + /// + /// Searches the concurrent container for a specific key. + /// + /// + /// The key to search for. + /// + /// + /// This function is concurrency safe. + /// + /// + /// A const_iterator pointing to the location of the searched for key. + /// + /**/ + const_iterator find(const key_type& _Keyval) const + { + return _Find(_Keyval); + } + + /// + /// Counts the number of times a specific key appears in the container. + /// + /// + /// The key to count. + /// + /// + /// This function is concurrency safe. + /// + /// + /// The number of times the key appears in the container. + /// + /**/ + size_type count(const key_type& _Keyval) const + { + size_type _Count = 0; + const_iterator _It = _Find(_Keyval); + for (;_It != end() && !_M_comparator(_Key_function(*_It), _Keyval); _It++) + { + _Count++; + } + return _Count; + } + + /// + /// Finds the iterators pointing to the being and end locations a specific key appears in the container. + /// + /// + /// The key to search for. + /// + /// + /// This function is concurrency safe. + /// It is possible that concurrent inserts may cause the additional keys to be inserted after the + /// begin iterator and before the end iterator. + /// + /// + /// A pair where the first element is an iterator to the beginning and the second element + /// is an iterator to the end of the range.. + /// + /**/ + _Pairii equal_range(const key_type& _Keyval) + { + return _Equal_range(_Keyval); + } + + /// + /// Finds the const_iterators pointing to the being and end locations a specific key appears in the container. + /// + /// + /// The key to search for. + /// + /// + /// This function is concurrency safe. + /// It is possible that concurrent inserts may cause the additional keys to be inserted after the + /// begin iterator and before the end iterator. + /// + /// + /// A pair where the first element is a const_iterator to the beginning and the second element + /// is a const_iterator to the end of the range. + /// + /**/ + _Paircc equal_range(const key_type& _Keyval) const + { + return _Equal_range(_Keyval); + } + + // Bucket interface - for debugging + /// + /// Returns the current number of buckets in this container. + /// + /// + /// The current number of buckets in this container. + /// + /**/ + size_type unsafe_bucket_count() const + { + return _M_number_of_buckets; + } + + /// + /// Returns the maximum number of buckets in this container. + /// + /// + /// The maximum number of buckets in this container. + /// + /**/ + size_type unsafe_max_bucket_count() const + { + return _Segment_size(_Pointers_per_table-1); + } + + /// + /// Returns the number of items in a specific bucket of this container. + /// + /// + /// The bucket to search for. + /// + /// + /// The current number of buckets in this container. + /// + /**/ + size_type unsafe_bucket_size(size_type _Bucket) + { + size_type _Count = 0; + + if (!_Is_initialized(_Bucket)) + { + return _Count; + } + + _Full_iterator _Iterator = _Get_bucket(_Bucket); + _Iterator++; + + for (; _Iterator != _M_split_ordered_list._End() && !_Iterator._Mynode()->_Is_dummy(); _Iterator++) + { + _Count++; + } + + return _Count; + } + + /// + /// Returns the bucket index that a specific key maps to in this container. + /// + /// + /// The element key being searched for. + /// + /// + /// The bucket index for the key in this container. + /// + /**/ + size_type unsafe_bucket(const key_type& _Keyval) const + { + _Split_order_key _Order_key = (_Split_order_key) _M_comparator(_Keyval); + size_type _Bucket = _Order_key % _M_number_of_buckets; + return _Bucket; + } + + // If the bucket is initialized, return a first non-dummy element in it + + /// + /// Returns an iterator to the first element in this container for a specific bucket. + /// + /// + /// The bucket index. + /// + /// + /// An iterator pointing to the beginning of the bucket. + /// + /**/ + local_iterator unsafe_begin(size_type _Bucket) + { + // It is possible the bucket being searched for has not yet been initialized + if (!_Is_initialized(_Bucket)) + { + _Initialize_bucket(_Bucket); + } + + _Full_iterator _Iterator = _Get_bucket(_Bucket); + return _M_split_ordered_list._Get_first_real_iterator(_Iterator); + } + + // If the bucket is initialized, return a first non-dummy element in it + + /// + /// Returns a const_iterator to the first element in this container for a specific bucket. + /// + /// + /// The bucket index. + /// + /// + /// A const_iterator pointing to the beginning of the bucket. + /// + /**/ + const_local_iterator unsafe_begin(size_type _Bucket) const + { + // It is possible the bucket being searched for has not yet been initialized + if (!_Is_initialized(_Bucket)) + { + _Initialize_bucket(_Bucket); + } + + + _Full_const_iterator _Iterator = _Get_bucket(_Bucket); + return _M_split_ordered_list._Get_first_real_iterator(_Iterator); + } + + // Returns the iterator after the last non-dummy element in the bucket + + /// + /// Returns an iterator to the last element in this container for a specific bucket. + /// + /// + /// The bucket index. + /// + /// + /// An iterator pointing to the end of the bucket. + /// + /**/ + local_iterator unsafe_end(size_type _Bucket) + { + // If we've increased the number of buckets, there's a chance the intermediate dummy + // node marking the end of this bucket has not yet been lazily initialized. + // Inserting from from _M_number_of_buckets/2 to _M_number_of_buckets will recursively + // initialize all the dummy nodes in the map. + for(size_type _Bucket_index = _M_number_of_buckets >> 1; _Bucket_index < _M_number_of_buckets; _Bucket_index++) + { + if (!_Is_initialized(_Bucket_index)) + { + _Initialize_bucket(_Bucket_index); + } + } + + _Full_iterator _Iterator = _Get_bucket(_Bucket); + + // Find the end of the bucket, denoted by the dummy element + do + { + _Iterator++; + } + while(_Iterator != _M_split_ordered_list._End() && !_Iterator._Mynode()->_Is_dummy()); + + // Return the first real element past the end of the bucket + return _M_split_ordered_list._Get_first_real_iterator(_Iterator); + } + + // Returns the iterator after the last non-dummy element in the bucket + + /// + /// Returns a const_iterator to the last element in this container for a specific bucket. + /// + /// + /// The bucket index. + /// + /// + /// A const_iterator pointing to the end of the bucket. + /// + /**/ + const_local_iterator unsafe_end(size_type _Bucket) const + { + // If we've increased the number of buckets, there's a chance the intermediate dummy + // node marking the end of this bucket has not yet been lazily initialized. + // Inserting from from _M_number_of_buckets/2 to _M_number_of_buckets will recursively + // initialize all the dummy nodes in the map. + for(size_type _Bucket_index = _M_number_of_buckets >> 1; _Bucket_index < _M_number_of_buckets; _Bucket_index++) + { + if (!_Is_initialized(_Bucket_index)) + { + _Initialize_bucket(_Bucket_index); + } + } + + _Full_const_iterator _Iterator = _Get_bucket(_Bucket); + + // Find the end of the bucket, denoted by the dummy element + do + { + _Iterator++; + } + while(_Iterator != _M_split_ordered_list._End() && !_Iterator._Mynode()->_Is_dummy()); + + // Return the first real element past the end of the bucket + return _M_split_ordered_list._Get_first_real_iterator(_Iterator); + } + + /// + /// Returns a const_iterator to the first element in this container for a specific bucket. + /// + /// + /// The bucket index. + /// + /// + /// A const_iterator pointing to the beginning of the bucket. + /// + /**/ + const_local_iterator unsafe_cbegin(size_type _Bucket) const + { + return ((const _Mytype *) this)->begin(); + } + + /// + /// Returns a const_iterator to the last element in this container for a specific bucket. + /// + /// + /// The bucket index. + /// + /// + /// A const_iterator pointing to the end of the bucket. + /// + /**/ + const_local_iterator unsafe_cend(size_type _Bucket) const + { + return ((const _Mytype *) this)->end(); + } + + // Hash policy + /// + /// Computes and returns the current load factor of the container. + /// The load factor is the number of elements in the container divided by the number of buckets. + /// + /// + /// The load factor for the container + /// + /**/ + float load_factor() const + { + return (float) size() / (float) unsafe_bucket_count(); + } + + /// + /// Returns the maximum load factor of the container. The maximum load factor is the + /// largest number of elements than can be in any bucket before the container grows its + /// internal table. + /// + /// + /// The maximum load factor for the container + /// + /**/ + float max_load_factor() const + { + return _M_maximum_bucket_size; + } + + /// + /// Sets the maximum load factor of the container to a specific value. + /// + /// + /// The desired load factor for this container. + /// + /// + /// Throws an out_of_range exception if the load factor is invalid (NaN or negative) + /// + /**/ + void max_load_factor(float _Newmax) + { + // The _Newmax != _Newmax is a check for NaN, because NaN is != to itself + if (_Newmax != _Newmax || _Newmax < 0) + { + throw std::out_of_range("invalid hash load factor"); + } + + _M_maximum_bucket_size = _Newmax; + } + + // This function is a noop, because the underlying split-ordered list + // is already sorted, so an increase in the bucket number will be + // reflected next time this bucket is touched. + + /// + /// Sets the the number of buckets for the container to a specific value. + /// + /// + /// The desired number of buckets. + /// + /// + /// The number of buckets must be a power of 2. If not a power of 2, it will be rounded up to + /// the next largest power of 2. + /// Throws an out_of_range exception if the number of buckets + /// is invalid (0 or greater than the maximum number of buckets) + /// + /**/ + void rehash(size_type _Buckets) + { + size_type _Current_buckets = _M_number_of_buckets; + + if (_Current_buckets > _Buckets) + { + return; + } + else if (_Buckets <= 0 || _Buckets > unsafe_max_bucket_count()) + { + throw std::out_of_range("invalid number of buckets"); + } + // Round up the number of buckets to the next largest power of 2 + _M_number_of_buckets = ((size_type) 1) << _Get_msb(_Buckets*2-1); + } + +private: + + // Initialize the hash and keep the first bucket open + void _Init() + { + // Allocate an array of segment pointers + memset(_M_buckets, 0, _Pointers_per_table * sizeof(void *)); + + // Insert the first element in the split-ordered list + _Full_iterator _Dummy_node = _M_split_ordered_list._Begin(); + _Set_bucket(0, _Dummy_node); + } + + void _Copy(const _Mytype& _Right) + { + clear(); + + _M_maximum_bucket_size = _Right._M_maximum_bucket_size; + _M_number_of_buckets = _Right._M_number_of_buckets; + + try + { + insert(_Right.begin(), _Right.end()); + _M_comparator = _Right._M_comparator; + } + catch(...) + { + _M_split_ordered_list.clear(); + throw; + } + } + + void _Swap_buckets(_Concurrent_hash& _Right) + { + if (_M_allocator == _Right._M_allocator) + { + // Swap all node segments + for (size_type _Index = 0; _Index < _Pointers_per_table; _Index++) + { + _Full_iterator * _Iterator_pointer = _M_buckets[_Index]; + _M_buckets[_Index] = _Right._M_buckets[_Index]; + _Right._M_buckets[_Index] = _Iterator_pointer; + } + } + else + { + throw std::invalid_argument("swap is invalid on non-equal allocators"); + } + } + + // Hash APIs + size_type _Distance(const_iterator _First, const_iterator _Last) const + { + size_type _Num = 0; + + for (const_iterator _Iterator = _First; _Iterator != _Last; _Iterator++) + { + _Num++; + } + + return _Num; + } + + // Insert an element in the hash given its value + template + _Pairib _Insert(_ValTy&& _Value) + { + _Split_order_key _Order_key = (_Split_order_key) _M_comparator(_Key_function(_Value)); + size_type _Bucket = _Order_key % _M_number_of_buckets; + + // If bucket is empty, initialize it first + if (!_Is_initialized(_Bucket)) + { + _Initialize_bucket(_Bucket); + } + + long _New_count; + _Order_key = _Split_order_regular_key(_Order_key); + _Full_iterator _Iterator = _Get_bucket(_Bucket); + _Full_iterator _Last = _M_split_ordered_list._End(); + _Full_iterator _Where = _Iterator; + _Nodeptr _New_node = _M_split_ordered_list._Buynode(_Order_key, std::forward<_ValTy>(_Value)); + + _ASSERT_EXPR(_Where != _Last, L"Invalid head node"); + + // First node is a dummy node + _Where++; + + for (;;) + { + if (_Where == _Last || _Mylist::_Get_key(_Where) > _Order_key) + { + // Try to insert it in the right place + _Pairib _Result = _M_split_ordered_list._Insert(_Iterator, _Where, _New_node, &_New_count); + + if (_Result.second) + { + // Insertion succeeded, adjust the table size, if needed + _Adjust_table_size(_New_count, _M_number_of_buckets); + return _Result; + } + else + { + // Insertion failed: either the same node was inserted by another thread, or + // another element was inserted at exactly the same place as this node. + // Proceed with the search from the previous location where order key was + // known to be larger (note: this is legal only because there is no safe + // concurrent erase operation supported). + _Where = _Iterator; + _Where++; + continue; + } + } + else if (!_M_allow_multimapping && _Mylist::_Get_key(_Where) == _Order_key && + _M_comparator(_Key_function(*_Where), _Key_function(_New_node->_M_element)) == 0) + { + // If the insert failed (element already there), then delete the new one + _M_split_ordered_list._Erase(_New_node); + + // Element already in the list, return it + return _Pairib(_M_split_ordered_list._Get_iterator(_Where), false); + } + + // Move the iterator forward + _Iterator = _Where; + _Where++; + } + } + // Find the element in the split-ordered list + iterator _Find(const key_type& _Keyval) + { + _Split_order_key _Order_key = (_Split_order_key) _M_comparator(_Keyval); + size_type _Bucket = _Order_key % _M_number_of_buckets; + + // If _Bucket is empty, initialize it first + if (!_Is_initialized(_Bucket)) + { + _Initialize_bucket(_Bucket); + } + + _Order_key = _Split_order_regular_key(_Order_key); + _Full_iterator _Last = _M_split_ordered_list._End(); + + for (_Full_iterator _Iterator = _Get_bucket(_Bucket); _Iterator != _Last; _Iterator++) + { + if (_Mylist::_Get_key(_Iterator) > _Order_key) + { + // If the order key is smaller than the current order key, the element + // is not in the hash. + return end(); + } + else if (_Mylist::_Get_key(_Iterator) == _Order_key) + { + // The fact that order keys match does not mean that the element is found. + // Key function comparison has to be performed to check whether this is the + // right element. If not, keep searching while order key is the same. + if (!_M_comparator(_Key_function(*_Iterator), _Keyval)) + { + return _M_split_ordered_list._Get_iterator(_Iterator); + } + } + } + + return end(); + } + + // Find the element in the split-ordered list + const_iterator _Find(const key_type& _Keyval) const + { + _Split_order_key _Order_key = (_Split_order_key) _M_comparator(_Keyval); + size_type _Bucket = _Order_key % _M_number_of_buckets; + + // If _Bucket has not been initialized, keep searching up for a parent bucket + // that has been initialized. Worst case is the entire map will be read. + while (!_Is_initialized(_Bucket)) + { + _Bucket = _Get_parent(_Bucket); + } + + _Order_key = _Split_order_regular_key(_Order_key); + _Full_const_iterator _Last = _M_split_ordered_list._End(); + + for (_Full_const_iterator _Iterator = _Get_bucket(_Bucket); _Iterator != _Last; _Iterator++) + { + if (_Mylist::_Get_key(_Iterator) > _Order_key) + { + // If the order key is smaller than the current order key, the element + // is not in the hash. + return end(); + } + else if (_Mylist::_Get_key(_Iterator) == _Order_key) + { + // The fact that order keys match does not mean that the element is found. + // Key function comparison has to be performed to check whether this is the + // right element. If not, keep searching while order key is the same. + if (!_M_comparator(_Key_function(*_Iterator), _Keyval)) + { + return _M_split_ordered_list._Get_iterator(_Iterator); + } + } + } + + return end(); + } + + // Erase an element from the list. This is not a concurrency safe function. + iterator _Erase(const_iterator _Iterator) + { + key_type _Keyval = _Key_function(*_Iterator); + _Split_order_key _Order_key = (_Split_order_key) _M_comparator(_Keyval); + size_type _Bucket = _Order_key % _M_number_of_buckets; + + // If bucket is empty, initialize it first + if (!_Is_initialized(_Bucket)) + { + _Initialize_bucket(_Bucket); + } + + _Order_key = _Split_order_regular_key(_Order_key); + + _Full_iterator _Previous = _Get_bucket(_Bucket); + _Full_iterator _Last = _M_split_ordered_list._End(); + _Full_iterator _Where = _Previous; + + _ASSERT_EXPR(_Where != _Last, L"Invalid head node"); + + // First node is a dummy node + _Where++; + + for (;;) + { + if (_Where == _Last) + { + return end(); + } + else if (_M_split_ordered_list._Get_iterator(_Where) == _Iterator) + { + return _M_split_ordered_list._Erase(_Previous, _Iterator); + } + + // Move the iterator forward + _Previous = _Where; + _Where++; + } + } + + // Return the [begin, end) pair of iterators with the same key values. + // This operation makes sense only if mapping is many-to-one. + _Pairii _Equal_range(const key_type& _Keyval) + { + _Split_order_key _Order_key = (_Split_order_key) _M_comparator(_Keyval); + size_type _Bucket = _Order_key % _M_number_of_buckets; + + // If _Bucket is empty, initialize it first + if (!_Is_initialized(_Bucket)) + { + _Initialize_bucket(_Bucket); + } + + _Order_key = _Split_order_regular_key(_Order_key); + _Full_iterator _Last = _M_split_ordered_list._End(); + + for (_Full_iterator _Iterator = _Get_bucket(_Bucket); _Iterator != _Last; _Iterator++) + { + if (_Mylist::_Get_key(_Iterator) > _Order_key) + { + // There is no element with the given key + return _Pairii(end(), end()); + } + else if (_Mylist::_Get_key(_Iterator) == _Order_key && !_M_comparator(_Key_function(*_Iterator), _Keyval)) + { + iterator _Begin = _M_split_ordered_list._Get_iterator(_Iterator); + iterator _End= _Begin; + + for (;_End != end() && !_M_comparator(_Key_function(*_End), _Keyval); _End++) + { + } + + return _Pairii(_Begin, _End); + } + } + + return _Pairii(end(), end()); + } + + // Return the [begin, end) pair of const iterators with the same key values. + // This operation makes sense only if mapping is many-to-one. + _Paircc _Equal_range(const key_type& _Keyval) const + { + _Split_order_key _Order_key = (_Split_order_key) _M_comparator(_Keyval); + size_type _Bucket = _Order_key % _M_number_of_buckets; + + // If _Bucket has not been initialized, keep searching up for a parent bucket + // that has been initialized. Worst case is the entire map will be read. + while (!_Is_initialized(_Bucket)) + { + _Bucket = _Get_parent(_Bucket); + } + + _Order_key = _Split_order_regular_key(_Order_key); + _Full_const_iterator _Last = _M_split_ordered_list._End(); + + for (_Full_const_iterator _Iterator = _Get_bucket(_Bucket); _Iterator != _Last; _Iterator++) + { + if (_Mylist::_Get_key(_Iterator) > _Order_key) + { + // There is no element with the given key + return _Paircc(end(), end()); + } + else if (_Mylist::_Get_key(_Iterator) == _Order_key && !_M_comparator(_Key_function(*_Iterator), _Keyval)) + { + const_iterator _Begin = _M_split_ordered_list._Get_iterator(_Iterator); + const_iterator _End = _Begin; + + for (; _End != end() && !_M_comparator(_Key_function(*_End), _Keyval); _End++) + { + } + + return _Paircc(_Begin, _End); + } + } + + return _Paircc(end(), end()); + } + + // Bucket APIs + void _Initialize_bucket(size_type _Bucket) + { + // Bucket 0 has no parent. Initialize it and return. + if (_Bucket == 0) + { + _Init(); + return; + } + + size_type _Parent_bucket = _Get_parent(_Bucket); + + // All _Parent_bucket buckets have to be initialized before this bucket is + if (!_Is_initialized(_Parent_bucket)) + { + _Initialize_bucket(_Parent_bucket); + } + + _Full_iterator _Parent = _Get_bucket(_Parent_bucket); + + // Create a dummy first node in this bucket + _Full_iterator _Dummy_node = _M_split_ordered_list._Insert_dummy(_Parent, _Split_order_dummy_key(_Bucket)); + _Set_bucket(_Bucket, _Dummy_node); + } + + void _Adjust_table_size(size_type _Total_elements, size_type _Current_size) + { + // Grow the table by a factor of 2 if possible and needed + if (((float) _Total_elements / (float) _Current_size) > _M_maximum_bucket_size) + { + // Double the size of the hash only if size has not changed inbetween loads + _InterlockedCompareExchangeSizeT(&_M_number_of_buckets, 2 * _Current_size, _Current_size); + } + } + + size_type _Get_parent(size_type _Bucket) const + { + // Unsets bucket's most significant turned-on bit + unsigned char _Msb = _Get_msb(_Bucket); + return _Bucket & ~(1 << _Msb); + } + + + // Dynamic sized array (segments) + + _Full_iterator _Get_bucket(size_type _Bucket) const + { + size_type _Segment = _Segment_index_of(_Bucket); + _Bucket -= _Segment_base(_Segment); + return _M_buckets[_Segment][_Bucket]; + } + + void _Set_bucket(size_type _Bucket, _Full_iterator _Dummy_head) + { + size_type _Segment = _Segment_index_of(_Bucket); + _Bucket -= _Segment_base(_Segment); + + if (_M_buckets[_Segment] == NULL) + { + size_type _Seg_size = _Segment_size(_Segment); + _Full_iterator * _New_segment = _M_allocator.allocate(_Seg_size); + _Uninitialized_default_fill_n(_New_segment, _Seg_size, (_Full_iterator *) 0, _M_allocator); + if (_InterlockedCompareExchangePointer((void * volatile *) &_M_buckets[_Segment], _New_segment, NULL) != NULL) + { + _M_allocator.deallocate(_New_segment, _Seg_size); + } + } + _M_buckets[_Segment][_Bucket] = _Dummy_head; + } + + bool _Is_initialized(size_type _Bucket) const + { + size_type _Segment = _Segment_index_of(_Bucket); + _Bucket -= _Segment_base(_Segment); + + if (_M_buckets[_Segment] == NULL) + { + return false; + } + + _Full_iterator _Iterator = _M_buckets[_Segment][_Bucket]; + return (_Iterator._Mynode() != NULL); + } + + // Utilities for keys + + _Split_order_key _Reverse(_Map_key _Order_key) const + { + _Split_order_key _Reversed_order_key; + + unsigned char * _Original = (unsigned char *) &_Order_key; + unsigned char * _Reversed = (unsigned char *) &_Reversed_order_key; + + int _Size = sizeof(_Map_key); + for (int _Index = 0; _Index < _Size; _Index++) + { + _Reversed[_Size - _Index - 1] = _Reverse_byte(_Original[_Index]); + } + + return _Reversed_order_key; + } + + // A regular order key has its original hash value reversed and the last bit set + _Split_order_key _Split_order_regular_key(_Map_key _Order_key) const + { + return _Reverse(_Order_key) | 0x1; + } + + // A dummy order key has its original hash value reversed and the last bit unset + _Split_order_key _Split_order_dummy_key(_Map_key _Order_key) const + { + return _Reverse(_Order_key) & ~(0x1); + } + + // Shared variables + _Full_iterator * _M_buckets[_Pointers_per_table]; // The segment table + _Mylist _M_split_ordered_list; // List where all the elements are kept + typename allocator_type::template rebind<_Full_iterator>::other _M_allocator; // Allocator object for segments + size_type _M_number_of_buckets; // Current table size + float _M_maximum_bucket_size; // Maximum size of the bucket +}; + +#pragma warning(pop) // warning 4127 -- while (true) has a constant expression in it + +} // namespace details; +} // namespace Concurrency diff --git a/concrt_extras/internal_split_ordered_list.h b/concrt_extras/internal_split_ordered_list.h new file mode 100644 index 000000000..f087ea303 --- /dev/null +++ b/concrt_extras/internal_split_ordered_list.h @@ -0,0 +1,777 @@ +/*** +* ==++== +* +* Copyright (c) Microsoft Corporation. All rights reserved. +* +* ==--== +* =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +* +* internal_split_ordered_list.h +* +* =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- +****/ +#pragma once + +// Needed for forward iterators +#include +#include + +namespace Concurrency +{ +namespace details +{ +// Split-order list iterators, needed to skip dummy elements +template +class _Solist_const_iterator : public std::_Flist_const_iterator<_Mylist> +{ +public: + typedef _Solist_const_iterator<_Mylist> _Myiter; + typedef std::_Flist_const_iterator<_Mylist> _Mybase; + typedef std::forward_iterator_tag iterator_category; + + typedef typename _Mylist::_Nodeptr _Nodeptr; + typedef typename _Mylist::value_type value_type; + typedef typename _Mylist::difference_type difference_type; + typedef typename _Mylist::const_pointer pointer; + typedef typename _Mylist::const_reference reference; + + _Solist_const_iterator() + { + } + + _Solist_const_iterator(_Nodeptr _Pnode, const _Mylist * _Plist) : _Mybase(_Pnode, _Plist) + { + } + + typedef _Solist_const_iterator<_Mylist> _Unchecked_type; + + _Myiter& _Rechecked(_Unchecked_type _Right) + { + _Ptr = _Right._Ptr; + return (*this); + } + + _Unchecked_type _Unchecked() const + { + return (_Unchecked_type(_Ptr, (_Mylist *)_Getcont())); + } + + reference operator*() const + { + return ((reference)**(_Mybase *)this); + } + + pointer operator->() const + { + return (&**this); + } + + _Myiter& operator++() + { + do + { + ++(*(_Mybase *)this); + } + while (_Mynode() != NULL && _Mynode()->_Is_dummy()); + + return (*this); + } + + _Myiter operator++(int) + { + _Myiter _Tmp = *this; + do + { + ++*this; + } + while (_Mynode() != NULL && _Mynode()->_Is_dummy()); + + return (_Tmp); + } +}; + +template inline +typename _Solist_const_iterator<_Mylist>::_Unchecked_type _Unchecked(_Solist_const_iterator<_Mylist> _Iterator) +{ + return (_Iterator._Unchecked()); +} + +template inline +_Solist_const_iterator<_Mylist>& _Rechecked(_Solist_const_iterator<_Mylist>& _Iterator, + typename _Solist_const_iterator<_Mylist>::_Unchecked_type _Right) +{ + return (_Iterator._Rechecked(_Right)); +} + +template +class _Solist_iterator : public _Solist_const_iterator<_Mylist> +{ +public: + typedef _Solist_iterator<_Mylist> _Myiter; + typedef _Solist_const_iterator<_Mylist> _Mybase; + typedef std::forward_iterator_tag iterator_category; + + typedef typename _Mylist::_Nodeptr _Nodeptr; + typedef typename _Mylist::value_type value_type; + typedef typename _Mylist::difference_type difference_type; + typedef typename _Mylist::pointer pointer; + typedef typename _Mylist::reference reference; + + _Solist_iterator() + { + } + + _Solist_iterator(_Nodeptr _Pnode, const _Mylist *_Plist) : _Mybase(_Pnode, _Plist) + { + } + + typedef _Solist_iterator<_Mylist> _Unchecked_type; + + _Myiter& _Rechecked(_Unchecked_type _Right) + { + _Ptr = _Right._Ptr; + return (*this); + } + + _Unchecked_type _Unchecked() const + { + return (_Unchecked_type(_Ptr, (_Mylist *)_Getcont())); + } + + reference operator*() const + { + return ((reference)**(_Mybase *)this); + } + + pointer operator->() const + { + return (&**this); + } + + _Myiter& operator++() + { + do + { + ++(*(_Mybase *)this); + } + while (_Mynode() != NULL && _Mynode()->_Is_dummy()); + + return (*this); + } + + _Myiter operator++(int) + { + _Myiter _Tmp = *this; + do + { + ++*this; + } + while (_Mynode() != NULL && _Mynode()->_Is_dummy()); + + return (_Tmp); + } +}; + +template inline +typename _Solist_iterator<_Mylist>::_Unchecked_type _Unchecked(_Solist_iterator<_Mylist> _Iterator) +{ + return (_Iterator._Unchecked()); +} + +template inline +_Solist_iterator<_Mylist>& _Rechecked(_Solist_iterator<_Mylist>& _Iterator, + typename _Solist_iterator<_Mylist>::_Unchecked_type _Right) +{ + return (_Iterator._Rechecked(_Right)); +} + +// Forward type and class definitions +typedef size_t _Map_key; +typedef _Map_key _Split_order_key; + +template +class _Split_order_list_node : public std::_Container_base +{ +public: + typedef typename _Allocator_type::template rebind<_Element_type>::other _Allocator_type; + typedef typename _Allocator_type::size_type size_type; + typedef typename _Element_type value_type; + + struct _Node; + typedef _Node * _Nodeptr; + typedef _Nodeptr& _Nodepref; + + // Node that holds the element in a split-ordered list + struct _Node + { + // Initialize the node with the given order key + void _Init(_Split_order_key _Order_key) + { + _M_order_key = _Order_key; + _M_next = NULL; + } + + // Return the order key (needed for hashing) + _Split_order_key _Get_order_key() const + { + return _M_order_key; + } + + // Inserts the new element in the list in an atomic fashion + _Nodeptr _Atomic_set_next(_Nodeptr _New_node, _Nodeptr _Current_node) + { + // Try to change the next pointer on the current element to a new element, only if it still points to the cached next + _Nodeptr _Exchange_node = (_Nodeptr) _InterlockedCompareExchangePointer((void * volatile *) &_M_next, _New_node, _Current_node); + + if (_Exchange_node == _Current_node) + { + // Operation succeeded, return the new node + return _New_node; + } + else + { + // Operation failed, return the "interfering" node + return _Exchange_node; + } + } + + // Checks if this element in the list is a dummy, order enforcing node. Dummy nodes are used by buckets + // in the hash table to quickly index into the right subsection of the split-ordered list. + bool _Is_dummy() const + { + return (_M_order_key & 0x1) == 0; + } + + _Nodeptr _M_next; // Next element in the list + value_type _M_element; // Element storage + _Split_order_key _M_order_key; // Order key for this element + }; + +#if _ITERATOR_DEBUG_LEVEL == 0 /*IFSTRIP=IGN*/ + _Split_order_list_node(_Allocator_type _Allocator) : _M_node_allocator(_Allocator), _M_value_allocator(_Allocator) + { + } +#else /* _ITERATOR_DEBUG_LEVEL == 0 */ + _Split_order_list_node(_Allocator_type _Allocator) : _M_node_allocator(_Allocator), _M_value_allocator(_Allocator) + { + typename _Allocator_type::template rebind::other _Alproxy(_M_node_allocator); + _Myproxy = _Alproxy.allocate(1); + _Cons_val(_Alproxy, _Myproxy, std::_Container_proxy()); + _Myproxy->_Mycont = this; + } + + ~_Split_order_list_node() + { + typename _Allocator_type::template rebind::other _Alproxy(_M_node_allocator); + _Orphan_all(); + _Dest_val(_Alproxy, _Myproxy); + _Alproxy.deallocate(_Myproxy, 1); + _Myproxy = 0; + } +#endif /* _ITERATOR_DEBUG_LEVEL == 0 */ + + _Nodeptr _Myhead; // pointer to head node + typename _Allocator_type::template rebind<_Node>::other _M_node_allocator; // allocator object for nodes + _Allocator_type _M_value_allocator; // allocator object for element values +}; + +template +class _Split_order_list_value : public _Split_order_list_node<_Element_type, _Allocator_type> +{ +public: + typedef _Split_order_list_node<_Element_type, _Allocator_type> _Mybase; + typedef typename _Mybase::_Nodeptr _Nodeptr; + typedef typename _Mybase::_Nodepref _Nodepref; + typedef typename _Allocator_type::template rebind<_Element_type>::other _Allocator_type; + + typedef typename _Allocator_type::size_type size_type; + typedef typename _Allocator_type::difference_type difference_type; + typedef typename _Allocator_type::pointer pointer; + typedef typename _Allocator_type::const_pointer const_pointer; + typedef typename _Allocator_type::reference reference; + typedef typename _Allocator_type::const_reference const_reference; + typedef typename _Allocator_type::value_type value_type; + + _Split_order_list_value(_Allocator_type _Allocator = _Allocator_type()) : _Mybase(_Allocator) + { + // Immediately allocate a dummy node with order key of 0. This node + // will always be the head of the list. + _Myhead = _Buynode(0); + } + + ~_Split_order_list_value() + { + } + + // Allocate a new node with the given order key and value + template + _Nodeptr _Buynode(_Split_order_key _Order_key, _ValTy&& _Value) + { + _Nodeptr _Pnode = _M_node_allocator.allocate(1); + + try + { + _M_value_allocator.construct(std::addressof(_Myval(_Pnode)), std::forward<_ValTy>(_Value)); + _Pnode->_Init(_Order_key); + } + catch(...) + { + _M_node_allocator.deallocate(_Pnode, 1); + throw; + } + + return (_Pnode); + } + + // Allocate a new node with the given order key; used to allocate dummy nodes + _Nodeptr _Buynode(_Split_order_key _Order_key) + { + _Nodeptr _Pnode = _M_node_allocator.allocate(1); + _Pnode->_Init(_Order_key); + + return (_Pnode); + } + + // Get the next node + static _Nodepref _Nextnode(_Nodeptr _Pnode) + { + return ((_Nodepref)(*_Pnode)._M_next); + } + + // Get the stored value + static reference _Myval(_Nodeptr _Pnode) + { + return ((reference)(*_Pnode)._M_element); + } +}; + +// Forward list in which elements are sorted in a split-order +template > +class _Split_ordered_list : _Split_order_list_value<_Element_type, _Element_allocator_type> +{ +public: + typedef _Split_ordered_list<_Element_type, _Element_allocator_type> _Mytype; + typedef _Split_order_list_value<_Element_type, _Element_allocator_type> _Mybase; + typedef typename _Mybase::_Allocator_type _Allocator_type; + typedef typename _Mybase::_Nodeptr _Nodeptr; + + typedef _Allocator_type allocator_type; + typedef typename _Allocator_type::size_type size_type; + typedef typename _Allocator_type::difference_type difference_type; + typedef typename _Allocator_type::pointer pointer; + typedef typename _Allocator_type::const_pointer const_pointer; + typedef typename _Allocator_type::reference reference; + typedef typename _Allocator_type::const_reference const_reference; + typedef typename _Allocator_type::value_type value_type; + + typedef _Solist_const_iterator<_Mybase> const_iterator; + typedef _Solist_iterator<_Mybase> iterator; + typedef std::_Flist_const_iterator<_Mybase> _Full_const_iterator; + typedef std::_Flist_iterator<_Mybase> _Full_iterator; + typedef std::pair _Pairib; + using _Split_order_list_value<_Element_type, _Element_allocator_type>::_Buynode; + _Split_ordered_list(_Allocator_type _Allocator = allocator_type()) : _Mybase(_Allocator), _M_element_count(0) + { + } + + ~_Split_ordered_list() + { + // Clear the list + clear(); + + // Remove the head element which is not cleared by clear() + _Nodeptr _Pnode = _Myhead; + _Myhead = NULL; + + _ASSERT_EXPR(_Pnode != NULL && _Nextnode(_Pnode) == NULL, L"Invalid head list node"); + + _Erase(_Pnode); + } + + // Common forward list functions + + allocator_type get_allocator() const + { + return (_M_value_allocator); + } + + void clear() + { +#if _ITERATOR_DEBUG_LEVEL == 2 /*IFSTRIP=IGN*/ + _Orphan_ptr(*this, 0); +#endif /* _ITERATOR_DEBUG_LEVEL == 2 */ + + _Nodeptr _Pnext; + _Nodeptr _Pnode = _Myhead; + + _ASSERT_EXPR(_Myhead != NULL, L"Invalid head list node"); + _Pnext = _Nextnode(_Pnode); + _Pnode->_M_next = NULL; + _Pnode = _Pnext; + + while (_Pnode != NULL) + { + _Pnext = _Nextnode(_Pnode); + _Erase(_Pnode); + _Pnode = _Pnext; + } + + _M_element_count = 0; + } + + // Returns a first non-dummy element in the SOL + iterator begin() + { + _Full_iterator _Iterator = _Begin(); + return _Get_first_real_iterator(_Iterator); + } + + // Returns a first non-dummy element in the SOL + const_iterator begin() const + { + _Full_const_iterator _Iterator = _Begin(); + return _Get_first_real_iterator(_Iterator); + } + + iterator end() + { + return (iterator(0, this)); + } + + const_iterator end() const + { + return (const_iterator(0, this)); + } + + const_iterator cbegin() const + { + return (((const _Mytype *)this)->begin()); + } + + const_iterator cend() const + { + return (((const _Mytype *)this)->end()); + } + + // Checks if the number of elements (non-dummy) is 0 + bool empty() const + { + return (_M_element_count == 0); + } + + // Returns the number of non-dummy elements in the list + size_type size() const + { + return _M_element_count; + } + + // Returns the maximum size of the list, determined by the allocator + size_type max_size() const + { + return _M_value_allocator.max_size(); + } + + // Swaps 'this' list with the passed in one + void swap(_Mytype& _Right) + { + if (this == &_Right) + { + // Nothing to do + return; + } + + if (_M_value_allocator == _Right._M_value_allocator) + { + _Swap_all(_Right); + std::swap(_Myhead, _Right._Myhead); + std::swap(_M_element_count, _Right._M_element_count); + } + else + { + _Mytype _Temp_list; + _Temp_list._Move_all(_Right); + _Right._Move_all(*this); + _Move_all(_Temp_list); + } + } + + // Split-order list functions + + // Returns a first element in the SOL, which is always a dummy + _Full_iterator _Begin() + { + return _Full_iterator(_Myhead, this); + } + + // Returns a first element in the SOL, which is always a dummy + _Full_const_iterator _Begin() const + { + return _Full_const_iterator(_Myhead, this); + } + + _Full_iterator _End() + { + return _Full_iterator(0, this); + } + + _Full_const_iterator _End() const + { + return _Full_const_iterator(0, this); + } + + static _Split_order_key _Get_key(const _Full_const_iterator& _Iterator) + { + return _Iterator._Mynode()->_Get_order_key(); + } + + // Returns a public iterator version of the internal iterator. Public iterator must not + // be a dummy private iterator. + iterator _Get_iterator(_Full_iterator _Iterator) + { + _ASSERT_EXPR(_Iterator._Mynode() != NULL && !_Iterator._Mynode()->_Is_dummy(), L"Invalid user node (dummy)"); + return iterator(_Iterator._Mynode(), this); + } + + // Returns a public iterator version of the internal iterator. Public iterator must not + // be a dummy private iterator. + const_iterator _Get_iterator(_Full_const_iterator _Iterator) const + { + _ASSERT_EXPR(_Iterator._Mynode() != NULL && !_Iterator._Mynode()->_Is_dummy(), L"Invalid user node (dummy)"); + return const_iterator(_Iterator._Mynode(), this); + } + + // Returns a non-const version of the _Full_iterator + _Full_iterator _Get_iterator(_Full_const_iterator _Iterator) + { + return _Full_iterator(_Iterator._Mynode(), this); + } + + // Returns a non-const version of the iterator + iterator _Get_iterator(const_iterator _Iterator) + { + return iterator(_Iterator._Mynode(), this); + } + + // Returns a public iterator version of a first non-dummy internal iterator at or after + // the passed in internal iterator. + iterator _Get_first_real_iterator(_Full_iterator _Iterator) + { + // Skip all dummy, internal only iterators + while (_Iterator != _End() && _Iterator._Mynode()->_Is_dummy()) + { + _Iterator++; + } + + return iterator(_Iterator._Mynode(), this); + } + + // Returns a public iterator version of a first non-dummy internal iterator at or after + // the passed in internal iterator. + const_iterator _Get_first_real_iterator(_Full_const_iterator _Iterator) const + { + // Skip all dummy, internal only iterators + while (_Iterator != _End() && _Iterator._Mynode()->_Is_dummy()) + { + _Iterator++; + } + + return const_iterator(_Iterator._Mynode(), this); + } + + // Erase an element using the allocator + void _Erase(_Nodeptr _Delete_node) + { + if (!_Delete_node->_Is_dummy()) + { + // Dummy nodes have nothing constructed, thus should not be destroyed. + _M_node_allocator.destroy(_Delete_node); + } + _M_node_allocator.deallocate(_Delete_node, 1); + } + + // Try to insert a new element in the list. If insert fails, return the node that + // was inserted instead. + _Nodeptr _Insert(_Nodeptr _Previous, _Nodeptr _New_node, _Nodeptr _Current_node) + { + _New_node->_M_next = _Current_node; + return _Previous->_Atomic_set_next(_New_node, _Current_node); + } + + // Insert a new element between passed in iterators + _Pairib _Insert(_Full_iterator _Iterator, _Full_iterator _Next, _Nodeptr _List_node, long * _New_count) + { + _Nodeptr _Inserted_node = _Insert(_Iterator._Mynode(), _List_node, _Next._Mynode()); + + if (_Inserted_node == _List_node) + { + // If the insert succeeded, check that the order is correct and increment the element count + _Check_range(); + *_New_count = _InterlockedIncrement(&_M_element_count); + return _Pairib(iterator(_List_node, this), true); + } + else + { + return _Pairib(end(), false); + } + } + + // Insert a new dummy element, starting search at a parent dummy element + _Full_iterator _Insert_dummy(_Full_iterator _Iterator, _Split_order_key _Order_key) + { + _Full_iterator _Last = _End(); + _Full_iterator _Where = _Iterator; + + _ASSERT_EXPR(_Where != _Last, L"Invalid head node"); + + _Where++; + + // Create a dummy element up front, even though it may be discarded (due to concurrent insertion) + _Nodeptr _Dummy_node = _Buynode(_Order_key); + + for (;;) + { + _ASSERT_EXPR(_Iterator != _Last, L"Invalid head list node"); + + // If the head iterator is at the end of the list, or past the point where this dummy + // node needs to be inserted, then try to insert it. + if (_Where == _Last || _Get_key(_Where) > _Order_key) + { + _ASSERT_EXPR(_Get_key(_Iterator) < _Order_key, L"Invalid node order in the list"); + + // Try to insert it in the right place + _Nodeptr _Inserted_node = _Insert(_Iterator._Mynode(), _Dummy_node, _Where._Mynode()); + + if (_Inserted_node == _Dummy_node) + { + // Insertion succeeded, check the list for order violations + _Check_range(); + return _Full_iterator(_Dummy_node, this); + } + else + { + // Insertion failed: either dummy node was inserted by another thread, or + // a real element was inserted at exactly the same place as dummy node. + // Proceed with the search from the previous location where order key was + // known to be larger (note: this is legal only because there is no safe + // concurrent erase operation supported). + _Where = _Iterator; + _Where++; + continue; + } + } + else if (_Get_key(_Where) == _Order_key) + { + // Another dummy node with the same value found, discard the new one. + _Erase(_Dummy_node); + return _Where; + } + + // Move the iterator forward + _Iterator = _Where; + _Where++; + } + + } + + // This erase function can handle both real and dummy nodes + void _Erase(_Full_iterator _Previous, _Full_const_iterator& _Where) + { +#if _ITERATOR_DEBUG_LEVEL == 2 /*IFSTRIP=IGN*/ + if (_Where._Getcont() != this || _Where._Ptr == _Myhead) + { + std::_DEBUG_ERROR("list erase iterator outside range"); + } + _Nodeptr _Pnode = (_Where++)._Mynode(); + _Orphan_ptr(*this, _Pnode); +#else /* _ITERATOR_DEBUG_LEVEL == 2 */ + _Nodeptr _Pnode = (_Where++)._Mynode(); +#endif /* _ITERATOR_DEBUG_LEVEL == 2 */ + + _Nodeptr _Prevnode = _Previous._Mynode(); + _ASSERT_EXPR(_Prevnode->_M_next == _Pnode, L"Erase must take consecutive iterators"); + _Prevnode->_M_next = _Pnode->_M_next; + + _Erase(_Pnode); + } + + // Erase the element (previous node needs to be passed because this is a forward only list) + iterator _Erase(_Full_iterator _Previous, const_iterator _Where) + { + _Full_const_iterator _Iterator = _Where; + _Erase(_Previous, _Iterator); + _M_element_count--; + + return _Get_iterator(_Get_first_real_iterator(_Iterator)); + } + + // Move all elements from the passed in split-ordered list to this one + void _Move_all(_Mytype& _Source_list) + { + _Full_const_iterator _First = _Source_list._Begin(); + _Full_const_iterator _Last = _Source_list._End(); + + if (_First == _Last) + { + return; + } + + _Nodeptr _Previous_node = _Myhead; + _Full_const_iterator _Begin_iterator = _First++; + + // Move all elements one by one, including dummy ones + for (_Full_const_iterator _Iterator = _First; _Iterator != _Last;) + { + _Nodeptr _Node = _Iterator._Mynode(); + + _Nodeptr _Dummy_node = _Node->_Is_dummy() ? _Buynode(_Node->_Get_order_key()) : _Buynode(_Node->_Get_order_key(), _Myval(_Node)); + _Previous_node = _Insert(_Previous_node, _Dummy_node, NULL); + _ASSERT_EXPR(_Previous_node != NULL, L"Insertion must succeed"); + _Full_const_iterator _Where = _Iterator++; + _Source_list._Erase(_Get_iterator(_Begin_iterator), _Where); + } + } + +private: + + // Check the list for order violations + void _Check_range() + { +#if defined (_DEBUG) + for (_Full_iterator _Iterator = _Begin(); _Iterator != _End(); _Iterator++) + { + _Full_iterator _Next_iterator = _Iterator; + _Next_iterator++; + + _ASSERT_EXPR(_Next_iterator == end() || _Next_iterator._Mynode()->_Get_order_key() >= _Iterator._Mynode()->_Get_order_key(), L"!!! List order inconsistency !!!"); + } +#endif + } + +#if _ITERATOR_DEBUG_LEVEL == 2 /*IFSTRIP=IGN*/ + void _Orphan_ptr(_Mytype& _Cont, _Nodeptr _Ptr) const + { + std::_Lockit _Lock(_LOCK_DEBUG); + const_iterator **_Pnext = (const_iterator **)_Cont._Getpfirst(); + if (_Pnext != 0) + { + while (*_Pnext != 0) + { + if ((*_Pnext)->_Ptr == (_Nodeptr)&_Myhead || _Ptr != 0 && (*_Pnext)->_Ptr != _Ptr) + { + _Pnext = (const_iterator **)(*_Pnext)->_Getpnext(); + } + else + { + (*_Pnext)->_Clrcont(); + *_Pnext = *(const_iterator **)(*_Pnext)->_Getpnext(); + } + } + } + } +#endif /* _ITERATOR_DEBUG_LEVEL == 2 */ + + volatile long _M_element_count; // Total item count, not counting dummy nodes +}; + +} // namespace details; +} // namespace Concurrency diff --git a/concrt_extras/semaphore.h b/concrt_extras/semaphore.h new file mode 100644 index 000000000..572cad351 --- /dev/null +++ b/concrt_extras/semaphore.h @@ -0,0 +1,70 @@ +//-------------------------------------------------------------------------- +// +// Copyright (c) Microsoft Corporation. All rights reserved. +// +// File: concrt_extras.h +// +// Implementation of ConcRT helpers +// +//-------------------------------------------------------------------------- + +#pragma once + +#include +#include + +namespace Concurrency +{ + class semaphore + { + public: + explicit semaphore(LONG capacity) + : _semaphore_count(capacity) + { + } + + // Acquires access to the semaphore. + void acquire() + { + // The capacity of the semaphore is exceeded when the semaphore count + // falls below zero. When this happens, add the current context to the + // back of the wait queue and block the current context. + if (InterlockedDecrement(&_semaphore_count) < 0) + { + _waiting_contexts.push(Concurrency::Context::CurrentContext()); + Concurrency::Context::Block(); + } + } + + // Releases access to the semaphore. + void release() + { + // If the semaphore count is negative, unblock the first waiting context. + if (InterlockedIncrement(&_semaphore_count) <= 0) + { + // A call to acquire might have decremented the counter, but has not + // yet finished adding the context to the queue. + // Create a spin loop that waits for the context to become available. + Concurrency:: Context* waiting = NULL; + while(!_waiting_contexts.try_pop(waiting)) + { + Concurrency::wait(0); + } + + // Unblock the context. + waiting->Unblock(); + } + } + + private: + // The semaphore count. + LONG _semaphore_count; + + // A concurrency-safe queue of contexts that must wait to + // acquire the semaphore. + Concurrency::concurrent_queue _waiting_contexts; + + semaphore const &operator =(semaphore const&); // no assignment operator + semaphore(semaphore const &); // no copy constructor + }; +} \ No newline at end of file -- 2.39.2