Сцепить несколько двухэтапных загрузок файлов с помощью Rx

Я пытаюсь загрузить несколько файлов из клиента Silverlight непосредственно в Amazon S3. Пользователь выбирает файлы из стандартного диалогового окна открытия файлов, и я хочу связать загрузки, чтобы они происходили последовательно по одному за раз. Это может произойти из нескольких мест в приложении, поэтому я пытался обернуть его в хороший служебный класс, который принимает IEnumerable для выбранных файлов, предоставляет IObservable для файлов по мере их загрузки, чтобы пользовательский интерфейс мог соответствующим образом реагировать на каждый файл. закончен.

Это довольно сложно из-за всех требований безопасности как Silverlight, так и AmazonS3. Я попытаюсь кратко объяснить всю свою среду для контекста, но я воспроизвел проблему с небольшим консольным приложением, код которого я опубликую ниже.

У меня есть сторонняя утилита, которая обрабатывает загрузку на S3 из Silverlight, которая предоставляет стандартные асинхронные методы на основе событий. Я создаю один экземпляр этой утилиты для каждого загруженного файла. Он создает неподписанную строку запроса, которую я затем отправляю на свой сервер для подписи с помощью моего закрытого ключа. Этот запрос на подпись выполняется через прокси-класс службы, который также использует асинхронные методы на основе событий. Получив подписанный запрос, я добавляю его в экземпляр загрузчика и инициирую загрузку.

Я пытался использовать Concat, но в итоге у меня проходит только первый файл. Когда я использую Merge, все файлы завершаются нормально, но параллельно, а не последовательно. Когда я использую Merge(2), все файлы начинают первый шаг, но затем только 2 проходят и завершаются.

Очевидно, я упускаю что-то, связанное с Rx, поскольку он ведет себя не так, как я ожидаю.

namespace RxConcat
{
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Reactive.Linq;
    using System.Timers;

    public class SignCompletedEventArgs : EventArgs
    {
        public string SignedRequest { get; set; }
    }

    public class ChainUploader
    {
        public IObservable<string> StartUploading(IEnumerable<string> files)
        {
            return files.Select(
                     file => from signArgs in this.Sign(file + "_request")
                             from uploadArgs in this.Upload(file, signArgs.EventArgs.SignedRequest)
                             select file).Concat();
        }

        private IObservable<System.Reactive.EventPattern<SignCompletedEventArgs>> Sign(string request)
        {
            Console.WriteLine("Signing request '" + request + "'");
            var signer = new Signer();
            var source = Observable.FromEventPattern<SignCompletedEventArgs>(ev => signer.SignCompleted += ev, ev => signer.SignCompleted -= ev);
            signer.SignAsync(request);
            return source;
        }

        private IObservable<System.Reactive.EventPattern<EventArgs>> Upload(string file, string signedRequest)
        {
            Console.WriteLine("Uploading file '" + file + "'");
            var uploader = new Uploader();
            var source = Observable.FromEventPattern<EventArgs>(ev => uploader.UploadCompleted += ev, ev => uploader.UploadCompleted -= ev);
            uploader.UploadAsync(file, signedRequest);
            return source;
        }
    }

    public class Signer
    {
        public event EventHandler<SignCompletedEventArgs> SignCompleted;

        public void SignAsync(string request)
        {
            var timer = new Timer(1000);
            timer.Elapsed += (sender, args) =>
            {
                timer.Stop();
                if (this.SignCompleted == null)
                {
                    return;
                }

                this.SignCompleted(this, new SignCompletedEventArgs { SignedRequest = request + "signed" });
            };
            timer.Start();
        }
    }

    public class Uploader
    {
        public event EventHandler<EventArgs> UploadCompleted;

        public void UploadAsync(string file, string signedRequest)
        {
            var timer = new Timer(1000);
            timer.Elapsed += (sender, args) =>
            {
                timer.Stop();
                if (this.UploadCompleted == null)
                {
                    return;
                }

                this.UploadCompleted(this, new EventArgs());
            };
            timer.Start();
        }
    }

    internal class Program
    {
        private static void Main(string[] args)
        {
            var files = new[] { "foo", "bar", "baz" };
            var uploader = new ChainUploader();
            var token = uploader.StartUploading(files).Subscribe(file =>   Console.WriteLine("Upload completed for '" + file + "'"));
            Console.ReadLine();
        }
    }
}

person GelatinousSlime    schedule 08.06.2012    source источник


Ответы (1)


Базовая наблюдаемая, которая обрабатывает двухэтапную загрузку для каждого файла, никогда не «завершается», что предотвращает запуск следующего в цепочке. Добавьте Limit (1) к этому наблюдаемому перед вызовом Concat (), и он будет работать правильно.

return files.Select(file => (from signArgs in this.Sign(file + "_request")
                             from uploadArgs in this.Upload(file, signArgs.EventArgs.SignedRequest)
                             select file).Take(1)).Concat();
person unknownprotocol    schedule 08.06.2012