【问题标题】:How to calculate mean of distributed data?如何计算分布式数据的平均值?
【发布时间】:2017-02-23 23:57:10
【问题描述】:

我如何在分布式计算中计算大向量(系列)的算术平均值,其中我将数据分区到多个节点上。我不想使用 map reduce 范式。除了在每个节点上简单计算单个总和然后将结果带到主节点并除以向量(系列)的大小之外,是否有任何分布式算法可以有效地计算平均值。

【问题讨论】:

  • 这个问题为什么是-1?
  • 为什么你不喜欢“琐碎”的方法?它有什么问题?
  • 我想知道有没有更好的分布式计算方法。
  • 没有。 “琐碎”的方法是最佳的。
  • 琐碎方法的问题在于,如果您有大量数据,实质​​上要使所有内容相互依赖,则可能需要很长时间来计算数据,到那个时候这些信息非常过时,因此是错误的,除非您锁定整个数据集。使用分布式平均共识(相同的方法适用于 Mean 的替代算法),您可以在不锁定数据的情况下实时更好地猜测 Mean 的当前值。 -> 请参阅下面的链接。但是你可以用谷歌搜索很多关于它的论文。

标签: distributed-computing mean algebra


【解决方案1】:

分布式平均共识是一种替代方案。

使用 master 进行 map-reduce 的简单方法的问题在于,如果您有大量数据,本质上要使所有内容相互依赖,则计算数据可能需要很长时间,通过什么时候信息非常过时,因此是错误的,除非您锁定整个数据集 - 对于大量分布式数据集是不切实际的。使用分布式平均共识(相同的方法适用于 Mean 的替代算法),您可以在不锁定数据的情况下实时更好地猜测 Mean 的当前值。 这是一篇关于它的论文的链接,但它的数学很重: http://web.stanford.edu/~boyd/papers/pdf/lms_consensus.pdf 你可以用谷歌搜索很多关于它的论文。

一般概念是这样的:假设在每个节点上都有一个套接字侦听器。您评估本地总和和平均值,然后将其发布到其他节点。每个节点都侦听其他节点,并在有意义的时间尺度上接收它们的总和和平均值。然后,您可以通过 (sumForAllNodes(storedAverage[node] *storedCount[node]) / (sumForAllNodes(storedCount[node]))) 评估对总平均值的良好猜测。如果您有一个非常大的数据集,您可以只听新的值,因为它们存储在节点中,并修改本地计数和平均值,然后发布它们。

即使这花费的时间太长,您也可以对每个节点中的随机数据子集进行平均。

这里有一些 c# 代码,可以让您有一个想法(使用 feck 在更多版本的 windows 上运行,而不是仅 windows-10 的 microsoft websockets 实现)。在两个节点上运行它,一个带有

<appSettings>
    <add key="thisNodeName" value="UK" />
</appSettings>

在 app.config 中,并在另一个中使用“EU-North”。这是一些示例代码。这两个实例交换意味着使用 websockets。您只需要添加数据库的后端枚举即可。

using Fleck;

namespace WebSocketServer
{
    class Program
    {
        static List<IWebSocketConnection> _allSockets;
        static Dictionary<string,decimal> _allMeans;
        static Dictionary<string,decimal> _allCounts;
        private static decimal _localMean;
        private static decimal _localCount;
        private static decimal _localAggregate_count;
        private static decimal _localAggregate_average;

