1 // Written in the D programming language.
2 /*
3 NYSL Version 0.9982
4 
5 A. This software is "Everyone'sWare". It means:
6   Anybody who has this software can use it as if he/she is
7   the author.
8 
9   A-1. Freeware. No fee is required.
10   A-2. You can freely redistribute this software.
11   A-3. You can freely modify this software. And the source
12       may be used in any software with no limitation.
13   A-4. When you release a modified version to public, you
14       must publish it with your name.
15 
16 B. The author is not responsible for any kind of damages or loss
17   while using or misusing this software, which is distributed
18   "AS IS". No warranty of any kind is expressed or implied.
19   You use AT YOUR OWN RISK.
20 
21 C. Copyrighted to Kazuki KOMATSU
22 
23 D. Above three clauses are applied both to source and binary
24   form of this software.
25 */
26 
27 /**
28 このモジュールは、信号処理のための試験的なモジュールです。
29 このモジュールで定義されているストリームは、Rangeに拡張を加える事で大量のデータを処理することに最適です。
30 また、状況によってはSIMD最適化を狙えるよう設計になっています。
31 */
32 module carbon.stream;
33 
34 import carbon.math,
35        carbon.functional,
36        carbon.nonametype,
37        carbon.simd;
38 
39 import std.algorithm,
40        std.complex,
41        std.container,
42        std.file,
43        std.functional,
44        std.math,
45        std.range,
46        std.stdio,
47        std.traits,
48        std.typecons;
49 
50 
51 /**
52 入力ストリームは、`read`メソッドに与えられたバッファに要素を格納可能な型です。
53 実行時には、次のような特徴を満たさなければいけません。
54 
55 + `stream.empty`は、そのストリームからノンブロッキングで一要素でも取り出せる場合には`false`となる。
56 + `stream.front`は、そのストリームからノンブロッキングで一要素取り出す。
57 + `stream.popFront`は、そのストリームをノンブロッキングでひとつ進める。
58 + `stream.length`が有効である場合、この値はノンブロッキングで読み出すことができる要素数となる。
59 + `stream.read(buf)`は、ノンブロッキングで処理しなければいけない。
60 + `stream.read(buf)`は、bufのうち先頭から読み込みに成功した要素数だけのスライスを返す。
61     - `stream.length >= buf.length`のとき、常に、`buf.length == stream.read(buf).length`となる。
62 //+ `stream.closed`は、いくら待ってもそれ以上要素が取り出せない状況で`true`となる。
63 //    - `stream.closed`が真の場合、`stream.empty`であり、`lenght`が有効であれば`stream.length == 0`を満たす。
64 + `stream.fetch()`は、`stream.empty`が偽になるまで、
65     もしくはそのストリームからは今後一切データを取り出せないと判断した時点で処理を返す。
66     `stream.fetch()`は、待機後の`stream.empty()`を返す。
67 + `stream`が無限レンジの場合、`stream.read(buf)`は常に`buf`の全要素に読み込む。
68 */
69 enum bool isInputStream(T) = isInputRange!T && is(typeof((T t){
70         alias E = Unqual!(ElementType!T);
71         while(!t.empty || !t.fetch()) E[] var = t.read(new E[1024]);
72     }));
73 
74 
75 /**
76 入力ストリームから、ストリーム終端になるまで可能な限り読み出します。
77 */
78 E[] fillBuffer(S, E)(ref S s, E[] buf)
79 if(isInputStream!S)
80 {
81     E[] rem = buf;
82     while(rem.length && (!s.empty || !s.fetch()))
83         rem = rem[s.read(rem).length .. $];
84     
85     return buf[0 .. $ - rem.length];
86 }
87 
88 
89 /**
90 演算可能なストリームとは、`readOp`メソッドに与えられたバッファ上に、計算した要素を格納可能な入力ストリームです。
91 これは、次のような実行時条件を満たしている必要があります。
92 
93 + `stream.readOp!"op"(buf)`は、`a op= b`という演算をbuf上で行う。
94     つまり、
95     - op : "+"のとき、streamから読み出された値を、bufに加える演算を表す。
96     - op : "-"のとき、streamから読み出された値を、bufから引く演算を表す。
97     + op : ""のときは、`stream.read(buf)`と等価です。
98 + `stream.readOp!func(buf)`は、`func(a, b)`という演算結果をbufに格納する
99     - `stream.readOp!((a, b) => b)(buf)`は、`stream.read(buf)`と等価です。
100     - `stream.readOp!((a, b) => a + b)(buf)`は、`stream.readOp!"+"(buf)`と等価です。
101 */
102 enum bool isInplaceComputableStream(T, alias op = "", U = Unqual!(ElementType!T)) = isInputStream!T &&
103     is(typeof((T t, U[] buf){
104         buf = t.readOp!op(buf);   // as read(buf)
105     }));
106 
107 
108 /**
109 出力ストリームは、`t.write`メソッドに与えられたバッファを出力可能な型です。
110 実行時には次のような条件を満たしている必要があります。
111 
112 + `stream.write(buf)`は、ノンブロッキングでバッファを出力し、まだ出力できていないバッファを返す。
113 + `stream.fill`が偽であれば、一要素でも`stream.write`でノンブロッキング出力可能である
114 + `stream.flush()`は、ブロッキング処理で`t.fill`が偽になるか、
115     もしくはそれ以上出力不可能だと判断した時点で、`t.fill`を返す。
116 */
117 enum bool isOutputStream(T, E) = 
118     is(typeof((T t, E e){
119         E[] writeData = new E[1024];
120         while(writeData.length && (!t.fill || !t.flush())) writeData = t.write(writeData);
121     }));
122 
123 
124 /**
125 */
126 E[] fillBufferOp(alias op, S, E)(ref S s, E[] buffer)
127 {
128     auto rem = buffer;
129     while(rem.length && (!s.empty || !s.fetch()))
130         rem = s.readOp!op(rem);
131     return buffer[0 .. $ - rem.length];
132 }
133 
134 
135 /**
136 内部にバッファを持つような入力ストリームです。
137 
138 + `stream.availables`は内部のバッファを返す。
139 + `stream.consume(size)`は、内部バッファの先頭から`size`要素だけを捨てる。
140 */
141 enum bool isBufferedInputStream(T) = isInputStream!T && 
142     is(typeof((T t){
143         alias E = ElementType!T;
144         const(E)[] buf = t.availables;
145         t.consume(buf.length);
146     }));
147 
148 
149 /**
150 内部にバッファを持つような出力ストリームです。
151 
152 + `stream.buffer`は内部のバッファを返す。
153 + `stream.flush(size)`は、内部バッファの先頭から`size`要素だけ出力する。
154 */
155 enum bool isBufferedOutputStream(T) = isOutputStream!T &&
156     is(typeof((T t){
157         auto buf = t.buffer;
158         static assert(is(typeof(buf) : E[], E));
159         static assert(is(typeof(buf) == Unqual!(typeof(buf))));
160         t.flush(buf.length);
161     }));
162 
163 
164 /**
165 InplaceComputableStreamを作成する際に有用な、std.functional.binaryFunの拡張です
166 */
167 auto ref A binaryFunExt(alias op, A, B)(auto ref A a, auto ref B b)
168 if(is(typeof(mixin(`a` ~ op ~ `=b`))))
169 {
170     mixin(`a` ~ op ~ "=b;");
171     return a;
172 }
173 
174 
175 /// ditto
176 auto ref A binaryFunExt(alias func, A, B)(auto ref A a, auto ref B b)
177 if(is(typeof(a = naryFun!func(a, forward!b))))
178 {
179     a = naryFun!func(a, forward!b);
180     return a;
181 }
182 
183 
184 /**
185 レンジをisInplaceComputableStreamに変換します
186 */
187 auto toStream(R)(R range)
188 if(isInputRange!R && !isInputStream!R)
189 {
190     alias E = Unqual!(ElementType!R);
191 
192     static struct InputStreamRange()
193     {
194         R range;
195         alias range this;
196         alias closed = range.empty;
197         bool fetch(){ return range.empty; }
198 
199         T[] readOp(alias func, T)(T[] buf)
200         {
201             auto p = buf.ptr;
202             const end = () @trusted { return p + buf.length; }();
203             while(p != end && !range.empty){
204                 binaryFunExt!func(*p, range.front);
205                 range.popFront();
206                 () @trusted { ++p; }();
207             }
208 
209             return () @trusted { return buf[0 .. cast(size_t)(p - buf.ptr)]; }();
210         }
211 
212 
213         T[] read(T)(T[] buf) { return readOp!""(buf); }
214 
215         //mixin(defaultStreamSIMDOperator);
216     }
217 
218 
219     return InputStreamRange!()(range);
220 }
221 
222 
223 /**
224 正確なComplexNCO
225 */
226 auto preciseComplexNCO(real freq, real deltaT, real theta = 0) pure nothrow @safe @nogc
227 {
228     static struct PreciseComplexNCO()
229     {
230         Complex!real front() const @property { return this[0]; }
231         void popFront() { _theta += _dt2PI * _freq; }
232         enum bool empty = false;
233         PreciseComplexNCO!() save() const @property { return this; }
234         Complex!real opIndex(size_t i) const @property { return std.complex.expi(_theta + i * _dt2PI * _freq); }
235         struct OpDollar{} enum OpDollar opDollar = OpDollar();
236         PreciseComplexNCO!() opSlice() const { return this; }
237         PreciseComplexNCO!() opSlice(size_t i, OpDollar) const { PreciseComplexNCO!() dst = this; dst._theta += i * _dt2PI * _freq; return dst; }
238         auto opSlice(size_t i, size_t j) const { return this[i .. $].take(j - i); }
239 
240         bool fetch() { return false; }
241 
242         T[] readOp(alias op, T)(T[] buf)
243         {
244             auto p = buf.ptr;
245             const end = () @trusted { return p + buf.length; }();
246             while(p != end){
247                 binaryFunExt!op(*p, std.math.expi(_theta));
248                 _theta += _dt2PI * _freq;
249                 () @trusted { ++p; }();
250             }
251 
252             return buf;
253         }
254 
255 
256         Cpx[] read(Cpx)(Cpx[] buf){ return readOp!""(buf); }
257 
258         //mixin(defaultStreamSIMDOperator);
259 
260       @property
261       {
262         real freq() const @property { return _freq; }
263         void freq(real f) @property { _freq = f; }
264 
265         real deltaT() const @property { return _dt2PI / 2 / PI; }
266         void deltaT(real dt) @property { _dt2PI = dt * 2 * PI; }
267       }
268 
269       private:
270         real _freq;
271         real _dt2PI;
272         real _theta;
273     }
274 
275     return PreciseComplexNCO!()(freq, deltaT * 2 * PI, theta);
276 }
277 
278 
279 ///
280 unittest
281 {
282     scope(failure) {writefln("Unittest failure :%s(%s)", __FILE__, __LINE__); stdout.flush();}
283     scope(success) {writefln("Unittest success :%s(%s)", __FILE__, __LINE__); stdout.flush();}
284 
285     auto sig1 = preciseComplexNCO(1000, 0.001, 0);
286     static assert(isInputStream!(typeof(sig1)));
287     static assert(isInplaceComputableStream!(typeof(sig1)));
288     static assert(is(ElementType!(typeof(sig1)) == creal));
289 
290     assert(equal!((a, b) => isClose(a.re, b.re) && isClose(a.im, b.im))
291                 (sig1[0 .. 4], cast(creal[])[1, 1, 1, 1]));
292 
293     sig1 = preciseComplexNCO(1, 0.25, 0);
294     assert(equal!((a, b) => isClose(a.re, b.re) && isClose(a.im, b.im))
295                 (sig1[0 .. 4], cast(creal[])[1, 0+1i, -1, -1i]));
296 
297     sig1 = preciseComplexNCO(1000, 10.0L^^-6, std.math.E);
298     auto buf = sig1[0 .. 1024].array;
299     assert(sig1.readOp!"-"(buf).length == buf.length);
300     assert(equal!"a == 0"(buf, buf));
301 
302     sig1.readOp!"a+b"(buf);
303 }
304 
305 
306 
307 /**
308 Lookup-Table方式の高速な局部発振器を提供します。
309 テンプレートパラメータの`func`には周期2PIの周期関数を与えることが出来ます。
310 たとえば、`std.math.expi`を与えれば複素発振器となり、`std.math.sin`であれば正弦波を出力します。
311 
312 この局部発振器は周波数の動的な制御が可能なので、信号の周波数をフィードバック制御する際に用いることができます。
313 Lookup-Tableは、テンプレートパラメータ毎にプログラムの初期化時に生成されるので、
314 最初の初期化が終われば、実行コストはテーブルの参照のみになり、高速にアクセス可能です。
315 また、テーブル長を伸ばしても初期化コストが増加するだけで、実行コストはあまり大きくならないことも特徴です。
316 */
317 template lutNCO(alias func, size_t divN)
318 if(isPowOf2(divN))
319 {
320     alias E = typeof(func(0.0));
321     immutable(E[]) table;
322 
323     shared static this()
324     {
325         E[divN] t;
326         foreach(i, ref e; t)
327             e = func(i * 2 * PI / divN);
328 
329         table = t.dup;
330     }
331 
332 
333     struct LutNCO()
334     {
335         E front() const @property { return table[cast(ptrdiff_t)(_phase) & (divN - 1)]; }
336         void popFront() { _phase += _freq * _deltaT * divN; _phase %= divN; }
337         enum bool empty = false;
338         LutNCO save() const @property { return this; }
339         E opIndex(size_t i) const { return table[cast(ptrdiff_t)(_phase + i * _freq * _deltaT * divN) & (divN - 1)]; }
340         struct OpDollar{} enum opDollar = OpDollar();
341         LutNCO opSlice() const { return this; }
342         LutNCO opSlice(size_t i, OpDollar) const { auto dst = this[]; dst._phase += i * _freq * _deltaT; return dst; }
343         auto opSlice(size_t i, size_t j) const { return this[i .. $].take(j - i); }
344 
345         real phase() const @property { return _phase; }
346         void phase(real p) @property { _phase = p; }
347         real freq() const @property { return _freq; }
348         void freq(real f) @property { _freq = f; }
349         real deltaT() const @property { return _deltaT; }
350         void deltaT(real t) @property { _deltaT = t; }
351 
352         bool fetch() { return false; }
353 
354         T[] readOp(alias op, T)(T[] buf)
355         if(is(typeof(binaryFunExt!op(buf[0], table[0]))))
356         {
357             immutable dph = _freq * _deltaT * divN;
358             auto p = buf.ptr;
359             const end = () @trusted { return p + buf.length; }();
360             while(p != end){
361                 binaryFunExt!op(*p, table[cast(ptrdiff_t)(_phase) & (divN - 1)]);
362                 _phase += dph;
363                 () @trusted { ++p; }();
364             }
365 
366             return buf[0 .. $];
367         }
368 
369         auto readOp(alias op, E, size_t N)(SIMDArray!(E, N) buf)
370         if(is(typeof(binaryFunExt!op(buf[0], _table[0]))))
371         {
372             immutable dph = _freq * _deltaT * divN * N;
373             auto p = buf[].ptr;
374             const end = () @trusted { return p + buf.length; }();
375             while(p != end){
376                 binaryFunExt!op(*p, table[cast(ptrdiff_t)(_phase) & (divN - 1)]);
377                 _phase += dph;
378                 () @trusted { ++p; }();
379             }
380 
381             return buf;
382         }
383 
384         T[] read(T)(T[] buf) @safe { return readOp!""(buf); }
385 
386         auto read(E, size_t N)(SIMDArray!(E, N) buf) { return readOp!""(buf); }
387 
388       private:
389         real _phase;
390         real _freq;
391         real _deltaT;
392     }
393 
394 
395     auto lutNCO(real freq, real deltaT, real theta = 0) pure nothrow @safe @nogc
396     {
397         return .lutNCO(table, freq, deltaT, theta);
398     }
399 }
400 
401 
402 ///
403 unittest{
404     auto sig1 = lutNCO!(std.math.expi, 4)(1, 0.25, 0);
405     static assert(isInputStream!(typeof(sig1)));
406     static assert(isInplaceComputableStream!(typeof(sig1)));
407     static assert(is(ElementType!(typeof(sig1)) : creal));
408 
409     assert(equal!((a, b) => isClose(a.re, b.re) && isClose(a.im, b.im))
410         (sig1[0 .. 1024], preciseComplexNCO(1, 0.25, 0)[0 .. 1024]));
411 
412     sig1 = lutNCO!(std.math.expi, 4)(1000, 10.0L^^-6, std.math.E);
413     auto buf = sig1[0 .. 1024].array;
414     assert(sig1.readOp!"-"(buf).length == buf.length);
415     assert(equal!"a == 0"(buf, buf));
416 }
417 
418 
419 /**
420 */
421 auto lutNCO(R)(R range, real freq, real deltaT, real theta = 0) pure nothrow @safe @nogc
422 if(isRandomAccessRange!R && hasLength!R)
423 {
424     alias E = Unqual!(std.range.ElementType!R);
425 
426     static struct LutNCO()
427     {
428         E front() @property { return _table[cast(ptrdiff_t)(_phase) % $]; }
429         void popFront() { _phase += _freq * _deltaT * _table.length; _phase %= _table.length; }
430         enum bool empty = false;
431         auto save() @property { return .lutNCO(_table.save, _freq, _deltaT, _phase); }
432         E opIndex(size_t i) { return _table[cast(ptrdiff_t)(_phase + i * _freq * _deltaT * $) % $]; }
433         struct OpDollar{} enum opDollar = OpDollar();
434         auto opSlice() { return .lutNCO(_table.save, _freq, _deltaT, _phase); }
435         auto opSlice(size_t i, OpDollar) { auto dst = this[]; dst._phase += i * _freq * _deltaT; return dst; }
436         auto opSlice(size_t i, size_t j) { return this[i .. $].take(j - i); }
437 
438       static if(is(typeof((const R r, size_t i){auto e = r[i];})))
439       {
440         E front() const @property { return _table[cast(ptrdiff_t)(_phase) % $]; }
441         E opIndex(size_t i) const { return _table[cast(ptrdiff_t)(_phase + i * _freq * _deltaT * $) % $]; }
442       }
443 
444       static if(is(typeof((const R r){r.save();})))
445       {
446         auto save() const @property { return .lutNCO(_table.save, _freq, _deltaT, _phase); }
447         auto opSlice() const { return .lutNCO(_table.save, _freq, _deltaT, _phase); }
448         auto opSlice(size_t i, OpDollar) const { auto dst = this[]; dst._phase += i * _freq * _deltaT; return dst; }
449         auto opSlice(size_t i, size_t j) const { return this[i .. $].take(j - i); }
450       }
451 
452         real phase() const @property { return _phase; }
453         void phase(real p) @property { _phase = p; }
454         real freq() const @property { return _freq; }
455         void freq(real f) @property { _freq = f; }
456         real deltaT() const @property { return _deltaT; }
457         void deltaT(real t) @property { _deltaT = t; }
458 
459         bool fetch() { return false; }
460 
461         T[] readOp(alias op, T)(T[] buf)
462         if(is(typeof(binaryFunExt!op(buf[0], _table[0]))))
463         {
464             immutable dph = _freq * _deltaT * _table.length;
465             auto p = buf.ptr;
466             const end = () @trusted { return p + buf.length; }();
467             while(p != end){
468                 binaryFunExt!op(*p, _table[cast(ptrdiff_t)(_phase) % $]);
469                 _phase += dph;
470                 () @trusted { ++p; }();
471             }
472 
473             return buf[0 .. $];
474         }
475 
476 
477         auto readOp(alias op, E, size_t N)(SIMDArray!(E, N) buf)
478         if(is(typeof(binaryFunExt!op(buf[0], _table[0]))))
479         {
480             immutable divN = _table.length;
481 
482             immutable dph = _freq * _deltaT * divN * N;
483             auto p = buf[].ptr;
484             const end = () @trusted { return p + buf.length; }();
485             while(p != end){
486                 binaryFunExt!op(*p, _table[cast(ptrdiff_t)(_phase) % $]);
487                 _phase += dph;
488                 () @trusted { ++p; }();
489             }
490 
491             return buf;
492         }
493 
494 
495         T[] read(T)(T[] buf) { return readOp!""(buf); }
496 
497         auto read(E, size_t N)(SIMDArray!(E, N) buf) { return readOp!""(buf); }
498 
499         //mixin(defaultStreamSIMDOperator);
500 
501 
502       private:
503         R _table;
504         real _phase;
505         real _freq;
506         real _deltaT;
507     }
508 
509 
510     return LutNCO!()(range, theta, freq, deltaT);
511 }
512 
513 
514 /**
515 永遠とその配列をループし続けるストリーム
516 */
517 auto repeatStream(E)(const E[] array)
518 in{
519     assert(array.length);
520 }
521 do{
522     static struct RepeatStream()
523     {
524         const(E) front() const @property { return _arr[_pos]; }
525         enum empty = false;
526         void popFront() { ++_pos; _pos %= _arr.length; }
527         RepeatStream!() save() const @property { return this; }
528         const(E) opIndex(size_t i) const { return _arr[_pos % $]; }
529         RepeatStream!() opSlice(){ return this; }
530         struct OpDollar{} enum opDollar = OpDollar();
531         RepeatStream!() opSlice(size_t i, OpDollar){ typeof(return) dst; dst._pos = (_pos + i) % _arr.length; return dst; }
532         auto opSlice(size_t i, size_t j){ return this[i .. $].take(j - i); }
533 
534         bool fetch() { return false; }
535 
536         U[] readOp(alias op, U)(U[] buf)
537         {
538             auto rem = buf;
539             while(rem.length){
540                 auto minL = min(rem.length, _arr.length - _pos);
541 
542               static if(is(typeof({ mixin(`rem[]` ~ op ~ `=_arr[];`); })))
543                 mixin(`rem[0 .. minL] `~ op ~ "= _arr[_pos .. _pos + minL];");
544               else{
545                 auto rem_ptr = rem.ptr,
546                      buf_ptr = () @trusted { return _arr.ptr + _pos; }();
547                 const rem_end = () @trusted { return rem_ptr + minL; }();
548 
549                 while(rem_ptr != rem_end){
550                     binaryFunExt!op(*rem_ptr, *buf_ptr);
551                     () @trusted { ++rem_ptr; ++buf_ptr; }();
552                 }
553               }
554               _pos += minL;
555               _pos %= _arr.length;
556               rem = rem[minL .. $];
557             }
558 
559             return buf;
560         }
561 
562 
563         U[] read(U)(U[] buf) { return readOp!""(buf); }
564 
565         //mixin(defaultStreamSIMDOperator);
566 
567 
568         const(E)[] availables() @property { return _arr[_pos .. $]; }
569         void consume(size_t n) {
570             _pos += n;
571             _pos %= _arr.length;
572         }
573 
574       private:
575         const(E)[] _arr;
576         size_t _pos;
577     }
578 
579 
580     return RepeatStream!()(array, 0);
581 }
582 
583 
584 /// ditto
585 auto repeatStream(E)(E element)
586 if(!is(E : T[], T))
587 {
588     static struct RepeatStream()
589     {
590         enum bool empty = false;
591         auto front() const @property { return _e; }
592         void popFront() {}
593         auto save() @property { return this; }
594         auto save() const @property { return .repeatStream(_e); }
595         auto save() immutable @property { return .repeatStream(_e); }
596         auto opIndex(size_t) const @property { return _e; }
597         RepeatStream!() opSlice() const { return this; }
598         struct OpDollar{} enum opDollar = OpDollar();
599         RepeatStream!() opSlice(size_t, OpDollar) const { return this; }
600         auto opSlice(size_t i , size_t j) const { return this.take(j - i); }
601 
602         bool fetch() { return false; }
603 
604 
605         U[] readOp(alias op, U)(U[] buf)
606         {
607           static if(is(typeof({ mixin(`buf[] ` ~ op ~ "= _e;"); })))
608             mixin(`buf[] ` ~ op ~ "= _e;");
609           else
610           {
611             auto p = buf.ptr,
612                  e = () @trusted { return p + buf.length; }();
613 
614             while(p != e){
615                 binaryFunExt!op(*p, _e);
616                 () @trusted { ++p; }();
617             }
618           }
619 
620             return buf;
621         }
622 
623 
624         U[] read(U)(U[] buf)
625         {
626             return readOp!""(buf);
627         }
628 
629         //mixin(defaultStreamSIMDOperator);
630 
631       private:
632         E _e;
633     }
634 
635 
636     return RepeatStream!()(element);
637 }
638 
639 ///
640 unittest{
641     int[] arr = [0, 1, 2, 3, 4, 5, 6, 7];
642     auto rs = arr.repeatStream;
643     static assert(isInfinite!(typeof(rs)));
644     static assert(isInputStream!(typeof(rs)));
645     static assert(isInplaceComputableStream!(typeof(rs)));
646     static assert(isBufferedInputStream!(typeof(rs)));
647 
648 
649     int[] buf1 = new int[24];
650     assert(rs.readOp!"+"(buf1) == arr ~ arr ~ arr);
651 
652     short[] buf2 = new short[17];
653     assert(rs.readOp!"cast(short)b"(buf2) == buf1[0 .. 17]);
654 }
655 
656 ///
657 unittest{
658     auto rs = 1.repeatStream;
659     static assert(isInfinite!(typeof(rs)));
660     static assert(isInputStream!(typeof(rs)));
661     static assert(isInplaceComputableStream!(typeof(rs)));
662 
663     assert(rs.read(new int[4]) == [1, 1, 1, 1]);
664 }
665 
666 
667 /**
668 一度に巨大なファイルを読み込むことに特化した,バッファ持ち入力ストリームです。
669 */
670 auto rawFileStream(T)(string filename, size_t bufferSize = 1024 * 1024)
671 if(is(Unqual!T == T))
672 {
673     static struct RawFileStream()
674     {
675         private void refill()
676         {
677             _pos = 0;
678             if(!_file.isOpen){
679                 _buffer = null;
680                 return;
681             }
682 
683             _pos = 0;
684             immutable beforeSize = _buffer.length;
685             _buffer = _file.rawRead(_buffer);
686             if(_buffer.length != beforeSize)
687                 _file.detach();
688         }
689 
690 
691         this(File file, T[] buf)
692         {
693             _file = file;
694             _buffer = buf;
695             _pos = 0;
696         }
697 
698 
699         /** 入力レンジのプリミティブ
700         */
701         T front() const @property { return _buffer[_pos]; }
702         bool empty() const @property { return _buffer.length == _pos; }     /// ditto
703         void popFront() { ++_pos; }                                         /// ditto
704         size_t length() const @property { return _buffer.length - _pos; }   /// ditto
705 
706 
707         /**
708         入力ストリームのプリミティブ
709         */
710         bool fetch()
711         {
712             if(!this.empty) return false;
713             refill();
714             return this.empty;
715         }
716 
717 
718         /// ditto
719         E[] readOp(alias func, E)(E[] buf)
720         if(is(typeof(binaryFunExt!func(buf[0], _buffer[0]))))
721         {
722             immutable minL = min(buf.length, _buffer.length - _pos);
723           static if(is(typeof(mixin(`buf[]` ~ func ~ `= _buffer[]`))))
724             mixin(`buf[0 .. minL] ` ~ func ~ "= _buffer[_pos .. _pos + minL];");
725           else
726           {
727             auto rem_ptr = buf.ptr,
728                  buf_ptr = () @trusted { return _buffer.ptr + _pos; }();
729             const rem_end = () @trusted { return rem_ptr + minL; }();
730 
731             while(rem_ptr != rem_end){
732                 binaryFunExt!func(*rem_ptr, *buf_ptr);
733                 () @trusted { ++rem_ptr; ++buf_ptr; }();
734             }
735           }
736             _pos += minL;
737             return buf[0 .. minL];
738         }
739 
740 
741         /// ditto
742         E[] read(E)(E[] buf)
743         //if(isAssignable!(E, T))
744         { return readOp!""(buf); }
745 
746 
747         //mixin(defaultStreamSIMDOperator);
748 
749 
750         /**
751         バッファ持ち入力レンジのプリミティブ
752         */
753         const(T)[] availables() const @property { return _buffer[_pos .. $]; }
754 
755 
756         /**
757         バッファ持ち入力レンジのプリミティブ
758         */
759         void consume(size_t n)
760         {
761             while(n && !this.empty){
762                 if(n >= _buffer.length - _pos){
763                     n -= _buffer.length - _pos;
764                     if(_file.isOpen)
765                         refill();
766                     else{
767                         _buffer = null;
768                         _pos = 0;
769                     }
770                 }
771                 else{
772                     _pos += n;
773                     n = 0;
774                 }
775             }
776         }
777 
778 
779       private:
780         T[] _buffer;
781         size_t _pos;
782         File _file;
783     }
784 
785 
786     auto dst = RefCounted!(RawFileStream!())(File(filename), new T[bufferSize]);
787     dst.refill();
788     return dst;
789 }
790 
791 ///
792 unittest{
793     immutable fname = "testData.dat";
794     std.file.write(fname, cast(ubyte[])[0, 1, 2, 3, 4, 5]); // 6byte
795     scope(exit)
796         std.file.remove(fname);
797 
798     auto sig1 = rawFileStream!int(fname);
799     static assert(isInputStream!(typeof(sig1)));
800     static assert(isInplaceComputableStream!(typeof(sig1)));
801     static assert(isBufferedInputStream!(typeof(sig1)));
802 
803     assert(sig1.availables.length == 1);
804     sig1.consume(1);
805     assert(sig1.empty && sig1.fetch());
806 }
807 
808 ///
809 unittest{
810     immutable fname = "testData.dat";
811     std.file.write(fname, [0, 1, 2, 3, 4, 5]); // 24byte
812     scope(exit)
813         std.file.remove(fname);
814 
815     auto sig1 = rawFileStream!int(fname);
816     assert(sig1.availables == [0, 1, 2, 3, 4, 5]);
817     sig1.consume(6);
818     assert(sig1.empty && sig1.fetch());
819 }
820 
821 
822 /**
823 二つの信号の積を返します。
824 2つ目の信号は演算による理想信号でなければいけません。
825 */
826 auto mixer(Sg1, Sg2)(Sg1 sg1, Sg2 sg2)
827 if(isInputStream!Sg1 && isInplaceComputableStream!(Sg2, "*") && isInfinite!Sg2)
828 {
829     static struct Mixer()
830     {
831         auto ref front() const @property { return _sg1.front * _sg2.front; }
832 
833       static if(isInfinite!Sg1)
834         enum bool empty = false;
835       else
836         bool empty() const @property { return _sg1.empty; }
837 
838         void popFront() { _sg1.popFront(); _sg2.popFront(); }
839 
840         bool fetch()
841         {
842           static if(isInfinite!Sg1)
843             return false;
844           else
845           {
846             if(!_sg1.empty) return false;
847             //_sg2.fetch();
848             return _sg1.fetch();
849           }
850         }
851 
852 
853         E[] read(E)(E[] buf)
854         {
855             return readOp!""(buf);
856         }
857 
858 
859         auto read(E, size_t N)(SIMDArray!(E, N) buf)
860         {
861             return readOp!""(buf);
862         }
863 
864 
865         E[] readOp(alias op, E)(E[] buf)
866         if((op == "*" || op == "/")
867         && isInplaceComputableStream!(Sg1, op)
868         && isInplaceComputableStream!(Sg2, op))
869         {
870             return _sg2.readOp!op(_sg1.readOp!op(buf));
871         }
872 
873 
874         E[] readOp(alias op : "", E)(E[] buf)
875         {
876             return _sg2.readOp!"*"(_sg1.read(buf));
877         }
878 
879 
880         auto readOp(alias op, E, size_t N)(SIMDArray!(E, N) buf)
881         if((op == "*" || op == "/")
882         && isInplaceComputableStream!(Sg1, op)
883         && isInplaceComputableStream!(Sg2, op))
884         {
885             return _sg2.readOp!op(_sg1.readOp!op(buf));
886         }
887 
888 
889         auto readOp(alias op : "", E, size_t N)(SIMDArray!(E, N) buf)
890         {
891             return _sg2.readOp!"*"(_sg1.read(buf));
892         }
893 
894 
895       private:
896         Sg1 _sg1;
897         Sg2 _sg2;
898     }
899 
900 
901     return Mixer!()(sg1, sg2);
902 }
903 
904 
905 ///
906 unittest
907 {
908     scope(failure) {writefln("Unittest failure :%s(%s)", __FILE__, __LINE__); stdout.flush();}
909     scope(success) {writefln("Unittest success :%s(%s)", __FILE__, __LINE__); stdout.flush();}
910 
911     auto arr1 = [0, 1, 0, 1, 0, 1, 0, 1].repeatStream;
912     auto arr2 = [0, 0, 1, 1, 0, 0, 1, 1].repeatStream;
913     auto mx1 = arr1.mixer(arr2);
914     static assert(isInfinite!(typeof(mx1)));
915     static assert(isInputStream!(typeof(mx1)));
916 
917     int[] buf1 = new int[16];
918     assert(mx1.read(buf1) == [0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1]);
919 
920     auto arr3 = [0, 0, 0, 0, 1, 1, 1, 1].repeatStream;
921     auto mx2 = mx1.mixer(arr3);
922 
923     int[] buf2 = new int[16];
924     assert(mx2.read(buf2) == [0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1]);
925 }
926 
927 
928 /**
929 二つの信号を足します。
930 2つ目の信号は、演算により生成された理想信号でなければいけません。
931 */
932 auto adder(Sg1, Sg2)(Sg1 sg1, Sg2 sg2)
933 if(isInputStream!Sg1 && isInplaceComputableStream!(Sg2, "+") && isInfinite!Sg2)
934 {
935     static struct Adder()
936     {
937         auto ref front() const @property { return _sg1.front + _sg2.front; }
938 
939       static if(isInfinite!Sg1)
940         enum bool empty = false;
941       else
942         bool empty() const @property { return _sg1.empty; }
943 
944         void popFront() { _sg1.popFront(); _sg2.popFront(); }
945 
946         bool fetch() {
947           static if(isInfinite!Sg1)
948             return false;
949           else
950           {
951             if(!_sg1.empty) return false;
952             //_sg2.fetch();
953             return _sg1.fetch();
954           }
955         }
956 
957         E[] read(E)(E[] buf)
958         {
959             return readOp!""(buf);
960         }
961 
962 
963         E[] readOp(alias op, E)(E[] buf)
964         if((op == "+" || op == "-")
965         && isInplaceComputableStream!(Sg1, op)
966         && isInplaceComputableStream!(Sg2, op))
967         {
968             return _sg2.readOp!op(_sg1.readOp!op(buf));
969         }
970 
971 
972         auto readOp(alias op, E, size_t N)(SIMDArray!(E, N) buf)
973         {
974             return _sg2.readOp!op(_sg1.readOp!op(buf));
975         }
976 
977 
978         E[] readOp(alias op : "", E)(E[] buf)
979         {
980             return _sg2.readOp!"+"(_sg1.read(buf));
981         }
982 
983 
984         auto readOp(alias op, E, size_t N)(SIMDArray!(E, N) buf)
985         {
986             return _sg2.readOp!"+"(_sg1.readOp(buf));
987         }
988 
989 
990       private:
991         Sg1 _sg1;
992         Sg2 _sg2;
993     }
994 
995 
996     return Adder!()(sg1, sg2);
997 }
998 
999 
1000 ///
1001 unittest
1002 {
1003     scope(failure) {writefln("Unittest failure :%s(%s)", __FILE__, __LINE__); stdout.flush();}
1004     scope(success) {writefln("Unittest success :%s(%s)", __FILE__, __LINE__); stdout.flush();}
1005 
1006     auto arr1 = [0, 1, 0, 1, 0, 1, 0, 1].repeatStream;
1007     auto arr2 = [0, 0, 1, 1, 0, 0, 1, 1].repeatStream;
1008     auto mx1 = arr1.adder(arr2);
1009     static assert(isInfinite!(typeof(mx1)));
1010     static assert(isInputStream!(typeof(mx1)));
1011 
1012     int[] buf1 = new int[16];
1013     assert(mx1.read(buf1) == [0, 1, 1, 2, 0, 1, 1, 2, 0, 1, 1, 2, 0, 1, 1, 2]);
1014 }
1015 
1016 ///
1017 unittest
1018 {
1019     scope(failure) {writefln("Unittest failure :%s(%s)", __FILE__, __LINE__); stdout.flush();}
1020     scope(success) {writefln("Unittest success :%s(%s)", __FILE__, __LINE__); stdout.flush();}
1021 
1022     auto arr1 = [0, 1, 0, 1, 0, 1, 0, 1].repeatStream;
1023     auto buf1 = arr1.adder(arr1).adder(arr1).read(new int[16]);
1024     auto buf2 = arr1.amplifier(3).read(new int[16]);
1025 
1026     assert(buf1 == buf2);
1027 }
1028 
1029 
1030 /**
1031 積分器です。
1032 積分するサンプル量が大きい場合の使用に適しています。
1033 */
1034 auto accumulator(E)(size_t integN)
1035 {
1036     static struct Accumulator
1037     {
1038         E[] write(E[] buf)
1039         {
1040             while(buf.length){
1041                 immutable n = min(_integN - _cnt, buf.length);
1042                 auto ptr = buf.ptr;
1043                 const end = buf.ptr + n;
1044                 auto val = _buffer.back;
1045                 while(ptr != end){
1046                     val += *ptr;
1047                     ++ptr;
1048                 }
1049 
1050                 _cnt += n;
1051                 _buffer.back = val;
1052                 buf = buf[n .. $];
1053                 if(_cnt == _integN){
1054                     _buffer ~= cast(E)0;
1055                     _cnt = 0;
1056                     ++_avaN;
1057                 }
1058             }
1059 
1060             return buf;
1061         }
1062 
1063 
1064         enum bool fill = false;
1065         bool flush() { return false; }
1066 
1067 
1068         auto opSlice(){ return _buffer[].take(_avaN); }
1069 
1070         E front() const @property { return _buffer.front; }
1071         void popFront() { _buffer.removeFront(); --_avaN; }
1072         bool empty() const @property { return _avaN == 0; }
1073 
1074 
1075       private:
1076         DList!E _buffer;
1077         size_t _cnt;
1078         size_t _integN;
1079         size_t _avaN;
1080     }
1081 
1082 
1083     return Accumulator(DList!E(cast(E)0), 0, integN, 0);
1084 }
1085 
1086 ///
1087 unittest
1088 {
1089     scope(failure) {writefln("Unittest failure :%s(%s)", __FILE__, __LINE__); stdout.flush();}
1090     scope(success) {writefln("Unittest success :%s(%s)", __FILE__, __LINE__); stdout.flush();}
1091 
1092     auto acc = accumulator!int(4);
1093     static assert(isOutputStream!(typeof(acc), int));
1094 
1095     assert(acc.write([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]).empty);
1096     assert(equal(acc[], [0+1+2+3, 4+5+6+7]));
1097 
1098     acc.write([10, 11]);
1099     assert(equal(acc[], [0+1+2+3, 4+5+6+7, 8+9+10+11]));
1100 }
1101 
1102 
1103 /**
1104 積分器です。
1105 たとえば、連続する少数の個数のサンプルの和を取るような用途に適しています。
1106 */
1107 auto accumulator(size_t N, Sg)(Sg sg, size_t bufSize = 1024 * 1024)
1108 {
1109     alias E = Unqual!(ElementType!Sg);
1110     return accumulator(sg, new E[bufSize]);
1111 }
1112 
1113 
1114 /// ditto
1115 auto accumulator(size_t N, Sg, E)(Sg sg, E[] buffer)
1116 if(isInputStream!Sg && is(ElementType!Sg : E))
1117 {
1118     static struct Accumulator()
1119     {
1120         auto front() const @property { return availables[0]; }
1121         void popFront() { consume(1); }
1122         bool empty() const @property { return _pos == _end; }
1123 
1124         bool fetch()
1125         {
1126             if(!this.empty) return false;
1127             auto p = _buffer.ptr + _buffer.length - _remN,
1128                  f = _buffer.ptr;
1129             const e = _buffer.ptr + _buffer.length;
1130             while(p != e) { *f = *p; ++f; ++p; }
1131             _pos = 0;
1132             _end = 0;
1133 
1134             while(this.empty && (!_sg.empty || !_sg.fetch())){
1135                 auto buf = _sg.read(_buffer[_remN .. $]);
1136                 immutable size_t len = (_remN + buf.length) / N;
1137                 _end += len;
1138 
1139                 auto pp = _buffer.ptr,
1140                      ff = _buffer.ptr;
1141                 auto ee = _buffer.ptr + _end;
1142 
1143                 while(pp != ee){
1144                     size_t s = 1;
1145                     if(pp != _buffer.ptr) { *pp = 0; s = 0; }
1146 
1147                     ff += s;
1148                     foreach(i; s .. N){
1149                         *pp += *ff;
1150                         ++ff;
1151                     }
1152 
1153                     ++pp;
1154                 }
1155                 _remN = (_remN + buf.length) - _end * N;
1156             }
1157 
1158             return this.empty;
1159         }
1160 
1161 
1162         T[] readOp(alias func, T)(T[] buf)
1163         {
1164             immutable minL = min(buf.length, _end - _pos);
1165           static if(is(typeof(mixin(`buf[]` ~ func ~ `= _buffer[]`))))
1166             mixin(`buf[0 .. minL] ` ~ func ~ "= _buffer[_pos .. _pos + minL];");
1167           else
1168           {
1169             auto rem_ptr = buf.ptr,
1170                  buf_ptr = () @trusted { return _buffer.ptr + _pos; }();
1171             const rem_end = () @trusted { return rem_ptr + minL; }();
1172 
1173             while(rem_ptr != rem_end){
1174                 binaryFunExt!func(*rem_ptr, *buf_ptr);
1175                 () @trusted { ++rem_ptr; ++buf_ptr; }();
1176             }
1177           }
1178             _pos += minL;
1179             return buf[0 .. minL];
1180         }
1181 
1182 
1183         auto readOp(alias func, E, size_t N)(SIMDArray!(E, N) buf)
1184         {
1185             auto b = readOp(buf.array);
1186             return buf[0 .. b.length];
1187         }
1188 
1189 
1190         T[] read(T)(T[] buf) { return readOp!""(buf); }
1191 
1192         //mixin(defaultStreamSIMDOperator);
1193 
1194 
1195         const(E)[] availables() const @property { return _buffer[_pos .. _end]; }
1196         void consume(size_t n)
1197         {
1198             while(n && (!this.empty || !this.fetch())){
1199                 if(n <= _end - _pos){
1200                     _pos += n;
1201                     n = 0;
1202                 }else{
1203                     n -= _end - _pos;
1204                     _pos = _end;
1205                 }
1206             }
1207         }
1208 
1209 
1210       private:
1211         Sg _sg;
1212         E[] _buffer;
1213         size_t _pos;
1214         size_t _end;
1215         size_t _remN;
1216     }
1217 
1218 
1219     return Accumulator!()(sg, buffer, 0, 0, 0);
1220 }
1221 
1222 ///
1223 unittest
1224 {
1225     scope(failure) {writefln("Unittest failure :%s(%s)", __FILE__, __LINE__); stdout.flush();}
1226     scope(success) {writefln("Unittest success :%s(%s)", __FILE__, __LINE__); stdout.flush();}
1227 
1228     auto arr1 = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9].repeatStream;
1229     auto acc = accumulator!4(arr1, new int[14]);
1230     static assert(isBufferedInputStream!(typeof(acc)));
1231 
1232     int[] buf = new int[3];
1233     assert(!acc.empty || !acc.fetch());
1234     assert(acc.read(buf) == [0+1+2+3, 4+5+6+7, 8+9+0+1]);
1235     assert(!acc.empty || !acc.fetch());
1236     assert(acc.read(buf) == [2+3+4+5, 6+7+8+9, 0+1+2+3]);
1237 }
1238 
1239 /+ ベンチマーク, rangeの3倍程度の性能
1240 unittest {
1241     static real[] vector;
1242     vector = new real[1024 * 1024];
1243     foreach(ref e; vector) e = 0;
1244 
1245 
1246     void funcStream()
1247     {
1248         alias sin128LutNCO = lutNCO!(std.math.sin, 128);
1249 
1250         auto mx = mixer(sin128LutNCO(6.5E6L, 1E-6L, 0),
1251                         sin128LutNCO(6.6E6L, 1E-6L, 0));
1252 
1253         mx.readOp!"*"(vector);
1254     }
1255 
1256 
1257     void funcRange()
1258     {
1259         alias sin128LutNCO = lutNCO!(std.math.sin, 128);
1260 
1261         auto mx = zip(sin128LutNCO(6.5E6L, 1E-6L, 0), sin128LutNCO(6.6E6L, 1E-6L, 0))
1262                  .map!"a[0]*a[1]";
1263 
1264         auto ptr = vector.ptr, end = ptr + vector.length;
1265         while(ptr != end){
1266             *ptr *= mx.front;
1267             mx.popFront();
1268             ++ptr;
1269         }
1270     }
1271 
1272     
1273     import std.datetime;
1274     import std.container;
1275     auto ts = benchmark!(funcStream, funcRange)(26);
1276 
1277     writefln("%(\n%s%)", ts[].map!"a.usecs");
1278 }
1279 +/
1280 
1281 
1282 auto amplifier(Sg, F)(Sg sg, F gain)
1283 {
1284     static struct Amplifier()
1285     {
1286         auto front() const @property { return _sg.front * _gain; }
1287         void popFront() { _sg.popFront(); }
1288         bool empty() const { return _sg.empty; }
1289         bool fetch() { return _sg.fetch(); }
1290 
1291 
1292         E[] readOp(string op, E)(E[] buf)
1293         if((op == "*" || op == "/") && isInplaceComputableStream!(Sg, op, E))
1294         {
1295             buf = _sg.readOp!op(buf);
1296             mixin("buf[]" ~ op ~ "= _gain;");
1297 
1298             return buf;
1299         }
1300 
1301 
1302         auto readOp(string op, E, size_t N)(SIMDArray!(E, N) buf)
1303         if((op == "*" || op == "/") && isInplaceComputableStream!(Sg, op, E))
1304         {
1305             buf = _sg.readOp!op(buf);
1306             mixin("buf" ~ op ~ "= _gain;");
1307 
1308             return buf;
1309         }
1310 
1311 
1312         E[] read(E)(E[] buf)
1313         {
1314             buf = _sg.read(buf);
1315             buf[] *= _gain;
1316 
1317             return buf;
1318         }
1319 
1320 
1321         auto read(E, size_t N)(SIMDArray!(E, N) buf)
1322         {
1323             buf = _sg.read(buf);
1324             buf *= _gain;
1325         }
1326 
1327 
1328         F gain() const @property pure nothrow @safe @nogc { return _gain; }
1329         void gain(F g) @property pure nothrow @safe @nogc { _gain = g; }
1330 
1331 
1332       private:
1333         Sg _sg;
1334         F _gain;
1335     }
1336 
1337 
1338     return Amplifier!()(sg, gain);
1339 }
1340 
1341 ///
1342 unittest
1343 {
1344     scope(failure) {writefln("Unittest failure :%s(%s)", __FILE__, __LINE__); stdout.flush();}
1345     scope(success) {writefln("Unittest success :%s(%s)", __FILE__, __LINE__); stdout.flush();}
1346 
1347     auto arr4 = [0, 1, 2, 3].repeatStream,
1348          amped = arr4.amplifier(4);
1349 
1350     assert(amped.read(new int[4]) == [0, 4, 8, 12]);
1351     static assert(isInplaceComputableStream!(typeof(arr4)));
1352     static assert(isInplaceComputableStream!(typeof(amped), "*"));
1353     static assert(isInplaceComputableStream!(typeof(amped), "/"));
1354 
1355     auto arr = [3, 4, 1, 2];
1356     assert(amped.readOp!"*"(arr) == [0, 16, 8, 24]);
1357 }
1358 
1359 
1360 auto selector(Sg...)(Sg sgs)
1361 {
1362     static struct Selector()
1363     {
1364         import std.string : format;
1365 
1366 
1367         auto front() const @property
1368         {
1369             switch(_selectIndex){
1370               foreach(i, E; Sg)
1371                 mixin(format("case %s: return _sgs[%s].front;", i, i));
1372               default: assert(0);
1373             }
1374         }
1375 
1376 
1377         void popFront()
1378         {
1379             switch(_selectIndex){
1380               foreach(i, E; Sg)
1381                 mixin(format("case %s: _sgs[%s].popFront();", i, i));
1382               default: assert(0);
1383             }
1384         }
1385 
1386 
1387         bool empty() const @property
1388         {
1389             switch(_selectIndex){
1390               foreach(i, E; Sg)
1391                 mixin(format("case %s: return _sgs[%s].empty;", i, i));
1392               default: assert(0);
1393             }
1394         }
1395 
1396 
1397         bool fetch()
1398         {
1399             switch(_selectIndex){
1400               foreach(i, E; Sg)
1401                 mixin(format("case %s: return _sgs[%s].fetch();", i, i));
1402               default: assert(0);
1403             }
1404         }
1405 
1406 
1407         E[] read(E)(E[] buf)
1408         {
1409             switch(_selectIndex){
1410               foreach(i, E; Sg)
1411                 mixin(format("case %s: return _sgs[%s].read(buf);", i, i));
1412               default: assert(0);
1413             }
1414         }
1415 
1416 
1417         E[] readOp(alias op, E)(E[] buf)
1418         {
1419             switch(_selectIndex){
1420               foreach(i, E; Sg)
1421                 mixin(format("case %s: return _sgs[%s].readOp!op(buf);", i, i));
1422               default: assert(0);
1423             }
1424         }
1425 
1426 
1427         //mixin(defaultStreamSIMDOperator);
1428 
1429 
1430         void select(size_t i)
1431         in{
1432             assert(i < Sg.length);
1433         }
1434         do{
1435             _selectIndex = i;
1436         }
1437 
1438 
1439         void select(size_t i)()
1440         {
1441             static assert(i < Sg.length);
1442             _selectIndex = i;
1443         }
1444 
1445 
1446       private:
1447         Sg _sgs;
1448         size_t _selectIndex;
1449     }
1450 
1451 
1452     return Selector!()(sgs, 0);
1453 }
1454 
1455 ///
1456 unittest
1457 {
1458     scope(failure) {writefln("Unittest failure :%s(%s)", __FILE__, __LINE__); stdout.flush();}
1459     scope(success) {writefln("Unittest success :%s(%s)", __FILE__, __LINE__); stdout.flush();}
1460 
1461     auto arr1 = 0.repeatStream,
1462          arr2 = [0, 1].repeatStream,
1463          arr3 = [0, 1, 2].repeatStream,
1464          arr4 = [0, 1, 2, 3].repeatStream;
1465 
1466     auto slt = selector(arr1, arr2, arr3, arr4);
1467     assert(slt.read(new int[4]) == [0, 0, 0, 0]);
1468     slt.select(1); // or slt.select!1;
1469     assert(slt.read(new int[4]) == [0, 1, 0, 1]);
1470     slt.select(2); // or slt.select!2;
1471     assert(slt.read(new int[4]) == [0, 1, 2, 0]);
1472     slt.select(3); // or slt.select!3;
1473     assert(slt.read(new int[4]) == [0, 1, 2, 3]);
1474 }
1475 
1476 
1477 /**
1478 FIRフィルタを構成します。
1479 
1480 FIRフィルタの一般形は、z変換すれば次のような式で表すことができます。
1481 H[z] = Σ(k[m]*z^(-m))
1482 
1483 この関数には、各タップの係数`k[m]`を指定することで任意のFIRフィルタを構築することができます。
1484 */
1485 auto firFilter(alias reduceFn = "a+b*c", Sg, E)(Sg sg, const E[] taps)
1486 if(isInputStream!Sg)
1487 {
1488     return firFilter!(reduceFn, Sg, Unqual!E)(sg, taps, new Unqual!E[](taps.length));
1489 }
1490 
1491 
1492 ///
1493 auto firFilter(alias reduceFn = "a+b*c", Sg, E)(Sg sg, const E[] tap, E[] buf)
1494 if(isInputStream!Sg && is(E == Unqual!E))
1495 in{
1496     assert(tap.length == buf.length);
1497 }
1498 do{
1499     static struct FIRFiltered()
1500     {
1501         E front() const @property @trusted
1502         {
1503             E a = 0;
1504 
1505             {
1506                 auto p1 = _tap[$ - 1 .. $].ptr,
1507                      p2 = _buf[_idx .. $].ptr,
1508                      e2 = _buf[$ .. $].ptr;
1509 
1510                 while(p2 != e2){
1511                     a = naryFun!reduceFn(a, *p2, *p1);
1512                     --p1;
1513                     ++p2;
1514                 }
1515             }
1516 
1517             {
1518                 auto p1 = _tap.ptr + _idx-1,
1519                      p2 = _buf[0 .. _idx].ptr,
1520                      e2 = _buf[_idx .. $].ptr;
1521 
1522                 while(p2 != e2){
1523                     a = naryFun!reduceFn(a, *p2, *p1);
1524                     --p1;
1525                     ++p2;
1526                 }
1527             }
1528 
1529             return a;
1530         }
1531 
1532 
1533         void popFront()
1534         {
1535             _buf[_idx] = _sg.front;
1536             ++_idx;
1537             _idx %= _buf.length;
1538             _sg.popFront();
1539         }
1540 
1541       static if(isInfinite!Sg)
1542         enum bool empty = false;
1543       else
1544         bool empty() const @property { return _sg.empty; }
1545 
1546 
1547         bool fetch() { return _sg.fetch(); }
1548 
1549 
1550         U[] readOp(alias op, U)(U[] buf)
1551         {
1552             auto p = buf.ptr,
1553                  e = buf[$ .. $].ptr;
1554 
1555             while(p != e && !this.empty){
1556                 binaryFunExt!op(*p, this.front);
1557                 this.popFront();
1558                 () @trusted { ++p; }();
1559             }
1560 
1561             return () @trusted { return buf[0 .. p - buf.ptr]; }();
1562         }
1563 
1564 
1565         U[] read(U)(U[] buf)
1566         {
1567             return readOp!""(buf);
1568         }
1569 
1570 
1571         //mixin(defaultStreamSIMDOperator);
1572 
1573 
1574       private:
1575         Sg _sg;
1576         const(E)[] _tap;
1577         E[] _buf;
1578         size_t _idx;
1579     }
1580 
1581 
1582     return FIRFiltered!()(sg, tap, buf, 0);
1583 }
1584 
1585 ///
1586 unittest
1587 {
1588     scope(failure) {writefln("Unittest failure :%s(%s)", __FILE__, __LINE__); stdout.flush();}
1589     scope(success) {writefln("Unittest success :%s(%s)", __FILE__, __LINE__); stdout.flush();}
1590 
1591     auto arr = [0, 1, 2, 3].repeatStream,
1592          flt1 = arr.firFilter([0, 1]);
1593 
1594     assert(flt1.read(new int[8]) == [0, 0, 0*1+1*0, 1, 2, 3, 0, 1]);
1595 
1596     auto flt2 = arr.firFilter([1, 2]);
1597     assert(flt2.read(new int[6]) == [0, 0, 0*2+1*1, 1*2+2*1, 2*2+3*1, 3*2+0*1]);
1598 }
1599 
1600 
1601 /*
1602 IIRフィルタを構成します。
1603 
1604 IIRフィルタの一般形は、z変換すれば次のような式で表すことができます。
1605 H(z) = 1/Σ(k[m]*z^(-m))
1606 
1607 この関数には、各タップの係数`k[m]`を指定することで任意のIIRフィルタを構築することができます。
1608 */
1609 //auto iirFilter(alias reduceFn = "a+b*c", Sg, E)(Sg sg, const E[] taps)
1610 //if(isInputStream!Sg)
1611 //{
1612 //    return iirFilter!(reduceFn, Sg, Unqual!E)(sg, taps, new Unqual!E()(taps.length));
1613 //}
1614 
1615 /////
1616 //auto iirFilter(alias reduceFn = "a+b*c", Sg, E)(Sg sg, const E[] taps, E[] buf)
1617 //if(isInputStream!Sg && is(E == Unqual!E))
1618 //{
1619 
1620 //}
1621 
1622 
1623 /**
1624 信号の絶対値の最大値を`limit`にするように線形に振幅を小さく、もしくは大きくします。
1625 */
1626 auto normalizer(Sg, E)(Sg sg, E limit)
1627 if(isFloatingPoint!E)
1628 {
1629     static struct Normalizer()
1630     {
1631         E front() @property
1632         {
1633             auto f = _sg.front,
1634                  a = abs(f);
1635 
1636             if(f == 0) return 0;
1637 
1638             if(a > _max){
1639                 _max = a;
1640 
1641                 return _lim;
1642             }
1643 
1644             return f / _max * _lim;
1645         }
1646 
1647 
1648         auto empty() const @property { return _sg.empty; }
1649         void popFront() { _sg.popFront(); }
1650 
1651         bool fetch() { return _sg.fetch(); }
1652 
1653 
1654         U[] read(U)(U[] buf)
1655         {
1656             buf = _sg.read(buf);
1657 
1658             auto p = buf.ptr,
1659                  e = buf[$ .. $].ptr;
1660 
1661             while(p != e){
1662                 immutable v = abs(*p);
1663                 if(v == 0)
1664                     *p = 0;
1665                 else{
1666                     if(v > _max)
1667                         _max = v;
1668 
1669                     *p *= _lim / _max;
1670                 }
1671 
1672                 () @trusted { ++p; }();
1673             }
1674 
1675             return buf;
1676         }
1677 
1678 
1679         U[] readOp(alias op : "", U)(U[] buf)
1680         {
1681             return this.read(buf);
1682         }
1683 
1684 
1685         E[] readOp(alias op, E)(E[] buf)
1686         {
1687             auto p = buf.ptr,
1688                  e = buf[$ .. $].ptr;
1689 
1690             while(p != e && !this.empty){
1691                 binaryFunExt!op(*p, this.front);
1692                 this.popFront();
1693                 () @trusted { ++p; }();
1694             }
1695 
1696             return buf;
1697         }
1698 
1699         //mixin(defaultStreamSIMDOperator);
1700 
1701 
1702       private:
1703         Sg _sg;
1704         E _lim;
1705         E _max;
1706     }
1707 
1708     return Normalizer!()(sg, limit, 0);
1709 }
1710 
1711 ///
1712 unittest
1713 {
1714     scope(failure) {writefln("Unittest failure :%s(%s)", __FILE__, __LINE__); stdout.flush();}
1715     scope(success) {writefln("Unittest success :%s(%s)", __FILE__, __LINE__); stdout.flush();}
1716 
1717     auto arr = [0, 1, -2, 3].repeatStream,
1718          nlz = arr.normalizer(1.5);
1719 
1720     assert(equal!isClose(nlz.read(new float[8]), [0, 1.5, -1.5, 1.5, 0, 0.5, -1.0, 1.5]));
1721 }
1722 
1723 
1724 
1725 /**
1726 信号をマッピングします
1727 */
1728 auto mapper(alias fn, Sg)(Sg sg, ElementType!Sg[] buf = null, size_t bufSize = 1024)
1729 if(isInputStream!Sg && is(typeof({sg.read(buf);})))
1730 {
1731     if(buf is null)
1732         buf.length = bufSize;
1733 
1734     static struct Result
1735     {
1736         alias E = ElementType!Sg;
1737 
1738         auto front() @property { return unaryFun!fn(_sg.front); }
1739         bool empty() const @property { return _sg.empty; }
1740         void popFront() { _sg.popFront(); }
1741 
1742         bool fetch() { return _sg.fetch(); }
1743 
1744         U[] readOp(alias op, U)(U[] outbuf)
1745         {
1746             if(_buf.length < outbuf.length)
1747                 _buf.length = outbuf.length;
1748 
1749             auto ib = _buf[0 .. outbuf.length];
1750             ib = _sg.read(ib);
1751 
1752             auto p = ib.ptr,
1753                  e = ib[$ .. $].ptr,
1754                  q = outbuf.ptr;
1755 
1756             while(p != e){
1757                 binaryFunExt!op(*q, unaryFun!fn(*p));
1758                 () @trusted { ++p; ++q; }();
1759             }
1760 
1761             return outbuf[0 .. ib.length];
1762         }
1763 
1764 
1765         U[] read(U)(U[] buf)
1766         {
1767             return readOp!""(buf);
1768         }
1769 
1770         //mixin(defaultStreamSIMDOperator);
1771 
1772 
1773       private:
1774         Sg _sg;
1775         E[] _buf;
1776     }
1777 
1778     return Result(sg, buf);
1779 }
1780 
1781 
1782 /**
1783 
1784 */
1785 auto arrayMapper(Sg, E)(Sg sg, in E[] arr, size_t[] buf = null, size_t bufSize = 1024)
1786 if(isInputStream!Sg && is(ElementType!Sg : size_t))
1787 {
1788     if(buf is null)
1789         buf.length = bufSize;
1790 
1791     static struct Result
1792     {
1793         auto front() @property { return _arr[_sg.front]; }
1794         bool empty() const @property { return _sg.empty; }
1795         void popFront() { _sg.popFront(); }
1796 
1797         bool fetch() { return _sg.fetch(); }
1798 
1799         U[] readOp(alias op, U)(U[] outbuf)
1800         {
1801             if(_buf.length < outbuf.length)
1802                 _buf.length = outbuf.length;
1803 
1804             auto ib = _buf[0 .. outbuf.length];
1805             ib = _sg.read(ib);
1806 
1807             auto p = ib.ptr,
1808                  e = ib[$ .. $].ptr,
1809                  q = outbuf.ptr;
1810 
1811             while(p != e){
1812                 binaryFunExt!op(*q, _arr[*p]);
1813                 () @trusted { ++p; ++q; }();
1814             }
1815 
1816             return outbuf[0 .. ib.length];
1817         }
1818 
1819 
1820         U[] read(U)(U[] buf)
1821         {
1822             return readOp!""(buf);
1823         }
1824 
1825 
1826         //mixin(defaultStreamSIMDOperator);
1827 
1828 
1829       private:
1830         Sg _sg;
1831         const(E)[] _arr;
1832         size_t[] _buf;
1833     }
1834 
1835     return Result(sg, arr, buf);
1836 }