【问题标题】:Async wait for file to be created异步等待文件创建
【发布时间】:2019-05-14 10:49:21
【问题描述】:

对于要由外部应用程序创建的文件,await 的最简洁方法是什么?

    async Task doSomethingWithFile(string filepath)
    {
        // 1. await for path exists
        // 2. Do something with file
    }

【问题讨论】:

    标签: c# .net async-await


    【解决方案1】:

    所以第一个关键点是,您可以使用FileSystemWatcher 在特定路径发生文件系统事件更改时收到通知。例如,如果您希望在特定位置创建文件时收到通知,您可以找到。

    接下来,我们可以创建一个方法,使用TaskCompletionSource 在文件系统观察器触发相关事件时触发任务完成。

    public static Task WhenFileCreated(string path)
    {
        if (File.Exists(path))
            return Task.FromResult(true);
    
        var tcs = new TaskCompletionSource<bool>();
        FileSystemWatcher watcher = new FileSystemWatcher(Path.GetDirectoryName(path));
    
        FileSystemEventHandler createdHandler = null;
        RenamedEventHandler renamedHandler = null;
        createdHandler = (s, e) =>
        {
            if (e.Name == Path.GetFileName(path))
            {
                tcs.TrySetResult(true);
                watcher.Created -= createdHandler;
                watcher.Dispose();
            }
        };
    
        renamedHandler = (s, e) =>
        {
            if (e.Name == Path.GetFileName(path))
            {
                tcs.TrySetResult(true);
                watcher.Renamed -= renamedHandler;
                watcher.Dispose();
            }
        };
    
        watcher.Created += createdHandler;
        watcher.Renamed += renamedHandler;
    
        watcher.EnableRaisingEvents = true;
    
        return tcs.Task;
    }
    

    请注意,这首先检查文件是否存在,以便在适用时允许它立即退出。它还使用创建的和重命名的处理程序,因为任何一个选项都可能允许文件在将来的某个时候存在。 FileSystemWatcher 也只监视目录,因此获取指定路径的目录然后在事件处理程序中检查每个受影响文件的文件名非常重要。

    另请注意,代码完成后会删除事件处理程序。

    这让我们可以写:

    public static async Task Foo()
    {
        await WhenFileCreated(@"C:\Temp\test.txt");
        Console.WriteLine("It's aliiiiiive!!!");
    }
    

    【讨论】:

    • 如果文件名被编辑了怎么办?另外,他想等待一个特定的文件名。
    • 这个有很多问题:fsw 永远不会被释放,如果文件被删除然后再次创建,它会出错,他传递的是文件名而不是目录路径...... - 1.
    • 感谢您修复它。只是小问题:仍未解决。
    • 这不是有竞争条件吗?如果文件是在File.Exists() 检查和EnableRaisingEvents = true 之间创建的,那么我认为Task 永远不会完成。
    • 请注意,FileSystemWatcher 将在与您来自的线程不同的线程上引发事件,因此您可能会丢失上下文。有关详细信息,请参阅roufa.com/articles/async-await-through-the-looking-glass
    【解决方案2】:

    使用自定义 ReactiveExtension 运算符的完整解决方案:WaitIf。 这需要通过 NuGet 获得 Genesis.RetryWithBackoff

    using System;
    using System.Collections.Generic;
    using System.IO;
    using System.Linq;
    using System.Reactive.Concurrency;
    using System.Reactive.Linq;
    using System.Reactive.Threading.Tasks;
    
    
    public class TestWatcher
    {
    
        public static void Test()
        {
    
            FileSystemWatcher Watcher = new FileSystemWatcher("C:\\test")
            {
                EnableRaisingEvents = true,
            };
    
            var Created = Observable
                .FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(h => Watcher.Created += h, h => Watcher.Created -= h)
                .Select(e => e.EventArgs.FullPath);
            var CreatedAndNotLocked = Created.WaitIf(IsFileLocked,100, attempt =>TimeSpan.FromMilliseconds(100), Scheduler.Default);
            var FirstCreatedAndNotLocked = CreatedAndNotLocked.Take(1)
                .Finally(Watcher.Dispose);
            var task = FirstCreatedAndNotLocked.GetAwaiter().ToTask();
            task.Wait();
            Console.WriteLine(task.Result);
    
        }
    
        public bool IsFileLocked(string filePath)
        {
            var ret = false;
            try
            {
                using (File.Open(filePath, FileMode.Open)) { }
            }
            catch (IOException e)
            {
                var errorCode = Marshal.GetHRForException(e) & ((1 << 16) - 1);
                ret = errorCode == 32 || errorCode == 33;
            }
            return ret;
        }
    }
    
    
    
    public static class ObservableExtensions
    {
    
    
        public class NotReadyException : Exception
        {
            public NotReadyException (string message) : base(message)
            {
            }
        }
    
        public static IObservable<T> WaitIf<T>(
          this IObservable<T> @this,
          Func<T, bool> predicate,
          int? retryCount = null,
          Func<int, TimeSpan> strategy = null,
          Func<Exception, bool> retryOnError = null,
          IScheduler scheduler = null)
        {
            scheduler = scheduler ?? DefaultScheduler.Instance;
            return @this.SelectMany(f =>
            Observable.Defer(() =>
               Observable.FromAsync<bool>(() => Task.Run<bool>(() => predicate.Invoke(f)),scheduler)
               .SelectMany(b => b ? Observable.Throw<T>(new NotReadyException(f + " not ready")) :
                               Observable.Return(f)
            ).RetryWithBackoff(retryCount, strategy, retryOnError, scheduler)));
        }
    }
    

    【讨论】:

      【解决方案3】:

      这是Servysolution 的功能更丰富的版本。它允许监视特定的文件系统状态和事件,以涵盖不同的场景。它也可以通过超时和CancellationToken 取消。

      [Flags]
      public enum WatchFileType
      {
          Created = 1,
          Deleted = 2,
          Changed = 4,
          Renamed = 8,
          Exists = 16,
          ExistsNotEmpty = 32,
          NotExists = 64,
      }
      
      public static Task<WatchFileType> WatchFile(string filePath,
          WatchFileType watchTypes,
          int timeout = Timeout.Infinite,
          CancellationToken cancellationToken = default)
      {
          var tcs = new TaskCompletionSource<WatchFileType>();
          var fileName = Path.GetFileName(filePath);
          var folderPath = Path.GetDirectoryName(filePath);
          var fsw = new FileSystemWatcher(folderPath);
          fsw.Filter = fileName;
      
          if (watchTypes.HasFlag(WatchFileType.Created)) fsw.Created += Handler;
          if (watchTypes.HasFlag(WatchFileType.Deleted)) fsw.Deleted += Handler;
          if (watchTypes.HasFlag(WatchFileType.Changed)) fsw.Changed += Handler;
          if (watchTypes.HasFlag(WatchFileType.Renamed)) fsw.Renamed += Handler;
      
          void Handler(object sender, FileSystemEventArgs e)
          {
              WatchFileType result;
              switch (e.ChangeType)
              {
                  case WatcherChangeTypes.Created: result = WatchFileType.Created; break;
                  case WatcherChangeTypes.Deleted: result = WatchFileType.Deleted; break;
                  case WatcherChangeTypes.Changed: result = WatchFileType.Changed; break;
                  case WatcherChangeTypes.Renamed: result = WatchFileType.Renamed; break;
                  default: throw new NotImplementedException(e.ChangeType.ToString());
              }
              fsw.Dispose();
              tcs.TrySetResult(result);
          }
      
          fsw.Error += (object sender, ErrorEventArgs e) =>
          {
              fsw.Dispose();
              tcs.TrySetException(e.GetException());
          };
      
          CancellationTokenRegistration cancellationTokenReg = default;
      
          fsw.Disposed += (object sender, EventArgs e) =>
          {
              cancellationTokenReg.Dispose();
          };
      
          fsw.EnableRaisingEvents = true;
      
          var fileInfo = new FileInfo(filePath);
          if (watchTypes.HasFlag(WatchFileType.Exists) && fileInfo.Exists)
          {
              fsw.Dispose();
              tcs.TrySetResult(WatchFileType.Exists);
          }
          if (watchTypes.HasFlag(WatchFileType.ExistsNotEmpty)
              && fileInfo.Exists && fileInfo.Length > 0)
          {
              fsw.Dispose();
              tcs.TrySetResult(WatchFileType.ExistsNotEmpty);
          }
          if (watchTypes.HasFlag(WatchFileType.NotExists) && !fileInfo.Exists)
          {
              fsw.Dispose();
              tcs.TrySetResult(WatchFileType.NotExists);
          }
      
          if (cancellationToken.CanBeCanceled)
          {
              cancellationTokenReg = cancellationToken.Register(() =>
              {
                  fsw.Dispose();
                  tcs.TrySetCanceled(cancellationToken);
              });
          }
      
          if (tcs.Task.IsCompleted || timeout == Timeout.Infinite)
          {
              return tcs.Task;
          }
      
          // Handle timeout
          var cts = new CancellationTokenSource();
          var delayTask = Task.Delay(timeout, cts.Token);
          return Task.WhenAny(tcs.Task, delayTask).ContinueWith(_ =>
          {
              cts.Cancel();
              if (tcs.Task.IsCompleted) return tcs.Task;
              fsw.Dispose();
              return Task.FromCanceled<WatchFileType>(cts.Token);
          }, TaskContinuationOptions.ExecuteSynchronously).Unwrap();
      }
      

      使用示例:

      var result = await WatchFile(@"..\..\_Test.txt",
          WatchFileType.Exists | WatchFileType.Created, 5000);
      

      在本例中,结果通常为WatchFileType.ExistsWatchFileType.Created。在文件不存在且 5000 毫秒内未创建文件的异常情况下,将抛出 TaskCanceledException

      场景
      WatchFileType.Exists | WatchFileType.Created:用于一次性创建的文件。
      WatchFileType.ExistsNotEmpty | WatchFileType.Changed:用于首先创建为空然后填充数据的文件。
      WatchFileType.NotExists | WatchFileType.Deleted:用于即将删除的文件。

      【讨论】:

        【解决方案4】:

        这就是我的做法:

        await Task.Run(() => {while(!File.Exists(@"yourpath.extension")){} return;});
        //do all the processing
        

        你也可以把它打包成一个方法:

        public static Task WaitForFileAsync(string path)
        {
            if (File.Exists(path)) return Task.FromResult<object>(null);
            var tcs = new TaskCompletionSource<object>();
            FileSystemWatcher watcher = new FileSystemWatcher(Path.GetDirectoryName(path));
            watcher.Created += (s, e) => 
            { 
                if (e.FullPath.Equals(path))
                { 
                    tcs.TrySetResult(null);
                    if (watcher != null)
                    {
                        watcher.EnableRaisingEvents = false;
                        watcher.Dispose();
                    }
                } 
            };
            watcher.Renamed += (s, e) =>
            {
                if (e.FullPath.Equals(path))
                {
                    tcs.TrySetResult(null);
                    if (watcher != null)
                    {
                        watcher.EnableRaisingEvents = false;
                        watcher.Dispose();
                    }
                }
            };
            watcher.EnableRaisingEvents = true;
            return tcs.Task;
        }
        

        然后就这样使用它:

        await WaitForFileAsync("yourpath.extension");
        //do all the processing
        

        【讨论】:

        • @StephenCleary 好吧,那里没有 FileExistsAsync 吗? FileSystemWatcher 可能会给您错误的路径名,并且并不总是涵盖重命名或文件是否存在。这可能是最好的方法。
        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2023-03-12
        • 1970-01-01
        • 1970-01-01
        • 2016-07-07
        • 2016-03-25
        • 2017-10-09
        相关资源
        最近更新 更多