        static void Main(string[] args)
        {
            _allSockets = new List<IWebSocketConnection>();
            _allMeans = new Dictionary<string, decimal>();
            _allCounts = new Dictionary<string, decimal>();

            var serverAddresses = new Dictionary<string,string>();
            //serverAddresses.Add("USA-WestCoast", "ws://127.0.0.1:58951");
            //serverAddresses.Add("USA-EastCoast", "ws://127.0.0.1:58952");
            serverAddresses.Add("UK", "ws://127.0.0.1:58953");
            serverAddresses.Add("EU-North", "ws://127.0.0.1:58954");
            //serverAddresses.Add("EU-South", "ws://127.0.0.1:58955");
            foreach (var serverAddress in serverAddresses)
            {
                _allMeans.Add(serverAddress.Key, 0m);
                _allCounts.Add(serverAddress.Key, 0m);
            }

            var thisNodeName = ConfigurationSettings.AppSettings["thisNodeName"];   //for example "UK"
            var serverSocketAddress = serverAddresses.First(x=>x.Key==thisNodeName);
            serverAddresses.Remove(thisNodeName);

            var websocketServer = new Fleck.WebSocketServer(serverSocketAddress.Value);

            websocketServer.Start(socket =>
            {
                socket.OnOpen = () =>
                {
                    Console.WriteLine("Open!");
                    _allSockets.Add(socket);
                };
                socket.OnClose = () =>
                {
                    Console.WriteLine("Close!");
                    _allSockets.Remove(socket);
                };
                socket.OnMessage = message =>
                {
                    Console.WriteLine(message + " received");

                    var parameters = message.Split('~');
                    var remoteHost = parameters[0];
                    var remoteMean = decimal.Parse(parameters[1]);
                    var remoteCount = decimal.Parse(parameters[2]);
                    _allMeans[remoteHost] = remoteMean;
                    _allCounts[remoteHost] = remoteCount;


                };
            });
            while (true)
            {
                //evaluate my local average and count
                Random rand = new Random(DateTime.Now.Millisecond);
                _localMean = 234.00m + (rand.Next(0, 100) - 50)/10.0m;
                _localCount = 222m + rand.Next(0, 100);

                //evaluate my local aggregate average using means and counts sent from all other nodes
                //could publish aggregate averages to other nodes, if you wanted to monitor disagreement between nodes
                var total_mean_times_count = 0m;
                var total_count = 0m;
                foreach (var server in serverAddresses)
                {
                    total_mean_times_count += _allCounts[server.Key]*_allMeans[server.Key];
                    total_count += _allCounts[server.Key];
                }
                //add on local mean and count which were removed from the server list earlier, so won't be processed
                total_mean_times_count += (_localMean * _localCount);
                total_count = total_count + _localCount;

                _localAggregate_average = (total_mean_times_count/total_count);
                _localAggregate_count = total_count;

                Console.WriteLine("local aggregate average = {0}", _localAggregate_average);

                System.Threading.Thread.Sleep(10000);
                foreach (var serverAddress in serverAddresses)
                {
                    using (var wscli = new ClientWebSocket())
                    {
                        var tokSrc = new CancellationTokenSource();
                        using (var task = wscli.ConnectAsync(new Uri(serverAddress.Value), tokSrc.Token))
                        {
                            task.Wait();
                        }

                        using (var task = wscli.SendAsync(new ArraySegment<byte>(Encoding.UTF8.GetBytes(thisNodeName+"~"+_localMean + "~"+_localCount)),
                            WebSocketMessageType.Text,
                            false,
                            tokSrc.Token
                            ))
                        {
                            task.Wait();
                        }
                    }

                }
            }
        }



    }
}

不要忘记通过在给定时间同步来添加静态锁或单独的活动。 (为简单起见未显示)

【讨论】:

    【解决方案2】:

    您可以使用两种简单的方法。

    正如您正确指出的那样,一个是计算每个节点的总和,然后将总和除以数据总量:

    avg = (sum1+sum2+sum3)/(cnt1+cnt2+cnt3)
    

    另一种可能性是计算每个节点上的平均值,然后使用加权平均值:

    avg = (avg1*cnt1 + avg2*cnt2 + avg3*cnt3) / (cnt1+cnt2+cnt3)
        = avg1*cnt1/(cnt1+cnt2+cnt3) + avg2*cnt2/(cnt1+cnt2+cnt3) + avg3*cnt3/(cnt1+cnt2+cnt3)
    

    我认为这些微不足道的方法没有任何问题,我想知道您为什么要使用不同的方法。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2023-03-15
      • 2018-04-05
      • 2013-11-05
      • 1970-01-01
      • 2021-11-21
      • 2021-07-05
      • 2017-09-24
      • 1970-01-01
      相关资源
      最近更新 更多