• R/O
  • SSH
  • HTTPS

Commit

Frequently used words (click to add to your profile)

javaandroidc++linuxc#objective-ccocoa誰得qtrubybathyscaphegamephpguicwindows翻訳pythonomegattwitterframeworkbtronarduinovb.net計画中(planning stage)directxpreviewertestゲームエンジンdom

FreeTrainの進化系を目指す


Commit MetaInfo

Revision65 (tree)
Zeit2016-11-07 00:56:08
Autorc477

Log Message

ReactiveExtensionsテスト&練習デモ

Ändern Zusammenfassung

Diff

--- trunk/TestLauncher/Program.cs (revision 64)
+++ trunk/TestLauncher/Program.cs (revision 65)
@@ -22,6 +22,7 @@
2222 Hashtable h = new Hashtable();
2323 Directories.Initialize(h);
2424 window = new TestLauncher();
25+ Trace.UseGlobalLock = false;
2526 Debug.Listeners.Add(window.TraceListener);
2627 Application.Run(window);
2728 Debug.Listeners.Remove(window.TraceListener);
--- trunk/TestLauncher/TestEntryListView.cs (revision 64)
+++ trunk/TestLauncher/TestEntryListView.cs (revision 65)
@@ -8,6 +8,9 @@
88 using nft.framework;
99 using System.Collections;
1010 using System.Drawing;
11+using System.Reactive.Linq;
12+using System.Threading;
13+using System.Threading.Tasks;
1114
1215 namespace nft.test
1316 {
@@ -195,25 +198,42 @@
195198 UpdateStatusLabel();
196199 }
197200
198- public void Run(ListViewItem item) {
201+ public async void Run(ListViewItem item) {
199202 Debug.WriteLine(StartLogText(item));
200203 TestInfo info = (TestInfo)item.Tag;
201- info.Run();
204+ if (info.TestAttribute.ShouldRunOnUIThread)
205+ {
206+ info.Run();
207+ AfterRun(item);
208+ return;
209+ }
210+ var o = Observable.Start(async () =>
211+ {
212+ await (Task)info.Run();
213+ });
214+ await o.ObserveOn(SynchronizationContext.Current).Finally(() => AfterRun(item));
215+ }
216+ #endregion
217+
218+ private void AfterRun(ListViewItem item)
219+ {
220+ TestInfo info = (TestInfo)item.Tag;
202221 item.SubItems[stateColumn].Text = GetStatusText(info);
203- Debug.WriteLineIf(info.TestState != TestState.Success,"### ERROR ###");
222+ Debug.WriteLineIf(info.TestState != TestState.Success, "### ERROR ###");
204223 item.SubItems[resultColumn].Text = "" + info.LastResult;
205- if (info.LastResult is Exception) {
224+ if (info.LastResult is Exception)
225+ {
206226 Exception ex = (Exception)info.LastResult;
207- Debug.WriteLine(ex.GetType().Name +":"+ ex.Message);
227+ Debug.WriteLine(ex.GetType().Name + ":" + ex.Message);
208228 Debug.WriteLine(ex.StackTrace);
209229 }
210230 Debug.WriteLine(EndLogText(item));
211231 Debug.WriteLine("");
212- if (!ignoreStatusUpdate) {
232+ if (!ignoreStatusUpdate)
233+ {
213234 UpdateStatusLabel();
214235 }
215236 }
216- #endregion
217237
218238 #region static utils
219239 static protected string GetStatusText(TestInfo info) {
--- trunk/TestLauncher/TestLauncher.cs (revision 64)
+++ trunk/TestLauncher/TestLauncher.cs (revision 65)
@@ -115,14 +115,34 @@
115115 }
116116
117117 public override void Write(string message) {
118- tbOutput.Invoke(new Action(() => tbOutput.AppendText(message)));
118+ if (tbOutput.InvokeRequired)
119+ {
120+ tbOutput.BeginInvoke(new Action(() => tbOutput.AppendText(message)));
121+ } else
122+ {
123+ tbOutput.AppendText(message);
124+ tbOutput.AppendText(System.Environment.NewLine);
125+ }
119126 }
120127
121128 public override void WriteLine(string message) {
122- tbOutput.Invoke(new Action(() => {
129+ if (tbOutput.InvokeRequired)
130+ {
131+ tbOutput.BeginInvoke(new Action(() =>
132+ {
133+ tbOutput.AppendText(message);
134+ tbOutput.AppendText(System.Environment.NewLine);
135+ }));
136+ } else
137+ {
123138 tbOutput.AppendText(message);
124139 tbOutput.AppendText(System.Environment.NewLine);
125- }));
140+ }
126141 }
142+
143+ public override bool IsThreadSafe
144+ {
145+ get { return true; }
146+ }
127147 }
128148 }
--- trunk/TestLauncher/test/RxPractice/HeavyTaskModel.cs (nonexistent)
+++ trunk/TestLauncher/test/RxPractice/HeavyTaskModel.cs (revision 65)
@@ -0,0 +1,84 @@
1+using System;
2+using System.Collections.Generic;
3+using System.Diagnostics;
4+using System.Linq;
5+using System.Reactive;
6+using System.Reactive.Linq;
7+using System.Reactive.Subjects;
8+using System.Text;
9+using System.Threading;
10+using System.Threading.Tasks;
11+
12+namespace nft.test.test.RxPractice
13+{
14+ public class HeavyTaskModel
15+ {
16+ const int MAX_PROCEED = 1000;
17+ private int proceed = 0;
18+ private bool debugLog = true;
19+ private SubjectBase<int> notifyProgress;
20+ private string name = "";
21+
22+ public HeavyTaskModel(string name, SubjectBase<int> subject = null)
23+ {
24+ this.notifyProgress = subject != null ? subject : new Subject<int>();
25+ this.name = name;
26+ }
27+ public HeavyTaskModel(SubjectBase<int> subject = null) : this("", subject)
28+ {
29+ }
30+
31+ // 重い処理のイメージ
32+ public void Process()
33+ {
34+ string prefix = name.Length > 0 ? string.Format("[{0}] ", name) : "#";
35+ for (int i = 0; i < 10; i++)
36+ {
37+ if (debugLog) Debug.WriteLine("{0}{1}", prefix, i);
38+ notifyProgress.OnNext(i);
39+ Thread.Sleep(350);
40+ }
41+ notifyProgress.OnCompleted();
42+ }
43+
44+ // 重い処理のイメージ
45+ public void Process2()
46+ {
47+ string prefix = name.Length > 0 ? string.Format("[{0}] ", name) : "";
48+ Debug.Assert(proceed == 0, prefix + "The process ALREADY Started!");
49+ Random rand = new Random();
50+ int vRange = MAX_PROCEED / 100;
51+ int aRange = MAX_PROCEED / 150;
52+ int velocity = rand.Next(2, vRange) + rand.Next(2, vRange);
53+ int accel = rand.Next(aRange) - rand.Next(aRange);
54+ int count = 0;
55+ if(debugLog) Debug.WriteLine("{0}Demo Process: proceed={1}, v={2}, a={3}", prefix, proceed, velocity, accel);
56+ while (proceed < MAX_PROCEED)
57+ {
58+ notifyProgress.OnNext(proceed);
59+ Thread.Sleep(20);
60+ count++;
61+ proceed += velocity;
62+ velocity += accel;
63+ if ( accel < 0 && velocity < vRange/2 || accel > 0 && velocity > vRange*3/2 || count % 100 == 0 )
64+ {
65+ do
66+ {
67+ accel = rand.Next(aRange + Math.Max(vRange - velocity, 1)) - rand.Next(aRange + Math.Max(velocity - vRange, 0));
68+ } while (accel < 0 && velocity < vRange / 2 || accel > 0 && velocity > vRange * 3 / 2);
69+ if (debugLog) Debug.WriteLine("{0}Demo Process: proceed={1}, v={2}, a={3}", prefix, proceed, velocity, accel);
70+ }
71+ }
72+ notifyProgress.OnNext(MAX_PROCEED);
73+ notifyProgress.OnCompleted();
74+ }
75+
76+ public IObservable<int> NotifyProgress
77+ {
78+ get
79+ {
80+ return notifyProgress;
81+ }
82+ }
83+ }
84+}
--- trunk/TestLauncher/test/RxPractice/ReactiveExtensionsPatterns.cs (revision 64)
+++ trunk/TestLauncher/test/RxPractice/ReactiveExtensionsPatterns.cs (revision 65)
@@ -21,7 +21,7 @@
2121 static class RxSamples
2222 {
2323
24- [TestEntry("RxSample#イベント間引き",new object[]{100})]
24+ [TestEntry("RxSample#イベント間引き",new object[]{100}, true)]
2525 static private void TestMouseMove1(int span) {
2626 RxPracticeForm form = new RxPracticeForm();
2727 form.SetMessage(String.Format("{0}ミリ秒以内の更新はまとめて、マウス座標をステータスバーに表示します", span));
@@ -35,7 +35,7 @@
3535 disposer.Dispose();
3636 }
3737
38- [TestEntry("RxSample#イベント間引き2", new object[] { 500 })]
38+ [TestEntry("RxSample#イベント間引き2", new object[] { 500 }, true)]
3939 static private void TestMouseMove2(int span) {
4040 RxPracticeForm form = new RxPracticeForm();
4141 form.SetMessage(String.Format("{0}ミリ秒に一回、マウス座標をステータスバーに表示します", span));
@@ -49,7 +49,7 @@
4949 disposer.Dispose();
5050 }
5151
52- [TestEntry("RxSample#ドラッグ描画パターン")]
52+ [TestEntry("RxSample#ドラッグ描画パターン", true)]
5353 static private void TestMouseMove3() {
5454 RxPracticeForm form = new RxPracticeForm();
5555 form.SetMessage("Aボタンで開始、Bボタンで停止、マウス移動差分をステータスバーに表示します");
@@ -76,13 +76,13 @@
7676 disposer.Dispose();
7777 }
7878
79- [TestEntry("RxSample#タイマーと後始末テスト")]
79+ [TestEntry("RxSample#タイマーと後始末テスト", true)]
8080 static private void TestTimerAndDispose() {
8181 RxPracticeForm form = new RxPracticeForm();
8282 form.SetMessage("A・Bボタンでタイマー登録をステータスバーに表示します");
8383
84- var disposerA = form.ObserveClickA().Subscribe( _ => Console.WriteLine("A button Clicked"));
85- var disposerB = form.ObserveClickA().Subscribe( _ => Console.WriteLine("B button Clicked"));
84+ var disposerA = form.ObserveClickA().Subscribe( _ => Debug.WriteLine("A button Clicked"));
85+ var disposerB = form.ObserveClickA().Subscribe( _ => Debug.WriteLine("B button Clicked"));
8686 int numA = 0, numB = 0;
8787
8888 using (var collection = new CompositeDisposable(disposerA,disposerB)) //? 登録
@@ -92,16 +92,17 @@
9292 String id = String.Format("A{0}", ++numA);
9393 var disposer2 = Observable.Timer(TimeSpan.FromSeconds(0), TimeSpan.FromSeconds(1))
9494 .Select(v => counter++)
95- .Subscribe(v => Console.WriteLine(String.Format("{0}={1}", id, v)));
95+ .Subscribe(v => Debug.WriteLine(String.Format("{0}={1}", id, v)));
9696 collection.Add(disposer2);
9797 });
9898 collection.Add(disposerA);
99- disposerB = form.ObserveClickB().Subscribe(_ => {
99+ disposerB = form.ObserveClickB().Subscribe(_ =>
100+ {
100101 int counter = 0;
101102 String id = String.Format("B{0}", ++numB);
102103 var disposer2 = Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1))
103104 .Select(v => counter++)
104- .Subscribe(v => Console.WriteLine(String.Format("{0}={1}", id, v)));
105+ .Subscribe(v => Debug.WriteLine(String.Format("{0}={1}", id, v)));
105106 collection.Add(disposer2);
106107 });
107108 collection.Add(disposerB);
--- trunk/TestLauncher/test/RxPractice/ReactiveExtensionsPractice.cs (revision 64)
+++ trunk/TestLauncher/test/RxPractice/ReactiveExtensionsPractice.cs (revision 65)
@@ -14,6 +14,8 @@
1414 using System.Reactive.Concurrency;
1515 using System.Windows.Forms;
1616 using System.Reactive.Subjects;
17+using System.Reactive.Disposables;
18+using System.Reactive;
1719
1820 namespace nft.test.test.RxPractice
1921 {
@@ -20,6 +22,190 @@
2022 static class RxTestMain
2123 {
2224
25+ [TestEntry("RxTest#HeavyTaskModel同期処理テスト")]
26+ static private void DoSyncHeavyTask()
27+ {
28+ HeavyTaskModel model = new HeavyTaskModel();
29+ model.NotifyProgress.Subscribe(v => Debug.WriteLine(v,"progress0"));
30+ Debug.WriteLine("Start process!");
31+ model.Process();
32+ }
33+
34+ [TestEntry("RxTest#HeavyTaskModel非同期処理テスト")]
35+ static private void DoAsyncHeavyTask()
36+ {
37+ HeavyTaskModel model = new HeavyTaskModel();
38+ // SynchronizationContext.Currentは Observable.Startから呼ぶとnullになるので使えない
39+ // メインスレッドから直接この関数を呼ぶ場合には有効
40+ model.NotifyProgress/*.ObserveOn(SynchronizationContext.Current)*/
41+ .Subscribe(v => Debug.WriteLine(v, "progress1"), ()=> Debug.WriteLine("Subscriber completed"));
42+ var o = Observable.Start(() =>
43+ {
44+ Debug.WriteLine("Start process!");
45+ model.Process();
46+ }).Finally(() => Debug.WriteLine("Process completed."));
47+ //o.Wait(); // 終了待機1、Finallyが呼ばれたあと戻る
48+ o.GetAwaiter().GetResult(); // 終了待機2、Finallyが呼ばれる前に戻る
49+ }
50+
51+ [TestEntry("RxTest#HeavyTaskModel非同期処理テスト2")]
52+ static private async Task DoAsyncHeavyTask2()
53+ {
54+ int i = 0;
55+ var o = Observable.FromAsync(() => Task.Run(() => {
56+ HeavyTaskModel model = new HeavyTaskModel();
57+ model.NotifyProgress.Subscribe(v => Debug.WriteLine(v, "progress1"), () => Debug.WriteLine("Subscriber completed"));
58+ Debug.WriteLine("Start process!");
59+ model.Process();
60+ return ++i;
61+ }));
62+ o.Repeat(3).Subscribe(x => Debug.WriteLine(x,"Task result"), () => Debug.WriteLine("Task Complete"));
63+ await o.LastAsync(); // Repeatの一週目でくる
64+ Debug.WriteLine("Await1");
65+ await o.LastAsync(); // Repeatの二週目でくる
66+ Debug.WriteLine("Await2");
67+ await o.LastAsync(); // Repeatの三週目でくる
68+ Debug.WriteLine("Await3");
69+ }
70+
71+ [TestEntry("RxTest#HeavyTaskModel非同期並行処理")]
72+ static private async Task DoAsyncHeavyTaskMulti()
73+ {
74+ var o = Observable.CombineLatest(
75+ Observable.Start(() => { HeavyTaskModel model = new HeavyTaskModel("A"); model.Process(); return "A End"; }),
76+ Observable.Start(() => { HeavyTaskModel model = new HeavyTaskModel("B"); model.Process(); return "B End"; }),
77+ Observable.Start(() => { HeavyTaskModel model = new HeavyTaskModel("C"); model.Process(); return "C End"; })
78+ ).Finally(() => Debug.WriteLine("All Process completed.")); ;
79+ foreach (string r in await o.FirstAsync())
80+ {
81+ Debug.WriteLine("Result = " + r);
82+ }
83+ }
84+
85+ [TestEntry("RxTestMain#別スレッド処理&キャンセル")]
86+ static private bool TestScedhulerAndCancel()
87+ {
88+ IObservable<int> ob =
89+ Observable.Create<int>(o => {
90+ var cancel = new CancellationDisposable(); // internally creates a new CancellationTokenSource
91+ NewThreadScheduler.Default.Schedule(() => {
92+ int i = 0;
93+ for (;;)
94+ {
95+ Thread.Sleep(100);
96+ if (cancel.Token.IsCancellationRequested) // check cancel token periodically
97+ {
98+ Debug.WriteLine("Abort by cancel!");
99+ o.OnCompleted(); // will not make it to the subscriber
100+ return;
101+ }
102+ o.OnNext(i++);
103+ }
104+ });
105+ return cancel;
106+ });
107+ IDisposable subscription = ob.Subscribe(i => Debug.WriteLine(i));
108+ Debug.WriteLine("Wait for 1 sec.");
109+ Thread.Sleep(1000);
110+ Debug.WriteLine("Dispose will cause cancel.");
111+ subscription.Dispose();
112+ return true;
113+ }
114+
115+ static private AsyncCallback Callback;
116+ [TestEntry("RxTestMain#ストリーム操作サンプル")]
117+ static private async Task<bool> TestStream()
118+ {
119+ // Obsolated
120+ //var read = Observable.FromAsyncPattern<byte[], int, int, int>(inputStream.BeginRead, inputStream.EndRead);
121+ //IObservable<int> observable = read(someBytes, 0, 10);
122+
123+ string file = "nftfw.resource.xml";
124+ byte[] buff = new byte[200];
125+
126+ /****************** Old style before .NET Framework4.5 *******************/
127+ Subject<Unit> sub = new Subject<Unit>();
128+ FileStream fs = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
129+ Callback = result =>
130+ {
131+ int length = fs.EndRead(result);
132+ if (length > 0)
133+ {
134+ Debug.WriteLine("Read Byte = " + length, "Old");
135+ fs.BeginRead(buff, 0, 200, Callback, fs);
136+ }
137+ else
138+ {
139+ Debug.WriteLine("Read Completed.", "Old");
140+ fs.Close();
141+ sub.OnCompleted();
142+ }
143+ };
144+ fs.BeginRead(buff, 0, 200, Callback, fs);
145+ await sub.FirstOrDefaultAsync(); // ここで待たないと同時並行で下の読み込みが始まる。
146+ sub.Dispose();
147+
148+ /****************** .NET Framework4.5 or later *******************/
149+ FileStream fs2 = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
150+ StreamReader sr = new StreamReader(fs2);
151+ string s = await sr.ReadLineAsync();
152+ while (s != null)
153+ {
154+ Debug.WriteLine(s, "New");
155+ s = await sr.ReadLineAsync();
156+ }
157+
158+ return true;
159+ }
160+
161+ [TestEntry("RxTestMain#IEnumerable操作サンプル")]
162+ static private async Task<bool> TestEnumeration()
163+ {
164+ IEnumerable<int> someInts = new List<int> { 1, 2, 3, 4, 5 };
165+
166+ // To convert a generic IEnumerable into an IObservable, use the ToObservable extension method.
167+ IObservable<int> observable = someInts.ToObservable();
168+
169+ var o = Observable.Start(() => {
170+ observable.Subscribe<int>(i => Debug.WriteLine("Enumerated from list = " + i));
171+ });
172+ await o.Finally(() => Debug.WriteLine("Completed."));
173+ return true;
174+ }
175+
176+ [TestEntry("RxTestMain#周期タイマー&スイッチ")]
177+ static private bool TestIntervalTimer()
178+ {
179+ var switches = new BehaviorSubject<bool>(false);
180+ var interval = Observable.Interval(TimeSpan.FromMilliseconds(500)).TakeUntil(switches.Where(on => !on));
181+ var query = switches.DistinctUntilChanged().Where(on => on).SelectMany(interval);
182+
183+ using (query.TimeInterval().Subscribe(count => Debug.WriteLine(count, "timer")))
184+ {
185+ Debug.WriteLine("start");
186+ Debug.WriteLine("wait {0}s", 1);
187+ Thread.Sleep(TimeSpan.FromSeconds(1));
188+ Debug.WriteLine("ON","switch"); switches.OnNext(true);
189+ Thread.Sleep(TimeSpan.FromSeconds(2));
190+ Debug.WriteLine("OFF", "switch"); switches.OnNext(false);
191+ Debug.WriteLine("wait {0}s", 2);
192+ Thread.Sleep(TimeSpan.FromSeconds(2));
193+ Debug.WriteLine("ON", "switch"); switches.OnNext(true);
194+ Thread.Sleep(TimeSpan.FromSeconds(3));
195+ Debug.WriteLine("OFF", "switch"); switches.OnNext(false);
196+ Debug.WriteLine("wait {0}s", 1);
197+ Thread.Sleep(TimeSpan.FromSeconds(1));
198+ Debug.WriteLine("OFF", "switch"); switches.OnNext(false);
199+ Debug.WriteLine("wait {0}s", 1);
200+ Thread.Sleep(TimeSpan.FromSeconds(1));
201+ Debug.WriteLine("ON", "switch"); switches.OnNext(true);
202+ Thread.Sleep(TimeSpan.FromSeconds(1));
203+ Debug.WriteLine("ON", "switch"); switches.OnNext(true);
204+ Thread.Sleep(TimeSpan.FromSeconds(2));
205+ }
206+ return true;
207+ }
208+
23209 [TestEntry("RxTestMain#ノーマルSubject")]
24210 static private bool TestSubject() {
25211 return TestSubjectMain(new Subject<int>());
--- trunk/framework/framework/TestEntryAttribute.cs (revision 64)
+++ trunk/framework/framework/TestEntryAttribute.cs (revision 65)
@@ -10,19 +10,21 @@
1010 {
1111 protected object[] _arguments = null;
1212 protected string _caps;
13+ protected bool _onUIThread = false;
1314
14- public TestEntryAttribute()
15- : this(null, null) {
15+ public TestEntryAttribute(bool onUIThread = false)
16+ : this(null, null, onUIThread) {
1617 }
17- public TestEntryAttribute(String caption)
18- : this(caption, null) {
18+ public TestEntryAttribute(String caption, bool onUIThread = false)
19+ : this(caption, null, onUIThread) {
1920 }
20- public TestEntryAttribute(object[] args)
21- : this(null, args) {
21+ public TestEntryAttribute(object[] args, bool onUIThread = false)
22+ : this(null, args, onUIThread) {
2223 }
23- public TestEntryAttribute(String caption, object[] args) {
24+ public TestEntryAttribute(String caption, object[] args, bool onUIThread = false) {
2425 this._arguments = args;
2526 this._caps = caption;
27+ this._onUIThread = onUIThread;
2628 }
2729
2830 public String Caption {
@@ -34,5 +36,11 @@
3436 get { return _arguments; }
3537 set { _arguments = value; }
3638 }
39+
40+ public bool ShouldRunOnUIThread
41+ {
42+ get { return _onUIThread; }
43+ set { _onUIThread = value; }
44+ }
3745 }
3846 }
--- trunk/framework/framework/TestUtil.cs (revision 64)
+++ trunk/framework/framework/TestUtil.cs (revision 65)
@@ -5,6 +5,7 @@
55 using nft.util;
66 using System.Diagnostics;
77 using System.Windows.Forms;
8+using System.Threading.Tasks;
89
910 namespace nft.framework
1011 {
@@ -85,6 +86,7 @@
8586 public object Run() {
8687 try {
8788 LastResult = this.Method.Invoke(null, TestAttribute.Arguments);
89+ if (LastResult is Task) ((Task)LastResult).Wait();
8890 _state = TestState.Success;
8991 } catch (TargetInvocationException ex) {
9092 _state = TestState.Failed;