How to implement a sorted buffer?

175 Views Asked by At

I need to traverse a collection of disjoint folders; each folder is associated to a visited time configurated somewhere in the folder.

I then sort the folders, and process the one with the earliest visited time first. Note the processing is generally slower than the traversing.

My code targets Framework4.8.1; Currently my implementation is as follows:

    public class BySeparateThread
    {
        ConcurrentDictionary<string, DateTime?> _dict = new ConcurrentDictionary<string, DateTime?>();
        private object _lock;

        /// <summary>
        /// this will be called by producer thread;
        /// </summary>
        /// <param name="address"></param>
        /// <param name="time"></param>
        public void add(string address,DateTime? time) {
            _dict.TryAdd(address, time);
            
        }

        /// <summary>
        /// called by subscriber thread;
        /// </summary>
        /// <returns></returns>
        public string? next() {
            lock (_lock) {
                var r = _dict.FirstOrDefault();
                
                //return sortedList.FirstOrDefault().Value;


                if (r.Key is null)
                {
                    return r.Key;
                }

                if (r.Value is null)
                {
                    _dict.TryRemove(r.Key, out var _);
                    return r.Key;
                }

                var key = r.Key;

                foreach (var item in _dict.Skip(1) )
                {
                    if (item.Value is null)
                    {
                        _dict.TryRemove(item.Key, out var _);
                        return item.Key;
                    }
                    if (item.Value< r.Value)
                    {
                        r=item;
                    }
                }
                _dict.TryRemove(key, out var _);

                return key;
            }
        }

        /// <summary>
        /// this will be assigned of false by producer thread;
        /// </summary>
        public bool _notComplete = true;

        /// <summary>
        /// shared configuration for subscribers;
        /// </summary>
        fs.addresses_.disjoint.deV_._bak.Io io; //.io_._CfgX.Create(cancel, git)

        /// <summary>
        /// run this in a separate thread other than <see cref="add(string, DateTime?)"/>
        /// </summary>
        /// <param name="sln"></param>
        /// <returns></returns>
        public async Task _asyn_ofAddress(string sln)
        {
            while (_notComplete)
            {
                var f = next();
                if (f is null )
                {
                    await Task.Delay(30*1000);
                    //await Task.Yield();
                    continue;
                }
                /// degree of concurrency is controlled by a semophore; for instance, at most 4 are tackled:
                new dev.srcs.each.sln_.delvable.Bak_srcsInAddresses(io)._startTask_ofAddress(sln);
            }
        }
    }

For the above, I'm concerned about the while(_notComplete) part, as it looks like there would be many loops doing nothing there. I think there should be better ways to remove the while by utilizing the fact that the collection can notify whether it's empty or not at some/various stages such as when we add.

There would be better implementation which can be based on some mature framework such as those being considered by me these days but I often stopped wondering at some implementation details:

  1. BlockingCollection for this one, I don't know how to make the collection added and sorted dynamically while producer and subscriber are on the run;
  2. Channel Again, I could not come up with one fitting my need after I read its examples;
  3. Pipeline I havenot fully understood it;
  4. Rx I tried to implement an observable and an observer. It only gives me a macroscope framework, but when I get into the details, I ended with what I'm currently doing and I begin to wonder: with what I'm doing, I don't need Rx here.
  5. Dataflow Shall I implement my own BufferBlock or ActionBlock? It seems the built-in bufferBlock cannot be customized to sort things before releasing them to the next block.

Sorting buffered Observables seems similar to my problem; but it ends with a solution similar to the one I currently have but am not satisfied with, as stated in the above.

Could some one give me a sample code? Please give as concrete code as you can; As you can see, I have researched some general ideas/paths and finally what stops me short is the details, which are often glossed over in some docs.

1

There are 1 best solutions below

2
NilNul On

I just found one solution which is better than my current one. I believe there are some even better ones, so please do post your answers if you find some; my current one is just what I can hack for what I know so far.

I found Prioritized queues in Task Parallel Library, and I write a similar one for my case:

using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;

namespace nilnul.dev.srcs.every.slns._bak
{

    public class BySortedSet : IProducerConsumerCollection<(string, DateTime)>
    {
        private class _Comparer : IComparer<(string, DateTime)>
        {
            public int Compare((string, DateTime) first, (string, DateTime) second)
            {
                var returnValue = first.Item2.CompareTo(second.Item2);
                if (returnValue == 0)
                    returnValue = first.Item1.CompareTo(second.Item1);
                return returnValue;
            }
            static public _Comparer Singleton
            {
                get
                {
                    return nilnul._obj.typ_.nilable_.unprimable_.Singleton<_Comparer>.Instance;// just some magic to get an instance
                }
            }
        }

        SortedSet<(string, DateTime)> _dict = new SortedSet<(string, DateTime)>(
            _Comparer.Singleton
        );

        private object _lock=new object();

        public int Count
        {
            get
            {
                lock(_lock){
                    return _dict.Count;
                }
            }
        }

        public object SyncRoot => _lock;

        public bool IsSynchronized => true;

        IEnumerator IEnumerable.GetEnumerator()
        {
            return GetEnumerator();
            //throw new NotImplementedException();
        }

        public void CopyTo((string, DateTime)[] array, int index)
        {
            lock (_lock)
            {
                foreach (var item in _dict)
                {
                    array[index++] = item;
                }
            }

        }
        public void CopyTo(Array array, int index)
        {
            lock (_lock)
            {
                foreach (var item in _dict)
                {
                    array.SetValue(item, index++);
                }
            }

        }

        public bool TryAdd((string, DateTime) item)
        {
            lock (_lock)
            {
                return _dict.Add(item);
            }
        }

        public bool TryTake(out (string, DateTime) item)
        {
            lock (_lock)
            {
                item = _dict.Min;
                if (item==default)
                {
                    return false;
                }
                return _dict.Remove(item);
            }
        }

        public (string, DateTime)[] ToArray()
        {
            lock (_lock)
            {
                return this._dict.ToArray();

            }
        }

        public IEnumerator<(string, DateTime)> GetEnumerator()
        {
            return ToArray().AsEnumerable().GetEnumerator();
        }

        /// <summary>
        /// </summary>
        /// <returns></returns>
        public BlockingCollection<(string, DateTime)> asBlockingCollection() {
            return new BlockingCollection<(string, DateTime)>(
                this
            );
        }
    }
}

Then I can use that like:

        static public void ExampleUse(CancellationToken cancellationToken) {
            var s = new BySortedSet().asBlockingCollection();

            /// traversal thread:
            s.Add(("", DateTime.MinValue));

            //...
            s.CompleteAdding();

            /// tackler thread:
            ///
            foreach (var item in s.GetConsumingEnumerable(cancellationToken))
            {
                ///  process the item;
                /// todo: degree of parallelism is controlled by the tackler, or is there a better way like in dataflow or Rx or sth else?
            }
        }

Thanks!